| Index: server/internal/logdog/archivist/archivist.go
|
| diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go
|
| index 7cad37ee68dac5e57ddaee53365b3e23f7a35d7f..5d4909f375af5a96fd42b4d131af8ff9c896880a 100644
|
| --- a/server/internal/logdog/archivist/archivist.go
|
| +++ b/server/internal/logdog/archivist/archivist.go
|
| @@ -104,6 +104,42 @@ type Task interface {
|
| AssertLease(context.Context) error
|
| }
|
|
|
| +// Settings defines the archival parameters for a specific archival operation.
|
| +//
|
| +// In practice, this will be formed from service and project settings.
|
| +type Settings struct {
|
| + // GSBase is the base Google Storage path. This includes the bucket name
|
| + // and any associated path.
|
| + //
|
| + // This must be unique to this archival project. In practice, it will be
|
| + // composed of the project's archival bucket and project ID.
|
| + GSBase gs.Path
|
| + // GSStagingBase is the base Google Storage path for archive staging. This
|
| + // includes the bucket name and any associated path.
|
| + //
|
| + // This must be unique to this archival project. In practice, it will be
|
| + // composed of the project's staging archival bucket and project ID.
|
| + GSStagingBase gs.Path
|
| +
|
| + // AlwaysRender, if true, means that a binary should be archived
|
| + // regardless of whether a specific binary file extension has been supplied
|
| + // with the log stream.
|
| + AlwaysRender bool
|
| +
|
| + // IndexStreamRange is the maximum number of stream indexes in between index
|
| + // entries. See archive.Manifest for more information.
|
| + IndexStreamRange int
|
| + // IndexPrefixRange is the maximum number of prefix indexes in between index
|
| + // entries. See archive.Manifest for more information.
|
| + IndexPrefixRange int
|
| + // IndexByteRange is the maximum number of stream data bytes in between index
|
| + // entries. See archive.Manifest for more information.
|
| + IndexByteRange int
|
| +}
|
| +
|
| +// SettingsLoader returns archival Settings for a given project.
|
| +type SettingsLoader func(context.Context, config.ProjectName) (*Settings, error)
|
| +
|
| // Archivist is a stateless configuration capable of archiving individual log
|
| // streams.
|
| type Archivist struct {
|
| @@ -111,27 +147,13 @@ type Archivist struct {
|
| // endpoint.
|
| Service logdog.ServicesClient
|
|
|
| + // SettingsLoader loads archival settings for a specific project.
|
| + SettingsLoader SettingsLoader
|
| +
|
| // Storage is the archival source Storage instance.
|
| Storage storage.Storage
|
| // GSClient is the Google Storage client to for archive generation.
|
| GSClient gs.Client
|
| -
|
| - // GSBase is the base Google Storage path. This includes the bucket name
|
| - // and any associated path.
|
| - GSBase gs.Path
|
| - // GSStagingBase is the base Google Storage path for archive staging. This
|
| - // includes the bucket name and any associated path.
|
| - GSStagingBase gs.Path
|
| -
|
| - // PrefixIndexRange is the maximum number of stream indexes in between index
|
| - // entries. See archive.Manifest for more information.
|
| - StreamIndexRange int
|
| - // PrefixIndexRange is the maximum number of prefix indexes in between index
|
| - // entries. See archive.Manifest for more information.
|
| - PrefixIndexRange int
|
| - // ByteRange is the maximum number of stream data bytes in between index
|
| - // entries. See archive.Manifest for more information.
|
| - ByteRange int
|
| }
|
|
|
| // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used
|
| @@ -252,13 +274,23 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error {
|
| return statusErr(errors.New("log stream is within settle delay"))
|
| }
|
|
|
| + // Load archival settings for this project.
|
| + settings, err := a.loadSettings(c, config.ProjectName(at.Project))
|
| + if err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "project": at.Project,
|
| + }.Errorf(c, "Failed to load settings for project.")
|
| + return err
|
| + }
|
| +
|
| ar := logdog.ArchiveStreamRequest{
|
| Project: at.Project,
|
| Id: at.Id,
|
| }
|
|
|
| // Build our staged archival plan. This doesn't actually do any archiving.
|
| - staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), ls, task.UniqueID())
|
| + staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), settings, ls, task.UniqueID())
|
| if err != nil {
|
| log.WithError(err).Errorf(c, "Failed to create staged archival plan.")
|
| return err
|
| @@ -355,10 +387,41 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error {
|
| return nil
|
| }
|
|
|
| +// loadSettings loads and validates archival settings.
|
| +func (a *Archivist) loadSettings(c context.Context, project config.ProjectName) (*Settings, error) {
|
| + if a.SettingsLoader == nil {
|
| + panic("no settings loader configured")
|
| + }
|
| +
|
| + st, err := a.SettingsLoader(c, project)
|
| + switch {
|
| + case err != nil:
|
| + return nil, err
|
| +
|
| + case st.GSBase.Bucket() == "":
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "gsBase": st.GSBase,
|
| + }.Errorf(c, "Invalid storage base.")
|
| + return nil, errors.New("invalid storage base")
|
| +
|
| + case st.GSStagingBase.Bucket() == "":
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "gsStagingBase": st.GSStagingBase,
|
| + }.Errorf(c, "Invalid storage staging base.")
|
| + return nil, errors.New("invalid storage staging base")
|
| +
|
| + default:
|
| + return st, nil
|
| + }
|
| +}
|
| +
|
| func (a *Archivist) makeStagedArchival(c context.Context, project config.ProjectName,
|
| - ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) {
|
| + st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) {
|
| sa := stagedArchival{
|
| Archivist: a,
|
| + Settings: st,
|
| project: project,
|
|
|
| terminalIndex: types.MessageIndex(ls.State.TerminalIndex),
|
| @@ -389,6 +452,7 @@ func (a *Archivist) makeStagedArchival(c context.Context, project config.Project
|
|
|
| type stagedArchival struct {
|
| *Archivist
|
| + *Settings
|
|
|
| project config.ProjectName
|
| path types.StreamPath
|
| @@ -407,15 +471,18 @@ type stagedArchival struct {
|
| // file name. It incorporates a unique ID into the staging name to differentiate
|
| // it from other staging paths for the same path/name.
|
| func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths {
|
| - // TODO(dnj): This won't be necessary when empty project is invalid.
|
| - project := string(sa.project)
|
| - if project == "" {
|
| - project = "_"
|
| + proj := string(sa.project)
|
| + // TODO (dnj): When empty projects are disallowed, remove this.
|
| + if proj == "" {
|
| + proj = "_"
|
| }
|
|
|
| + // Either of these paths may be shared between projects. To enforce
|
| + // an absence of conflicts, we will insert the project name as part of the
|
| + // path.
|
| return stagingPaths{
|
| - staged: sa.GSStagingBase.Concat(project, string(sa.path), uid, name),
|
| - final: sa.GSBase.Concat(project, string(sa.path), name),
|
| + staged: sa.GSStagingBase.Concat(proj, string(sa.path), uid, name),
|
| + final: sa.GSBase.Concat(proj, string(sa.path), name),
|
| }
|
| }
|
|
|
| @@ -546,9 +613,9 @@ func (sa *stagedArchival) stage(c context.Context) (err error) {
|
| LogWriter: streamWriter,
|
| IndexWriter: indexWriter,
|
| DataWriter: dataWriter,
|
| - StreamIndexRange: sa.StreamIndexRange,
|
| - PrefixIndexRange: sa.PrefixIndexRange,
|
| - ByteRange: sa.ByteRange,
|
| + StreamIndexRange: sa.IndexStreamRange,
|
| + PrefixIndexRange: sa.IndexPrefixRange,
|
| + ByteRange: sa.IndexByteRange,
|
|
|
| Logger: log.Get(c),
|
| }
|
|
|