Chromium Code Reviews| Index: appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream.go b/appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| index 2b9530ee2e288418b0aac6c622bfe7801376b33f..84b3e884edf8d161bc4d3bbd1f4d428791800e9e 100644 |
| --- a/appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| +++ b/appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| @@ -7,6 +7,7 @@ package services |
| import ( |
| ds "github.com/luci/gae/service/datastore" |
| "github.com/luci/luci-go/appengine/logdog/coordinator" |
| + "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| "github.com/luci/luci-go/common/clock" |
| "github.com/luci/luci-go/common/grpcutil" |
| @@ -23,9 +24,20 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque |
| return nil, err |
| } |
| + cfg, err := config.Load(c) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to load configuration.") |
| + return nil, grpcutil.Internal |
| + } |
| + ccfg := cfg.GetCoordinator() // config.Load ensures this is not nil. |
| + |
| log.Fields{ |
| - "path": req.Path, |
| - }.Infof(c, "Marking log stream as archived.") |
| + "path": req.Path, |
| + "complete": req.Complete(), |
| + "terminalIndex": req.TerminalIndex, |
| + "logEntryCount": req.LogEntryCount, |
| + "error": req.Error, |
| + }.Infof(c, "Received archival request.") |
| // Verify that the request is minimially valid. |
| path := types.StreamPath(req.Path) |
| @@ -42,40 +54,87 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque |
| ls := coordinator.LogStreamFromPath(path) |
| - // (Non-transactional) Is the log stream already archived? |
| - switch err := ds.Get(c).Get(ls); err { |
| - case nil: |
| - if ls.Archived() { |
| - log.Infof(c, "Log stream already marked as archived (non-transactional).") |
| - return &google.Empty{}, nil |
| - } |
| - |
| - case ds.ErrNoSuchEntity: |
| - break |
| - |
| - default: |
| - log.WithError(err).Errorf(c, "Failed to check for log stream archvial state.") |
| - return nil, grpcutil.Internal |
| - } |
| + log.Fields{ |
| + "id": ls.HashID(), |
| + }.Infof(c, "Log stream ID.") |
| // Post the archival results to the Coordinator. |
| now := clock.Now(c).UTC() |
| - err := ds.Get(c).RunInTransaction(func(c context.Context) error { |
| + var ierr error |
|
Vadim Sh.
2016/04/07 01:21:32
nit: reset ierr to nil at the beginning of a trans
dnj
2016/04/11 17:20:03
Done.
|
| + err = ds.Get(c).RunInTransaction(func(c context.Context) error { |
| + // Note that within this transaction, we have two return values: |
| + // - Non-nil to abort the transaction. |
| + // - Specific error via "ierr". |
| di := ds.Get(c) |
| if err := di.Get(ls); err != nil { |
| return err |
| } |
| - if ls.Archived() { |
| - log.Infof(c, "Log stream already marked as archived.") |
| + |
| + // If our log stream is not in LSArchiveTasked, we will reject this archive |
| + // request with FailedPrecondition. |
| + switch { |
| + case ls.Archived(): |
| + // Return nil if the log stream is already archived (idempotent). |
| + log.Warningf(c, "Log stream is already archived.") |
| return nil |
| + |
| + case ls.State != coordinator.LSArchiveTasked: |
| + log.Fields{ |
| + "state": ls.State, |
| + }.Errorf(c, "Log stream is not in archival tasked state.") |
| + ierr = grpcutil.Errf(codes.FailedPrecondition, "Log stream has not tasked an archival.") |
| + return ierr |
| + } |
| + |
| + // If this request contained an error, we will reject it with Aborted status |
| + // if we are below our error threshold. |
| + if req.Error { |
| + // Increment our error count. |
| + ls.ArchiveErrors++ |
| + if err := di.Put(ls); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to update log stream error count.") |
| + return err |
| + } |
| + |
| + retries := int(ccfg.ArchiveRetries) |
| + if ls.ArchiveErrors < retries { |
| + log.Fields{ |
| + "path": req.Path, |
| + "errorCount": ls.ArchiveErrors, |
| + "archiveRetries": retries, |
| + }.Warningf(c, "Rejecting failed archival: below error threshold.") |
| + |
| + // Fail this RPC call to keep the archival task in the queue. |
| + // |
| + // We return this via "ierr" because we want the transaction to succeed. |
| + ierr = grpcutil.Errf(codes.Aborted, "below error threshold (%d < %d)", ls.ArchiveErrors, retries) |
| + return nil |
| + } |
| + |
| + // We have exceeded our error threshold, so we will continue to archive |
| + // this log stream. |
| + log.Fields{ |
| + "path": req.Path, |
| + "errorCount": ls.ArchiveErrors, |
| + "archiveRetries": retries, |
| + }.Warningf(c, "Log stream has exceeded archive error threshold. Archiving empty stream.") |
| + |
| + req.TerminalIndex = -1 |
| + req.LogEntryCount = 0 |
| } |
| // Update archival information. Make sure this actually marks the stream as |
| // archived. |
| - ls.Updated = now |
| ls.State = coordinator.LSArchived |
| - ls.ArchiveWhole = req.Complete |
| + ls.ArchivedTime = now |
| + |
| + if ls.TerminalIndex < 0 { |
| + // Also set the terminated time. |
| + ls.TerminatedTime = now |
| + } |
| ls.TerminalIndex = req.TerminalIndex |
| + |
| + ls.ArchiveLogEntryCount = req.LogEntryCount |
| ls.ArchiveStreamURL = req.StreamUrl |
| ls.ArchiveStreamSize = req.StreamSize |
| ls.ArchiveIndexURL = req.IndexUrl |
| @@ -84,7 +143,7 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque |
| ls.ArchiveDataSize = req.DataSize |
| // Update the log stream. |
| - if err := ls.Put(di); err != nil { |
| + if err := di.Put(ls); err != nil { |
| log.WithError(err).Errorf(c, "Failed to update log stream.") |
| return err |
| } |
| @@ -92,8 +151,12 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque |
| log.Infof(c, "Successfully marked stream as archived.") |
| return nil |
| }, nil) |
| - if err != nil { |
| + if ierr != nil { |
| log.WithError(err).Errorf(c, "Failed to mark stream as archived.") |
| + return nil, ierr |
| + } |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Internal error.") |
| return nil, grpcutil.Internal |
| } |