| Index: server/internal/logdog/archivist/archivist.go
|
| diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go
|
| index ca0a4c2d05f7175f6132bc49c17a628c0676481a..33e837df9b4862dfbc92e3b4e8660ae7663a2e98 100644
|
| --- a/server/internal/logdog/archivist/archivist.go
|
| +++ b/server/internal/logdog/archivist/archivist.go
|
| @@ -172,10 +172,20 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
|
| }
|
|
|
| // Are we required to archive a complete log stream?
|
| - complete := (age <= at.CompletePeriod.Duration())
|
| - if complete && ls.State.TerminalIndex < 0 {
|
| - log.Warningf(c, "Cannot archive complete stream with no terminal index.")
|
| - return false, errors.New("completeness required, but stream has no terminal index")
|
| + if age <= at.CompletePeriod.Duration() {
|
| + tidx := ls.State.TerminalIndex
|
| +
|
| + if tidx < 0 {
|
| + log.Warningf(c, "Cannot archive complete stream with no terminal index.")
|
| + return false, errors.New("completeness required, but stream has no terminal index")
|
| + }
|
| +
|
| + // If we're requiring completeness, perform a keys-only scan of intermediate
|
| + // storage to ensure that we have all of the records before we bother
|
| + // streaming to storage only to find that we are missing data.
|
| + if err := a.checkComplete(types.StreamPath(at.Path), types.MessageIndex(tidx)); err != nil {
|
| + return false, err
|
| + }
|
| }
|
|
|
| ar := logdog.ArchiveStreamRequest{
|
| @@ -193,7 +203,7 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
|
| if err != nil {
|
| log.WithError(err).Errorf(c, "Failed to create staged archival plan.")
|
| } else {
|
| - err = staged.stage(c, complete)
|
| + err = staged.stage(c)
|
| }
|
|
|
| switch {
|
| @@ -259,6 +269,40 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
|
| return true, nil
|
| }
|
|
|
| +// checkComplete performs a quick scan of intermediate storage to ensure that
|
| +// all of the log stream's records are available.
|
| +func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex) error {
|
| + sreq := storage.GetRequest{
|
| + Path: path,
|
| + KeysOnly: true,
|
| + }
|
| +
|
| + nextIndex := types.MessageIndex(0)
|
| + var ierr error
|
| + err := a.Storage.Get(sreq, func(idx types.MessageIndex, d []byte) bool {
|
| + switch {
|
| + case idx != nextIndex:
|
| + ierr = fmt.Errorf("missing log entry index %d (next %d)", nextIndex, idx)
|
| + return false
|
| +
|
| + case idx == tidx:
|
| + // We have hit our terminal index, so all of the log data is here!
|
| + return false
|
| +
|
| + default:
|
| + nextIndex++
|
| + return true
|
| + }
|
| + })
|
| + if ierr != nil {
|
| + return ierr
|
| + }
|
| + if err != nil {
|
| + return err
|
| + }
|
| + return nil
|
| +}
|
| +
|
| func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) (
|
| *stagedArchival, error) {
|
| sa := stagedArchival{
|
| @@ -306,14 +350,9 @@ type stagedArchival struct {
|
| path types.StreamPath
|
| desc logpb.LogStreamDescriptor
|
|
|
| - stream stagingPaths
|
| - streamSize int64
|
| -
|
| - index stagingPaths
|
| - indexSize int64
|
| -
|
| - data stagingPaths
|
| - dataSize int64
|
| + stream stagingPaths
|
| + index stagingPaths
|
| + data stagingPaths
|
|
|
| finalized bool
|
| terminalIndex int64
|
| @@ -323,7 +362,7 @@ type stagedArchival struct {
|
| // stage executes the archival process, archiving to the staged storage paths.
|
| //
|
| // If stage fails, it may return a transient error.
|
| -func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) {
|
| +func (sa *stagedArchival) stage(c context.Context) (err error) {
|
| log.Fields{
|
| "streamURL": sa.stream.staged,
|
| "indexURL": sa.index.staged,
|
| @@ -396,7 +435,6 @@ func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) {
|
| Context: c,
|
| st: sa.Storage,
|
| path: sa.path,
|
| - contiguous: complete,
|
| terminalIndex: types.MessageIndex(sa.terminalIndex),
|
| lastIndex: -1,
|
| }
|
| @@ -418,28 +456,17 @@ func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) {
|
| return
|
| }
|
|
|
| - if tidx := sa.terminalIndex; tidx != int64(ss.lastIndex) {
|
| - // Fail if we were requested to archive only the complete log. We consider
|
| - // this a transient error with the expectation that the missing entries will
|
| - // show up in future retries.
|
| - switch {
|
| - case complete && ss.hasMissingEntries:
|
| - log.Errorf(c, "Log stream has missing entries, but completeness is required.")
|
| - err = errors.WrapTransient(errors.New("stream has missing entries"))
|
| - return
|
| -
|
| - case ss.logEntryCount == 0:
|
| - // If our last log index was <0, then no logs were archived.
|
| - log.Warningf(c, "No log entries were archived.")
|
| + switch {
|
| + case ss.logEntryCount == 0:
|
| + // If our last log index was <0, then no logs were archived.
|
| + log.Warningf(c, "No log entries were archived.")
|
|
|
| - default:
|
| - // Update our terminal index.
|
| - log.Fields{
|
| - "terminalIndex": ss.lastIndex,
|
| - "logEntryCount": ss.logEntryCount,
|
| - "hasMissingEntries": ss.hasMissingEntries,
|
| - }.Debugf(c, "Finished archiving log stream.")
|
| - }
|
| + default:
|
| + // Update our terminal index.
|
| + log.Fields{
|
| + "terminalIndex": ss.lastIndex,
|
| + "logEntryCount": ss.logEntryCount,
|
| + }.Debugf(c, "Finished archiving log stream.")
|
| }
|
|
|
| // Update our state with archival results.
|
|
|