Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(288)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/archiveStream.go

Issue 1853433002: LogDog: Handle archive failures. (Closed) Base URL: https://github.com/luci/luci-go@logdog-gs-update
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 ds "github.com/luci/gae/service/datastore" 8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/appengine/logdog/coordinator" 9 "github.com/luci/luci-go/appengine/logdog/coordinator"
10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
11 "github.com/luci/luci-go/common/clock" 11 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/grpcutil" 12 "github.com/luci/luci-go/common/grpcutil"
13 "github.com/luci/luci-go/common/logdog/types" 13 "github.com/luci/luci-go/common/logdog/types"
14 log "github.com/luci/luci-go/common/logging" 14 log "github.com/luci/luci-go/common/logging"
15 "github.com/luci/luci-go/common/proto/google" 15 "github.com/luci/luci-go/common/proto/google"
16 "golang.org/x/net/context" 16 "golang.org/x/net/context"
17 "google.golang.org/grpc/codes" 17 "google.golang.org/grpc/codes"
18 ) 18 )
19 19
20 // maxArchiveErrors is the maximum number of archival errors that we will
21 // tolerate before accepting a failed archival.
22 const maxArchiveErrors = 2
23
20 // ArchiveStream implements the logdog.ServicesServer interface. 24 // ArchiveStream implements the logdog.ServicesServer interface.
21 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) { 25 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) {
22 if err := Auth(c); err != nil { 26 if err := Auth(c); err != nil {
23 return nil, err 27 return nil, err
24 } 28 }
25 29
26 log.Fields{ 30 log.Fields{
27 "path": req.Path, 31 "path": req.Path,
28 }.Infof(c, "Marking log stream as archived.") 32 }.Infof(c, "Marking log stream as archived.")
29 33
(...skipping 23 matching lines...) Expand all
53 case ds.ErrNoSuchEntity: 57 case ds.ErrNoSuchEntity:
54 break 58 break
55 59
56 default: 60 default:
57 log.WithError(err).Errorf(c, "Failed to check for log stream arc hvial state.") 61 log.WithError(err).Errorf(c, "Failed to check for log stream arc hvial state.")
58 return nil, grpcutil.Internal 62 return nil, grpcutil.Internal
59 } 63 }
60 64
61 // Post the archival results to the Coordinator. 65 // Post the archival results to the Coordinator.
62 now := clock.Now(c).UTC() 66 now := clock.Now(c).UTC()
67 var ierr error
63 err := ds.Get(c).RunInTransaction(func(c context.Context) error { 68 err := ds.Get(c).RunInTransaction(func(c context.Context) error {
64 di := ds.Get(c) 69 di := ds.Get(c)
65 if err := di.Get(ls); err != nil { 70 if err := di.Get(ls); err != nil {
66 return err 71 return err
67 } 72 }
68 if ls.Archived() { 73 if ls.Archived() {
69 log.Infof(c, "Log stream already marked as archived.") 74 log.Infof(c, "Log stream already marked as archived.")
70 return nil 75 return nil
71 } 76 }
72 77
78 // If this request contained an error, we will reject it with
79 // FailedPrecondition status if we are below our error threshold .
80 if req.Error {
81 if ls.ArchiveErrors < maxArchiveErrors {
82 log.Fields{
83 "path": req.Path,
84 "errorCount": ls.ArchiveErrors,
85 "maxArchiveErrors": maxArchiveErrors,
86 }.Warningf(c, "Rejecting failed archival: below error threshold.")
87
88 // Increment our error count.
89 ls.ArchiveErrors++
90 if err := ls.Put(di); err != nil {
91 log.WithError(err).Errorf(c, "Failed to update log stream error count.")
92 return err
93 }
94
95 // Fail this RPC call to keep the archival task in the queue.
96 ierr = grpcutil.Errf(codes.FailedPrecondition, " below error threshold (%d < %d)", ls.ArchiveErrors, maxArchiveErrors)
97 return nil
98 }
99
100 // We have exceeded our error threshold, so we will cont inue to archive
101 // this log stream.
102 log.Fields{
Vadim Sh. 2016/03/31 22:29:59 maybe mark the entity with some indexable boolean,
dnj 2016/04/01 22:57:04 Done. I introduced "ArchiveState", which can be ei
103 "path": req.Path,
104 "errorCount": ls.ArchiveErrors,
105 "maxArchiveErrors": maxArchiveErrors,
106 }.Warningf(c, "Log stream has exceeded archive error thr eshold.")
107 }
108
73 // Update archival information. Make sure this actually marks th e stream as 109 // Update archival information. Make sure this actually marks th e stream as
74 // archived. 110 // archived.
75 ls.Updated = now 111 ls.Updated = now
76 ls.State = coordinator.LSArchived 112 ls.State = coordinator.LSArchived
77 ls.ArchiveWhole = req.Complete 113 ls.ArchiveWhole = req.Complete
78 ls.TerminalIndex = req.TerminalIndex 114 ls.TerminalIndex = req.TerminalIndex
79 ls.ArchiveStreamURL = req.StreamUrl 115 ls.ArchiveStreamURL = req.StreamUrl
80 ls.ArchiveStreamSize = req.StreamSize 116 ls.ArchiveStreamSize = req.StreamSize
81 ls.ArchiveIndexURL = req.IndexUrl 117 ls.ArchiveIndexURL = req.IndexUrl
82 ls.ArchiveIndexSize = req.IndexSize 118 ls.ArchiveIndexSize = req.IndexSize
83 ls.ArchiveDataURL = req.DataUrl 119 ls.ArchiveDataURL = req.DataUrl
84 ls.ArchiveDataSize = req.DataSize 120 ls.ArchiveDataSize = req.DataSize
85 121
86 // Update the log stream. 122 // Update the log stream.
87 if err := ls.Put(di); err != nil { 123 if err := ls.Put(di); err != nil {
Vadim Sh. 2016/03/31 22:29:59 this line got me confused :) usually it's di.Put(l
dnj 2016/04/01 22:57:03 Yeah LogStream.Put is the same as di.Put, but forc
88 log.WithError(err).Errorf(c, "Failed to update log strea m.") 124 log.WithError(err).Errorf(c, "Failed to update log strea m.")
89 return err 125 return err
90 } 126 }
91 127
92 log.Infof(c, "Successfully marked stream as archived.") 128 log.Infof(c, "Successfully marked stream as archived.")
93 return nil 129 return nil
94 }, nil) 130 }, nil)
95 if err != nil { 131 if err != nil {
96 log.WithError(err).Errorf(c, "Failed to mark stream as archived. ") 132 log.WithError(err).Errorf(c, "Failed to mark stream as archived. ")
97 return nil, grpcutil.Internal 133 return nil, grpcutil.Internal
98 } 134 }
135 if ierr != nil {
136 log.WithError(ierr).Errorf(c, "Failed to mark stream as archived (inner).")
137 return nil, ierr
138 }
99 139
100 return &google.Empty{}, nil 140 return &google.Empty{}, nil
101 } 141 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698