Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "io" | 8 "io" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/luci/luci-go/common/auth" | 11 "github.com/luci/luci-go/common/auth" |
| 12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
| 13 "github.com/luci/luci-go/common/config" | |
| 13 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 14 "github.com/luci/luci-go/common/gcloud/gs" | 15 "github.com/luci/luci-go/common/gcloud/gs" |
| 15 "github.com/luci/luci-go/common/gcloud/pubsub" | 16 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 16 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
| 17 "github.com/luci/luci-go/common/parallel" | 18 "github.com/luci/luci-go/common/parallel" |
| 19 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | |
| 18 "github.com/luci/luci-go/common/tsmon/distribution" | 20 "github.com/luci/luci-go/common/tsmon/distribution" |
| 19 "github.com/luci/luci-go/common/tsmon/field" | 21 "github.com/luci/luci-go/common/tsmon/field" |
| 20 "github.com/luci/luci-go/common/tsmon/metric" | 22 "github.com/luci/luci-go/common/tsmon/metric" |
| 21 "github.com/luci/luci-go/server/internal/logdog/archivist" | 23 "github.com/luci/luci-go/server/internal/logdog/archivist" |
| 22 "github.com/luci/luci-go/server/internal/logdog/service" | 24 "github.com/luci/luci-go/server/internal/logdog/service" |
| 23 "golang.org/x/net/context" | 25 "golang.org/x/net/context" |
| 24 "google.golang.org/cloud" | 26 "google.golang.org/cloud" |
| 25 gcps "google.golang.org/cloud/pubsub" | 27 gcps "google.golang.org/cloud/pubsub" |
| 26 ) | 28 ) |
| 27 | 29 |
| (...skipping 26 matching lines...) Expand all Loading... | |
| 54 func (a *application) runArchivist(c context.Context) error { | 56 func (a *application) runArchivist(c context.Context) error { |
| 55 cfg := a.Config() | 57 cfg := a.Config() |
| 56 | 58 |
| 57 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist() | 59 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist() |
| 58 switch { | 60 switch { |
| 59 case coordCfg == nil: | 61 case coordCfg == nil: |
| 60 fallthrough | 62 fallthrough |
| 61 | 63 |
| 62 case acfg == nil: | 64 case acfg == nil: |
| 63 return errors.New("missing Archivist configuration") | 65 return errors.New("missing Archivist configuration") |
| 64 case acfg.GsBase == "": | |
| 65 return errors.New("missing archive GS bucket") | |
| 66 case acfg.GsStagingBase == "": | 66 case acfg.GsStagingBase == "": |
| 67 return errors.New("missing archive staging GS bucket") | 67 return errors.New("missing archive staging GS bucket") |
| 68 } | 68 } |
| 69 | 69 |
| 70 // Construct and validate our GS bases. | |
| 71 gsBase := gs.Path(acfg.GsBase) | |
| 72 if gsBase.Bucket() == "" { | |
| 73 log.Fields{ | |
| 74 "value": gsBase, | |
| 75 }.Errorf(c, "Google Storage base does not include a bucket name. ") | |
| 76 return errors.New("invalid Google Storage base") | |
| 77 } | |
| 78 | |
| 79 gsStagingBase := gs.Path(acfg.GsStagingBase) | |
| 80 if gsStagingBase.Bucket() == "" { | |
| 81 log.Fields{ | |
| 82 "value": gsStagingBase, | |
| 83 }.Errorf(c, "Google Storage staging base does not include a buck et name.") | |
| 84 return errors.New("invalid Google Storage staging base") | |
| 85 } | |
| 86 | |
| 87 // Initialize Pub/Sub client. | 70 // Initialize Pub/Sub client. |
| 88 // | 71 // |
| 89 // We will initialize both an authenticated Client instance and an | 72 // We will initialize both an authenticated Client instance and an |
| 90 // authenticated Context, since we need the latter for raw ACK deadline | 73 // authenticated Context, since we need the latter for raw ACK deadline |
| 91 // updates. | 74 // updates. |
| 92 taskSub := pubsub.Subscription(acfg.Subscription) | 75 taskSub := pubsub.Subscription(acfg.Subscription) |
| 93 if err := taskSub.Validate(); err != nil { | 76 if err := taskSub.Validate(); err != nil { |
| 94 log.Fields{ | 77 log.Fields{ |
| 95 log.ErrorKey: err, | 78 log.ErrorKey: err, |
| 96 "value": taskSub, | 79 "value": taskSub, |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 136 if err != nil { | 119 if err != nil { |
| 137 log.WithError(err).Errorf(c, "Failed to get Google Storage clien t.") | 120 log.WithError(err).Errorf(c, "Failed to get Google Storage clien t.") |
| 138 return err | 121 return err |
| 139 } | 122 } |
| 140 defer gsClient.Close() | 123 defer gsClient.Close() |
| 141 | 124 |
| 142 ar := archivist.Archivist{ | 125 ar := archivist.Archivist{ |
| 143 Service: a.Coordinator(), | 126 Service: a.Coordinator(), |
| 144 Storage: st, | 127 Storage: st, |
| 145 GSClient: gsClient, | 128 GSClient: gsClient, |
| 146 | |
| 147 GSBase: gsBase, | |
| 148 GSStagingBase: gsStagingBase, | |
| 149 StreamIndexRange: int(acfg.IndexStreamRange), | |
| 150 PrefixIndexRange: int(acfg.IndexPrefixRange), | |
| 151 ByteRange: int(acfg.IndexByteRange), | |
| 152 } | 129 } |
| 153 | 130 |
| 154 tasks := int(acfg.Tasks) | 131 tasks := int(acfg.Tasks) |
| 155 if tasks <= 0 { | 132 if tasks <= 0 { |
| 156 tasks = 1 | 133 tasks = 1 |
| 157 } | 134 } |
| 158 | 135 |
| 159 log.Fields{ | 136 log.Fields{ |
| 160 "subscription": taskSub, | 137 "subscription": taskSub, |
| 161 "tasks": tasks, | 138 "tasks": tasks, |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 232 clock.Sleep(c, subscriptionErrorDelay) | 209 clock.Sleep(c, subscriptionErrorDelay) |
| 233 continue | 210 continue |
| 234 } | 211 } |
| 235 } | 212 } |
| 236 })) | 213 })) |
| 237 | 214 |
| 238 log.Debugf(c, "Archivist finished.") | 215 log.Debugf(c, "Archivist finished.") |
| 239 return nil | 216 return nil |
| 240 } | 217 } |
| 241 | 218 |
| 219 // GetSettingsLoader is an archivist.SettingsLoader implementation that merges | |
| 220 // global and project-specific settings. | |
| 221 // | |
| 222 // The resulting settings object will be verified by the Archivist. | |
| 223 func (a *application) GetSettingsLoader(acfg *svcconfig.Archivist) archivist.Set tingsLoader { | |
| 224 return func(c context.Context, proj config.ProjectName) (*archivist.Sett ings, error) { | |
| 225 // Load our Archivist-wide settings. | |
| 226 st := archivist.Settings{ | |
| 227 GSBase: gs.Path(acfg.GsBase), | |
| 228 GSStagingBase: gs.Path(acfg.GsStagingBase), | |
|
nodir
2016/05/19 15:56:29
I assume this will be rebased to be use GsStagingB
dnj (Google)
2016/05/19 16:34:51
Yeah. This is weird, I did a "dependency upload" a
| |
| 229 | |
| 230 IndexStreamRange: int(acfg.IndexStreamRange), | |
| 231 IndexPrefixRange: int(acfg.IndexPrefixRange), | |
| 232 IndexByteRange: int(acfg.IndexByteRange), | |
|
nodir
2016/05/19 15:56:29
I assume this will be rebased to be use GsStagingB
dnj (Google)
2016/05/19 16:34:51
Acknowledged.
| |
| 233 } | |
| 234 | |
| 235 // Fold in our project-specific configuration, if valid. | |
| 236 pcfg, err := a.ProjectConfig(c, proj) | |
| 237 if err != nil { | |
| 238 log.Fields{ | |
| 239 log.ErrorKey: err, | |
| 240 "project": proj, | |
| 241 }.Errorf(c, "Failed to fetch project configuration.") | |
| 242 } | |
|
nodir
2016/05/19 15:56:29
return err?
dnj (Google)
2016/05/19 16:34:51
er um ... yes.
| |
| 243 | |
| 244 // Fold project settings into loaded ones. | |
| 245 if pcfg.ArchiveGsBase != "" { | |
| 246 st.GSBase = gs.Path(pcfg.ArchiveGsBase) | |
| 247 } | |
| 248 st.AlwaysCreateBinary = (acfg.AlwaysCreateBinary || pcfg.AlwaysC reateBinary) | |
| 249 if r := pcfg.ArchiveIndexStreamRange; r >= 0 { | |
|
nodir
2016/05/19 15:56:29
I still don't get this obsession to save a field v
dnj (Google)
2016/05/19 16:34:51
In this case, I think it is less wordy and therefo
| |
| 250 st.IndexStreamRange = int(r) | |
| 251 } | |
| 252 if r := pcfg.ArchiveIndexPrefixRange; r >= 0 { | |
| 253 st.IndexPrefixRange = int(r) | |
| 254 } | |
| 255 if r := pcfg.ArchiveIndexByteRange; r >= 0 { | |
| 256 st.IndexByteRange = int(r) | |
| 257 } | |
| 258 return &st, nil | |
| 259 } | |
| 260 } | |
| 261 | |
| 242 // Entry point. | 262 // Entry point. |
| 243 func main() { | 263 func main() { |
| 244 a := application{ | 264 a := application{ |
| 245 Service: service.Service{ | 265 Service: service.Service{ |
| 246 Name: "archivist", | 266 Name: "archivist", |
| 247 }, | 267 }, |
| 248 } | 268 } |
| 249 a.Run(context.Background(), a.runArchivist) | 269 a.Run(context.Background(), a.runArchivist) |
| 250 } | 270 } |
| OLD | NEW |