Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(995)

Unified Diff: server/internal/logdog/archivist/archivist.go

Issue 1968063003: LogDog: Use per-project settings for archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-useconfig
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/archivist/archivist.go
diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go
index 7cad37ee68dac5e57ddaee53365b3e23f7a35d7f..09632d8e6d6fdbb514d32813a2aa049b1e30740b 100644
--- a/server/internal/logdog/archivist/archivist.go
+++ b/server/internal/logdog/archivist/archivist.go
@@ -104,6 +104,34 @@ type Task interface {
AssertLease(context.Context) error
}
+// Settings retrieve the settings for an archival.
nodir 2016/05/19 15:56:29 s/retrieve/store SettingsLoader retrieves them; Se
dnj (Google) 2016/05/19 16:34:51 Done.
+type Settings struct {
+ // GSBase is the base Google Storage path. This includes the bucket name
+ // and any associated path.
+ GSBase gs.Path
+ // GSStagingBase is the base Google Storage path for archive staging. This
+ // includes the bucket name and any associated path.
+ GSStagingBase gs.Path
nodir 2016/05/19 15:56:29 rebase?
dnj (Google) 2016/05/19 16:34:51 Acknowledged.
+
+ // AlwaysCreateBinary if true, means that a binary should be archived
nodir 2016/05/19 15:56:29 nit: comma before if?
dnj (Google) 2016/05/19 16:34:51 Done.
+ // regardless of whether a specific binary file extension has been supplied
+ // with the log stream.
+ AlwaysCreateBinary bool
+
+ // IndexStreamRange is the maximum number of stream indexes in between index
+ // entries. See archive.Manifest for more information.
+ IndexStreamRange int
+ // IndexPrefixRange is the maximum number of prefix indexes in between index
+ // entries. See archive.Manifest for more information.
+ IndexPrefixRange int
+ // IndexByteRange is the maximum number of stream data bytes in between index
+ // entries. See archive.Manifest for more information.
+ IndexByteRange int
+}
+
+// SettingsLoader returns archival Settings for a given project.
+type SettingsLoader func(context.Context, config.ProjectName) (*Settings, error)
+
// Archivist is a stateless configuration capable of archiving individual log
// streams.
type Archivist struct {
@@ -111,27 +139,13 @@ type Archivist struct {
// endpoint.
Service logdog.ServicesClient
+ // SettingsLoader loads archival settings for a specific project.
+ SettingsLoader SettingsLoader
+
// Storage is the archival source Storage instance.
Storage storage.Storage
// GSClient is the Google Storage client to for archive generation.
GSClient gs.Client
-
- // GSBase is the base Google Storage path. This includes the bucket name
- // and any associated path.
- GSBase gs.Path
- // GSStagingBase is the base Google Storage path for archive staging. This
- // includes the bucket name and any associated path.
- GSStagingBase gs.Path
-
- // PrefixIndexRange is the maximum number of stream indexes in between index
- // entries. See archive.Manifest for more information.
- StreamIndexRange int
- // PrefixIndexRange is the maximum number of prefix indexes in between index
- // entries. See archive.Manifest for more information.
- PrefixIndexRange int
- // ByteRange is the maximum number of stream data bytes in between index
- // entries. See archive.Manifest for more information.
- ByteRange int
}
// storageBufferSize is the size, in bytes, of the LogEntry buffer that is used
@@ -252,13 +266,23 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error {
return statusErr(errors.New("log stream is within settle delay"))
}
+ // Load archival settings for this project.
+ settings, err := a.loadSettings(c, config.ProjectName(at.Project))
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "project": at.Project,
+ }.Errorf(c, "Failed to load settings for project.")
+ return err
+ }
+
ar := logdog.ArchiveStreamRequest{
Project: at.Project,
Id: at.Id,
}
// Build our staged archival plan. This doesn't actually do any archiving.
- staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), ls, task.UniqueID())
+ staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), settings, ls, task.UniqueID())
if err != nil {
log.WithError(err).Errorf(c, "Failed to create staged archival plan.")
return err
@@ -355,10 +379,41 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error {
return nil
}
+// loadSettings loads and validates archival settings.
+func (a *Archivist) loadSettings(c context.Context, project config.ProjectName) (*Settings, error) {
+ if a.SettingsLoader == nil {
+ panic("no settings loader configured")
nodir 2016/05/19 15:56:29 I am glad to see you didn't ban panics altogether
dnj (Google) 2016/05/19 16:34:51 IMO panics have their place. I think explicit erro
+ }
+
+ st, err := a.SettingsLoader(c, project)
+ switch {
+ case err != nil:
+ return nil, err
+
+ case st.GSBase.Bucket() == "":
+ log.Fields{
+ log.ErrorKey: err,
+ "gsBase": st.GSBase,
+ }.Errorf(c, "Invalid storage base.")
+ return nil, errors.New("invalid storage base")
+
+ case st.GSStagingBase.Bucket() == "":
+ log.Fields{
+ log.ErrorKey: err,
+ "gsStagingBase": st.GSStagingBase,
+ }.Errorf(c, "Invalid storage staging base.")
+ return nil, errors.New("invalid storage staging base")
+
+ default:
+ return st, nil
+ }
+}
+
func (a *Archivist) makeStagedArchival(c context.Context, project config.ProjectName,
- ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) {
+ st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) {
sa := stagedArchival{
Archivist: a,
+ Settings: st,
project: project,
terminalIndex: types.MessageIndex(ls.State.TerminalIndex),
@@ -389,6 +444,7 @@ func (a *Archivist) makeStagedArchival(c context.Context, project config.Project
type stagedArchival struct {
*Archivist
+ *Settings
project config.ProjectName
path types.StreamPath
@@ -407,15 +463,18 @@ type stagedArchival struct {
// file name. It incorporates a unique ID into the staging name to differentiate
// it from other staging paths for the same path/name.
func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths {
- // TODO(dnj): This won't be necessary when empty project is invalid.
- project := string(sa.project)
- if project == "" {
- project = "_"
+ proj := string(sa.project)
+ // TODO (dnj): When empty projects are disallowed, remove this.
+ if proj == "" {
+ proj = "_"
}
+ // Either of these paths may be shared between projects. To enforce
+ // an absence of conflicts, we will insert the project name as part of the
+ // path.
return stagingPaths{
- staged: sa.GSStagingBase.Concat(project, string(sa.path), uid, name),
- final: sa.GSBase.Concat(project, string(sa.path), name),
+ staged: sa.GSStagingBase.Concat(proj, string(sa.path), uid, name),
+ final: sa.GSBase.Concat(proj, string(sa.path), name),
}
}
@@ -546,9 +605,9 @@ func (sa *stagedArchival) stage(c context.Context) (err error) {
LogWriter: streamWriter,
IndexWriter: indexWriter,
DataWriter: dataWriter,
- StreamIndexRange: sa.StreamIndexRange,
- PrefixIndexRange: sa.PrefixIndexRange,
- ByteRange: sa.ByteRange,
+ StreamIndexRange: sa.IndexStreamRange,
+ PrefixIndexRange: sa.IndexPrefixRange,
+ ByteRange: sa.IndexByteRange,
Logger: log.Get(c),
}

Powered by Google App Engine
This is Rietveld 408576698