OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package main | 5 package main |
6 | 6 |
7 import ( | 7 import ( |
8 "io" | 8 "io" |
9 "time" | 9 "time" |
10 | 10 |
11 "github.com/luci/luci-go/common/auth" | 11 "github.com/luci/luci-go/common/auth" |
12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
13 "github.com/luci/luci-go/common/config" | |
13 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
14 "github.com/luci/luci-go/common/gcloud/gs" | 15 "github.com/luci/luci-go/common/gcloud/gs" |
15 "github.com/luci/luci-go/common/gcloud/pubsub" | 16 "github.com/luci/luci-go/common/gcloud/pubsub" |
16 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
17 "github.com/luci/luci-go/common/parallel" | 18 "github.com/luci/luci-go/common/parallel" |
19 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | |
18 "github.com/luci/luci-go/common/tsmon/distribution" | 20 "github.com/luci/luci-go/common/tsmon/distribution" |
19 "github.com/luci/luci-go/common/tsmon/field" | 21 "github.com/luci/luci-go/common/tsmon/field" |
20 "github.com/luci/luci-go/common/tsmon/metric" | 22 "github.com/luci/luci-go/common/tsmon/metric" |
21 "github.com/luci/luci-go/server/internal/logdog/archivist" | 23 "github.com/luci/luci-go/server/internal/logdog/archivist" |
22 "github.com/luci/luci-go/server/internal/logdog/service" | 24 "github.com/luci/luci-go/server/internal/logdog/service" |
23 "golang.org/x/net/context" | 25 "golang.org/x/net/context" |
24 "google.golang.org/cloud" | 26 "google.golang.org/cloud" |
25 gcps "google.golang.org/cloud/pubsub" | 27 gcps "google.golang.org/cloud/pubsub" |
26 ) | 28 ) |
27 | 29 |
(...skipping 26 matching lines...) Expand all Loading... | |
54 func (a *application) runArchivist(c context.Context) error { | 56 func (a *application) runArchivist(c context.Context) error { |
55 cfg := a.Config() | 57 cfg := a.Config() |
56 | 58 |
57 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist() | 59 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist() |
58 switch { | 60 switch { |
59 case coordCfg == nil: | 61 case coordCfg == nil: |
60 fallthrough | 62 fallthrough |
61 | 63 |
62 case acfg == nil: | 64 case acfg == nil: |
63 return errors.New("missing Archivist configuration") | 65 return errors.New("missing Archivist configuration") |
64 case acfg.GsBase == "": | |
65 return errors.New("missing archive GS bucket") | |
66 case acfg.GsStagingBase == "": | 66 case acfg.GsStagingBase == "": |
67 return errors.New("missing archive staging GS bucket") | 67 return errors.New("missing archive staging GS bucket") |
68 } | 68 } |
69 | 69 |
70 // Construct and validate our GS bases. | |
71 gsBase := gs.Path(acfg.GsBase) | |
72 if gsBase.Bucket() == "" { | |
73 log.Fields{ | |
74 "value": gsBase, | |
75 }.Errorf(c, "Google Storage base does not include a bucket name. ") | |
76 return errors.New("invalid Google Storage base") | |
77 } | |
78 | |
79 gsStagingBase := gs.Path(acfg.GsStagingBase) | |
80 if gsStagingBase.Bucket() == "" { | |
81 log.Fields{ | |
82 "value": gsStagingBase, | |
83 }.Errorf(c, "Google Storage staging base does not include a buck et name.") | |
84 return errors.New("invalid Google Storage staging base") | |
85 } | |
86 | |
87 // Initialize Pub/Sub client. | 70 // Initialize Pub/Sub client. |
88 // | 71 // |
89 // We will initialize both an authenticated Client instance and an | 72 // We will initialize both an authenticated Client instance and an |
90 // authenticated Context, since we need the latter for raw ACK deadline | 73 // authenticated Context, since we need the latter for raw ACK deadline |
91 // updates. | 74 // updates. |
92 taskSub := pubsub.Subscription(acfg.Subscription) | 75 taskSub := pubsub.Subscription(acfg.Subscription) |
93 if err := taskSub.Validate(); err != nil { | 76 if err := taskSub.Validate(); err != nil { |
94 log.Fields{ | 77 log.Fields{ |
95 log.ErrorKey: err, | 78 log.ErrorKey: err, |
96 "value": taskSub, | 79 "value": taskSub, |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
136 if err != nil { | 119 if err != nil { |
137 log.WithError(err).Errorf(c, "Failed to get Google Storage clien t.") | 120 log.WithError(err).Errorf(c, "Failed to get Google Storage clien t.") |
138 return err | 121 return err |
139 } | 122 } |
140 defer gsClient.Close() | 123 defer gsClient.Close() |
141 | 124 |
142 ar := archivist.Archivist{ | 125 ar := archivist.Archivist{ |
143 Service: a.Coordinator(), | 126 Service: a.Coordinator(), |
144 Storage: st, | 127 Storage: st, |
145 GSClient: gsClient, | 128 GSClient: gsClient, |
146 | |
147 GSBase: gsBase, | |
148 GSStagingBase: gsStagingBase, | |
149 StreamIndexRange: int(acfg.IndexStreamRange), | |
150 PrefixIndexRange: int(acfg.IndexPrefixRange), | |
151 ByteRange: int(acfg.IndexByteRange), | |
152 } | 129 } |
153 | 130 |
154 tasks := int(acfg.Tasks) | 131 tasks := int(acfg.Tasks) |
155 if tasks <= 0 { | 132 if tasks <= 0 { |
156 tasks = 1 | 133 tasks = 1 |
157 } | 134 } |
158 | 135 |
159 log.Fields{ | 136 log.Fields{ |
160 "subscription": taskSub, | 137 "subscription": taskSub, |
161 "tasks": tasks, | 138 "tasks": tasks, |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
232 clock.Sleep(c, subscriptionErrorDelay) | 209 clock.Sleep(c, subscriptionErrorDelay) |
233 continue | 210 continue |
234 } | 211 } |
235 } | 212 } |
236 })) | 213 })) |
237 | 214 |
238 log.Debugf(c, "Archivist finished.") | 215 log.Debugf(c, "Archivist finished.") |
239 return nil | 216 return nil |
240 } | 217 } |
241 | 218 |
219 // GetSettingsLoader is an archivist.SettingsLoader implementation that merges | |
220 // global and project-specific settings. | |
221 // | |
222 // The resulting settings object will be verified by the Archivist. | |
223 func (a *application) GetSettingsLoader(acfg *svcconfig.Archivist) archivist.Set tingsLoader { | |
224 return func(c context.Context, proj config.ProjectName) (*archivist.Sett ings, error) { | |
225 // Load our Archivist-wide settings. | |
226 st := archivist.Settings{ | |
227 GSBase: gs.Path(acfg.GsBase), | |
228 GSStagingBase: gs.Path(acfg.GsStagingBase), | |
nodir
2016/05/19 15:56:29
I assume this will be rebased to be use GsStagingB
dnj (Google)
2016/05/19 16:34:51
Yeah. This is weird, I did a "dependency upload" a
| |
229 | |
230 IndexStreamRange: int(acfg.IndexStreamRange), | |
231 IndexPrefixRange: int(acfg.IndexPrefixRange), | |
232 IndexByteRange: int(acfg.IndexByteRange), | |
nodir
2016/05/19 15:56:29
I assume this will be rebased to be use GsStagingB
dnj (Google)
2016/05/19 16:34:51
Acknowledged.
| |
233 } | |
234 | |
235 // Fold in our project-specific configuration, if valid. | |
236 pcfg, err := a.ProjectConfig(c, proj) | |
237 if err != nil { | |
238 log.Fields{ | |
239 log.ErrorKey: err, | |
240 "project": proj, | |
241 }.Errorf(c, "Failed to fetch project configuration.") | |
242 } | |
nodir
2016/05/19 15:56:29
return err?
dnj (Google)
2016/05/19 16:34:51
er um ... yes.
| |
243 | |
244 // Fold project settings into loaded ones. | |
245 if pcfg.ArchiveGsBase != "" { | |
246 st.GSBase = gs.Path(pcfg.ArchiveGsBase) | |
247 } | |
248 st.AlwaysCreateBinary = (acfg.AlwaysCreateBinary || pcfg.AlwaysC reateBinary) | |
249 if r := pcfg.ArchiveIndexStreamRange; r >= 0 { | |
nodir
2016/05/19 15:56:29
I still don't get this obsession to save a field v
dnj (Google)
2016/05/19 16:34:51
In this case, I think it is less wordy and therefo
| |
250 st.IndexStreamRange = int(r) | |
251 } | |
252 if r := pcfg.ArchiveIndexPrefixRange; r >= 0 { | |
253 st.IndexPrefixRange = int(r) | |
254 } | |
255 if r := pcfg.ArchiveIndexByteRange; r >= 0 { | |
256 st.IndexByteRange = int(r) | |
257 } | |
258 return &st, nil | |
259 } | |
260 } | |
261 | |
242 // Entry point. | 262 // Entry point. |
243 func main() { | 263 func main() { |
244 a := application{ | 264 a := application{ |
245 Service: service.Service{ | 265 Service: service.Service{ |
246 Name: "archivist", | 266 Name: "archivist", |
247 }, | 267 }, |
248 } | 268 } |
249 a.Run(context.Background(), a.runArchivist) | 269 a.Run(context.Background(), a.runArchivist) |
250 } | 270 } |
OLD | NEW |