| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package service | 5 package service |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "flag" | 8 "flag" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| 11 "net/url" | 11 "net/url" |
| 12 "os" | 12 "os" |
| 13 "os/signal" | 13 "os/signal" |
| 14 "path/filepath" | 14 "path/filepath" |
| 15 "runtime/pprof" | 15 "runtime/pprof" |
| 16 "sync/atomic" | 16 "sync/atomic" |
| 17 "time" | 17 "time" |
| 18 | 18 |
| 19 "github.com/luci/luci-go/appengine/gaesettings" |
| 19 "github.com/luci/luci-go/client/authcli" | 20 "github.com/luci/luci-go/client/authcli" |
| 20 "github.com/luci/luci-go/common/auth" | 21 "github.com/luci/luci-go/common/auth" |
| 21 "github.com/luci/luci-go/common/clock/clockflag" | 22 "github.com/luci/luci-go/common/clock/clockflag" |
| 22 "github.com/luci/luci-go/common/config/impl/filesystem" | 23 "github.com/luci/luci-go/common/config/impl/filesystem" |
| 23 "github.com/luci/luci-go/common/data/caching/proccache" | 24 "github.com/luci/luci-go/common/data/caching/proccache" |
| 24 "github.com/luci/luci-go/common/errors" | 25 "github.com/luci/luci-go/common/errors" |
| 25 "github.com/luci/luci-go/common/gcloud/gs" | 26 "github.com/luci/luci-go/common/gcloud/gs" |
| 26 log "github.com/luci/luci-go/common/logging" | 27 log "github.com/luci/luci-go/common/logging" |
| 27 "github.com/luci/luci-go/common/logging/gologger" | 28 "github.com/luci/luci-go/common/logging/gologger" |
| 28 "github.com/luci/luci-go/common/proto/google" | 29 "github.com/luci/luci-go/common/proto/google" |
| 29 "github.com/luci/luci-go/common/tsmon" | 30 "github.com/luci/luci-go/common/tsmon" |
| 30 "github.com/luci/luci-go/common/tsmon/target" | 31 "github.com/luci/luci-go/common/tsmon/target" |
| 31 "github.com/luci/luci-go/grpc/prpc" | 32 "github.com/luci/luci-go/grpc/prpc" |
| 32 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 33 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 33 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 34 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 34 "github.com/luci/luci-go/logdog/common/storage" | 35 "github.com/luci/luci-go/logdog/common/storage" |
| 35 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 36 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 36 "github.com/luci/luci-go/logdog/server/retryServicesClient" | 37 "github.com/luci/luci-go/logdog/server/retryServicesClient" |
| 37 "github.com/luci/luci-go/logdog/server/service/config" | 38 "github.com/luci/luci-go/logdog/server/service/config" |
| 38 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 39 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 39 "github.com/luci/luci-go/luci_config/server/cfgclient" | 40 "github.com/luci/luci-go/luci_config/server/cfgclient" |
| 40 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client" | 41 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client" |
| 41 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig
" | 42 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig
" |
| 42 "github.com/luci/luci-go/luci_config/server/cfgclient/textproto" | 43 "github.com/luci/luci-go/luci_config/server/cfgclient/textproto" |
| 44 "github.com/luci/luci-go/server/settings" |
| 45 |
| 46 "github.com/luci/gae/impl/cloud" |
| 43 | 47 |
| 44 "cloud.google.com/go/compute/metadata" | 48 "cloud.google.com/go/compute/metadata" |
| 49 "cloud.google.com/go/datastore" |
| 45 "golang.org/x/net/context" | 50 "golang.org/x/net/context" |
| 46 "golang.org/x/oauth2" | 51 "golang.org/x/oauth2" |
| 47 "google.golang.org/api/option" | 52 "google.golang.org/api/option" |
| 48 ) | 53 ) |
| 49 | 54 |
| 50 var ( | 55 var ( |
| 51 // ErrInvalidConfig is an error that is returned when the supplied | 56 // ErrInvalidConfig is an error that is returned when the supplied |
| 52 // configuration is invalid. | 57 // configuration is invalid. |
| 53 ErrInvalidConfig = errors.New("invalid configuration") | 58 ErrInvalidConfig = errors.New("invalid configuration") |
| 54 | 59 |
| (...skipping 30 matching lines...) Expand all Loading... |
| 85 authFlags authcli.Flags | 90 authFlags authcli.Flags |
| 86 tsMonFlags tsmon.Flags | 91 tsMonFlags tsmon.Flags |
| 87 | 92 |
| 88 coordinatorHost string | 93 coordinatorHost string |
| 89 coordinatorInsecure bool | 94 coordinatorInsecure bool |
| 90 storageCredentialJSONPath string | 95 storageCredentialJSONPath string |
| 91 cpuProfilePath string | 96 cpuProfilePath string |
| 92 heapProfilePath string | 97 heapProfilePath string |
| 93 | 98 |
| 94 // onGCE is true if we're on GCE. We probe this once during Run. | 99 // onGCE is true if we're on GCE. We probe this once during Run. |
| 95 » onGCE bool | 100 » onGCE bool |
| 101 » hasDatastore bool |
| 96 | 102 |
| 97 // killCheckInterval is the amount of time in between service configurat
ion | 103 // killCheckInterval is the amount of time in between service configurat
ion |
| 98 // checks. If set, this service will periodically reload its service | 104 // checks. If set, this service will periodically reload its service |
| 99 // configuration. If that configuration has changed, the service will ki
ll | 105 // configuration. If that configuration has changed, the service will ki
ll |
| 100 // itself. | 106 // itself. |
| 101 // | 107 // |
| 102 // Since, in production, this is running under an execution harness such
as | 108 // Since, in production, this is running under an execution harness such
as |
| 103 // Kubernetes, the service will restart and load the new configuration.
This | 109 // Kubernetes, the service will restart and load the new configuration.
This |
| 104 // is easier than implementing in-process configuration updating. | 110 // is easier than implementing in-process configuration updating. |
| 105 killCheckInterval clockflag.Duration | 111 killCheckInterval clockflag.Duration |
| (...skipping 25 matching lines...) Expand all Loading... |
| 131 | 137 |
| 132 rc := 0 | 138 rc := 0 |
| 133 if err := s.runImpl(c, f); err != nil { | 139 if err := s.runImpl(c, f); err != nil { |
| 134 log.WithError(err).Errorf(c, "Application exiting with error.") | 140 log.WithError(err).Errorf(c, "Application exiting with error.") |
| 135 rc = 1 | 141 rc = 1 |
| 136 } | 142 } |
| 137 os.Exit(rc) | 143 os.Exit(rc) |
| 138 } | 144 } |
| 139 | 145 |
| 140 func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro
r { | 146 func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro
r { |
| 147 // Probe our environment for default values. |
| 148 s.probeGCEEnvironment(c) |
| 149 |
| 150 // Install service flags and parse. |
| 141 s.addFlags(c, &s.Flags) | 151 s.addFlags(c, &s.Flags) |
| 142 if err := s.Flags.Parse(os.Args[1:]); err != nil { | 152 if err := s.Flags.Parse(os.Args[1:]); err != nil { |
| 143 log.WithError(err).Errorf(c, "Failed to parse command-line.") | 153 log.WithError(err).Errorf(c, "Failed to parse command-line.") |
| 144 return err | 154 return err |
| 145 } | 155 } |
| 146 | 156 |
| 147 // Install logging configuration. | 157 // Install logging configuration. |
| 148 c = s.loggingFlags.Set(c) | 158 c = s.loggingFlags.Set(c) |
| 149 | 159 |
| 150 if p := s.cpuProfilePath; p != "" { | 160 if p := s.cpuProfilePath; p != "" { |
| (...skipping 25 matching lines...) Expand all Loading... |
| 176 | 186 |
| 177 if err := pprof.WriteHeapProfile(fd); err != nil { | 187 if err := pprof.WriteHeapProfile(fd); err != nil { |
| 178 log.Fields{ | 188 log.Fields{ |
| 179 log.ErrorKey: err, | 189 log.ErrorKey: err, |
| 180 "path": p, | 190 "path": p, |
| 181 }.Warningf(c, "Failed to write heap profile.") | 191 }.Warningf(c, "Failed to write heap profile.") |
| 182 } | 192 } |
| 183 }() | 193 }() |
| 184 } | 194 } |
| 185 | 195 |
| 196 // Cancel our Context after we're done our run loop. |
| 197 c, cancelFunc := context.WithCancel(c) |
| 198 defer cancelFunc() |
| 199 |
| 186 // Validate the runtime environment. | 200 // Validate the runtime environment. |
| 187 if s.serviceID == "" { | 201 if s.serviceID == "" { |
| 188 return errors.New("no service ID was configured (-service-id)") | 202 return errors.New("no service ID was configured (-service-id)") |
| 189 } | 203 } |
| 190 | 204 |
| 205 // Install a cloud datastore client. This is non-fatal if it fails. |
| 206 dsClient, err := s.initDatastoreClient(c) |
| 207 if err == nil { |
| 208 defer dsClient.Close() |
| 209 |
| 210 ccfg := cloud.Config{ |
| 211 DS: dsClient, |
| 212 } |
| 213 c = ccfg.Use(c) |
| 214 c = settings.Use(c, settings.New(gaesettings.Storage{})) |
| 215 |
| 216 s.hasDatastore = true |
| 217 log.Debugf(c, "Enabled cloud datastore access.") |
| 218 } else { |
| 219 log.WithError(err).Warningf(c, "Failed to create cloud datastore
client.") |
| 220 } |
| 221 |
| 191 // Install a process-wide cache. | 222 // Install a process-wide cache. |
| 192 c = proccache.Use(c, &proccache.Cache{}) | 223 c = proccache.Use(c, &proccache.Cache{}) |
| 193 | 224 |
| 194 // Configure our signal handler. It will listen for terminating signals
and | 225 // Configure our signal handler. It will listen for terminating signals
and |
| 195 // issue a shutdown signal if one is received. | 226 // issue a shutdown signal if one is received. |
| 196 signalC := make(chan os.Signal) | 227 signalC := make(chan os.Signal) |
| 197 go func(c context.Context) { | 228 go func(c context.Context) { |
| 198 hasShutdownAlready := false | 229 hasShutdownAlready := false |
| 199 for sig := range signalC { | 230 for sig := range signalC { |
| 200 if !hasShutdownAlready { | 231 if !hasShutdownAlready { |
| (...skipping 17 matching lines...) Expand all Loading... |
| 218 | 249 |
| 219 // Initialize our tsmon library. | 250 // Initialize our tsmon library. |
| 220 c = tsmon.WithState(c, tsmon.NewState()) | 251 c = tsmon.WithState(c, tsmon.NewState()) |
| 221 | 252 |
| 222 if err := tsmon.InitializeFromFlags(c, &s.tsMonFlags); err != nil { | 253 if err := tsmon.InitializeFromFlags(c, &s.tsMonFlags); err != nil { |
| 223 log.WithError(err).Warningf(c, "Failed to initialize monitoring;
will continue without metrics.") | 254 log.WithError(err).Warningf(c, "Failed to initialize monitoring;
will continue without metrics.") |
| 224 } | 255 } |
| 225 defer tsmon.Shutdown(c) | 256 defer tsmon.Shutdown(c) |
| 226 | 257 |
| 227 // Initialize our Client instantiations. | 258 // Initialize our Client instantiations. |
| 228 var err error | |
| 229 if s.coord, err = s.initCoordinatorClient(c); err != nil { | 259 if s.coord, err = s.initCoordinatorClient(c); err != nil { |
| 230 log.WithError(err).Errorf(c, "Failed to setup Coordinator client
.") | 260 log.WithError(err).Errorf(c, "Failed to setup Coordinator client
.") |
| 231 return err | 261 return err |
| 232 } | 262 } |
| 233 | 263 |
| 234 // Initialize and install our config service and caching layers, and loa
d our | 264 // Initialize and install our config service and caching layers, and loa
d our |
| 235 // initial service config. | 265 // initial service config. |
| 236 if err := s.initConfig(&c); err != nil { | 266 if err := s.initConfig(&c); err != nil { |
| 237 log.WithError(err).Errorf(c, "Failed to setup configuration.") | 267 log.WithError(err).Errorf(c, "Failed to setup configuration.") |
| 238 return err | 268 return err |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 275 "If supplied, enable CPU profiling and write the profile here.") | 305 "If supplied, enable CPU profiling and write the profile here.") |
| 276 fs.Var(&s.killCheckInterval, "config-kill-interval", | 306 fs.Var(&s.killCheckInterval, "config-kill-interval", |
| 277 "If non-zero, poll for configuration changes and kill the applic
ation if one is detected.") | 307 "If non-zero, poll for configuration changes and kill the applic
ation if one is detected.") |
| 278 fs.StringVar(&s.testConfigFilePath, "test-config-file-path", "", | 308 fs.StringVar(&s.testConfigFilePath, "test-config-file-path", "", |
| 279 "(Testing) If set, load configuration from a local filesystem ro
oted here.") | 309 "(Testing) If set, load configuration from a local filesystem ro
oted here.") |
| 280 } | 310 } |
| 281 | 311 |
| 282 // probeGCEEnvironment fills in any parameters that can be probed from Google | 312 // probeGCEEnvironment fills in any parameters that can be probed from Google |
| 283 // Compute Engine metadata. | 313 // Compute Engine metadata. |
| 284 // | 314 // |
| 285 // If we're not running on GCE, this will return nil. An error will only be | 315 // If we're not running on GCE, this will do nothing. It is non-fatal if any |
| 286 // returned if an operation that is expected to work fails. | 316 // given GCE field fails to be probed. |
| 287 func (s *Service) probeGCEEnvironment(c context.Context) error { | 317 func (s *Service) probeGCEEnvironment(c context.Context) { |
| 288 s.onGCE = metadata.OnGCE() | 318 s.onGCE = metadata.OnGCE() |
| 289 if !s.onGCE { | 319 if !s.onGCE { |
| 290 » » return nil | 320 » » return |
| 291 } | 321 } |
| 292 | 322 |
| 293 // Determine our service ID from metadata. The service ID will equal the
cloud | 323 // Determine our service ID from metadata. The service ID will equal the
cloud |
| 294 // project ID. | 324 // project ID. |
| 295 if s.serviceID == "" { | 325 if s.serviceID == "" { |
| 296 var err error | 326 var err error |
| 297 if s.serviceID, err = metadata.ProjectID(); err != nil { | 327 if s.serviceID, err = metadata.ProjectID(); err != nil { |
| 298 » » » log.WithError(err).Errorf(c, "Failed to probe GCE projec
t ID.") | 328 » » » log.WithError(err).Warningf(c, "Failed to probe GCE proj
ect ID.") |
| 299 » » » return err | |
| 300 } | 329 } |
| 301 } | 330 } |
| 302 » return nil | 331 } |
| 332 |
| 333 func (s *Service) initDatastoreClient(c context.Context) (*datastore.Client, err
or) { |
| 334 » // Initialize Storage authentication. |
| 335 » tokenSource, err := s.TokenSource(c, func(o *auth.Options) { |
| 336 » » o.Scopes = []string{datastore.ScopeDatastore} |
| 337 » }) |
| 338 » if err != nil { |
| 339 » » log.WithError(err).Errorf(c, "Failed to create datastore TokenSo
urce.") |
| 340 » » return nil, err |
| 341 » } |
| 342 |
| 343 » return datastore.NewClient(c, s.serviceID, |
| 344 » » option.WithTokenSource(tokenSource)) |
| 303 } | 345 } |
| 304 | 346 |
| 305 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
t, error) { | 347 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
t, error) { |
| 306 if s.coordinatorHost == "" { | 348 if s.coordinatorHost == "" { |
| 307 log.Errorf(c, "Missing Coordinator URL (-coordinator).") | 349 log.Errorf(c, "Missing Coordinator URL (-coordinator).") |
| 308 return nil, ErrInvalidConfig | 350 return nil, ErrInvalidConfig |
| 309 } | 351 } |
| 310 | 352 |
| 311 httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) { | 353 httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) { |
| 312 o.Scopes = CoordinatorScopes | 354 o.Scopes = CoordinatorScopes |
| (...skipping 15 matching lines...) Expand all Loading... |
| 328 sc := logdog.NewServicesPRPCClient(&prpcClient) | 370 sc := logdog.NewServicesPRPCClient(&prpcClient) |
| 329 | 371 |
| 330 // Wrap the resulting client in a retry harness. | 372 // Wrap the resulting client in a retry harness. |
| 331 return retryServicesClient.New(sc, nil), nil | 373 return retryServicesClient.New(sc, nil), nil |
| 332 } | 374 } |
| 333 | 375 |
| 334 func (s *Service) initConfig(c *context.Context) error { | 376 func (s *Service) initConfig(c *context.Context) error { |
| 335 // Set up our in-memory config object cache. | 377 // Set up our in-memory config object cache. |
| 336 s.configCache.Lifetime = projectConfigCacheDuration | 378 s.configCache.Lifetime = projectConfigCacheDuration |
| 337 | 379 |
| 380 // Start to build our backend caching options. |
| 381 opts := config.CacheOptions{ |
| 382 CacheExpiration: projectConfigCacheDuration, |
| 383 } |
| 384 |
| 338 // If a testConfigFilePath was specified, use a mock configuration servi
ce | 385 // If a testConfigFilePath was specified, use a mock configuration servi
ce |
| 339 // that loads from a local file. | 386 // that loads from a local file. |
| 340 var p client.Provider | 387 var p client.Provider |
| 341 if s.testConfigFilePath == "" { | 388 if s.testConfigFilePath == "" { |
| 342 ccfg, err := s.coord.GetConfig(*c, &google.Empty{}) | 389 ccfg, err := s.coord.GetConfig(*c, &google.Empty{}) |
| 343 if err != nil { | 390 if err != nil { |
| 344 return err | 391 return err |
| 345 } | 392 } |
| 346 | 393 |
| 347 // Determine our config service host. | 394 // Determine our config service host. |
| (...skipping 15 matching lines...) Expand all Loading... |
| 363 if ccfg.ConfigSet == "" { | 410 if ccfg.ConfigSet == "" { |
| 364 return errors.New("coordinator does not specify a config
set") | 411 return errors.New("coordinator does not specify a config
set") |
| 365 } | 412 } |
| 366 | 413 |
| 367 log.Fields{ | 414 log.Fields{ |
| 368 "host": host, | 415 "host": host, |
| 369 }.Debugf(*c, "Using remote configuration service client.") | 416 }.Debugf(*c, "Using remote configuration service client.") |
| 370 p = &client.RemoteProvider{ | 417 p = &client.RemoteProvider{ |
| 371 Host: host, | 418 Host: host, |
| 372 } | 419 } |
| 420 |
| 421 // If using a remote config provider, enable datastore access an
d caching. |
| 422 opts.DatastoreCacheAvailable = s.hasDatastore |
| 373 } else { | 423 } else { |
| 374 // Test / Local: use filesystem config path. | 424 // Test / Local: use filesystem config path. |
| 375 ci, err := filesystem.New(s.testConfigFilePath) | 425 ci, err := filesystem.New(s.testConfigFilePath) |
| 376 if err != nil { | 426 if err != nil { |
| 377 return err | 427 return err |
| 378 } | 428 } |
| 379 p = &testconfig.Provider{Base: ci} | 429 p = &testconfig.Provider{Base: ci} |
| 380 } | 430 } |
| 381 | 431 |
| 382 // Add config caching layers. | 432 // Add config caching layers. |
| 383 opts := config.CacheOptions{ | |
| 384 CacheExpiration: projectConfigCacheDuration, | |
| 385 } | |
| 386 *c = opts.WrapBackend(*c, &client.Backend{ | 433 *c = opts.WrapBackend(*c, &client.Backend{ |
| 387 Provider: p, | 434 Provider: p, |
| 388 }) | 435 }) |
| 389 | 436 |
| 390 // Load our service configuration. | 437 // Load our service configuration. |
| 391 var meta cfgclient.Meta | 438 var meta cfgclient.Meta |
| 392 cset, path := s.ServiceConfigPath() | 439 cset, path := s.ServiceConfigPath() |
| 393 if err := cfgclient.Get(*c, cfgclient.AsService, cset, path, textproto.M
essage(&s.serviceConfig), &meta); err != nil { | 440 if err := cfgclient.Get(*c, cfgclient.AsService, cset, path, textproto.M
essage(&s.serviceConfig), &meta); err != nil { |
| 394 return errors.Annotate(err).Reason("failed to load service confi
g").Err() | 441 return errors.Annotate(err).Reason("failed to load service confi
g").Err() |
| 395 } | 442 } |
| (...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 579 // | 626 // |
| 580 // An optional permutation function can be provided to modify those Options | 627 // An optional permutation function can be provided to modify those Options |
| 581 // before the Authenticator is created. | 628 // before the Authenticator is created. |
| 582 func (s *Service) TokenSource(c context.Context, f func(o *auth.Options)) (oauth
2.TokenSource, error) { | 629 func (s *Service) TokenSource(c context.Context, f func(o *auth.Options)) (oauth
2.TokenSource, error) { |
| 583 a, err := s.Authenticator(c, f) | 630 a, err := s.Authenticator(c, f) |
| 584 if err != nil { | 631 if err != nil { |
| 585 return nil, err | 632 return nil, err |
| 586 } | 633 } |
| 587 return a.TokenSource() | 634 return a.TokenSource() |
| 588 } | 635 } |
| OLD | NEW |