| Index: server/internal/logdog/archivist/archivist.go
|
| diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go
|
| index 1d79af1d0225532985066c380010d92f0ad693f4..31eb691a8fe656d9bcfc6edcea668dd32eeaa969 100644
|
| --- a/server/internal/logdog/archivist/archivist.go
|
| +++ b/server/internal/logdog/archivist/archivist.go
|
| @@ -116,7 +116,19 @@ func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error {
|
| }
|
| if err := task.archive(c); err != nil {
|
| log.WithError(err).Errorf(c, "Failed to perform archival operation.")
|
| - return err
|
| +
|
| + // Fail only if completeness is a requirement. Allow the Coordinator to
|
| + // handle errors on !Complete streams.
|
| + if t.Complete {
|
| + return err
|
| + }
|
| +
|
| + // Report that there was an archival error to the Coordinator's
|
| + // ArchiveStream endpoint. It may accept the archival or respond with a
|
| + // FailedPrecondition error code indicating that this archival run should
|
| + // be considered a failure, in which case we will refrain from deleting our
|
| + // task and try again.
|
| + task.ar.Error = true
|
| }
|
| log.Fields{
|
| "streamURL": task.ar.StreamUrl,
|
| @@ -124,6 +136,7 @@ func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error {
|
| "dataURL": task.ar.DataUrl,
|
| "terminalIndex": task.ar.TerminalIndex,
|
| "complete": task.ar.Complete,
|
| + "hadError": task.ar.Error,
|
| }.Debugf(c, "Finished archive construction.")
|
|
|
| if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil {
|
| @@ -151,6 +164,10 @@ type archiveTask struct {
|
| // Coordinator State. Upon success, the State will be updated with the result
|
| // of the archival operation.
|
| func (t *archiveTask) archive(c context.Context) (err error) {
|
| + // Minimal ArchiveRequest parameters.
|
| + t.ar.Path = t.Path
|
| + t.ar.TerminalIndex = -1
|
| +
|
| // Generate our archival object managers.
|
| bext := t.desc.BinaryFileExt
|
| if bext == "" {
|
| @@ -271,7 +288,6 @@ func (t *archiveTask) archive(c context.Context) (err error) {
|
| }
|
|
|
| // Update our state with archival results.
|
| - t.ar.Path = t.Path
|
| t.ar.StreamSize = streamO.Count()
|
| t.ar.IndexSize = indexO.Count()
|
| t.ar.DataSize = dataO.Count()
|
|
|