Chromium Code Reviews| Index: appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream.go b/appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| index 2b9530ee2e288418b0aac6c622bfe7801376b33f..c2cac5176f7ebb5d6ebf366e9f40151db8324c05 100644 |
| --- a/appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| +++ b/appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| @@ -17,6 +17,10 @@ import ( |
| "google.golang.org/grpc/codes" |
| ) |
| +// maxArchiveErrors is the maximum number of archival errors that we will |
| +// tolerate before accepting a failed archival. |
| +const maxArchiveErrors = 2 |
| + |
| // ArchiveStream implements the logdog.ServicesServer interface. |
| func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamRequest) (*google.Empty, error) { |
| if err := Auth(c); err != nil { |
| @@ -60,6 +64,7 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque |
| // Post the archival results to the Coordinator. |
| now := clock.Now(c).UTC() |
| + var ierr error |
| err := ds.Get(c).RunInTransaction(func(c context.Context) error { |
| di := ds.Get(c) |
| if err := di.Get(ls); err != nil { |
| @@ -70,6 +75,37 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque |
| return nil |
| } |
| + // If this request contained an error, we will reject it with |
| + // FailedPrecondition status if we are below our error threshold. |
| + if req.Error { |
| + if ls.ArchiveErrors < maxArchiveErrors { |
| + log.Fields{ |
| + "path": req.Path, |
| + "errorCount": ls.ArchiveErrors, |
| + "maxArchiveErrors": maxArchiveErrors, |
| + }.Warningf(c, "Rejecting failed archival: below error threshold.") |
| + |
| + // Increment our error count. |
| + ls.ArchiveErrors++ |
| + if err := ls.Put(di); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to update log stream error count.") |
| + return err |
| + } |
| + |
| + // Fail this RPC call to keep the archival task in the queue. |
| + ierr = grpcutil.Errf(codes.FailedPrecondition, "below error threshold (%d < %d)", ls.ArchiveErrors, maxArchiveErrors) |
| + return nil |
| + } |
| + |
| + // We have exceeded our error threshold, so we will continue to archive |
| + // this log stream. |
| + log.Fields{ |
|
Vadim Sh.
2016/03/31 22:29:59
maybe mark the entity with some indexable boolean,
dnj
2016/04/01 22:57:04
Done. I introduced "ArchiveState", which can be ei
|
| + "path": req.Path, |
| + "errorCount": ls.ArchiveErrors, |
| + "maxArchiveErrors": maxArchiveErrors, |
| + }.Warningf(c, "Log stream has exceeded archive error threshold.") |
| + } |
| + |
| // Update archival information. Make sure this actually marks the stream as |
| // archived. |
| ls.Updated = now |
| @@ -96,6 +132,10 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque |
| log.WithError(err).Errorf(c, "Failed to mark stream as archived.") |
| return nil, grpcutil.Internal |
| } |
| + if ierr != nil { |
| + log.WithError(ierr).Errorf(c, "Failed to mark stream as archived (inner).") |
| + return nil, ierr |
| + } |
| return &google.Empty{}, nil |
| } |