Chromium Code Reviews| Index: server/internal/logdog/archivist/archivist.go |
| diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go |
| index 7cad37ee68dac5e57ddaee53365b3e23f7a35d7f..09632d8e6d6fdbb514d32813a2aa049b1e30740b 100644 |
| --- a/server/internal/logdog/archivist/archivist.go |
| +++ b/server/internal/logdog/archivist/archivist.go |
| @@ -104,6 +104,34 @@ type Task interface { |
| AssertLease(context.Context) error |
| } |
| +// Settings retrieve the settings for an archival. |
|
nodir
2016/05/19 15:56:29
s/retrieve/store
SettingsLoader retrieves them; Se
dnj (Google)
2016/05/19 16:34:51
Done.
|
| +type Settings struct { |
| + // GSBase is the base Google Storage path. This includes the bucket name |
| + // and any associated path. |
| + GSBase gs.Path |
| + // GSStagingBase is the base Google Storage path for archive staging. This |
| + // includes the bucket name and any associated path. |
| + GSStagingBase gs.Path |
|
nodir
2016/05/19 15:56:29
rebase?
dnj (Google)
2016/05/19 16:34:51
Acknowledged.
|
| + |
| + // AlwaysCreateBinary if true, means that a binary should be archived |
|
nodir
2016/05/19 15:56:29
nit: comma before if?
dnj (Google)
2016/05/19 16:34:51
Done.
|
| + // regardless of whether a specific binary file extension has been supplied |
| + // with the log stream. |
| + AlwaysCreateBinary bool |
| + |
| + // IndexStreamRange is the maximum number of stream indexes in between index |
| + // entries. See archive.Manifest for more information. |
| + IndexStreamRange int |
| + // IndexPrefixRange is the maximum number of prefix indexes in between index |
| + // entries. See archive.Manifest for more information. |
| + IndexPrefixRange int |
| + // IndexByteRange is the maximum number of stream data bytes in between index |
| + // entries. See archive.Manifest for more information. |
| + IndexByteRange int |
| +} |
| + |
| +// SettingsLoader returns archival Settings for a given project. |
| +type SettingsLoader func(context.Context, config.ProjectName) (*Settings, error) |
| + |
| // Archivist is a stateless configuration capable of archiving individual log |
| // streams. |
| type Archivist struct { |
| @@ -111,27 +139,13 @@ type Archivist struct { |
| // endpoint. |
| Service logdog.ServicesClient |
| + // SettingsLoader loads archival settings for a specific project. |
| + SettingsLoader SettingsLoader |
| + |
| // Storage is the archival source Storage instance. |
| Storage storage.Storage |
| // GSClient is the Google Storage client to for archive generation. |
| GSClient gs.Client |
| - |
| - // GSBase is the base Google Storage path. This includes the bucket name |
| - // and any associated path. |
| - GSBase gs.Path |
| - // GSStagingBase is the base Google Storage path for archive staging. This |
| - // includes the bucket name and any associated path. |
| - GSStagingBase gs.Path |
| - |
| - // PrefixIndexRange is the maximum number of stream indexes in between index |
| - // entries. See archive.Manifest for more information. |
| - StreamIndexRange int |
| - // PrefixIndexRange is the maximum number of prefix indexes in between index |
| - // entries. See archive.Manifest for more information. |
| - PrefixIndexRange int |
| - // ByteRange is the maximum number of stream data bytes in between index |
| - // entries. See archive.Manifest for more information. |
| - ByteRange int |
| } |
| // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used |
| @@ -252,13 +266,23 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error { |
| return statusErr(errors.New("log stream is within settle delay")) |
| } |
| + // Load archival settings for this project. |
| + settings, err := a.loadSettings(c, config.ProjectName(at.Project)) |
| + if err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "project": at.Project, |
| + }.Errorf(c, "Failed to load settings for project.") |
| + return err |
| + } |
| + |
| ar := logdog.ArchiveStreamRequest{ |
| Project: at.Project, |
| Id: at.Id, |
| } |
| // Build our staged archival plan. This doesn't actually do any archiving. |
| - staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), ls, task.UniqueID()) |
| + staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), settings, ls, task.UniqueID()) |
| if err != nil { |
| log.WithError(err).Errorf(c, "Failed to create staged archival plan.") |
| return err |
| @@ -355,10 +379,41 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error { |
| return nil |
| } |
| +// loadSettings loads and validates archival settings. |
| +func (a *Archivist) loadSettings(c context.Context, project config.ProjectName) (*Settings, error) { |
| + if a.SettingsLoader == nil { |
| + panic("no settings loader configured") |
|
nodir
2016/05/19 15:56:29
I am glad to see you didn't ban panics altogether
dnj (Google)
2016/05/19 16:34:51
IMO panics have their place. I think explicit erro
|
| + } |
| + |
| + st, err := a.SettingsLoader(c, project) |
| + switch { |
| + case err != nil: |
| + return nil, err |
| + |
| + case st.GSBase.Bucket() == "": |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "gsBase": st.GSBase, |
| + }.Errorf(c, "Invalid storage base.") |
| + return nil, errors.New("invalid storage base") |
| + |
| + case st.GSStagingBase.Bucket() == "": |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "gsStagingBase": st.GSStagingBase, |
| + }.Errorf(c, "Invalid storage staging base.") |
| + return nil, errors.New("invalid storage staging base") |
| + |
| + default: |
| + return st, nil |
| + } |
| +} |
| + |
| func (a *Archivist) makeStagedArchival(c context.Context, project config.ProjectName, |
| - ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { |
| + st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { |
| sa := stagedArchival{ |
| Archivist: a, |
| + Settings: st, |
| project: project, |
| terminalIndex: types.MessageIndex(ls.State.TerminalIndex), |
| @@ -389,6 +444,7 @@ func (a *Archivist) makeStagedArchival(c context.Context, project config.Project |
| type stagedArchival struct { |
| *Archivist |
| + *Settings |
| project config.ProjectName |
| path types.StreamPath |
| @@ -407,15 +463,18 @@ type stagedArchival struct { |
| // file name. It incorporates a unique ID into the staging name to differentiate |
| // it from other staging paths for the same path/name. |
| func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths { |
| - // TODO(dnj): This won't be necessary when empty project is invalid. |
| - project := string(sa.project) |
| - if project == "" { |
| - project = "_" |
| + proj := string(sa.project) |
| + // TODO (dnj): When empty projects are disallowed, remove this. |
| + if proj == "" { |
| + proj = "_" |
| } |
| + // Either of these paths may be shared between projects. To enforce |
| + // an absence of conflicts, we will insert the project name as part of the |
| + // path. |
| return stagingPaths{ |
| - staged: sa.GSStagingBase.Concat(project, string(sa.path), uid, name), |
| - final: sa.GSBase.Concat(project, string(sa.path), name), |
| + staged: sa.GSStagingBase.Concat(proj, string(sa.path), uid, name), |
| + final: sa.GSBase.Concat(proj, string(sa.path), name), |
| } |
| } |
| @@ -546,9 +605,9 @@ func (sa *stagedArchival) stage(c context.Context) (err error) { |
| LogWriter: streamWriter, |
| IndexWriter: indexWriter, |
| DataWriter: dataWriter, |
| - StreamIndexRange: sa.StreamIndexRange, |
| - PrefixIndexRange: sa.PrefixIndexRange, |
| - ByteRange: sa.ByteRange, |
| + StreamIndexRange: sa.IndexStreamRange, |
| + PrefixIndexRange: sa.IndexPrefixRange, |
| + ByteRange: sa.IndexByteRange, |
| Logger: log.Get(c), |
| } |