| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "time" | 8 "time" |
| 9 | 9 |
| 10 "github.com/luci/luci-go/common/auth" | 10 "github.com/luci/luci-go/common/auth" |
| 11 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/config" |
| 12 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
| 13 "github.com/luci/luci-go/common/gcloud/gs" | 14 "github.com/luci/luci-go/common/gcloud/gs" |
| 14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" | 15 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 15 log "github.com/luci/luci-go/common/logging" | 16 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/parallel" | 17 "github.com/luci/luci-go/common/parallel" |
| 18 "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
| 17 "github.com/luci/luci-go/common/tsmon/distribution" | 19 "github.com/luci/luci-go/common/tsmon/distribution" |
| 18 "github.com/luci/luci-go/common/tsmon/field" | 20 "github.com/luci/luci-go/common/tsmon/field" |
| 19 "github.com/luci/luci-go/common/tsmon/metric" | 21 "github.com/luci/luci-go/common/tsmon/metric" |
| 20 "github.com/luci/luci-go/server/internal/logdog/archivist" | 22 "github.com/luci/luci-go/server/internal/logdog/archivist" |
| 21 "github.com/luci/luci-go/server/internal/logdog/service" | 23 "github.com/luci/luci-go/server/internal/logdog/service" |
| 22 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
| 23 "google.golang.org/cloud" | 25 "google.golang.org/cloud" |
| 24 "google.golang.org/cloud/pubsub" | 26 "google.golang.org/cloud/pubsub" |
| 25 ) | 27 ) |
| 26 | 28 |
| (...skipping 25 matching lines...) Expand all Loading... |
| 52 // run is the main execution function. | 54 // run is the main execution function. |
| 53 func (a *application) runArchivist(c context.Context) error { | 55 func (a *application) runArchivist(c context.Context) error { |
| 54 cfg := a.Config() | 56 cfg := a.Config() |
| 55 | 57 |
| 56 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist() | 58 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist() |
| 57 switch { | 59 switch { |
| 58 case coordCfg == nil: | 60 case coordCfg == nil: |
| 59 fallthrough | 61 fallthrough |
| 60 | 62 |
| 61 case acfg == nil: | 63 case acfg == nil: |
| 62 » » return errors.New("missing Archivist configuration") | 64 » » return errors.New("missing required config: archivist") |
| 63 case acfg.GsStagingBucket == "": | 65 case acfg.GsStagingBucket == "": |
| 64 » » return errors.New("missing archive staging GS bucket") | 66 » » return errors.New("missing required config: archivist.gs_staging
_bucket") |
| 65 » } | |
| 66 | |
| 67 » // Construct and validate our GS bases. | |
| 68 » gsBase := gs.Path(acfg.GsStagingBucket) | |
| 69 » if gsBase.Bucket() == "" { | |
| 70 » » log.Fields{ | |
| 71 » » » "value": gsBase, | |
| 72 » » }.Errorf(c, "Google Storage base does not include a bucket name.
") | |
| 73 » » return errors.New("invalid Google Storage base") | |
| 74 » } | |
| 75 | |
| 76 » gsStagingBase := gs.Path(acfg.GsStagingBucket).Concat("staging") | |
| 77 » if gsStagingBase.Bucket() == "" { | |
| 78 » » log.Fields{ | |
| 79 » » » "value": gsStagingBase, | |
| 80 » » }.Errorf(c, "Google Storage staging base does not include a buck
et name.") | |
| 81 » » return errors.New("invalid Google Storage staging base") | |
| 82 } | 67 } |
| 83 | 68 |
| 84 // Initialize Pub/Sub client. | 69 // Initialize Pub/Sub client. |
| 85 // | 70 // |
| 86 // We will initialize both an authenticated Client instance and an | 71 // We will initialize both an authenticated Client instance and an |
| 87 // authenticated Context, since we need the latter for raw ACK deadline | 72 // authenticated Context, since we need the latter for raw ACK deadline |
| 88 // updates. | 73 // updates. |
| 89 taskSub := gcps.Subscription(acfg.Subscription) | 74 taskSub := gcps.Subscription(acfg.Subscription) |
| 90 if err := taskSub.Validate(); err != nil { | 75 if err := taskSub.Validate(); err != nil { |
| 91 log.Fields{ | 76 log.Fields{ |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 130 | 115 |
| 131 // Initialize our Google Storage client. | 116 // Initialize our Google Storage client. |
| 132 gsClient, err := a.GSClient(c) | 117 gsClient, err := a.GSClient(c) |
| 133 if err != nil { | 118 if err != nil { |
| 134 log.WithError(err).Errorf(c, "Failed to get Google Storage clien
t.") | 119 log.WithError(err).Errorf(c, "Failed to get Google Storage clien
t.") |
| 135 return err | 120 return err |
| 136 } | 121 } |
| 137 defer gsClient.Close() | 122 defer gsClient.Close() |
| 138 | 123 |
| 139 ar := archivist.Archivist{ | 124 ar := archivist.Archivist{ |
| 140 » » Service: a.Coordinator(), | 125 » » Service: a.Coordinator(), |
| 141 » » Storage: st, | 126 » » SettingsLoader: a.GetSettingsLoader(acfg), |
| 142 » » GSClient: gsClient, | 127 » » Storage: st, |
| 143 | 128 » » GSClient: gsClient, |
| 144 » » GSBase: gsBase, | |
| 145 » » GSStagingBase: gsStagingBase, | |
| 146 » } | |
| 147 » if ic := acfg.ArchiveIndexConfig; ic != nil { | |
| 148 » » ar.StreamIndexRange = int(ic.StreamRange) | |
| 149 » » ar.PrefixIndexRange = int(ic.PrefixRange) | |
| 150 » » ar.ByteRange = int(ic.ByteRange) | |
| 151 } | 129 } |
| 152 | 130 |
| 153 tasks := int(acfg.Tasks) | 131 tasks := int(acfg.Tasks) |
| 154 if tasks <= 0 { | 132 if tasks <= 0 { |
| 155 tasks = 1 | 133 tasks = 1 |
| 156 } | 134 } |
| 157 | 135 |
| 158 log.Fields{ | 136 log.Fields{ |
| 159 "subscription": taskSub, | 137 "subscription": taskSub, |
| 160 "tasks": tasks, | 138 "tasks": tasks, |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 231 clock.Sleep(c, subscriptionErrorDelay) | 209 clock.Sleep(c, subscriptionErrorDelay) |
| 232 continue | 210 continue |
| 233 } | 211 } |
| 234 } | 212 } |
| 235 })) | 213 })) |
| 236 | 214 |
| 237 log.Debugf(c, "Archivist finished.") | 215 log.Debugf(c, "Archivist finished.") |
| 238 return nil | 216 return nil |
| 239 } | 217 } |
| 240 | 218 |
| 219 // GetSettingsLoader is an archivist.SettingsLoader implementation that merges |
| 220 // global and project-specific settings. |
| 221 // |
| 222 // The resulting settings object will be verified by the Archivist. |
| 223 func (a *application) GetSettingsLoader(acfg *svcconfig.Archivist) archivist.Set
tingsLoader { |
| 224 serviceID := a.ServiceID() |
| 225 |
| 226 return func(c context.Context, proj config.ProjectName) (*archivist.Sett
ings, error) { |
| 227 // Fold in our project-specific configuration, if valid. |
| 228 pcfg, err := a.ProjectConfig(c, proj) |
| 229 if err != nil { |
| 230 log.Fields{ |
| 231 log.ErrorKey: err, |
| 232 "project": proj, |
| 233 }.Errorf(c, "Failed to fetch project configuration.") |
| 234 return nil, err |
| 235 } |
| 236 |
| 237 indexParam := func(get func(ic *svcconfig.ArchiveIndexConfig) in
t32) int { |
| 238 if ic := pcfg.ArchiveIndexConfig; ic != nil { |
| 239 if v := get(ic); v > 0 { |
| 240 return int(v) |
| 241 } |
| 242 } |
| 243 |
| 244 if ic := acfg.ArchiveIndexConfig; ic != nil { |
| 245 if v := get(ic); v > 0 { |
| 246 return int(v) |
| 247 } |
| 248 } |
| 249 |
| 250 return 0 |
| 251 } |
| 252 |
| 253 // Load our base settings. |
| 254 // |
| 255 // Archival bases are: |
| 256 // Staging: gs://<services:gs_staging_bucket>/<project-id>/... |
| 257 // Archive: gs://<project:archive_gs_bucket>/<project-id>/... |
| 258 st := archivist.Settings{ |
| 259 GSBase: gs.MakePath(pcfg.ArchiveGsBucket, "").Con
cat(serviceID), |
| 260 GSStagingBase: gs.MakePath(acfg.GsStagingBucket, "").Con
cat(serviceID), |
| 261 |
| 262 IndexStreamRange: indexParam(func(ic *svcconfig.ArchiveI
ndexConfig) int32 { return ic.StreamRange }), |
| 263 IndexPrefixRange: indexParam(func(ic *svcconfig.ArchiveI
ndexConfig) int32 { return ic.PrefixRange }), |
| 264 IndexByteRange: indexParam(func(ic *svcconfig.ArchiveI
ndexConfig) int32 { return ic.ByteRange }), |
| 265 AlwaysRender: (acfg.RenderAllStreams || pcfg.RenderA
llStreams), |
| 266 } |
| 267 |
| 268 // Fold project settings into loaded ones. |
| 269 return &st, nil |
| 270 } |
| 271 } |
| 272 |
| 241 // Entry point. | 273 // Entry point. |
| 242 func main() { | 274 func main() { |
| 243 a := application{ | 275 a := application{ |
| 244 Service: service.Service{ | 276 Service: service.Service{ |
| 245 Name: "archivist", | 277 Name: "archivist", |
| 246 }, | 278 }, |
| 247 } | 279 } |
| 248 a.Run(context.Background(), a.runArchivist) | 280 a.Run(context.Background(), a.runArchivist) |
| 249 } | 281 } |
| OLD | NEW |