Chromium Code Reviews| Index: server/internal/logdog/archivist/archivist.go |
| diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go |
| index 1d79af1d0225532985066c380010d92f0ad693f4..4a3cefa3ff7edf6cd0958082ac0fc98828fcaa75 100644 |
| --- a/server/internal/logdog/archivist/archivist.go |
| +++ b/server/internal/logdog/archivist/archivist.go |
| @@ -6,6 +6,7 @@ package archivist |
| import ( |
| "fmt" |
| + "time" |
| "github.com/golang/protobuf/proto" |
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| @@ -52,23 +53,37 @@ type Archivist struct { |
| const storageBufferSize = types.MaxLogEntryDataSize * 64 |
| // ArchiveTask processes and executes a single log stream archive task. |
| -func (a *Archivist) ArchiveTask(c context.Context, desc []byte) error { |
| +// |
| +// It returns true on success (delete the task) and false on failure (don't |
| +// delete the task). The return value of true should only be used if the task |
| +// is truely complete, as deleting a named task in the task queue will cause it |
|
Vadim Sh.
2016/04/07 01:21:32
typo: truly
also you don't use named task (anymor
dnj
2016/04/11 17:20:04
Done.
|
| +// to not be reschedulable for a long period of time (10 days). |
| +// |
| +// If the supplied Context is Done, operation may terminate before completion, |
| +// returning the Context's error. |
| +func (a *Archivist) ArchiveTask(c context.Context, desc []byte, age time.Duration) bool { |
| var task logdog.ArchiveTask |
| if err := proto.Unmarshal(desc, &task); err != nil { |
| log.WithError(err).Errorf(c, "Failed to decode archive task.") |
| - return err |
| + return false |
| + } |
| + if err := a.Archive(c, &task, age); err != nil { |
| + log.WithError(err).Errorf(c, "Archival task failed.") |
| + return false |
| } |
| - return a.Archive(c, &task) |
| + return true |
| } |
| -// Archive archives a single log stream. If unsuccessful, an error is returned. |
| +// Archive processes and executes a single log stream archive task. |
| // |
| -// This error may be wrapped in errors.Transient if it is believed to have been |
| -// caused by a transient failure. |
| +// It returns an error if the archival isn't complete and confirmed by the |
| +// Coordinator. |
| // |
| // If the supplied Context is Done, operation may terminate before completion, |
| // returning the Context's error. |
| -func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error { |
| +func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask, age time.Duration) error { |
| + complete := (age <= t.CompletePeriod.Duration()) |
| + |
| // Load the log stream's current state. If it is already archived, we will |
| // return an immediate success. |
| ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ |
| @@ -77,18 +92,17 @@ func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error { |
| }) |
| switch { |
| case err != nil: |
| - log.WithError(err).Errorf(c, "Failed to load log stream.") |
| - return err |
| + return fmt.Errorf("failed to load log stream: %v", err) |
| case ls.State == nil: |
| - return errors.New("missing state") |
| + return errors.New("log stream did not include state") |
| case ls.State.ProtoVersion != logpb.Version: |
| log.Fields{ |
| "protoVersion": ls.State.ProtoVersion, |
| "expectedVersion": logpb.Version, |
| }.Errorf(c, "Unsupported log stream protobuf version.") |
| - return errors.New("unsupported protobuf version") |
| + return errors.New("unsupported log stream protobuf version") |
| case ls.Desc == nil: |
| - return errors.New("missing descriptor") |
| + return errors.New("log stream did not include a descriptor") |
| case ls.State.Purged: |
| log.Warningf(c, "Log stream is purged.") |
| @@ -96,6 +110,9 @@ func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error { |
| case ls.State.Archived: |
| log.Infof(c, "Log stream is already archived.") |
| return nil |
| + |
| + case complete && ls.State.TerminalIndex < 0: |
| + return errors.WrapTransient(errors.New("cannot archive complete stream with no terminal index")) |
| } |
| // Deserialize and validate the descriptor protobuf. |
| @@ -111,24 +128,45 @@ func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error { |
| task := &archiveTask{ |
| Archivist: a, |
| ArchiveTask: t, |
| + complete: complete, |
| ls: ls, |
| desc: &desc, |
| } |
| - if err := task.archive(c); err != nil { |
| - log.WithError(err).Errorf(c, "Failed to perform archival operation.") |
| - return err |
| + aerr := task.archive(c) |
| + if aerr != nil { |
| + // If this is a transient error, fail immediately with it. Use |
| + // isTransientError because err can be an errors.MultiError. |
| + // |
| + // Returning a transient error here will cause task queue to reschedule. |
| + if isTransientError(aerr) { |
| + log.WithError(aerr).Errorf(c, "TRANSIENT error during archival operation.") |
| + return aerr |
| + } |
| + |
| + // This is a non-transient error, so we will report it as an error to the |
| + // service. It may accept the archival (success), in which case we'll delete |
| + // our task, or respond with an error code (Aborted) indicating that this |
| + // archival run should be retried and that we should not delete the task. |
| + log.WithError(aerr).Errorf(c, "Archival failed with non-transient error.") |
| + task.ar.Error = true |
| } |
| + |
| log.Fields{ |
| "streamURL": task.ar.StreamUrl, |
| "indexURL": task.ar.IndexUrl, |
| "dataURL": task.ar.DataUrl, |
| "terminalIndex": task.ar.TerminalIndex, |
| - "complete": task.ar.Complete, |
| - }.Debugf(c, "Finished archive construction.") |
| + "logEntryCount": task.ar.LogEntryCount, |
| + "hadError": task.ar.Error, |
| + "complete": task.ar.Complete(), |
| + }.Debugf(c, "Finished archive construction; reporting archive state.") |
| if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { |
| - log.WithError(err).Errorf(c, "Failed to mark log stream as archived.") |
| - return err |
| + // All service errors will be reported and returned as transient. We will |
| + // NOT delete this archival task unless the service has explicitly |
| + // instructed us to via success code. |
| + log.WithError(err).Errorf(c, "Failed to report archive state.") |
| + return errors.WrapTransient(errors.New("failed to report archive state")) |
| } |
| return nil |
| } |
| @@ -138,6 +176,9 @@ type archiveTask struct { |
| *Archivist |
| *logdog.ArchiveTask |
| + // complete, if true, will cause incomplete streams to return transient |
| + // errors. |
| + complete bool |
| // ls is the log stream state. |
| ls *logdog.LoadStreamResponse |
| // desc is the unmarshaled log stream descriptor. |
| @@ -151,6 +192,10 @@ type archiveTask struct { |
| // Coordinator State. Upon success, the State will be updated with the result |
| // of the archival operation. |
| func (t *archiveTask) archive(c context.Context) (err error) { |
| + // Minimal ArchiveRequest parameters. |
| + t.ar.Path = t.Path |
| + t.ar.TerminalIndex = -1 |
| + |
| // Generate our archival object managers. |
| bext := t.desc.BinaryFileExt |
| if bext == "" { |
| @@ -212,7 +257,7 @@ func (t *archiveTask) archive(c context.Context) (err error) { |
| return |
| } |
| if ierr := o.Close(); ierr != nil { |
| - err = ierr |
| + err = joinArchiveErrors(err, ierr) |
| } |
| } |
| defer closeOM(streamO) |
| @@ -224,7 +269,7 @@ func (t *archiveTask) archive(c context.Context) (err error) { |
| Context: c, |
| st: t.Storage, |
| path: types.StreamPath(t.Path), |
| - contiguous: t.Complete, |
| + contiguous: t.complete, |
| terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex), |
| lastIndex: -1, |
| } |
| @@ -241,41 +286,40 @@ func (t *archiveTask) archive(c context.Context) (err error) { |
| Logger: log.Get(c), |
| } |
| - err = archive.Archive(m) |
| - if err != nil { |
| + if err = archive.Archive(m); err != nil { |
| log.WithError(err).Errorf(c, "Failed to archive log stream.") |
| return |
| } |
| - t.ar.TerminalIndex = int64(ss.lastIndex) |
| if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex { |
| - // Fail, if we were requested to archive only the complete log. |
| - if t.Complete { |
| - log.Fields{ |
| - "terminalIndex": tidx, |
| - "lastIndex": t.ar.TerminalIndex, |
| - }.Errorf(c, "Log stream archival stopped prior to terminal index.") |
| - return errors.New("stream finished short of terminal index") |
| - } |
| - |
| - if t.ar.TerminalIndex < 0 { |
| + // Fail if we were requested to archive only the complete log. We consider |
| + // this a transient error with the expectation that the missing entries will |
| + // show up in future retries. |
| + switch { |
| + case t.complete && ss.hasMissingEntries: |
| + log.Errorf(c, "Log stream has missing entries, but completeness is required.") |
| + return errors.WrapTransient(errors.New("stream has missing entries")) |
| + |
| + case ss.logEntryCount == 0: |
| // If our last log index was <0, then no logs were archived. |
| log.Warningf(c, "No log entries were archived.") |
| - } else { |
| + |
| + default: |
| // Update our terminal index. |
| log.Fields{ |
| - "from": tidx, |
| - "to": t.ar.TerminalIndex, |
| - }.Infof(c, "Updated log stream terminal index.") |
| + "terminalIndex": ss.lastIndex, |
| + "logEntryCount": ss.logEntryCount, |
| + "hasMissingEntries": ss.hasMissingEntries, |
| + }.Debugf(c, "Finished archiving log stream.") |
| } |
| } |
| // Update our state with archival results. |
| - t.ar.Path = t.Path |
| + t.ar.TerminalIndex = int64(ss.lastIndex) |
| + t.ar.LogEntryCount = ss.logEntryCount |
| t.ar.StreamSize = streamO.Count() |
| t.ar.IndexSize = indexO.Count() |
| t.ar.DataSize = dataO.Count() |
| - t.ar.Complete = !ss.hasMissingEntries |
| return |
| } |
| @@ -345,6 +389,7 @@ type storageSource struct { |
| buf []*logpb.LogEntry |
| lastIndex types.MessageIndex |
| + logEntryCount int64 |
| hasMissingEntries bool // true if some log entries were missing. |
| } |
| @@ -378,6 +423,9 @@ func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { |
| if err := s.bufferEntries(s.lastIndex + 1); err != nil { |
| if err == storage.ErrDoesNotExist { |
| log.Warningf(s, "Archive target stream does not exist in intermediate storage.") |
| + if s.terminalIndex >= 0 { |
| + s.hasMissingEntries = true |
| + } |
| return nil, archive.ErrEndOfStream |
| } |
| @@ -386,13 +434,26 @@ func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { |
| } |
| } |
| + // If we have no more buffered entries, we have exhausted our log stream. |
| if len(s.buf) == 0 { |
| - log.Fields{ |
| - "lastIndex": s.lastIndex, |
| - }.Debugf(s, "Encountered end of stream.") |
| + // If we have a terminal index, but we didn't actually emit that index, |
| + // mark that we have missing entries. |
| + if s.terminalIndex >= 0 && s.lastIndex != s.terminalIndex { |
| + log.Fields{ |
| + "terminalIndex": s.terminalIndex, |
| + "lastIndex": s.lastIndex, |
| + }.Warningf(s, "Log stream stopped before terminal index.") |
| + s.hasMissingEntries = true |
| + } else { |
| + log.Fields{ |
| + "lastIndex": s.lastIndex, |
| + }.Debugf(s, "Encountered end of stream.") |
| + } |
| + |
| return nil, archive.ErrEndOfStream |
| } |
| + // Pop the next log entry and advance the stream. |
| var le *logpb.LogEntry |
| le, s.buf = s.buf[0], s.buf[1:] |
| @@ -402,14 +463,13 @@ func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { |
| nidx := (s.lastIndex + 1) |
| if sidx != nidx { |
| s.hasMissingEntries = true |
| - |
| - if s.contiguous { |
| - log.Fields{ |
| - "index": sidx, |
| - "nextIndex": nidx, |
| - }.Errorf(s, "Non-contiguous log stream while enforcing.") |
| - return nil, errors.New("non-contiguous log stream") |
| - } |
| + } |
| + if s.contiguous && s.hasMissingEntries { |
| + log.Fields{ |
| + "index": sidx, |
| + "nextIndex": nidx, |
| + }.Warningf(s, "Non-contiguous log stream while enforcing.") |
| + return nil, archive.ErrEndOfStream |
| } |
| // If we're enforcing a maximum terminal index, return end of stream if this |
| @@ -423,5 +483,38 @@ func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { |
| } |
| s.lastIndex = sidx |
| + s.logEntryCount++ |
| return le, nil |
| } |
| + |
| +func joinArchiveErrors(err error, add error) error { |
| + switch { |
| + case err == nil: |
| + return add |
| + case add == nil: |
| + return err |
| + |
| + default: |
| + merr, ok := err.(errors.MultiError) |
| + if !ok { |
| + merr = errors.MultiError{err, nil}[:1] |
| + } |
| + merr = append(merr, add) |
| + return merr |
| + } |
| +} |
| + |
| +func isTransientError(err error) bool { |
| + if errors.IsTransient(err) { |
| + return true |
| + } |
| + |
| + if merr, ok := err.(errors.MultiError); ok { |
| + for _, ierr := range merr { |
| + if isTransientError(ierr) { |
| + return true |
| + } |
| + } |
| + } |
| + return false |
| +} |