| Index: appengine/logdog/coordinator/endpoints/services/archiveStream.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream.go b/appengine/logdog/coordinator/endpoints/services/archiveStream.go
|
| index 2b9530ee2e288418b0aac6c622bfe7801376b33f..34d2af8a41901c3e841a80f6f3ead729f2431501 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/archiveStream.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/archiveStream.go
|
| @@ -17,6 +17,10 @@ import (
|
| "google.golang.org/grpc/codes"
|
| )
|
|
|
| +// maxArchiveErrors is the maximum number of archival errors that we will
|
| +// tolerate before accepting a failed archival.
|
| +const maxArchiveErrors = 2
|
| +
|
| // ArchiveStream implements the logdog.ServicesServer interface.
|
| func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamRequest) (*google.Empty, error) {
|
| if err := Auth(c); err != nil {
|
| @@ -60,6 +64,7 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
|
|
|
| // Post the archival results to the Coordinator.
|
| now := clock.Now(c).UTC()
|
| + var ierr error
|
| err := ds.Get(c).RunInTransaction(func(c context.Context) error {
|
| di := ds.Get(c)
|
| if err := di.Get(ls); err != nil {
|
| @@ -70,6 +75,37 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
|
| return nil
|
| }
|
|
|
| + // If this request contained an error, we will reject it with
|
| + // FailedPrecondition status if we are below our error threshold.
|
| + if req.Error {
|
| + if ls.ArchiveErrors < maxArchiveErrors {
|
| + log.Fields{
|
| + "path": req.Path,
|
| + "errorCount": ls.ArchiveErrors,
|
| + "maxArchiveErrors": maxArchiveErrors,
|
| + }.Warningf(c, "Rejecting failed archival: below error threshold.")
|
| +
|
| + // Increment our error count.
|
| + ls.ArchiveErrors++
|
| + if err := ls.Put(di); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to update log stream error count.")
|
| + return err
|
| + }
|
| +
|
| + // Fail this RPC call to keep the archival task in the queue.
|
| + ierr = grpcutil.Errf(codes.FailedPrecondition, "below error threshold (%d < %d)", ls.ArchiveErrors, maxArchiveErrors)
|
| + return nil
|
| + }
|
| +
|
| + // We have exceeded our error threshold, so we will continue to archive
|
| + // this log stream.
|
| + log.Fields{
|
| + "path": req.Path,
|
| + "errorCount": ls.ArchiveErrors,
|
| + "maxArchiveErrors": maxArchiveErrors,
|
| + }.Warningf(c, "Log stream has exceeded archive error threshold.")
|
| + }
|
| +
|
| // Update archival information. Make sure this actually marks the stream as
|
| // archived.
|
| ls.Updated = now
|
| @@ -83,6 +119,16 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
|
| ls.ArchiveDataURL = req.DataUrl
|
| ls.ArchiveDataSize = req.DataSize
|
|
|
| + // Determine archive state.
|
| + switch {
|
| + case req.Error:
|
| + ls.ArchiveState = coordinator.ArchivedWithErrors
|
| + case !req.Complete:
|
| + ls.ArchiveState = coordinator.ArchivedPartially
|
| + default:
|
| + ls.ArchiveState = coordinator.Archived
|
| + }
|
| +
|
| // Update the log stream.
|
| if err := ls.Put(di); err != nil {
|
| log.WithError(err).Errorf(c, "Failed to update log stream.")
|
| @@ -96,6 +142,10 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
|
| log.WithError(err).Errorf(c, "Failed to mark stream as archived.")
|
| return nil, grpcutil.Internal
|
| }
|
| + if ierr != nil {
|
| + log.WithError(ierr).Errorf(c, "Failed to mark stream as archived (inner).")
|
| + return nil, ierr
|
| + }
|
|
|
| return &google.Empty{}, nil
|
| }
|
|
|