Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: server/cmd/logdog_archivist/main.go

Issue 1968063003: LogDog: Use per-project settings for archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-useconfig
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "io" 8 "io"
9 "time" 9 "time"
10 10
11 "github.com/luci/luci-go/common/auth" 11 "github.com/luci/luci-go/common/auth"
12 "github.com/luci/luci-go/common/clock" 12 "github.com/luci/luci-go/common/clock"
13 "github.com/luci/luci-go/common/config"
13 "github.com/luci/luci-go/common/errors" 14 "github.com/luci/luci-go/common/errors"
14 "github.com/luci/luci-go/common/gcloud/gs" 15 "github.com/luci/luci-go/common/gcloud/gs"
15 "github.com/luci/luci-go/common/gcloud/pubsub" 16 "github.com/luci/luci-go/common/gcloud/pubsub"
16 log "github.com/luci/luci-go/common/logging" 17 log "github.com/luci/luci-go/common/logging"
17 "github.com/luci/luci-go/common/parallel" 18 "github.com/luci/luci-go/common/parallel"
19 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
18 "github.com/luci/luci-go/common/tsmon/distribution" 20 "github.com/luci/luci-go/common/tsmon/distribution"
19 "github.com/luci/luci-go/common/tsmon/field" 21 "github.com/luci/luci-go/common/tsmon/field"
20 "github.com/luci/luci-go/common/tsmon/metric" 22 "github.com/luci/luci-go/common/tsmon/metric"
21 "github.com/luci/luci-go/server/internal/logdog/archivist" 23 "github.com/luci/luci-go/server/internal/logdog/archivist"
22 "github.com/luci/luci-go/server/internal/logdog/service" 24 "github.com/luci/luci-go/server/internal/logdog/service"
23 "golang.org/x/net/context" 25 "golang.org/x/net/context"
24 "google.golang.org/cloud" 26 "google.golang.org/cloud"
25 gcps "google.golang.org/cloud/pubsub" 27 gcps "google.golang.org/cloud/pubsub"
26 ) 28 )
27 29
(...skipping 26 matching lines...) Expand all
54 func (a *application) runArchivist(c context.Context) error { 56 func (a *application) runArchivist(c context.Context) error {
55 cfg := a.Config() 57 cfg := a.Config()
56 58
57 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist() 59 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist()
58 switch { 60 switch {
59 case coordCfg == nil: 61 case coordCfg == nil:
60 fallthrough 62 fallthrough
61 63
62 case acfg == nil: 64 case acfg == nil:
63 return errors.New("missing Archivist configuration") 65 return errors.New("missing Archivist configuration")
64 case acfg.GsBase == "":
65 return errors.New("missing archive GS bucket")
66 case acfg.GsStagingBase == "": 66 case acfg.GsStagingBase == "":
67 return errors.New("missing archive staging GS bucket") 67 return errors.New("missing archive staging GS bucket")
68 } 68 }
69 69
70 // Construct and validate our GS bases.
71 gsBase := gs.Path(acfg.GsBase)
72 if gsBase.Bucket() == "" {
73 log.Fields{
74 "value": gsBase,
75 }.Errorf(c, "Google Storage base does not include a bucket name. ")
76 return errors.New("invalid Google Storage base")
77 }
78
79 gsStagingBase := gs.Path(acfg.GsStagingBase)
80 if gsStagingBase.Bucket() == "" {
81 log.Fields{
82 "value": gsStagingBase,
83 }.Errorf(c, "Google Storage staging base does not include a buck et name.")
84 return errors.New("invalid Google Storage staging base")
85 }
86
87 // Initialize Pub/Sub client. 70 // Initialize Pub/Sub client.
88 // 71 //
89 // We will initialize both an authenticated Client instance and an 72 // We will initialize both an authenticated Client instance and an
90 // authenticated Context, since we need the latter for raw ACK deadline 73 // authenticated Context, since we need the latter for raw ACK deadline
91 // updates. 74 // updates.
92 taskSub := pubsub.Subscription(acfg.Subscription) 75 taskSub := pubsub.Subscription(acfg.Subscription)
93 if err := taskSub.Validate(); err != nil { 76 if err := taskSub.Validate(); err != nil {
94 log.Fields{ 77 log.Fields{
95 log.ErrorKey: err, 78 log.ErrorKey: err,
96 "value": taskSub, 79 "value": taskSub,
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
136 if err != nil { 119 if err != nil {
137 log.WithError(err).Errorf(c, "Failed to get Google Storage clien t.") 120 log.WithError(err).Errorf(c, "Failed to get Google Storage clien t.")
138 return err 121 return err
139 } 122 }
140 defer gsClient.Close() 123 defer gsClient.Close()
141 124
142 ar := archivist.Archivist{ 125 ar := archivist.Archivist{
143 Service: a.Coordinator(), 126 Service: a.Coordinator(),
144 Storage: st, 127 Storage: st,
145 GSClient: gsClient, 128 GSClient: gsClient,
146
147 GSBase: gsBase,
148 GSStagingBase: gsStagingBase,
149 StreamIndexRange: int(acfg.IndexStreamRange),
150 PrefixIndexRange: int(acfg.IndexPrefixRange),
151 ByteRange: int(acfg.IndexByteRange),
152 } 129 }
153 130
154 tasks := int(acfg.Tasks) 131 tasks := int(acfg.Tasks)
155 if tasks <= 0 { 132 if tasks <= 0 {
156 tasks = 1 133 tasks = 1
157 } 134 }
158 135
159 log.Fields{ 136 log.Fields{
160 "subscription": taskSub, 137 "subscription": taskSub,
161 "tasks": tasks, 138 "tasks": tasks,
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
232 clock.Sleep(c, subscriptionErrorDelay) 209 clock.Sleep(c, subscriptionErrorDelay)
233 continue 210 continue
234 } 211 }
235 } 212 }
236 })) 213 }))
237 214
238 log.Debugf(c, "Archivist finished.") 215 log.Debugf(c, "Archivist finished.")
239 return nil 216 return nil
240 } 217 }
241 218
219 // GetSettingsLoader is an archivist.SettingsLoader implementation that merges
220 // global and project-specific settings.
221 //
222 // The resulting settings object will be verified by the Archivist.
223 func (a *application) GetSettingsLoader(acfg *svcconfig.Archivist) archivist.Set tingsLoader {
224 return func(c context.Context, proj config.ProjectName) (*archivist.Sett ings, error) {
225 // Load our Archivist-wide settings.
226 st := archivist.Settings{
227 GSBase: gs.Path(acfg.GsBase),
228 GSStagingBase: gs.Path(acfg.GsStagingBase),
nodir 2016/05/19 15:56:29 I assume this will be rebased to be use GsStagingB
dnj (Google) 2016/05/19 16:34:51 Yeah. This is weird, I did a "dependency upload" a
229
230 IndexStreamRange: int(acfg.IndexStreamRange),
231 IndexPrefixRange: int(acfg.IndexPrefixRange),
232 IndexByteRange: int(acfg.IndexByteRange),
nodir 2016/05/19 15:56:29 I assume this will be rebased to be use GsStagingB
dnj (Google) 2016/05/19 16:34:51 Acknowledged.
233 }
234
235 // Fold in our project-specific configuration, if valid.
236 pcfg, err := a.ProjectConfig(c, proj)
237 if err != nil {
238 log.Fields{
239 log.ErrorKey: err,
240 "project": proj,
241 }.Errorf(c, "Failed to fetch project configuration.")
242 }
nodir 2016/05/19 15:56:29 return err?
dnj (Google) 2016/05/19 16:34:51 er um ... yes.
243
244 // Fold project settings into loaded ones.
245 if pcfg.ArchiveGsBase != "" {
246 st.GSBase = gs.Path(pcfg.ArchiveGsBase)
247 }
248 st.AlwaysCreateBinary = (acfg.AlwaysCreateBinary || pcfg.AlwaysC reateBinary)
249 if r := pcfg.ArchiveIndexStreamRange; r >= 0 {
nodir 2016/05/19 15:56:29 I still don't get this obsession to save a field v
dnj (Google) 2016/05/19 16:34:51 In this case, I think it is less wordy and therefo
250 st.IndexStreamRange = int(r)
251 }
252 if r := pcfg.ArchiveIndexPrefixRange; r >= 0 {
253 st.IndexPrefixRange = int(r)
254 }
255 if r := pcfg.ArchiveIndexByteRange; r >= 0 {
256 st.IndexByteRange = int(r)
257 }
258 return &st, nil
259 }
260 }
261
242 // Entry point. 262 // Entry point.
243 func main() { 263 func main() {
244 a := application{ 264 a := application{
245 Service: service.Service{ 265 Service: service.Service{
246 Name: "archivist", 266 Name: "archivist",
247 }, 267 },
248 } 268 }
249 a.Run(context.Background(), a.runArchivist) 269 a.Run(context.Background(), a.runArchivist)
250 } 270 }
OLDNEW
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist.go » ('j') | server/internal/logdog/archivist/archivist.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698