Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2277)

Unified Diff: appengine/logdog/coordinator/endpoints/services/archiveStream.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/services/archiveStream.go
diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream.go b/appengine/logdog/coordinator/endpoints/services/archiveStream.go
index 2b9530ee2e288418b0aac6c622bfe7801376b33f..84b3e884edf8d161bc4d3bbd1f4d428791800e9e 100644
--- a/appengine/logdog/coordinator/endpoints/services/archiveStream.go
+++ b/appengine/logdog/coordinator/endpoints/services/archiveStream.go
@@ -7,6 +7,7 @@ package services
import (
ds "github.com/luci/gae/service/datastore"
"github.com/luci/luci-go/appengine/logdog/coordinator"
+ "github.com/luci/luci-go/appengine/logdog/coordinator/config"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/grpcutil"
@@ -23,9 +24,20 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
return nil, err
}
+ cfg, err := config.Load(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to load configuration.")
+ return nil, grpcutil.Internal
+ }
+ ccfg := cfg.GetCoordinator() // config.Load ensures this is not nil.
+
log.Fields{
- "path": req.Path,
- }.Infof(c, "Marking log stream as archived.")
+ "path": req.Path,
+ "complete": req.Complete(),
+ "terminalIndex": req.TerminalIndex,
+ "logEntryCount": req.LogEntryCount,
+ "error": req.Error,
+ }.Infof(c, "Received archival request.")
// Verify that the request is minimially valid.
path := types.StreamPath(req.Path)
@@ -42,40 +54,87 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
ls := coordinator.LogStreamFromPath(path)
- // (Non-transactional) Is the log stream already archived?
- switch err := ds.Get(c).Get(ls); err {
- case nil:
- if ls.Archived() {
- log.Infof(c, "Log stream already marked as archived (non-transactional).")
- return &google.Empty{}, nil
- }
-
- case ds.ErrNoSuchEntity:
- break
-
- default:
- log.WithError(err).Errorf(c, "Failed to check for log stream archvial state.")
- return nil, grpcutil.Internal
- }
+ log.Fields{
+ "id": ls.HashID(),
+ }.Infof(c, "Log stream ID.")
// Post the archival results to the Coordinator.
now := clock.Now(c).UTC()
- err := ds.Get(c).RunInTransaction(func(c context.Context) error {
+ var ierr error
Vadim Sh. 2016/04/07 01:21:32 nit: reset ierr to nil at the beginning of a trans
dnj 2016/04/11 17:20:03 Done.
+ err = ds.Get(c).RunInTransaction(func(c context.Context) error {
+ // Note that within this transaction, we have two return values:
+ // - Non-nil to abort the transaction.
+ // - Specific error via "ierr".
di := ds.Get(c)
if err := di.Get(ls); err != nil {
return err
}
- if ls.Archived() {
- log.Infof(c, "Log stream already marked as archived.")
+
+ // If our log stream is not in LSArchiveTasked, we will reject this archive
+ // request with FailedPrecondition.
+ switch {
+ case ls.Archived():
+ // Return nil if the log stream is already archived (idempotent).
+ log.Warningf(c, "Log stream is already archived.")
return nil
+
+ case ls.State != coordinator.LSArchiveTasked:
+ log.Fields{
+ "state": ls.State,
+ }.Errorf(c, "Log stream is not in archival tasked state.")
+ ierr = grpcutil.Errf(codes.FailedPrecondition, "Log stream has not tasked an archival.")
+ return ierr
+ }
+
+ // If this request contained an error, we will reject it with Aborted status
+ // if we are below our error threshold.
+ if req.Error {
+ // Increment our error count.
+ ls.ArchiveErrors++
+ if err := di.Put(ls); err != nil {
+ log.WithError(err).Errorf(c, "Failed to update log stream error count.")
+ return err
+ }
+
+ retries := int(ccfg.ArchiveRetries)
+ if ls.ArchiveErrors < retries {
+ log.Fields{
+ "path": req.Path,
+ "errorCount": ls.ArchiveErrors,
+ "archiveRetries": retries,
+ }.Warningf(c, "Rejecting failed archival: below error threshold.")
+
+ // Fail this RPC call to keep the archival task in the queue.
+ //
+ // We return this via "ierr" because we want the transaction to succeed.
+ ierr = grpcutil.Errf(codes.Aborted, "below error threshold (%d < %d)", ls.ArchiveErrors, retries)
+ return nil
+ }
+
+ // We have exceeded our error threshold, so we will continue to archive
+ // this log stream.
+ log.Fields{
+ "path": req.Path,
+ "errorCount": ls.ArchiveErrors,
+ "archiveRetries": retries,
+ }.Warningf(c, "Log stream has exceeded archive error threshold. Archiving empty stream.")
+
+ req.TerminalIndex = -1
+ req.LogEntryCount = 0
}
// Update archival information. Make sure this actually marks the stream as
// archived.
- ls.Updated = now
ls.State = coordinator.LSArchived
- ls.ArchiveWhole = req.Complete
+ ls.ArchivedTime = now
+
+ if ls.TerminalIndex < 0 {
+ // Also set the terminated time.
+ ls.TerminatedTime = now
+ }
ls.TerminalIndex = req.TerminalIndex
+
+ ls.ArchiveLogEntryCount = req.LogEntryCount
ls.ArchiveStreamURL = req.StreamUrl
ls.ArchiveStreamSize = req.StreamSize
ls.ArchiveIndexURL = req.IndexUrl
@@ -84,7 +143,7 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
ls.ArchiveDataSize = req.DataSize
// Update the log stream.
- if err := ls.Put(di); err != nil {
+ if err := di.Put(ls); err != nil {
log.WithError(err).Errorf(c, "Failed to update log stream.")
return err
}
@@ -92,8 +151,12 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
log.Infof(c, "Successfully marked stream as archived.")
return nil
}, nil)
- if err != nil {
+ if ierr != nil {
log.WithError(err).Errorf(c, "Failed to mark stream as archived.")
+ return nil, ierr
+ }
+ if err != nil {
+ log.WithError(err).Errorf(c, "Internal error.")
return nil, grpcutil.Internal
}

Powered by Google App Engine
This is Rietveld 408576698