| Index: server/internal/logdog/archivist/archivist.go
|
| diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go
|
| index 1d79af1d0225532985066c380010d92f0ad693f4..ca0a4c2d05f7175f6132bc49c17a628c0676481a 100644
|
| --- a/server/internal/logdog/archivist/archivist.go
|
| +++ b/server/internal/logdog/archivist/archivist.go
|
| @@ -5,7 +5,10 @@
|
| package archivist
|
|
|
| import (
|
| + "bytes"
|
| + "encoding/hex"
|
| "fmt"
|
| + "io"
|
|
|
| "github.com/golang/protobuf/proto"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| @@ -13,12 +16,29 @@ import (
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| log "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/parallel"
|
| "github.com/luci/luci-go/common/proto/logdog/logpb"
|
| "github.com/luci/luci-go/server/logdog/archive"
|
| "github.com/luci/luci-go/server/logdog/storage"
|
| "golang.org/x/net/context"
|
| )
|
|
|
| +// Task is a single archive task.
|
| +type Task interface {
|
| + // UniqueID returns a task-unique value. Other tasks, and other retries of
|
| + // this task, should (try to) not reuse this ID.
|
| + UniqueID() string
|
| +
|
| + // Task is the archive task to execute.
|
| + Task() *logdog.ArchiveTask
|
| +
|
| + // AssertLease asserts that the lease for this Task is still held.
|
| + //
|
| + // On failure, it will return an error. If successful, the Archivist may
|
| + // assume that it holds the lease longer.
|
| + AssertLease(context.Context) error
|
| +}
|
| +
|
| // Archivist is a stateless configuration capable of archiving individual log
|
| // streams.
|
| type Archivist struct {
|
| @@ -26,16 +46,18 @@ type Archivist struct {
|
| // endpoint.
|
| Service logdog.ServicesClient
|
|
|
| - // Storage is the intermediate storage instance to use to pull log entries for
|
| - // archival.
|
| + // Storage is the archival source Storage instance.
|
| Storage storage.Storage
|
| -
|
| // GSClient is the Google Storage client to for archive generation.
|
| GSClient gs.Client
|
|
|
| // GSBase is the base Google Storage path. This includes the bucket name
|
| // and any associated path.
|
| GSBase gs.Path
|
| + // GSStagingBase is the base Google Storage path for archive staging. This
|
| + // includes the bucket name and any associated path.
|
| + GSStagingBase gs.Path
|
| +
|
| // PrefixIndexRange is the maximum number of stream indexes in between index
|
| // entries. See archive.Manifest for more information.
|
| StreamIndexRange int
|
| @@ -52,376 +74,455 @@ type Archivist struct {
|
| const storageBufferSize = types.MaxLogEntryDataSize * 64
|
|
|
| // ArchiveTask processes and executes a single log stream archive task.
|
| -func (a *Archivist) ArchiveTask(c context.Context, desc []byte) error {
|
| - var task logdog.ArchiveTask
|
| - if err := proto.Unmarshal(desc, &task); err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to decode archive task.")
|
| - return err
|
| - }
|
| - return a.Archive(c, &task)
|
| -}
|
| -
|
| -// Archive archives a single log stream. If unsuccessful, an error is returned.
|
| //
|
| -// This error may be wrapped in errors.Transient if it is believed to have been
|
| -// caused by a transient failure.
|
| +// It returns true on success (delete the task) and false on failure (don't
|
| +// delete the task). The return value of true should only be used if the task
|
| +// is truly complete and acknowledged by the Coordinator.
|
| //
|
| // If the supplied Context is Done, operation may terminate before completion,
|
| // returning the Context's error.
|
| -func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error {
|
| +func (a *Archivist) ArchiveTask(c context.Context, task Task) bool {
|
| + delete, err := a.archiveTaskImpl(c, task)
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "delete": delete,
|
| + "path": task.Task().Path,
|
| + }.Infof(c, "Finished archive task.")
|
| + return delete
|
| +}
|
| +
|
| +// archiveTaskImpl returns the same boolean value as ArchiveTask, but includes
|
| +// an error. The error is useful for testing to assert that certain conditions
|
| +// were hit.
|
| +func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) {
|
| + at := task.Task()
|
| + log.Fields{
|
| + "path": at.Path,
|
| + }.Debugf(c, "Received archival task.")
|
| +
|
| // Load the log stream's current state. If it is already archived, we will
|
| // return an immediate success.
|
| ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{
|
| - Path: t.Path,
|
| + Path: at.Path,
|
| Desc: true,
|
| })
|
| switch {
|
| case err != nil:
|
| log.WithError(err).Errorf(c, "Failed to load log stream.")
|
| - return err
|
| + return false, err
|
| +
|
| case ls.State == nil:
|
| - return errors.New("missing state")
|
| + log.Errorf(c, "Log stream did not include state.")
|
| + return false, errors.New("log stream did not include state")
|
| +
|
| + case ls.State.Purged:
|
| + log.Warningf(c, "Log stream is purged. Discarding archival request.")
|
| + return true, errors.New("log stream is purged")
|
| +
|
| + case ls.State.Archived:
|
| + log.Infof(c, "Log stream is already archived. Discarding archival request.")
|
| + return true, errors.New("log stream is archived")
|
| +
|
| + case !bytes.Equal(ls.ArchivalKey, at.Key):
|
| + if len(ls.ArchivalKey) == 0 {
|
| + // The log stream is not registering as "archive pending" state.
|
| + //
|
| + // This can happen if the eventually-consistent datastore hasn't updated
|
| + // its log stream state by the time this Pub/Sub task is received. In
|
| + // this case, we will continue retrying the task until datastore registers
|
| + // that some key is associated with it.
|
| + log.Fields{
|
| + "logStreamArchivalKey": hex.EncodeToString(ls.ArchivalKey),
|
| + "requestArchivalKey": hex.EncodeToString(at.Key),
|
| + }.Infof(c, "Archival request received before log stream has its key.")
|
| + return false, errors.New("premature archival request")
|
| + }
|
| +
|
| + // This can happen if a Pub/Sub message is dispatched during a transaction,
|
| + // but that specific transaction failed. In this case, the Pub/Sub message
|
| + // will have a key that doesn't match the key that was transactionally
|
| + // encoded, and can be discarded.
|
| + log.Fields{
|
| + "logStreamArchivalKey": hex.EncodeToString(ls.ArchivalKey),
|
| + "requestArchivalKey": hex.EncodeToString(at.Key),
|
| + }.Infof(c, "Superfluous archival request (keys do not match). Discarding.")
|
| + return true, errors.New("superfluous archival request")
|
| +
|
| case ls.State.ProtoVersion != logpb.Version:
|
| log.Fields{
|
| "protoVersion": ls.State.ProtoVersion,
|
| "expectedVersion": logpb.Version,
|
| }.Errorf(c, "Unsupported log stream protobuf version.")
|
| - return errors.New("unsupported protobuf version")
|
| - case ls.Desc == nil:
|
| - return errors.New("missing descriptor")
|
| + return false, errors.New("unsupported log stream protobuf version")
|
|
|
| - case ls.State.Purged:
|
| - log.Warningf(c, "Log stream is purged.")
|
| - return nil
|
| - case ls.State.Archived:
|
| - log.Infof(c, "Log stream is already archived.")
|
| - return nil
|
| + case ls.Desc == nil:
|
| + log.Errorf(c, "Log stream did not include a descriptor.")
|
| + return false, errors.New("log stream did not include a descriptor")
|
| }
|
|
|
| - // Deserialize and validate the descriptor protobuf.
|
| - var desc logpb.LogStreamDescriptor
|
| - if err := proto.Unmarshal(ls.Desc, &desc); err != nil {
|
| + // If the archival request is younger than the settle delay, kick it back to
|
| + // retry later.
|
| + age := ls.Age.Duration()
|
| + if age < at.SettleDelay.Duration() {
|
| log.Fields{
|
| - log.ErrorKey: err,
|
| - "protoVersion": ls.State.ProtoVersion,
|
| - }.Errorf(c, "Failed to unmarshal descriptor protobuf.")
|
| - return err
|
| + "age": age,
|
| + "settleDelay": at.SettleDelay.Duration(),
|
| + }.Infof(c, "Log stream is younger than the settle delay. Returning task to queue.")
|
| + return false, errors.New("log stream is within settle delay")
|
| }
|
|
|
| - task := &archiveTask{
|
| - Archivist: a,
|
| - ArchiveTask: t,
|
| - ls: ls,
|
| - desc: &desc,
|
| + // Are we required to archive a complete log stream?
|
| + complete := (age <= at.CompletePeriod.Duration())
|
| + if complete && ls.State.TerminalIndex < 0 {
|
| + log.Warningf(c, "Cannot archive complete stream with no terminal index.")
|
| + return false, errors.New("completeness required, but stream has no terminal index")
|
| }
|
| - if err := task.archive(c); err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to perform archival operation.")
|
| - return err
|
| +
|
| + ar := logdog.ArchiveStreamRequest{
|
| + Path: at.Path,
|
| }
|
| +
|
| + // Archive to staging.
|
| + //
|
| + // If a non-transient failure occurs here, we will report it to the Archivist
|
| + // under the assumption that it will continue occurring.
|
| + //
|
| + // We will handle error creating the plan and executing the plan in the same
|
| + // switch statement below.
|
| + staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, task.UniqueID())
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create staged archival plan.")
|
| + } else {
|
| + err = staged.stage(c, complete)
|
| + }
|
| +
|
| + switch {
|
| + case errors.IsTransient(err):
|
| + // If this is a transient error, exit immediately and do not delete the
|
| + // archival task.
|
| + log.WithError(err).Warningf(c, "TRANSIENT error during archival operation.")
|
| + return false, err
|
| +
|
| + case err != nil:
|
| + // This is a non-transient error, so we are confident that any future
|
| + // Archival will also encounter this error. We will mark this archival
|
| + // as an error and report it to the Coordinator.
|
| + log.WithError(err).Errorf(c, "Archival failed with non-transient error.")
|
| + ar.Error = err.Error()
|
| + if ar.Error == "" {
|
| + // This needs to be non-nil, so if our acutal error has an empty string,
|
| + // fill in a generic message.
|
| + ar.Error = "archival error"
|
| + }
|
| +
|
| + default:
|
| + // In case something fails, clean up our staged archival (best effort).
|
| + defer staged.cleanup(c)
|
| +
|
| + // Finalize the archival. First, extend our lease to confirm that we still
|
| + // hold it.
|
| + if err := task.AssertLease(c); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to extend task lease before finalizing.")
|
| + return false, err
|
| + }
|
| +
|
| + // Finalize the archival.
|
| + if err := staged.finalize(c, a.GSClient, &ar); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to finalize archival.")
|
| + return false, err
|
| + }
|
| + }
|
| +
|
| log.Fields{
|
| - "streamURL": task.ar.StreamUrl,
|
| - "indexURL": task.ar.IndexUrl,
|
| - "dataURL": task.ar.DataUrl,
|
| - "terminalIndex": task.ar.TerminalIndex,
|
| - "complete": task.ar.Complete,
|
| - }.Debugf(c, "Finished archive construction.")
|
| -
|
| - if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to mark log stream as archived.")
|
| - return err
|
| + "streamURL": ar.StreamUrl,
|
| + "indexURL": ar.IndexUrl,
|
| + "dataURL": ar.DataUrl,
|
| + "terminalIndex": ar.TerminalIndex,
|
| + "logEntryCount": ar.LogEntryCount,
|
| + "hadError": ar.Error,
|
| + "complete": ar.Complete(),
|
| + }.Debugf(c, "Finished archival round. Reporting archive state.")
|
| +
|
| + // Extend the lease again to confirm that we still hold it.
|
| + if err := task.AssertLease(c); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to extend task lease before reporting.")
|
| + return false, err
|
| }
|
| - return nil
|
| +
|
| + if _, err := a.Service.ArchiveStream(c, &ar); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to report archive state.")
|
| + return false, err
|
| + }
|
| +
|
| + // Archival is complete and acknowledged by Coordinator. Consume the archival
|
| + // task.
|
| + return true, nil
|
| }
|
|
|
| -// archiveTask is the set of parameters for a single archival.
|
| -type archiveTask struct {
|
| - *Archivist
|
| - *logdog.ArchiveTask
|
| +func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) (
|
| + *stagedArchival, error) {
|
| + sa := stagedArchival{
|
| + Archivist: a,
|
| + path: path,
|
|
|
| - // ls is the log stream state.
|
| - ls *logdog.LoadStreamResponse
|
| - // desc is the unmarshaled log stream descriptor.
|
| - desc *logpb.LogStreamDescriptor
|
| + terminalIndex: ls.State.TerminalIndex,
|
| + }
|
|
|
| - // ar will be populated during archive construction.
|
| - ar logdog.ArchiveStreamRequest
|
| -}
|
| + // Deserialize and validate the descriptor protobuf. If this fails, it is a
|
| + // non-transient error.
|
| + if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "protoVersion": ls.State.ProtoVersion,
|
| + }.Errorf(c, "Failed to unmarshal descriptor protobuf.")
|
| + return nil, err
|
| + }
|
|
|
| -// archiveState performs the archival operation on a stream described by a
|
| -// Coordinator State. Upon success, the State will be updated with the result
|
| -// of the archival operation.
|
| -func (t *archiveTask) archive(c context.Context) (err error) {
|
| - // Generate our archival object managers.
|
| - bext := t.desc.BinaryFileExt
|
| + bext := sa.desc.BinaryFileExt
|
| if bext == "" {
|
| bext = "bin"
|
| }
|
|
|
| - path := t.Path
|
| - var streamO, indexO, dataO *gsObject
|
| - streamO, err = t.newGSObject(c, path, "logstream.entries")
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create log object.")
|
| - return
|
| - }
|
| + // Construct our staged archival paths.
|
| + sa.stream = a.makeStagingPaths(path, "logstream.entries", uid)
|
| + sa.index = a.makeStagingPaths(path, "logstream.index", uid)
|
| + sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid)
|
| + return &sa, nil
|
| +}
|
|
|
| - indexO, err = t.newGSObject(c, path, "logstream.index")
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create index object.")
|
| - return
|
| +// makeStagingPaths returns a stagingPaths instance for the given path and
|
| +// file name. It incorporates a unique ID into the staging name to differentiate
|
| +// it from other staging paths for the same path/name.
|
| +func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) stagingPaths {
|
| + return stagingPaths{
|
| + staged: a.GSStagingBase.Concat(string(path), uid, name),
|
| + final: a.GSBase.Concat(string(path), name),
|
| }
|
| +}
|
|
|
| - dataO, err = t.newGSObject(c, path, fmt.Sprintf("data.%s", bext))
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create data object.")
|
| - return
|
| - }
|
| +type stagedArchival struct {
|
| + *Archivist
|
| +
|
| + path types.StreamPath
|
| + desc logpb.LogStreamDescriptor
|
|
|
| - // Load the URLs into our state.
|
| - t.ar.StreamUrl = streamO.url
|
| - t.ar.IndexUrl = indexO.url
|
| - t.ar.DataUrl = dataO.url
|
| + stream stagingPaths
|
| + streamSize int64
|
| +
|
| + index stagingPaths
|
| + indexSize int64
|
| +
|
| + data stagingPaths
|
| + dataSize int64
|
| +
|
| + finalized bool
|
| + terminalIndex int64
|
| + logEntryCount int64
|
| +}
|
|
|
| +// stage executes the archival process, archiving to the staged storage paths.
|
| +//
|
| +// If stage fails, it may return a transient error.
|
| +func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) {
|
| log.Fields{
|
| - "streamURL": t.ar.StreamUrl,
|
| - "indexURL": t.ar.IndexUrl,
|
| - "dataURL": t.ar.DataUrl,
|
| - }.Infof(c, "Archiving log stream...")
|
| -
|
| - // We want to try and delete any GS objects that were created during a failed
|
| - // archival attempt.
|
| - deleteOnFail := func(o *gsObject) {
|
| - if o == nil || err == nil {
|
| - return
|
| + "streamURL": sa.stream.staged,
|
| + "indexURL": sa.index.staged,
|
| + "dataURL": sa.data.staged,
|
| + }.Debugf(c, "Staging log stream...")
|
| +
|
| + // Group any transient errors that occur during cleanup. If we aren't
|
| + // returning a non-transient error, return a transient "terr".
|
| + var terr errors.MultiError
|
| + defer func() {
|
| + if err == nil && len(terr) > 0 {
|
| + err = errors.WrapTransient(terr)
|
| + }
|
| + }()
|
| +
|
| + // Close our writers on exit. If any of them fail to close, mark the archival
|
| + // as a transient failure.
|
| + closeWriter := func(closer io.Closer, path gs.Path) {
|
| + // Close the Writer. If this results in an error, append it to our transient
|
| + // error MultiError.
|
| + if ierr := closer.Close(); ierr != nil {
|
| + terr = append(terr, ierr)
|
| }
|
| - if ierr := o.delete(); ierr != nil {
|
| +
|
| + // If we have an archival error, also delete the path associated with this
|
| + // stream. This is a non-fatal failure, since we've already hit a fatal
|
| + // one.
|
| + if err != nil || len(terr) > 0 {
|
| + if ierr := sa.GSClient.Delete(path); ierr != nil {
|
| + log.Fields{
|
| + log.ErrorKey: ierr,
|
| + "path": path,
|
| + }.Warningf(c, "Failed to delete stream on error.")
|
| + }
|
| + }
|
| + }
|
| +
|
| + // createWriter is a shorthand function for creating a writer to a path and
|
| + // reporting an error if it failed.
|
| + createWriter := func(p gs.Path) (gs.Writer, error) {
|
| + w, ierr := sa.GSClient.NewWriter(p)
|
| + if ierr != nil {
|
| log.Fields{
|
| log.ErrorKey: ierr,
|
| - "url": o.url,
|
| - }.Warningf(c, "Failed to clean-up GS object on failure.")
|
| + "path": p,
|
| + }.Errorf(c, "Failed to create writer.")
|
| + return nil, ierr
|
| }
|
| + return w, nil
|
| }
|
| - defer deleteOnFail(streamO)
|
| - defer deleteOnFail(indexO)
|
| - defer deleteOnFail(dataO)
|
| -
|
| - // Close our GS object managers on exit. If any of them fail to close, marh
|
| - // the archival as a failure.
|
| - closeOM := func(o *gsObject) {
|
| - if o == nil {
|
| - return
|
| - }
|
| - if ierr := o.Close(); ierr != nil {
|
| - err = ierr
|
| - }
|
| +
|
| + var streamWriter, indexWriter, dataWriter gs.Writer
|
| + if streamWriter, err = createWriter(sa.stream.staged); err != nil {
|
| + return
|
| + }
|
| + defer closeWriter(streamWriter, sa.stream.staged)
|
| +
|
| + if indexWriter, err = createWriter(sa.index.staged); err != nil {
|
| + return err
|
| + }
|
| + defer closeWriter(indexWriter, sa.index.staged)
|
| +
|
| + if dataWriter, err = createWriter(sa.data.staged); err != nil {
|
| + return err
|
| }
|
| - defer closeOM(streamO)
|
| - defer closeOM(indexO)
|
| - defer closeOM(dataO)
|
| + defer closeWriter(dataWriter, sa.data.staged)
|
|
|
| // Read our log entries from intermediate storage.
|
| ss := storageSource{
|
| Context: c,
|
| - st: t.Storage,
|
| - path: types.StreamPath(t.Path),
|
| - contiguous: t.Complete,
|
| - terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex),
|
| + st: sa.Storage,
|
| + path: sa.path,
|
| + contiguous: complete,
|
| + terminalIndex: types.MessageIndex(sa.terminalIndex),
|
| lastIndex: -1,
|
| }
|
|
|
| m := archive.Manifest{
|
| - Desc: t.desc,
|
| + Desc: &sa.desc,
|
| Source: &ss,
|
| - LogWriter: streamO,
|
| - IndexWriter: indexO,
|
| - DataWriter: dataO,
|
| - StreamIndexRange: t.StreamIndexRange,
|
| - PrefixIndexRange: t.PrefixIndexRange,
|
| - ByteRange: t.ByteRange,
|
| + LogWriter: streamWriter,
|
| + IndexWriter: indexWriter,
|
| + DataWriter: dataWriter,
|
| + StreamIndexRange: sa.StreamIndexRange,
|
| + PrefixIndexRange: sa.PrefixIndexRange,
|
| + ByteRange: sa.ByteRange,
|
|
|
| Logger: log.Get(c),
|
| }
|
| - err = archive.Archive(m)
|
| - if err != nil {
|
| + if err = archive.Archive(m); err != nil {
|
| log.WithError(err).Errorf(c, "Failed to archive log stream.")
|
| return
|
| }
|
|
|
| - t.ar.TerminalIndex = int64(ss.lastIndex)
|
| - if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex {
|
| - // Fail, if we were requested to archive only the complete log.
|
| - if t.Complete {
|
| - log.Fields{
|
| - "terminalIndex": tidx,
|
| - "lastIndex": t.ar.TerminalIndex,
|
| - }.Errorf(c, "Log stream archival stopped prior to terminal index.")
|
| - return errors.New("stream finished short of terminal index")
|
| - }
|
| + if tidx := sa.terminalIndex; tidx != int64(ss.lastIndex) {
|
| + // Fail if we were requested to archive only the complete log. We consider
|
| + // this a transient error with the expectation that the missing entries will
|
| + // show up in future retries.
|
| + switch {
|
| + case complete && ss.hasMissingEntries:
|
| + log.Errorf(c, "Log stream has missing entries, but completeness is required.")
|
| + err = errors.WrapTransient(errors.New("stream has missing entries"))
|
| + return
|
|
|
| - if t.ar.TerminalIndex < 0 {
|
| + case ss.logEntryCount == 0:
|
| // If our last log index was <0, then no logs were archived.
|
| log.Warningf(c, "No log entries were archived.")
|
| - } else {
|
| +
|
| + default:
|
| // Update our terminal index.
|
| log.Fields{
|
| - "from": tidx,
|
| - "to": t.ar.TerminalIndex,
|
| - }.Infof(c, "Updated log stream terminal index.")
|
| + "terminalIndex": ss.lastIndex,
|
| + "logEntryCount": ss.logEntryCount,
|
| + "hasMissingEntries": ss.hasMissingEntries,
|
| + }.Debugf(c, "Finished archiving log stream.")
|
| }
|
| }
|
|
|
| // Update our state with archival results.
|
| - t.ar.Path = t.Path
|
| - t.ar.StreamSize = streamO.Count()
|
| - t.ar.IndexSize = indexO.Count()
|
| - t.ar.DataSize = dataO.Count()
|
| - t.ar.Complete = !ss.hasMissingEntries
|
| + sa.terminalIndex = int64(ss.lastIndex)
|
| + sa.logEntryCount = ss.logEntryCount
|
| + sa.stream.count = streamWriter.Count()
|
| + sa.index.count = indexWriter.Count()
|
| + sa.data.count = dataWriter.Count()
|
| return
|
| }
|
|
|
| -func (t *archiveTask) newGSObject(c context.Context, path string, name string) (*gsObject, error) {
|
| - p := t.GSBase.Concat(path, name)
|
| - o := gsObject{
|
| - gs: t.GSClient,
|
| - bucket: p.Bucket(),
|
| - path: p.Filename(),
|
| - }
|
| -
|
| - // Build our GS URL. Note that since buildGSPath joins with "/", the initial
|
| - // token, "gs:/", will become "gs://".
|
| - o.url = string(p)
|
| -
|
| - var err error
|
| - o.Writer, err = t.GSClient.NewWriter(o.bucket, o.path)
|
| - if err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "url": o.url,
|
| - }.Errorf(c, "Failed to create Writer.")
|
| - return nil, err
|
| - }
|
| -
|
| - // Delete any existing object at this path.
|
| - if err := o.delete(); err != nil {
|
| - closeErr := o.Close()
|
| -
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "closeErr": closeErr,
|
| - "url": o.url,
|
| - }.Errorf(c, "Could not delete object during creation.")
|
| - return nil, err
|
| - }
|
| - return &o, nil
|
| -}
|
| -
|
| -// gsObjectManger wraps a gsObject instance with metadata.
|
| -type gsObject struct {
|
| - gs.Writer
|
| -
|
| - // gs is the Client instance.
|
| - gs gs.Client
|
| - // bucket is the name of the object's bucket.
|
| - bucket string
|
| - // path is the bucket-relative path of the object.
|
| - path string
|
| - // url is the Google Storage URL (gs://) of this object.
|
| - url string
|
| +type stagingPaths struct {
|
| + staged gs.Path
|
| + final gs.Path
|
| + count int64
|
| }
|
|
|
| -func (o *gsObject) delete() error {
|
| - return o.gs.Delete(o.bucket, o.path)
|
| +func (d *stagingPaths) clearStaged() {
|
| + d.staged = ""
|
| }
|
|
|
| -// storageSource is an archive.LogEntrySource that pulls log entries from
|
| -// intermediate storage via its storage.Storage instance.
|
| -type storageSource struct {
|
| - context.Context
|
| -
|
| - st storage.Storage // the storage instance to read from
|
| - path types.StreamPath // the path of the log stream
|
| - contiguous bool // if true, enforce contiguous entries
|
| - terminalIndex types.MessageIndex // if >= 0, discard logs beyond this
|
| -
|
| - buf []*logpb.LogEntry
|
| - lastIndex types.MessageIndex
|
| - hasMissingEntries bool // true if some log entries were missing.
|
| -}
|
| -
|
| -func (s *storageSource) bufferEntries(start types.MessageIndex) error {
|
| - bytes := 0
|
| -
|
| - req := storage.GetRequest{
|
| - Path: s.path,
|
| - Index: start,
|
| - }
|
| - return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool {
|
| - le := logpb.LogEntry{}
|
| - if err := proto.Unmarshal(d, &le); err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "streamIndex": idx,
|
| - }.Errorf(s, "Failed to unmarshal LogEntry.")
|
| - return false
|
| - }
|
| - s.buf = append(s.buf, &le)
|
| -
|
| - // Stop loading if we've reached or exceeded our buffer size.
|
| - bytes += len(d)
|
| - return bytes < storageBufferSize
|
| - })
|
| -}
|
| +func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logdog.ArchiveStreamRequest) error {
|
| + err := parallel.FanOutIn(func(taskC chan<- func() error) {
|
| + for _, d := range sa.getStagingPaths() {
|
| + d := d
|
|
|
| -func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
|
| - if len(s.buf) == 0 {
|
| - s.buf = s.buf[:0]
|
| - if err := s.bufferEntries(s.lastIndex + 1); err != nil {
|
| - if err == storage.ErrDoesNotExist {
|
| - log.Warningf(s, "Archive target stream does not exist in intermediate storage.")
|
| - return nil, archive.ErrEndOfStream
|
| + // Don't copy zero-sized streams.
|
| + if d.count == 0 {
|
| + continue
|
| }
|
|
|
| - log.WithError(err).Errorf(s, "Failed to retrieve log stream from storage.")
|
| - return nil, err
|
| + taskC <- func() error {
|
| + if err := client.Rename(d.staged, d.final); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "stagedPath": d.staged,
|
| + "finalPath": d.final,
|
| + }.Errorf(c, "Failed to rename GS object.")
|
| + return err
|
| + }
|
| +
|
| + // Clear the staged value to indicate that it no longer exists.
|
| + d.clearStaged()
|
| + return nil
|
| + }
|
| }
|
| + })
|
| + if err != nil {
|
| + return err
|
| }
|
|
|
| - if len(s.buf) == 0 {
|
| - log.Fields{
|
| - "lastIndex": s.lastIndex,
|
| - }.Debugf(s, "Encountered end of stream.")
|
| - return nil, archive.ErrEndOfStream
|
| - }
|
| -
|
| - var le *logpb.LogEntry
|
| - le, s.buf = s.buf[0], s.buf[1:]
|
| + ar.TerminalIndex = sa.terminalIndex
|
| + ar.LogEntryCount = sa.logEntryCount
|
| + ar.StreamUrl = string(sa.stream.final)
|
| + ar.StreamSize = sa.stream.count
|
| + ar.IndexUrl = string(sa.index.final)
|
| + ar.IndexSize = sa.index.count
|
| + ar.DataUrl = string(sa.data.final)
|
| + ar.DataSize = sa.data.count
|
| + return nil
|
| +}
|
|
|
| - // If we're enforcing a contiguous log stream, error if this LogEntry is not
|
| - // contiguous.
|
| - sidx := types.MessageIndex(le.StreamIndex)
|
| - nidx := (s.lastIndex + 1)
|
| - if sidx != nidx {
|
| - s.hasMissingEntries = true
|
| +func (sa *stagedArchival) cleanup(c context.Context) {
|
| + for _, d := range sa.getStagingPaths() {
|
| + if d.staged == "" {
|
| + continue
|
| + }
|
|
|
| - if s.contiguous {
|
| + if err := sa.GSClient.Delete(d.staged); err != nil {
|
| log.Fields{
|
| - "index": sidx,
|
| - "nextIndex": nidx,
|
| - }.Errorf(s, "Non-contiguous log stream while enforcing.")
|
| - return nil, errors.New("non-contiguous log stream")
|
| + log.ErrorKey: err,
|
| + "path": d.staged,
|
| + }.Warningf(c, "Failed to clean up staged path.")
|
| }
|
| - }
|
|
|
| - // If we're enforcing a maximum terminal index, return end of stream if this
|
| - // LogEntry exceeds that index.
|
| - if s.terminalIndex >= 0 && sidx > s.terminalIndex {
|
| - log.Fields{
|
| - "index": sidx,
|
| - "terminalIndex": s.terminalIndex,
|
| - }.Warningf(s, "Discarding log entries beyond expected terminal index.")
|
| - return nil, archive.ErrEndOfStream
|
| + d.clearStaged()
|
| }
|
| +}
|
|
|
| - s.lastIndex = sidx
|
| - return le, nil
|
| +func (sa *stagedArchival) getStagingPaths() []*stagingPaths {
|
| + return []*stagingPaths{
|
| + &sa.stream,
|
| + &sa.index,
|
| + &sa.data,
|
| + }
|
| }
|
|
|