Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(97)

Unified Diff: server/internal/logdog/archivist/archivist.go

Issue 1874563005: Archivist asserts completeness through keys scan. (Closed) Base URL: https://github.com/luci/luci-go@logdog-storage-keysonly
Patch Set: Remove unused members. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/internal/logdog/archivist/archivist.go
diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go
index ca0a4c2d05f7175f6132bc49c17a628c0676481a..33e837df9b4862dfbc92e3b4e8660ae7663a2e98 100644
--- a/server/internal/logdog/archivist/archivist.go
+++ b/server/internal/logdog/archivist/archivist.go
@@ -172,10 +172,20 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
}
// Are we required to archive a complete log stream?
- complete := (age <= at.CompletePeriod.Duration())
- if complete && ls.State.TerminalIndex < 0 {
- log.Warningf(c, "Cannot archive complete stream with no terminal index.")
- return false, errors.New("completeness required, but stream has no terminal index")
+ if age <= at.CompletePeriod.Duration() {
+ tidx := ls.State.TerminalIndex
+
+ if tidx < 0 {
+ log.Warningf(c, "Cannot archive complete stream with no terminal index.")
+ return false, errors.New("completeness required, but stream has no terminal index")
+ }
+
+ // If we're requiring completeness, perform a keys-only scan of intermediate
+ // storage to ensure that we have all of the records before we bother
+ // streaming to storage only to find that we are missing data.
+ if err := a.checkComplete(types.StreamPath(at.Path), types.MessageIndex(tidx)); err != nil {
+ return false, err
+ }
}
ar := logdog.ArchiveStreamRequest{
@@ -193,7 +203,7 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
if err != nil {
log.WithError(err).Errorf(c, "Failed to create staged archival plan.")
} else {
- err = staged.stage(c, complete)
+ err = staged.stage(c)
}
switch {
@@ -259,6 +269,40 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
return true, nil
}
+// checkComplete performs a quick scan of intermediate storage to ensure that
+// all of the log stream's records are available.
+func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex) error {
+ sreq := storage.GetRequest{
+ Path: path,
+ KeysOnly: true,
+ }
+
+ nextIndex := types.MessageIndex(0)
+ var ierr error
+ err := a.Storage.Get(sreq, func(idx types.MessageIndex, d []byte) bool {
+ switch {
+ case idx != nextIndex:
+ ierr = fmt.Errorf("missing log entry index %d (next %d)", nextIndex, idx)
+ return false
+
+ case idx == tidx:
+ // We have hit our terminal index, so all of the log data is here!
+ return false
+
+ default:
+ nextIndex++
+ return true
+ }
+ })
+ if ierr != nil {
+ return ierr
+ }
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) (
*stagedArchival, error) {
sa := stagedArchival{
@@ -306,14 +350,9 @@ type stagedArchival struct {
path types.StreamPath
desc logpb.LogStreamDescriptor
- stream stagingPaths
- streamSize int64
-
- index stagingPaths
- indexSize int64
-
- data stagingPaths
- dataSize int64
+ stream stagingPaths
+ index stagingPaths
+ data stagingPaths
finalized bool
terminalIndex int64
@@ -323,7 +362,7 @@ type stagedArchival struct {
// stage executes the archival process, archiving to the staged storage paths.
//
// If stage fails, it may return a transient error.
-func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) {
+func (sa *stagedArchival) stage(c context.Context) (err error) {
log.Fields{
"streamURL": sa.stream.staged,
"indexURL": sa.index.staged,
@@ -396,7 +435,6 @@ func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) {
Context: c,
st: sa.Storage,
path: sa.path,
- contiguous: complete,
terminalIndex: types.MessageIndex(sa.terminalIndex),
lastIndex: -1,
}
@@ -418,28 +456,17 @@ func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) {
return
}
- if tidx := sa.terminalIndex; tidx != int64(ss.lastIndex) {
- // Fail if we were requested to archive only the complete log. We consider
- // this a transient error with the expectation that the missing entries will
- // show up in future retries.
- switch {
- case complete && ss.hasMissingEntries:
- log.Errorf(c, "Log stream has missing entries, but completeness is required.")
- err = errors.WrapTransient(errors.New("stream has missing entries"))
- return
-
- case ss.logEntryCount == 0:
- // If our last log index was <0, then no logs were archived.
- log.Warningf(c, "No log entries were archived.")
+ switch {
+ case ss.logEntryCount == 0:
+ // If our last log index was <0, then no logs were archived.
+ log.Warningf(c, "No log entries were archived.")
- default:
- // Update our terminal index.
- log.Fields{
- "terminalIndex": ss.lastIndex,
- "logEntryCount": ss.logEntryCount,
- "hasMissingEntries": ss.hasMissingEntries,
- }.Debugf(c, "Finished archiving log stream.")
- }
+ default:
+ // Update our terminal index.
+ log.Fields{
+ "terminalIndex": ss.lastIndex,
+ "logEntryCount": ss.logEntryCount,
+ }.Debugf(c, "Finished archiving log stream.")
}
// Update our state with archival results.
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698