Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(160)

Unified Diff: server/cmd/logdog_archivist/main.go

Issue 1968063003: LogDog: Use per-project settings for archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-useconfig
Patch Set: Updated patchset dependency Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/cmd/logdog_archivist/main.go
diff --git a/server/cmd/logdog_archivist/main.go b/server/cmd/logdog_archivist/main.go
index 1ecd7d33563c3957a5cff5ef8fd0f1e70f92f98c..d00cb5cfc1dc47348130980ed4e7a413ae882046 100644
--- a/server/cmd/logdog_archivist/main.go
+++ b/server/cmd/logdog_archivist/main.go
@@ -9,11 +9,13 @@ import (
"github.com/luci/luci-go/common/auth"
"github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
gcps "github.com/luci/luci-go/common/gcloud/pubsub"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/parallel"
+ "github.com/luci/luci-go/common/proto/logdog/svcconfig"
"github.com/luci/luci-go/common/tsmon/distribution"
"github.com/luci/luci-go/common/tsmon/field"
"github.com/luci/luci-go/common/tsmon/metric"
@@ -59,26 +61,9 @@ func (a *application) runArchivist(c context.Context) error {
fallthrough
case acfg == nil:
- return errors.New("missing Archivist configuration")
+ return errors.New("missing required config: archivist")
case acfg.GsStagingBucket == "":
- return errors.New("missing archive staging GS bucket")
- }
-
- // Construct and validate our GS bases.
- gsBase := gs.Path(acfg.GsStagingBucket)
- if gsBase.Bucket() == "" {
- log.Fields{
- "value": gsBase,
- }.Errorf(c, "Google Storage base does not include a bucket name.")
- return errors.New("invalid Google Storage base")
- }
-
- gsStagingBase := gs.Path(acfg.GsStagingBucket).Concat("staging")
- if gsStagingBase.Bucket() == "" {
- log.Fields{
- "value": gsStagingBase,
- }.Errorf(c, "Google Storage staging base does not include a bucket name.")
- return errors.New("invalid Google Storage staging base")
+ return errors.New("missing required config: archivist.gs_staging_bucket")
}
// Initialize Pub/Sub client.
@@ -137,17 +122,10 @@ func (a *application) runArchivist(c context.Context) error {
defer gsClient.Close()
ar := archivist.Archivist{
- Service: a.Coordinator(),
- Storage: st,
- GSClient: gsClient,
-
- GSBase: gsBase,
- GSStagingBase: gsStagingBase,
- }
- if ic := acfg.ArchiveIndexConfig; ic != nil {
- ar.StreamIndexRange = int(ic.StreamRange)
- ar.PrefixIndexRange = int(ic.PrefixRange)
- ar.ByteRange = int(ic.ByteRange)
+ Service: a.Coordinator(),
+ SettingsLoader: a.GetSettingsLoader(acfg),
+ Storage: st,
+ GSClient: gsClient,
}
tasks := int(acfg.Tasks)
@@ -238,6 +216,60 @@ func (a *application) runArchivist(c context.Context) error {
return nil
}
+// GetSettingsLoader is an archivist.SettingsLoader implementation that merges
+// global and project-specific settings.
+//
+// The resulting settings object will be verified by the Archivist.
+func (a *application) GetSettingsLoader(acfg *svcconfig.Archivist) archivist.SettingsLoader {
+ serviceID := a.ServiceID()
+
+ return func(c context.Context, proj config.ProjectName) (*archivist.Settings, error) {
+ // Fold in our project-specific configuration, if valid.
+ pcfg, err := a.ProjectConfig(c, proj)
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "project": proj,
+ }.Errorf(c, "Failed to fetch project configuration.")
+ return nil, err
+ }
+
+ indexParam := func(get func(ic *svcconfig.ArchiveIndexConfig) int32) int {
+ if ic := pcfg.ArchiveIndexConfig; ic != nil {
+ if v := get(ic); v > 0 {
+ return int(v)
+ }
+ }
+
+ if ic := acfg.ArchiveIndexConfig; ic != nil {
+ if v := get(ic); v > 0 {
+ return int(v)
+ }
+ }
+
+ return 0
+ }
+
+ // Load our base settings.
+ //
+ // Archival bases are:
+ // Staging: gs://<services:gs_staging_bucket>/<project-id>/...
+ // Archive: gs://<project:archive_gs_bucket>/<project-id>/...
+ st := archivist.Settings{
+ GSBase: gs.MakePath(pcfg.ArchiveGsBucket, "").Concat(serviceID),
+ GSStagingBase: gs.MakePath(acfg.GsStagingBucket, "").Concat(serviceID),
+
+ IndexStreamRange: indexParam(func(ic *svcconfig.ArchiveIndexConfig) int32 { return ic.StreamRange }),
+ IndexPrefixRange: indexParam(func(ic *svcconfig.ArchiveIndexConfig) int32 { return ic.PrefixRange }),
+ IndexByteRange: indexParam(func(ic *svcconfig.ArchiveIndexConfig) int32 { return ic.ByteRange }),
+ AlwaysRender: (acfg.RenderAllStreams || pcfg.RenderAllStreams),
+ }
+
+ // Fold project settings into loaded ones.
+ return &st, nil
+ }
+}
+
// Entry point.
func main() {
a := application{
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698