Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(312)

Unified Diff: appengine/logdog/coordinator/endpoints/services/archiveStream.go

Issue 1853433002: LogDog: Handle archive failures. (Closed) Base URL: https://github.com/luci/luci-go@logdog-gs-update
Patch Set: Regenerate protobufs. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/services/archiveStream.go
diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream.go b/appengine/logdog/coordinator/endpoints/services/archiveStream.go
index 2b9530ee2e288418b0aac6c622bfe7801376b33f..34d2af8a41901c3e841a80f6f3ead729f2431501 100644
--- a/appengine/logdog/coordinator/endpoints/services/archiveStream.go
+++ b/appengine/logdog/coordinator/endpoints/services/archiveStream.go
@@ -17,6 +17,10 @@ import (
"google.golang.org/grpc/codes"
)
+// maxArchiveErrors is the maximum number of archival errors that we will
+// tolerate before accepting a failed archival.
+const maxArchiveErrors = 2
+
// ArchiveStream implements the logdog.ServicesServer interface.
func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamRequest) (*google.Empty, error) {
if err := Auth(c); err != nil {
@@ -60,6 +64,7 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
// Post the archival results to the Coordinator.
now := clock.Now(c).UTC()
+ var ierr error
err := ds.Get(c).RunInTransaction(func(c context.Context) error {
di := ds.Get(c)
if err := di.Get(ls); err != nil {
@@ -70,6 +75,37 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
return nil
}
+ // If this request contained an error, we will reject it with
+ // FailedPrecondition status if we are below our error threshold.
+ if req.Error {
+ if ls.ArchiveErrors < maxArchiveErrors {
+ log.Fields{
+ "path": req.Path,
+ "errorCount": ls.ArchiveErrors,
+ "maxArchiveErrors": maxArchiveErrors,
+ }.Warningf(c, "Rejecting failed archival: below error threshold.")
+
+ // Increment our error count.
+ ls.ArchiveErrors++
+ if err := ls.Put(di); err != nil {
+ log.WithError(err).Errorf(c, "Failed to update log stream error count.")
+ return err
+ }
+
+ // Fail this RPC call to keep the archival task in the queue.
+ ierr = grpcutil.Errf(codes.FailedPrecondition, "below error threshold (%d < %d)", ls.ArchiveErrors, maxArchiveErrors)
+ return nil
+ }
+
+ // We have exceeded our error threshold, so we will continue to archive
+ // this log stream.
+ log.Fields{
+ "path": req.Path,
+ "errorCount": ls.ArchiveErrors,
+ "maxArchiveErrors": maxArchiveErrors,
+ }.Warningf(c, "Log stream has exceeded archive error threshold.")
+ }
+
// Update archival information. Make sure this actually marks the stream as
// archived.
ls.Updated = now
@@ -83,6 +119,16 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
ls.ArchiveDataURL = req.DataUrl
ls.ArchiveDataSize = req.DataSize
+ // Determine archive state.
+ switch {
+ case req.Error:
+ ls.ArchiveState = coordinator.ArchivedWithErrors
+ case !req.Complete:
+ ls.ArchiveState = coordinator.ArchivedPartially
+ default:
+ ls.ArchiveState = coordinator.Archived
+ }
+
// Update the log stream.
if err := ls.Put(di); err != nil {
log.WithError(err).Errorf(c, "Failed to update log stream.")
@@ -96,6 +142,10 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
log.WithError(err).Errorf(c, "Failed to mark stream as archived.")
return nil, grpcutil.Internal
}
+ if ierr != nil {
+ log.WithError(ierr).Errorf(c, "Failed to mark stream as archived (inner).")
+ return nil, ierr
+ }
return &google.Empty{}, nil
}

Powered by Google App Engine
This is Rietveld 408576698