Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(171)

Unified Diff: server/internal/logdog/archivist/archivist.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/archivist/archivist.go
diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go
index 1d79af1d0225532985066c380010d92f0ad693f4..4a3cefa3ff7edf6cd0958082ac0fc98828fcaa75 100644
--- a/server/internal/logdog/archivist/archivist.go
+++ b/server/internal/logdog/archivist/archivist.go
@@ -6,6 +6,7 @@ package archivist
import (
"fmt"
+ "time"
"github.com/golang/protobuf/proto"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
@@ -52,23 +53,37 @@ type Archivist struct {
const storageBufferSize = types.MaxLogEntryDataSize * 64
// ArchiveTask processes and executes a single log stream archive task.
-func (a *Archivist) ArchiveTask(c context.Context, desc []byte) error {
+//
+// It returns true on success (delete the task) and false on failure (don't
+// delete the task). The return value of true should only be used if the task
+// is truely complete, as deleting a named task in the task queue will cause it
Vadim Sh. 2016/04/07 01:21:32 typo: truly also you don't use named task (anymor
dnj 2016/04/11 17:20:04 Done.
+// to not be reschedulable for a long period of time (10 days).
+//
+// If the supplied Context is Done, operation may terminate before completion,
+// returning the Context's error.
+func (a *Archivist) ArchiveTask(c context.Context, desc []byte, age time.Duration) bool {
var task logdog.ArchiveTask
if err := proto.Unmarshal(desc, &task); err != nil {
log.WithError(err).Errorf(c, "Failed to decode archive task.")
- return err
+ return false
+ }
+ if err := a.Archive(c, &task, age); err != nil {
+ log.WithError(err).Errorf(c, "Archival task failed.")
+ return false
}
- return a.Archive(c, &task)
+ return true
}
-// Archive archives a single log stream. If unsuccessful, an error is returned.
+// Archive processes and executes a single log stream archive task.
//
-// This error may be wrapped in errors.Transient if it is believed to have been
-// caused by a transient failure.
+// It returns an error if the archival isn't complete and confirmed by the
+// Coordinator.
//
// If the supplied Context is Done, operation may terminate before completion,
// returning the Context's error.
-func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error {
+func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask, age time.Duration) error {
+ complete := (age <= t.CompletePeriod.Duration())
+
// Load the log stream's current state. If it is already archived, we will
// return an immediate success.
ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{
@@ -77,18 +92,17 @@ func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error {
})
switch {
case err != nil:
- log.WithError(err).Errorf(c, "Failed to load log stream.")
- return err
+ return fmt.Errorf("failed to load log stream: %v", err)
case ls.State == nil:
- return errors.New("missing state")
+ return errors.New("log stream did not include state")
case ls.State.ProtoVersion != logpb.Version:
log.Fields{
"protoVersion": ls.State.ProtoVersion,
"expectedVersion": logpb.Version,
}.Errorf(c, "Unsupported log stream protobuf version.")
- return errors.New("unsupported protobuf version")
+ return errors.New("unsupported log stream protobuf version")
case ls.Desc == nil:
- return errors.New("missing descriptor")
+ return errors.New("log stream did not include a descriptor")
case ls.State.Purged:
log.Warningf(c, "Log stream is purged.")
@@ -96,6 +110,9 @@ func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error {
case ls.State.Archived:
log.Infof(c, "Log stream is already archived.")
return nil
+
+ case complete && ls.State.TerminalIndex < 0:
+ return errors.WrapTransient(errors.New("cannot archive complete stream with no terminal index"))
}
// Deserialize and validate the descriptor protobuf.
@@ -111,24 +128,45 @@ func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error {
task := &archiveTask{
Archivist: a,
ArchiveTask: t,
+ complete: complete,
ls: ls,
desc: &desc,
}
- if err := task.archive(c); err != nil {
- log.WithError(err).Errorf(c, "Failed to perform archival operation.")
- return err
+ aerr := task.archive(c)
+ if aerr != nil {
+ // If this is a transient error, fail immediately with it. Use
+ // isTransientError because err can be an errors.MultiError.
+ //
+ // Returning a transient error here will cause task queue to reschedule.
+ if isTransientError(aerr) {
+ log.WithError(aerr).Errorf(c, "TRANSIENT error during archival operation.")
+ return aerr
+ }
+
+ // This is a non-transient error, so we will report it as an error to the
+ // service. It may accept the archival (success), in which case we'll delete
+ // our task, or respond with an error code (Aborted) indicating that this
+ // archival run should be retried and that we should not delete the task.
+ log.WithError(aerr).Errorf(c, "Archival failed with non-transient error.")
+ task.ar.Error = true
}
+
log.Fields{
"streamURL": task.ar.StreamUrl,
"indexURL": task.ar.IndexUrl,
"dataURL": task.ar.DataUrl,
"terminalIndex": task.ar.TerminalIndex,
- "complete": task.ar.Complete,
- }.Debugf(c, "Finished archive construction.")
+ "logEntryCount": task.ar.LogEntryCount,
+ "hadError": task.ar.Error,
+ "complete": task.ar.Complete(),
+ }.Debugf(c, "Finished archive construction; reporting archive state.")
if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil {
- log.WithError(err).Errorf(c, "Failed to mark log stream as archived.")
- return err
+ // All service errors will be reported and returned as transient. We will
+ // NOT delete this archival task unless the service has explicitly
+ // instructed us to via success code.
+ log.WithError(err).Errorf(c, "Failed to report archive state.")
+ return errors.WrapTransient(errors.New("failed to report archive state"))
}
return nil
}
@@ -138,6 +176,9 @@ type archiveTask struct {
*Archivist
*logdog.ArchiveTask
+ // complete, if true, will cause incomplete streams to return transient
+ // errors.
+ complete bool
// ls is the log stream state.
ls *logdog.LoadStreamResponse
// desc is the unmarshaled log stream descriptor.
@@ -151,6 +192,10 @@ type archiveTask struct {
// Coordinator State. Upon success, the State will be updated with the result
// of the archival operation.
func (t *archiveTask) archive(c context.Context) (err error) {
+ // Minimal ArchiveRequest parameters.
+ t.ar.Path = t.Path
+ t.ar.TerminalIndex = -1
+
// Generate our archival object managers.
bext := t.desc.BinaryFileExt
if bext == "" {
@@ -212,7 +257,7 @@ func (t *archiveTask) archive(c context.Context) (err error) {
return
}
if ierr := o.Close(); ierr != nil {
- err = ierr
+ err = joinArchiveErrors(err, ierr)
}
}
defer closeOM(streamO)
@@ -224,7 +269,7 @@ func (t *archiveTask) archive(c context.Context) (err error) {
Context: c,
st: t.Storage,
path: types.StreamPath(t.Path),
- contiguous: t.Complete,
+ contiguous: t.complete,
terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex),
lastIndex: -1,
}
@@ -241,41 +286,40 @@ func (t *archiveTask) archive(c context.Context) (err error) {
Logger: log.Get(c),
}
- err = archive.Archive(m)
- if err != nil {
+ if err = archive.Archive(m); err != nil {
log.WithError(err).Errorf(c, "Failed to archive log stream.")
return
}
- t.ar.TerminalIndex = int64(ss.lastIndex)
if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex {
- // Fail, if we were requested to archive only the complete log.
- if t.Complete {
- log.Fields{
- "terminalIndex": tidx,
- "lastIndex": t.ar.TerminalIndex,
- }.Errorf(c, "Log stream archival stopped prior to terminal index.")
- return errors.New("stream finished short of terminal index")
- }
-
- if t.ar.TerminalIndex < 0 {
+ // Fail if we were requested to archive only the complete log. We consider
+ // this a transient error with the expectation that the missing entries will
+ // show up in future retries.
+ switch {
+ case t.complete && ss.hasMissingEntries:
+ log.Errorf(c, "Log stream has missing entries, but completeness is required.")
+ return errors.WrapTransient(errors.New("stream has missing entries"))
+
+ case ss.logEntryCount == 0:
// If our last log index was <0, then no logs were archived.
log.Warningf(c, "No log entries were archived.")
- } else {
+
+ default:
// Update our terminal index.
log.Fields{
- "from": tidx,
- "to": t.ar.TerminalIndex,
- }.Infof(c, "Updated log stream terminal index.")
+ "terminalIndex": ss.lastIndex,
+ "logEntryCount": ss.logEntryCount,
+ "hasMissingEntries": ss.hasMissingEntries,
+ }.Debugf(c, "Finished archiving log stream.")
}
}
// Update our state with archival results.
- t.ar.Path = t.Path
+ t.ar.TerminalIndex = int64(ss.lastIndex)
+ t.ar.LogEntryCount = ss.logEntryCount
t.ar.StreamSize = streamO.Count()
t.ar.IndexSize = indexO.Count()
t.ar.DataSize = dataO.Count()
- t.ar.Complete = !ss.hasMissingEntries
return
}
@@ -345,6 +389,7 @@ type storageSource struct {
buf []*logpb.LogEntry
lastIndex types.MessageIndex
+ logEntryCount int64
hasMissingEntries bool // true if some log entries were missing.
}
@@ -378,6 +423,9 @@ func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
if err := s.bufferEntries(s.lastIndex + 1); err != nil {
if err == storage.ErrDoesNotExist {
log.Warningf(s, "Archive target stream does not exist in intermediate storage.")
+ if s.terminalIndex >= 0 {
+ s.hasMissingEntries = true
+ }
return nil, archive.ErrEndOfStream
}
@@ -386,13 +434,26 @@ func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
}
}
+ // If we have no more buffered entries, we have exhausted our log stream.
if len(s.buf) == 0 {
- log.Fields{
- "lastIndex": s.lastIndex,
- }.Debugf(s, "Encountered end of stream.")
+ // If we have a terminal index, but we didn't actually emit that index,
+ // mark that we have missing entries.
+ if s.terminalIndex >= 0 && s.lastIndex != s.terminalIndex {
+ log.Fields{
+ "terminalIndex": s.terminalIndex,
+ "lastIndex": s.lastIndex,
+ }.Warningf(s, "Log stream stopped before terminal index.")
+ s.hasMissingEntries = true
+ } else {
+ log.Fields{
+ "lastIndex": s.lastIndex,
+ }.Debugf(s, "Encountered end of stream.")
+ }
+
return nil, archive.ErrEndOfStream
}
+ // Pop the next log entry and advance the stream.
var le *logpb.LogEntry
le, s.buf = s.buf[0], s.buf[1:]
@@ -402,14 +463,13 @@ func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
nidx := (s.lastIndex + 1)
if sidx != nidx {
s.hasMissingEntries = true
-
- if s.contiguous {
- log.Fields{
- "index": sidx,
- "nextIndex": nidx,
- }.Errorf(s, "Non-contiguous log stream while enforcing.")
- return nil, errors.New("non-contiguous log stream")
- }
+ }
+ if s.contiguous && s.hasMissingEntries {
+ log.Fields{
+ "index": sidx,
+ "nextIndex": nidx,
+ }.Warningf(s, "Non-contiguous log stream while enforcing.")
+ return nil, archive.ErrEndOfStream
}
// If we're enforcing a maximum terminal index, return end of stream if this
@@ -423,5 +483,38 @@ func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
}
s.lastIndex = sidx
+ s.logEntryCount++
return le, nil
}
+
+func joinArchiveErrors(err error, add error) error {
+ switch {
+ case err == nil:
+ return add
+ case add == nil:
+ return err
+
+ default:
+ merr, ok := err.(errors.MultiError)
+ if !ok {
+ merr = errors.MultiError{err, nil}[:1]
+ }
+ merr = append(merr, add)
+ return merr
+ }
+}
+
+func isTransientError(err error) bool {
+ if errors.IsTransient(err) {
+ return true
+ }
+
+ if merr, ok := err.(errors.MultiError); ok {
+ for _, ierr := range merr {
+ if isTransientError(ierr) {
+ return true
+ }
+ }
+ }
+ return false
+}

Powered by Google App Engine
This is Rietveld 408576698