Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: logdog/server/service/service.go

Issue 2648433007: LogDog: Use datastore config cache when available. (Closed)
Patch Set: Comments. Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/server/service/config/opts.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package service 5 package service
6 6
7 import ( 7 import (
8 "flag" 8 "flag"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
11 "net/url" 11 "net/url"
12 "os" 12 "os"
13 "os/signal" 13 "os/signal"
14 "path/filepath" 14 "path/filepath"
15 "runtime/pprof" 15 "runtime/pprof"
16 "sync/atomic" 16 "sync/atomic"
17 "time" 17 "time"
18 18
19 "github.com/luci/luci-go/appengine/gaesettings"
19 "github.com/luci/luci-go/client/authcli" 20 "github.com/luci/luci-go/client/authcli"
20 "github.com/luci/luci-go/common/auth" 21 "github.com/luci/luci-go/common/auth"
21 "github.com/luci/luci-go/common/clock/clockflag" 22 "github.com/luci/luci-go/common/clock/clockflag"
22 "github.com/luci/luci-go/common/config/impl/filesystem" 23 "github.com/luci/luci-go/common/config/impl/filesystem"
23 "github.com/luci/luci-go/common/data/caching/proccache" 24 "github.com/luci/luci-go/common/data/caching/proccache"
24 "github.com/luci/luci-go/common/errors" 25 "github.com/luci/luci-go/common/errors"
25 "github.com/luci/luci-go/common/gcloud/gs" 26 "github.com/luci/luci-go/common/gcloud/gs"
26 log "github.com/luci/luci-go/common/logging" 27 log "github.com/luci/luci-go/common/logging"
27 "github.com/luci/luci-go/common/logging/gologger" 28 "github.com/luci/luci-go/common/logging/gologger"
28 "github.com/luci/luci-go/common/proto/google" 29 "github.com/luci/luci-go/common/proto/google"
29 "github.com/luci/luci-go/common/tsmon" 30 "github.com/luci/luci-go/common/tsmon"
30 "github.com/luci/luci-go/common/tsmon/target" 31 "github.com/luci/luci-go/common/tsmon/target"
31 "github.com/luci/luci-go/grpc/prpc" 32 "github.com/luci/luci-go/grpc/prpc"
32 "github.com/luci/luci-go/logdog/api/config/svcconfig" 33 "github.com/luci/luci-go/logdog/api/config/svcconfig"
33 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" 34 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
34 "github.com/luci/luci-go/logdog/common/storage" 35 "github.com/luci/luci-go/logdog/common/storage"
35 "github.com/luci/luci-go/logdog/common/storage/bigtable" 36 "github.com/luci/luci-go/logdog/common/storage/bigtable"
36 "github.com/luci/luci-go/logdog/server/retryServicesClient" 37 "github.com/luci/luci-go/logdog/server/retryServicesClient"
37 "github.com/luci/luci-go/logdog/server/service/config" 38 "github.com/luci/luci-go/logdog/server/service/config"
38 "github.com/luci/luci-go/luci_config/common/cfgtypes" 39 "github.com/luci/luci-go/luci_config/common/cfgtypes"
39 "github.com/luci/luci-go/luci_config/server/cfgclient" 40 "github.com/luci/luci-go/luci_config/server/cfgclient"
40 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client" 41 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client"
41 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig " 42 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig "
42 "github.com/luci/luci-go/luci_config/server/cfgclient/textproto" 43 "github.com/luci/luci-go/luci_config/server/cfgclient/textproto"
44 "github.com/luci/luci-go/server/settings"
45
46 "github.com/luci/gae/impl/cloud"
43 47
44 "cloud.google.com/go/compute/metadata" 48 "cloud.google.com/go/compute/metadata"
49 "cloud.google.com/go/datastore"
45 "golang.org/x/net/context" 50 "golang.org/x/net/context"
46 "golang.org/x/oauth2" 51 "golang.org/x/oauth2"
47 "google.golang.org/api/option" 52 "google.golang.org/api/option"
48 ) 53 )
49 54
50 var ( 55 var (
51 // ErrInvalidConfig is an error that is returned when the supplied 56 // ErrInvalidConfig is an error that is returned when the supplied
52 // configuration is invalid. 57 // configuration is invalid.
53 ErrInvalidConfig = errors.New("invalid configuration") 58 ErrInvalidConfig = errors.New("invalid configuration")
54 59
(...skipping 30 matching lines...) Expand all
85 authFlags authcli.Flags 90 authFlags authcli.Flags
86 tsMonFlags tsmon.Flags 91 tsMonFlags tsmon.Flags
87 92
88 coordinatorHost string 93 coordinatorHost string
89 coordinatorInsecure bool 94 coordinatorInsecure bool
90 storageCredentialJSONPath string 95 storageCredentialJSONPath string
91 cpuProfilePath string 96 cpuProfilePath string
92 heapProfilePath string 97 heapProfilePath string
93 98
94 // onGCE is true if we're on GCE. We probe this once during Run. 99 // onGCE is true if we're on GCE. We probe this once during Run.
95 » onGCE bool 100 » onGCE bool
101 » hasDatastore bool
96 102
97 // killCheckInterval is the amount of time in between service configurat ion 103 // killCheckInterval is the amount of time in between service configurat ion
98 // checks. If set, this service will periodically reload its service 104 // checks. If set, this service will periodically reload its service
99 // configuration. If that configuration has changed, the service will ki ll 105 // configuration. If that configuration has changed, the service will ki ll
100 // itself. 106 // itself.
101 // 107 //
102 // Since, in production, this is running under an execution harness such as 108 // Since, in production, this is running under an execution harness such as
103 // Kubernetes, the service will restart and load the new configuration. This 109 // Kubernetes, the service will restart and load the new configuration. This
104 // is easier than implementing in-process configuration updating. 110 // is easier than implementing in-process configuration updating.
105 killCheckInterval clockflag.Duration 111 killCheckInterval clockflag.Duration
(...skipping 25 matching lines...) Expand all
131 137
132 rc := 0 138 rc := 0
133 if err := s.runImpl(c, f); err != nil { 139 if err := s.runImpl(c, f); err != nil {
134 log.WithError(err).Errorf(c, "Application exiting with error.") 140 log.WithError(err).Errorf(c, "Application exiting with error.")
135 rc = 1 141 rc = 1
136 } 142 }
137 os.Exit(rc) 143 os.Exit(rc)
138 } 144 }
139 145
140 func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro r { 146 func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro r {
147 // Probe our environment for default values.
148 s.probeGCEEnvironment(c)
149
150 // Install service flags and parse.
141 s.addFlags(c, &s.Flags) 151 s.addFlags(c, &s.Flags)
142 if err := s.Flags.Parse(os.Args[1:]); err != nil { 152 if err := s.Flags.Parse(os.Args[1:]); err != nil {
143 log.WithError(err).Errorf(c, "Failed to parse command-line.") 153 log.WithError(err).Errorf(c, "Failed to parse command-line.")
144 return err 154 return err
145 } 155 }
146 156
147 // Install logging configuration. 157 // Install logging configuration.
148 c = s.loggingFlags.Set(c) 158 c = s.loggingFlags.Set(c)
149 159
150 if p := s.cpuProfilePath; p != "" { 160 if p := s.cpuProfilePath; p != "" {
(...skipping 25 matching lines...) Expand all
176 186
177 if err := pprof.WriteHeapProfile(fd); err != nil { 187 if err := pprof.WriteHeapProfile(fd); err != nil {
178 log.Fields{ 188 log.Fields{
179 log.ErrorKey: err, 189 log.ErrorKey: err,
180 "path": p, 190 "path": p,
181 }.Warningf(c, "Failed to write heap profile.") 191 }.Warningf(c, "Failed to write heap profile.")
182 } 192 }
183 }() 193 }()
184 } 194 }
185 195
196 // Cancel our Context after we're done our run loop.
197 c, cancelFunc := context.WithCancel(c)
198 defer cancelFunc()
199
186 // Validate the runtime environment. 200 // Validate the runtime environment.
187 if s.serviceID == "" { 201 if s.serviceID == "" {
188 return errors.New("no service ID was configured (-service-id)") 202 return errors.New("no service ID was configured (-service-id)")
189 } 203 }
190 204
205 // Install a cloud datastore client. This is non-fatal if it fails.
206 dsClient, err := s.initDatastoreClient(c)
207 if err == nil {
208 defer dsClient.Close()
209
210 ccfg := cloud.Config{
211 DS: dsClient,
212 }
213 c = ccfg.Use(c)
214 c = settings.Use(c, settings.New(gaesettings.Storage{}))
215
216 s.hasDatastore = true
217 log.Debugf(c, "Enabled cloud datastore access.")
218 } else {
219 log.WithError(err).Warningf(c, "Failed to create cloud datastore client.")
220 }
221
191 // Install a process-wide cache. 222 // Install a process-wide cache.
192 c = proccache.Use(c, &proccache.Cache{}) 223 c = proccache.Use(c, &proccache.Cache{})
193 224
194 // Configure our signal handler. It will listen for terminating signals and 225 // Configure our signal handler. It will listen for terminating signals and
195 // issue a shutdown signal if one is received. 226 // issue a shutdown signal if one is received.
196 signalC := make(chan os.Signal) 227 signalC := make(chan os.Signal)
197 go func(c context.Context) { 228 go func(c context.Context) {
198 hasShutdownAlready := false 229 hasShutdownAlready := false
199 for sig := range signalC { 230 for sig := range signalC {
200 if !hasShutdownAlready { 231 if !hasShutdownAlready {
(...skipping 17 matching lines...) Expand all
218 249
219 // Initialize our tsmon library. 250 // Initialize our tsmon library.
220 c = tsmon.WithState(c, tsmon.NewState()) 251 c = tsmon.WithState(c, tsmon.NewState())
221 252
222 if err := tsmon.InitializeFromFlags(c, &s.tsMonFlags); err != nil { 253 if err := tsmon.InitializeFromFlags(c, &s.tsMonFlags); err != nil {
223 log.WithError(err).Warningf(c, "Failed to initialize monitoring; will continue without metrics.") 254 log.WithError(err).Warningf(c, "Failed to initialize monitoring; will continue without metrics.")
224 } 255 }
225 defer tsmon.Shutdown(c) 256 defer tsmon.Shutdown(c)
226 257
227 // Initialize our Client instantiations. 258 // Initialize our Client instantiations.
228 var err error
229 if s.coord, err = s.initCoordinatorClient(c); err != nil { 259 if s.coord, err = s.initCoordinatorClient(c); err != nil {
230 log.WithError(err).Errorf(c, "Failed to setup Coordinator client .") 260 log.WithError(err).Errorf(c, "Failed to setup Coordinator client .")
231 return err 261 return err
232 } 262 }
233 263
234 // Initialize and install our config service and caching layers, and loa d our 264 // Initialize and install our config service and caching layers, and loa d our
235 // initial service config. 265 // initial service config.
236 if err := s.initConfig(&c); err != nil { 266 if err := s.initConfig(&c); err != nil {
237 log.WithError(err).Errorf(c, "Failed to setup configuration.") 267 log.WithError(err).Errorf(c, "Failed to setup configuration.")
238 return err 268 return err
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
275 "If supplied, enable CPU profiling and write the profile here.") 305 "If supplied, enable CPU profiling and write the profile here.")
276 fs.Var(&s.killCheckInterval, "config-kill-interval", 306 fs.Var(&s.killCheckInterval, "config-kill-interval",
277 "If non-zero, poll for configuration changes and kill the applic ation if one is detected.") 307 "If non-zero, poll for configuration changes and kill the applic ation if one is detected.")
278 fs.StringVar(&s.testConfigFilePath, "test-config-file-path", "", 308 fs.StringVar(&s.testConfigFilePath, "test-config-file-path", "",
279 "(Testing) If set, load configuration from a local filesystem ro oted here.") 309 "(Testing) If set, load configuration from a local filesystem ro oted here.")
280 } 310 }
281 311
282 // probeGCEEnvironment fills in any parameters that can be probed from Google 312 // probeGCEEnvironment fills in any parameters that can be probed from Google
283 // Compute Engine metadata. 313 // Compute Engine metadata.
284 // 314 //
285 // If we're not running on GCE, this will return nil. An error will only be 315 // If we're not running on GCE, this will do nothing. It is non-fatal if any
286 // returned if an operation that is expected to work fails. 316 // given GCE field fails to be probed.
287 func (s *Service) probeGCEEnvironment(c context.Context) error { 317 func (s *Service) probeGCEEnvironment(c context.Context) {
288 s.onGCE = metadata.OnGCE() 318 s.onGCE = metadata.OnGCE()
289 if !s.onGCE { 319 if !s.onGCE {
290 » » return nil 320 » » return
291 } 321 }
292 322
293 // Determine our service ID from metadata. The service ID will equal the cloud 323 // Determine our service ID from metadata. The service ID will equal the cloud
294 // project ID. 324 // project ID.
295 if s.serviceID == "" { 325 if s.serviceID == "" {
296 var err error 326 var err error
297 if s.serviceID, err = metadata.ProjectID(); err != nil { 327 if s.serviceID, err = metadata.ProjectID(); err != nil {
298 » » » log.WithError(err).Errorf(c, "Failed to probe GCE projec t ID.") 328 » » » log.WithError(err).Warningf(c, "Failed to probe GCE proj ect ID.")
299 » » » return err
300 } 329 }
301 } 330 }
302 » return nil 331 }
332
333 func (s *Service) initDatastoreClient(c context.Context) (*datastore.Client, err or) {
334 » // Initialize Storage authentication.
335 » tokenSource, err := s.TokenSource(c, func(o *auth.Options) {
336 » » o.Scopes = []string{datastore.ScopeDatastore}
337 » })
338 » if err != nil {
339 » » log.WithError(err).Errorf(c, "Failed to create datastore TokenSo urce.")
340 » » return nil, err
341 » }
342
343 » return datastore.NewClient(c, s.serviceID,
344 » » option.WithTokenSource(tokenSource))
303 } 345 }
304 346
305 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien t, error) { 347 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien t, error) {
306 if s.coordinatorHost == "" { 348 if s.coordinatorHost == "" {
307 log.Errorf(c, "Missing Coordinator URL (-coordinator).") 349 log.Errorf(c, "Missing Coordinator URL (-coordinator).")
308 return nil, ErrInvalidConfig 350 return nil, ErrInvalidConfig
309 } 351 }
310 352
311 httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) { 353 httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) {
312 o.Scopes = CoordinatorScopes 354 o.Scopes = CoordinatorScopes
(...skipping 15 matching lines...) Expand all
328 sc := logdog.NewServicesPRPCClient(&prpcClient) 370 sc := logdog.NewServicesPRPCClient(&prpcClient)
329 371
330 // Wrap the resulting client in a retry harness. 372 // Wrap the resulting client in a retry harness.
331 return retryServicesClient.New(sc, nil), nil 373 return retryServicesClient.New(sc, nil), nil
332 } 374 }
333 375
334 func (s *Service) initConfig(c *context.Context) error { 376 func (s *Service) initConfig(c *context.Context) error {
335 // Set up our in-memory config object cache. 377 // Set up our in-memory config object cache.
336 s.configCache.Lifetime = projectConfigCacheDuration 378 s.configCache.Lifetime = projectConfigCacheDuration
337 379
380 // Start to build our backend caching options.
381 opts := config.CacheOptions{
382 CacheExpiration: projectConfigCacheDuration,
383 }
384
338 // If a testConfigFilePath was specified, use a mock configuration servi ce 385 // If a testConfigFilePath was specified, use a mock configuration servi ce
339 // that loads from a local file. 386 // that loads from a local file.
340 var p client.Provider 387 var p client.Provider
341 if s.testConfigFilePath == "" { 388 if s.testConfigFilePath == "" {
342 ccfg, err := s.coord.GetConfig(*c, &google.Empty{}) 389 ccfg, err := s.coord.GetConfig(*c, &google.Empty{})
343 if err != nil { 390 if err != nil {
344 return err 391 return err
345 } 392 }
346 393
347 // Determine our config service host. 394 // Determine our config service host.
(...skipping 15 matching lines...) Expand all
363 if ccfg.ConfigSet == "" { 410 if ccfg.ConfigSet == "" {
364 return errors.New("coordinator does not specify a config set") 411 return errors.New("coordinator does not specify a config set")
365 } 412 }
366 413
367 log.Fields{ 414 log.Fields{
368 "host": host, 415 "host": host,
369 }.Debugf(*c, "Using remote configuration service client.") 416 }.Debugf(*c, "Using remote configuration service client.")
370 p = &client.RemoteProvider{ 417 p = &client.RemoteProvider{
371 Host: host, 418 Host: host,
372 } 419 }
420
421 // If using a remote config provider, enable datastore access an d caching.
422 opts.DatastoreCacheAvailable = s.hasDatastore
373 } else { 423 } else {
374 // Test / Local: use filesystem config path. 424 // Test / Local: use filesystem config path.
375 ci, err := filesystem.New(s.testConfigFilePath) 425 ci, err := filesystem.New(s.testConfigFilePath)
376 if err != nil { 426 if err != nil {
377 return err 427 return err
378 } 428 }
379 p = &testconfig.Provider{Base: ci} 429 p = &testconfig.Provider{Base: ci}
380 } 430 }
381 431
382 // Add config caching layers. 432 // Add config caching layers.
383 opts := config.CacheOptions{
384 CacheExpiration: projectConfigCacheDuration,
385 }
386 *c = opts.WrapBackend(*c, &client.Backend{ 433 *c = opts.WrapBackend(*c, &client.Backend{
387 Provider: p, 434 Provider: p,
388 }) 435 })
389 436
390 // Load our service configuration. 437 // Load our service configuration.
391 var meta cfgclient.Meta 438 var meta cfgclient.Meta
392 cset, path := s.ServiceConfigPath() 439 cset, path := s.ServiceConfigPath()
393 if err := cfgclient.Get(*c, cfgclient.AsService, cset, path, textproto.M essage(&s.serviceConfig), &meta); err != nil { 440 if err := cfgclient.Get(*c, cfgclient.AsService, cset, path, textproto.M essage(&s.serviceConfig), &meta); err != nil {
394 return errors.Annotate(err).Reason("failed to load service confi g").Err() 441 return errors.Annotate(err).Reason("failed to load service confi g").Err()
395 } 442 }
(...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after
579 // 626 //
580 // An optional permutation function can be provided to modify those Options 627 // An optional permutation function can be provided to modify those Options
581 // before the Authenticator is created. 628 // before the Authenticator is created.
582 func (s *Service) TokenSource(c context.Context, f func(o *auth.Options)) (oauth 2.TokenSource, error) { 629 func (s *Service) TokenSource(c context.Context, f func(o *auth.Options)) (oauth 2.TokenSource, error) {
583 a, err := s.Authenticator(c, f) 630 a, err := s.Authenticator(c, f)
584 if err != nil { 631 if err != nil {
585 return nil, err 632 return nil, err
586 } 633 }
587 return a.TokenSource() 634 return a.TokenSource()
588 } 635 }
OLDNEW
« no previous file with comments | « logdog/server/service/config/opts.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698