Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(280)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/archiveStream.go

Issue 1853433002: LogDog: Handle archive failures. (Closed) Base URL: https://github.com/luci/luci-go@logdog-gs-update
Patch Set: Regenerate protobufs. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 ds "github.com/luci/gae/service/datastore" 8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/appengine/logdog/coordinator" 9 "github.com/luci/luci-go/appengine/logdog/coordinator"
10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
11 "github.com/luci/luci-go/common/clock" 11 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/grpcutil" 12 "github.com/luci/luci-go/common/grpcutil"
13 "github.com/luci/luci-go/common/logdog/types" 13 "github.com/luci/luci-go/common/logdog/types"
14 log "github.com/luci/luci-go/common/logging" 14 log "github.com/luci/luci-go/common/logging"
15 "github.com/luci/luci-go/common/proto/google" 15 "github.com/luci/luci-go/common/proto/google"
16 "golang.org/x/net/context" 16 "golang.org/x/net/context"
17 "google.golang.org/grpc/codes" 17 "google.golang.org/grpc/codes"
18 ) 18 )
19 19
20 // maxArchiveErrors is the maximum number of archival errors that we will
21 // tolerate before accepting a failed archival.
22 const maxArchiveErrors = 2
23
20 // ArchiveStream implements the logdog.ServicesServer interface. 24 // ArchiveStream implements the logdog.ServicesServer interface.
21 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) { 25 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) {
22 if err := Auth(c); err != nil { 26 if err := Auth(c); err != nil {
23 return nil, err 27 return nil, err
24 } 28 }
25 29
26 log.Fields{ 30 log.Fields{
27 "path": req.Path, 31 "path": req.Path,
28 }.Infof(c, "Marking log stream as archived.") 32 }.Infof(c, "Marking log stream as archived.")
29 33
(...skipping 23 matching lines...) Expand all
53 case ds.ErrNoSuchEntity: 57 case ds.ErrNoSuchEntity:
54 break 58 break
55 59
56 default: 60 default:
57 log.WithError(err).Errorf(c, "Failed to check for log stream arc hvial state.") 61 log.WithError(err).Errorf(c, "Failed to check for log stream arc hvial state.")
58 return nil, grpcutil.Internal 62 return nil, grpcutil.Internal
59 } 63 }
60 64
61 // Post the archival results to the Coordinator. 65 // Post the archival results to the Coordinator.
62 now := clock.Now(c).UTC() 66 now := clock.Now(c).UTC()
67 var ierr error
63 err := ds.Get(c).RunInTransaction(func(c context.Context) error { 68 err := ds.Get(c).RunInTransaction(func(c context.Context) error {
64 di := ds.Get(c) 69 di := ds.Get(c)
65 if err := di.Get(ls); err != nil { 70 if err := di.Get(ls); err != nil {
66 return err 71 return err
67 } 72 }
68 if ls.Archived() { 73 if ls.Archived() {
69 log.Infof(c, "Log stream already marked as archived.") 74 log.Infof(c, "Log stream already marked as archived.")
70 return nil 75 return nil
71 } 76 }
72 77
78 // If this request contained an error, we will reject it with
79 // FailedPrecondition status if we are below our error threshold .
80 if req.Error {
81 if ls.ArchiveErrors < maxArchiveErrors {
82 log.Fields{
83 "path": req.Path,
84 "errorCount": ls.ArchiveErrors,
85 "maxArchiveErrors": maxArchiveErrors,
86 }.Warningf(c, "Rejecting failed archival: below error threshold.")
87
88 // Increment our error count.
89 ls.ArchiveErrors++
90 if err := ls.Put(di); err != nil {
91 log.WithError(err).Errorf(c, "Failed to update log stream error count.")
92 return err
93 }
94
95 // Fail this RPC call to keep the archival task in the queue.
96 ierr = grpcutil.Errf(codes.FailedPrecondition, " below error threshold (%d < %d)", ls.ArchiveErrors, maxArchiveErrors)
97 return nil
98 }
99
100 // We have exceeded our error threshold, so we will cont inue to archive
101 // this log stream.
102 log.Fields{
103 "path": req.Path,
104 "errorCount": ls.ArchiveErrors,
105 "maxArchiveErrors": maxArchiveErrors,
106 }.Warningf(c, "Log stream has exceeded archive error thr eshold.")
107 }
108
73 // Update archival information. Make sure this actually marks th e stream as 109 // Update archival information. Make sure this actually marks th e stream as
74 // archived. 110 // archived.
75 ls.Updated = now 111 ls.Updated = now
76 ls.State = coordinator.LSArchived 112 ls.State = coordinator.LSArchived
77 ls.ArchiveWhole = req.Complete 113 ls.ArchiveWhole = req.Complete
78 ls.TerminalIndex = req.TerminalIndex 114 ls.TerminalIndex = req.TerminalIndex
79 ls.ArchiveStreamURL = req.StreamUrl 115 ls.ArchiveStreamURL = req.StreamUrl
80 ls.ArchiveStreamSize = req.StreamSize 116 ls.ArchiveStreamSize = req.StreamSize
81 ls.ArchiveIndexURL = req.IndexUrl 117 ls.ArchiveIndexURL = req.IndexUrl
82 ls.ArchiveIndexSize = req.IndexSize 118 ls.ArchiveIndexSize = req.IndexSize
83 ls.ArchiveDataURL = req.DataUrl 119 ls.ArchiveDataURL = req.DataUrl
84 ls.ArchiveDataSize = req.DataSize 120 ls.ArchiveDataSize = req.DataSize
85 121
122 // Determine archive state.
123 switch {
124 case req.Error:
125 ls.ArchiveState = coordinator.ArchivedWithErrors
126 case !req.Complete:
127 ls.ArchiveState = coordinator.ArchivedPartially
128 default:
129 ls.ArchiveState = coordinator.Archived
130 }
131
86 // Update the log stream. 132 // Update the log stream.
87 if err := ls.Put(di); err != nil { 133 if err := ls.Put(di); err != nil {
88 log.WithError(err).Errorf(c, "Failed to update log strea m.") 134 log.WithError(err).Errorf(c, "Failed to update log strea m.")
89 return err 135 return err
90 } 136 }
91 137
92 log.Infof(c, "Successfully marked stream as archived.") 138 log.Infof(c, "Successfully marked stream as archived.")
93 return nil 139 return nil
94 }, nil) 140 }, nil)
95 if err != nil { 141 if err != nil {
96 log.WithError(err).Errorf(c, "Failed to mark stream as archived. ") 142 log.WithError(err).Errorf(c, "Failed to mark stream as archived. ")
97 return nil, grpcutil.Internal 143 return nil, grpcutil.Internal
98 } 144 }
145 if ierr != nil {
146 log.WithError(ierr).Errorf(c, "Failed to mark stream as archived (inner).")
147 return nil, ierr
148 }
99 149
100 return &google.Empty{}, nil 150 return &google.Empty{}, nil
101 } 151 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698