Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(753)

Side by Side Diff: server/cmd/logdog_archivist/main.go

Issue 1968063003: LogDog: Use per-project settings for archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-useconfig
Patch Set: Updated patchset dependency Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "time" 8 "time"
9 9
10 "github.com/luci/luci-go/common/auth" 10 "github.com/luci/luci-go/common/auth"
11 "github.com/luci/luci-go/common/clock" 11 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/config"
12 "github.com/luci/luci-go/common/errors" 13 "github.com/luci/luci-go/common/errors"
13 "github.com/luci/luci-go/common/gcloud/gs" 14 "github.com/luci/luci-go/common/gcloud/gs"
14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" 15 gcps "github.com/luci/luci-go/common/gcloud/pubsub"
15 log "github.com/luci/luci-go/common/logging" 16 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/parallel" 17 "github.com/luci/luci-go/common/parallel"
18 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
17 "github.com/luci/luci-go/common/tsmon/distribution" 19 "github.com/luci/luci-go/common/tsmon/distribution"
18 "github.com/luci/luci-go/common/tsmon/field" 20 "github.com/luci/luci-go/common/tsmon/field"
19 "github.com/luci/luci-go/common/tsmon/metric" 21 "github.com/luci/luci-go/common/tsmon/metric"
20 "github.com/luci/luci-go/server/internal/logdog/archivist" 22 "github.com/luci/luci-go/server/internal/logdog/archivist"
21 "github.com/luci/luci-go/server/internal/logdog/service" 23 "github.com/luci/luci-go/server/internal/logdog/service"
22 "golang.org/x/net/context" 24 "golang.org/x/net/context"
23 "google.golang.org/cloud" 25 "google.golang.org/cloud"
24 "google.golang.org/cloud/pubsub" 26 "google.golang.org/cloud/pubsub"
25 ) 27 )
26 28
(...skipping 25 matching lines...) Expand all
52 // run is the main execution function. 54 // run is the main execution function.
53 func (a *application) runArchivist(c context.Context) error { 55 func (a *application) runArchivist(c context.Context) error {
54 cfg := a.Config() 56 cfg := a.Config()
55 57
56 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist() 58 coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist()
57 switch { 59 switch {
58 case coordCfg == nil: 60 case coordCfg == nil:
59 fallthrough 61 fallthrough
60 62
61 case acfg == nil: 63 case acfg == nil:
62 » » return errors.New("missing Archivist configuration") 64 » » return errors.New("missing required config: archivist")
63 case acfg.GsStagingBucket == "": 65 case acfg.GsStagingBucket == "":
64 » » return errors.New("missing archive staging GS bucket") 66 » » return errors.New("missing required config: archivist.gs_staging _bucket")
65 » }
66
67 » // Construct and validate our GS bases.
68 » gsBase := gs.Path(acfg.GsStagingBucket)
69 » if gsBase.Bucket() == "" {
70 » » log.Fields{
71 » » » "value": gsBase,
72 » » }.Errorf(c, "Google Storage base does not include a bucket name. ")
73 » » return errors.New("invalid Google Storage base")
74 » }
75
76 » gsStagingBase := gs.Path(acfg.GsStagingBucket).Concat("staging")
77 » if gsStagingBase.Bucket() == "" {
78 » » log.Fields{
79 » » » "value": gsStagingBase,
80 » » }.Errorf(c, "Google Storage staging base does not include a buck et name.")
81 » » return errors.New("invalid Google Storage staging base")
82 } 67 }
83 68
84 // Initialize Pub/Sub client. 69 // Initialize Pub/Sub client.
85 // 70 //
86 // We will initialize both an authenticated Client instance and an 71 // We will initialize both an authenticated Client instance and an
87 // authenticated Context, since we need the latter for raw ACK deadline 72 // authenticated Context, since we need the latter for raw ACK deadline
88 // updates. 73 // updates.
89 taskSub := gcps.Subscription(acfg.Subscription) 74 taskSub := gcps.Subscription(acfg.Subscription)
90 if err := taskSub.Validate(); err != nil { 75 if err := taskSub.Validate(); err != nil {
91 log.Fields{ 76 log.Fields{
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
130 115
131 // Initialize our Google Storage client. 116 // Initialize our Google Storage client.
132 gsClient, err := a.GSClient(c) 117 gsClient, err := a.GSClient(c)
133 if err != nil { 118 if err != nil {
134 log.WithError(err).Errorf(c, "Failed to get Google Storage clien t.") 119 log.WithError(err).Errorf(c, "Failed to get Google Storage clien t.")
135 return err 120 return err
136 } 121 }
137 defer gsClient.Close() 122 defer gsClient.Close()
138 123
139 ar := archivist.Archivist{ 124 ar := archivist.Archivist{
140 » » Service: a.Coordinator(), 125 » » Service: a.Coordinator(),
141 » » Storage: st, 126 » » SettingsLoader: a.GetSettingsLoader(acfg),
142 » » GSClient: gsClient, 127 » » Storage: st,
143 128 » » GSClient: gsClient,
144 » » GSBase: gsBase,
145 » » GSStagingBase: gsStagingBase,
146 » }
147 » if ic := acfg.ArchiveIndexConfig; ic != nil {
148 » » ar.StreamIndexRange = int(ic.StreamRange)
149 » » ar.PrefixIndexRange = int(ic.PrefixRange)
150 » » ar.ByteRange = int(ic.ByteRange)
151 } 129 }
152 130
153 tasks := int(acfg.Tasks) 131 tasks := int(acfg.Tasks)
154 if tasks <= 0 { 132 if tasks <= 0 {
155 tasks = 1 133 tasks = 1
156 } 134 }
157 135
158 log.Fields{ 136 log.Fields{
159 "subscription": taskSub, 137 "subscription": taskSub,
160 "tasks": tasks, 138 "tasks": tasks,
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
231 clock.Sleep(c, subscriptionErrorDelay) 209 clock.Sleep(c, subscriptionErrorDelay)
232 continue 210 continue
233 } 211 }
234 } 212 }
235 })) 213 }))
236 214
237 log.Debugf(c, "Archivist finished.") 215 log.Debugf(c, "Archivist finished.")
238 return nil 216 return nil
239 } 217 }
240 218
219 // GetSettingsLoader is an archivist.SettingsLoader implementation that merges
220 // global and project-specific settings.
221 //
222 // The resulting settings object will be verified by the Archivist.
223 func (a *application) GetSettingsLoader(acfg *svcconfig.Archivist) archivist.Set tingsLoader {
224 serviceID := a.ServiceID()
225
226 return func(c context.Context, proj config.ProjectName) (*archivist.Sett ings, error) {
227 // Fold in our project-specific configuration, if valid.
228 pcfg, err := a.ProjectConfig(c, proj)
229 if err != nil {
230 log.Fields{
231 log.ErrorKey: err,
232 "project": proj,
233 }.Errorf(c, "Failed to fetch project configuration.")
234 return nil, err
235 }
236
237 indexParam := func(get func(ic *svcconfig.ArchiveIndexConfig) in t32) int {
238 if ic := pcfg.ArchiveIndexConfig; ic != nil {
239 if v := get(ic); v > 0 {
240 return int(v)
241 }
242 }
243
244 if ic := acfg.ArchiveIndexConfig; ic != nil {
245 if v := get(ic); v > 0 {
246 return int(v)
247 }
248 }
249
250 return 0
251 }
252
253 // Load our base settings.
254 //
255 // Archival bases are:
256 // Staging: gs://<services:gs_staging_bucket>/<project-id>/...
257 // Archive: gs://<project:archive_gs_bucket>/<project-id>/...
258 st := archivist.Settings{
259 GSBase: gs.MakePath(pcfg.ArchiveGsBucket, "").Con cat(serviceID),
260 GSStagingBase: gs.MakePath(acfg.GsStagingBucket, "").Con cat(serviceID),
261
262 IndexStreamRange: indexParam(func(ic *svcconfig.ArchiveI ndexConfig) int32 { return ic.StreamRange }),
263 IndexPrefixRange: indexParam(func(ic *svcconfig.ArchiveI ndexConfig) int32 { return ic.PrefixRange }),
264 IndexByteRange: indexParam(func(ic *svcconfig.ArchiveI ndexConfig) int32 { return ic.ByteRange }),
265 AlwaysRender: (acfg.RenderAllStreams || pcfg.RenderA llStreams),
266 }
267
268 // Fold project settings into loaded ones.
269 return &st, nil
270 }
271 }
272
241 // Entry point. 273 // Entry point.
242 func main() { 274 func main() {
243 a := application{ 275 a := application{
244 Service: service.Service{ 276 Service: service.Service{
245 Name: "archivist", 277 Name: "archivist",
246 }, 278 },
247 } 279 }
248 a.Run(context.Background(), a.runArchivist) 280 a.Run(context.Background(), a.runArchivist)
249 } 281 }
OLDNEW
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698