Index: server/internal/logdog/archivist/archivist.go |
diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go |
index 7cad37ee68dac5e57ddaee53365b3e23f7a35d7f..09632d8e6d6fdbb514d32813a2aa049b1e30740b 100644 |
--- a/server/internal/logdog/archivist/archivist.go |
+++ b/server/internal/logdog/archivist/archivist.go |
@@ -104,6 +104,34 @@ type Task interface { |
AssertLease(context.Context) error |
} |
+// Settings retrieve the settings for an archival. |
nodir
2016/05/19 15:56:29
s/retrieve/store
SettingsLoader retrieves them; Se
dnj (Google)
2016/05/19 16:34:51
Done.
|
+type Settings struct { |
+ // GSBase is the base Google Storage path. This includes the bucket name |
+ // and any associated path. |
+ GSBase gs.Path |
+ // GSStagingBase is the base Google Storage path for archive staging. This |
+ // includes the bucket name and any associated path. |
+ GSStagingBase gs.Path |
nodir
2016/05/19 15:56:29
rebase?
dnj (Google)
2016/05/19 16:34:51
Acknowledged.
|
+ |
+ // AlwaysCreateBinary if true, means that a binary should be archived |
nodir
2016/05/19 15:56:29
nit: comma before if?
dnj (Google)
2016/05/19 16:34:51
Done.
|
+ // regardless of whether a specific binary file extension has been supplied |
+ // with the log stream. |
+ AlwaysCreateBinary bool |
+ |
+ // IndexStreamRange is the maximum number of stream indexes in between index |
+ // entries. See archive.Manifest for more information. |
+ IndexStreamRange int |
+ // IndexPrefixRange is the maximum number of prefix indexes in between index |
+ // entries. See archive.Manifest for more information. |
+ IndexPrefixRange int |
+ // IndexByteRange is the maximum number of stream data bytes in between index |
+ // entries. See archive.Manifest for more information. |
+ IndexByteRange int |
+} |
+ |
+// SettingsLoader returns archival Settings for a given project. |
+type SettingsLoader func(context.Context, config.ProjectName) (*Settings, error) |
+ |
// Archivist is a stateless configuration capable of archiving individual log |
// streams. |
type Archivist struct { |
@@ -111,27 +139,13 @@ type Archivist struct { |
// endpoint. |
Service logdog.ServicesClient |
+ // SettingsLoader loads archival settings for a specific project. |
+ SettingsLoader SettingsLoader |
+ |
// Storage is the archival source Storage instance. |
Storage storage.Storage |
// GSClient is the Google Storage client to for archive generation. |
GSClient gs.Client |
- |
- // GSBase is the base Google Storage path. This includes the bucket name |
- // and any associated path. |
- GSBase gs.Path |
- // GSStagingBase is the base Google Storage path for archive staging. This |
- // includes the bucket name and any associated path. |
- GSStagingBase gs.Path |
- |
- // PrefixIndexRange is the maximum number of stream indexes in between index |
- // entries. See archive.Manifest for more information. |
- StreamIndexRange int |
- // PrefixIndexRange is the maximum number of prefix indexes in between index |
- // entries. See archive.Manifest for more information. |
- PrefixIndexRange int |
- // ByteRange is the maximum number of stream data bytes in between index |
- // entries. See archive.Manifest for more information. |
- ByteRange int |
} |
// storageBufferSize is the size, in bytes, of the LogEntry buffer that is used |
@@ -252,13 +266,23 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error { |
return statusErr(errors.New("log stream is within settle delay")) |
} |
+ // Load archival settings for this project. |
+ settings, err := a.loadSettings(c, config.ProjectName(at.Project)) |
+ if err != nil { |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ "project": at.Project, |
+ }.Errorf(c, "Failed to load settings for project.") |
+ return err |
+ } |
+ |
ar := logdog.ArchiveStreamRequest{ |
Project: at.Project, |
Id: at.Id, |
} |
// Build our staged archival plan. This doesn't actually do any archiving. |
- staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), ls, task.UniqueID()) |
+ staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), settings, ls, task.UniqueID()) |
if err != nil { |
log.WithError(err).Errorf(c, "Failed to create staged archival plan.") |
return err |
@@ -355,10 +379,41 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error { |
return nil |
} |
+// loadSettings loads and validates archival settings. |
+func (a *Archivist) loadSettings(c context.Context, project config.ProjectName) (*Settings, error) { |
+ if a.SettingsLoader == nil { |
+ panic("no settings loader configured") |
nodir
2016/05/19 15:56:29
I am glad to see you didn't ban panics altogether
dnj (Google)
2016/05/19 16:34:51
IMO panics have their place. I think explicit erro
|
+ } |
+ |
+ st, err := a.SettingsLoader(c, project) |
+ switch { |
+ case err != nil: |
+ return nil, err |
+ |
+ case st.GSBase.Bucket() == "": |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ "gsBase": st.GSBase, |
+ }.Errorf(c, "Invalid storage base.") |
+ return nil, errors.New("invalid storage base") |
+ |
+ case st.GSStagingBase.Bucket() == "": |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ "gsStagingBase": st.GSStagingBase, |
+ }.Errorf(c, "Invalid storage staging base.") |
+ return nil, errors.New("invalid storage staging base") |
+ |
+ default: |
+ return st, nil |
+ } |
+} |
+ |
func (a *Archivist) makeStagedArchival(c context.Context, project config.ProjectName, |
- ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { |
+ st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { |
sa := stagedArchival{ |
Archivist: a, |
+ Settings: st, |
project: project, |
terminalIndex: types.MessageIndex(ls.State.TerminalIndex), |
@@ -389,6 +444,7 @@ func (a *Archivist) makeStagedArchival(c context.Context, project config.Project |
type stagedArchival struct { |
*Archivist |
+ *Settings |
project config.ProjectName |
path types.StreamPath |
@@ -407,15 +463,18 @@ type stagedArchival struct { |
// file name. It incorporates a unique ID into the staging name to differentiate |
// it from other staging paths for the same path/name. |
func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths { |
- // TODO(dnj): This won't be necessary when empty project is invalid. |
- project := string(sa.project) |
- if project == "" { |
- project = "_" |
+ proj := string(sa.project) |
+ // TODO (dnj): When empty projects are disallowed, remove this. |
+ if proj == "" { |
+ proj = "_" |
} |
+ // Either of these paths may be shared between projects. To enforce |
+ // an absence of conflicts, we will insert the project name as part of the |
+ // path. |
return stagingPaths{ |
- staged: sa.GSStagingBase.Concat(project, string(sa.path), uid, name), |
- final: sa.GSBase.Concat(project, string(sa.path), name), |
+ staged: sa.GSStagingBase.Concat(proj, string(sa.path), uid, name), |
+ final: sa.GSBase.Concat(proj, string(sa.path), name), |
} |
} |
@@ -546,9 +605,9 @@ func (sa *stagedArchival) stage(c context.Context) (err error) { |
LogWriter: streamWriter, |
IndexWriter: indexWriter, |
DataWriter: dataWriter, |
- StreamIndexRange: sa.StreamIndexRange, |
- PrefixIndexRange: sa.PrefixIndexRange, |
- ByteRange: sa.ByteRange, |
+ StreamIndexRange: sa.IndexStreamRange, |
+ PrefixIndexRange: sa.IndexPrefixRange, |
+ ByteRange: sa.IndexByteRange, |
Logger: log.Get(c), |
} |