| Index: server/cmd/logdog_archivist/main.go
|
| diff --git a/server/cmd/logdog_archivist/main.go b/server/cmd/logdog_archivist/main.go
|
| index 1ecd7d33563c3957a5cff5ef8fd0f1e70f92f98c..d00cb5cfc1dc47348130980ed4e7a413ae882046 100644
|
| --- a/server/cmd/logdog_archivist/main.go
|
| +++ b/server/cmd/logdog_archivist/main.go
|
| @@ -9,11 +9,13 @@ import (
|
|
|
| "github.com/luci/luci-go/common/auth"
|
| "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| gcps "github.com/luci/luci-go/common/gcloud/pubsub"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/parallel"
|
| + "github.com/luci/luci-go/common/proto/logdog/svcconfig"
|
| "github.com/luci/luci-go/common/tsmon/distribution"
|
| "github.com/luci/luci-go/common/tsmon/field"
|
| "github.com/luci/luci-go/common/tsmon/metric"
|
| @@ -59,26 +61,9 @@ func (a *application) runArchivist(c context.Context) error {
|
| fallthrough
|
|
|
| case acfg == nil:
|
| - return errors.New("missing Archivist configuration")
|
| + return errors.New("missing required config: archivist")
|
| case acfg.GsStagingBucket == "":
|
| - return errors.New("missing archive staging GS bucket")
|
| - }
|
| -
|
| - // Construct and validate our GS bases.
|
| - gsBase := gs.Path(acfg.GsStagingBucket)
|
| - if gsBase.Bucket() == "" {
|
| - log.Fields{
|
| - "value": gsBase,
|
| - }.Errorf(c, "Google Storage base does not include a bucket name.")
|
| - return errors.New("invalid Google Storage base")
|
| - }
|
| -
|
| - gsStagingBase := gs.Path(acfg.GsStagingBucket).Concat("staging")
|
| - if gsStagingBase.Bucket() == "" {
|
| - log.Fields{
|
| - "value": gsStagingBase,
|
| - }.Errorf(c, "Google Storage staging base does not include a bucket name.")
|
| - return errors.New("invalid Google Storage staging base")
|
| + return errors.New("missing required config: archivist.gs_staging_bucket")
|
| }
|
|
|
| // Initialize Pub/Sub client.
|
| @@ -137,17 +122,10 @@ func (a *application) runArchivist(c context.Context) error {
|
| defer gsClient.Close()
|
|
|
| ar := archivist.Archivist{
|
| - Service: a.Coordinator(),
|
| - Storage: st,
|
| - GSClient: gsClient,
|
| -
|
| - GSBase: gsBase,
|
| - GSStagingBase: gsStagingBase,
|
| - }
|
| - if ic := acfg.ArchiveIndexConfig; ic != nil {
|
| - ar.StreamIndexRange = int(ic.StreamRange)
|
| - ar.PrefixIndexRange = int(ic.PrefixRange)
|
| - ar.ByteRange = int(ic.ByteRange)
|
| + Service: a.Coordinator(),
|
| + SettingsLoader: a.GetSettingsLoader(acfg),
|
| + Storage: st,
|
| + GSClient: gsClient,
|
| }
|
|
|
| tasks := int(acfg.Tasks)
|
| @@ -238,6 +216,60 @@ func (a *application) runArchivist(c context.Context) error {
|
| return nil
|
| }
|
|
|
| +// GetSettingsLoader is an archivist.SettingsLoader implementation that merges
|
| +// global and project-specific settings.
|
| +//
|
| +// The resulting settings object will be verified by the Archivist.
|
| +func (a *application) GetSettingsLoader(acfg *svcconfig.Archivist) archivist.SettingsLoader {
|
| + serviceID := a.ServiceID()
|
| +
|
| + return func(c context.Context, proj config.ProjectName) (*archivist.Settings, error) {
|
| + // Fold in our project-specific configuration, if valid.
|
| + pcfg, err := a.ProjectConfig(c, proj)
|
| + if err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "project": proj,
|
| + }.Errorf(c, "Failed to fetch project configuration.")
|
| + return nil, err
|
| + }
|
| +
|
| + indexParam := func(get func(ic *svcconfig.ArchiveIndexConfig) int32) int {
|
| + if ic := pcfg.ArchiveIndexConfig; ic != nil {
|
| + if v := get(ic); v > 0 {
|
| + return int(v)
|
| + }
|
| + }
|
| +
|
| + if ic := acfg.ArchiveIndexConfig; ic != nil {
|
| + if v := get(ic); v > 0 {
|
| + return int(v)
|
| + }
|
| + }
|
| +
|
| + return 0
|
| + }
|
| +
|
| + // Load our base settings.
|
| + //
|
| + // Archival bases are:
|
| + // Staging: gs://<services:gs_staging_bucket>/<project-id>/...
|
| + // Archive: gs://<project:archive_gs_bucket>/<project-id>/...
|
| + st := archivist.Settings{
|
| + GSBase: gs.MakePath(pcfg.ArchiveGsBucket, "").Concat(serviceID),
|
| + GSStagingBase: gs.MakePath(acfg.GsStagingBucket, "").Concat(serviceID),
|
| +
|
| + IndexStreamRange: indexParam(func(ic *svcconfig.ArchiveIndexConfig) int32 { return ic.StreamRange }),
|
| + IndexPrefixRange: indexParam(func(ic *svcconfig.ArchiveIndexConfig) int32 { return ic.PrefixRange }),
|
| + IndexByteRange: indexParam(func(ic *svcconfig.ArchiveIndexConfig) int32 { return ic.ByteRange }),
|
| + AlwaysRender: (acfg.RenderAllStreams || pcfg.RenderAllStreams),
|
| + }
|
| +
|
| + // Fold project settings into loaded ones.
|
| + return &st, nil
|
| + }
|
| +}
|
| +
|
| // Entry point.
|
| func main() {
|
| a := application{
|
|
|