| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package service | 5 package service |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | |
| 9 "flag" | 8 "flag" |
| 10 "fmt" | 9 "fmt" |
| 11 "net/http" | 10 "net/http" |
| 11 "net/url" |
| 12 "os" | 12 "os" |
| 13 "os/signal" | 13 "os/signal" |
| 14 "path/filepath" | 14 "path/filepath" |
| 15 "runtime/pprof" | 15 "runtime/pprof" |
| 16 "sync/atomic" | 16 "sync/atomic" |
| 17 "time" | 17 "time" |
| 18 | 18 |
| 19 "github.com/luci/luci-go/client/authcli" | 19 "github.com/luci/luci-go/client/authcli" |
| 20 "github.com/luci/luci-go/common/auth" | 20 "github.com/luci/luci-go/common/auth" |
| 21 "github.com/luci/luci-go/common/clock/clockflag" |
| 22 "github.com/luci/luci-go/common/config/impl/filesystem" |
| 21 "github.com/luci/luci-go/common/data/caching/proccache" | 23 "github.com/luci/luci-go/common/data/caching/proccache" |
| 24 "github.com/luci/luci-go/common/errors" |
| 22 "github.com/luci/luci-go/common/gcloud/gs" | 25 "github.com/luci/luci-go/common/gcloud/gs" |
| 23 log "github.com/luci/luci-go/common/logging" | 26 log "github.com/luci/luci-go/common/logging" |
| 24 "github.com/luci/luci-go/common/logging/gologger" | 27 "github.com/luci/luci-go/common/logging/gologger" |
| 28 "github.com/luci/luci-go/common/proto/google" |
| 25 "github.com/luci/luci-go/common/tsmon" | 29 "github.com/luci/luci-go/common/tsmon" |
| 26 "github.com/luci/luci-go/common/tsmon/target" | 30 "github.com/luci/luci-go/common/tsmon/target" |
| 27 "github.com/luci/luci-go/grpc/prpc" | 31 "github.com/luci/luci-go/grpc/prpc" |
| 28 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 32 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 29 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 33 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 30 "github.com/luci/luci-go/logdog/common/storage" | 34 "github.com/luci/luci-go/logdog/common/storage" |
| 31 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 35 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 32 "github.com/luci/luci-go/logdog/server/retryServicesClient" | 36 "github.com/luci/luci-go/logdog/server/retryServicesClient" |
| 33 "github.com/luci/luci-go/logdog/server/service/config" | 37 "github.com/luci/luci-go/logdog/server/service/config" |
| 34 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 38 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 39 "github.com/luci/luci-go/luci_config/server/cfgclient" |
| 40 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client" |
| 41 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig
" |
| 42 "github.com/luci/luci-go/luci_config/server/cfgclient/textproto" |
| 35 | 43 |
| 36 "cloud.google.com/go/compute/metadata" | 44 "cloud.google.com/go/compute/metadata" |
| 37 "golang.org/x/net/context" | 45 "golang.org/x/net/context" |
| 38 "golang.org/x/oauth2" | 46 "golang.org/x/oauth2" |
| 39 "google.golang.org/api/option" | 47 "google.golang.org/api/option" |
| 40 ) | 48 ) |
| 41 | 49 |
| 42 var ( | 50 var ( |
| 43 // ErrInvalidConfig is an error that is returned when the supplied | 51 // ErrInvalidConfig is an error that is returned when the supplied |
| 44 // configuration is invalid. | 52 // configuration is invalid. |
| 45 ErrInvalidConfig = errors.New("invalid configuration") | 53 ErrInvalidConfig = errors.New("invalid configuration") |
| 46 | 54 |
| 47 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina
tor | 55 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina
tor |
| 48 // client. | 56 // client. |
| 49 CoordinatorScopes = []string{ | 57 CoordinatorScopes = []string{ |
| 50 auth.OAuthScopeEmail, | 58 auth.OAuthScopeEmail, |
| 51 } | 59 } |
| 52 ) | 60 ) |
| 53 | 61 |
| 54 // projectConfigCacheDuration is the amount of time to cache a project's | 62 // projectConfigCacheDuration is the amount of time to cache a project's |
| 55 // configuration before reloading. | 63 // configuration before reloading. |
| 56 const projectConfigCacheDuration = 30 * time.Minute | 64 const ( |
| 65 » projectConfigCacheDuration = 30 * time.Minute |
| 66 |
| 67 » // minAuthTokenLifetime is the amount of time that an access token has b
efore |
| 68 » // expiring. |
| 69 » minAuthTokenLifetime = 2 * time.Minute |
| 70 ) |
| 57 | 71 |
| 58 // Service is a base class full of common LogDog service application parameters. | 72 // Service is a base class full of common LogDog service application parameters. |
| 59 type Service struct { | 73 type Service struct { |
| 60 // Name is the name of this service. It is used for logging, metrics, an
d | 74 // Name is the name of this service. It is used for logging, metrics, an
d |
| 61 // user agent string generation. | 75 // user agent string generation. |
| 62 // | 76 // |
| 63 // If empty, a service name will be inferred from the command-line argum
ents. | 77 // If empty, a service name will be inferred from the command-line argum
ents. |
| 64 Name string | 78 Name string |
| 65 // Flags is the set of flags that will be used by the Service. | 79 // Flags is the set of flags that will be used by the Service. |
| 66 Flags flag.FlagSet | 80 Flags flag.FlagSet |
| 67 | 81 |
| 68 shutdownFunc atomic.Value | 82 shutdownFunc atomic.Value |
| 69 | 83 |
| 70 loggingFlags log.Config | 84 loggingFlags log.Config |
| 71 authFlags authcli.Flags | 85 authFlags authcli.Flags |
| 72 configFlags config.Flags | |
| 73 tsMonFlags tsmon.Flags | 86 tsMonFlags tsmon.Flags |
| 74 | 87 |
| 75 coordinatorHost string | 88 coordinatorHost string |
| 76 coordinatorInsecure bool | 89 coordinatorInsecure bool |
| 77 serviceID string | |
| 78 storageCredentialJSONPath string | 90 storageCredentialJSONPath string |
| 79 cpuProfilePath string | 91 cpuProfilePath string |
| 80 heapProfilePath string | 92 heapProfilePath string |
| 81 | 93 |
| 82 » coord logdog.ServicesClient | 94 » // onGCE is true if we're on GCE. We probe this once during Run. |
| 83 » config *config.Manager | 95 » onGCE bool |
| 84 | 96 |
| 85 » onGCE bool | 97 » // killCheckInterval is the amount of time in between service configurat
ion |
| 98 » // checks. If set, this service will periodically reload its service |
| 99 » // configuration. If that configuration has changed, the service will ki
ll |
| 100 » // itself. |
| 101 » // |
| 102 » // Since, in production, this is running under an execution harness such
as |
| 103 » // Kubernetes, the service will restart and load the new configuration.
This |
| 104 » // is easier than implementing in-process configuration updating. |
| 105 » killCheckInterval clockflag.Duration |
| 106 » // testConfigFilePath is the path to a local configuration service files
ystem |
| 107 » // (impl/filesystem) root. This is used for testing. |
| 108 » testConfigFilePath string |
| 109 » // serviceConfig is the cached service configuration. |
| 110 » serviceConfig svcconfig.Config |
| 111 |
| 112 » // serviceID is the cloud project ID, which is also this service's uniqu
e |
| 113 » // ID. This can be specified by flag or, if on GCE, will automatically b
e |
| 114 » // probed from metadata. |
| 115 » serviceID string |
| 116 |
| 117 » coord logdog.ServicesClient |
| 86 } | 118 } |
| 87 | 119 |
| 88 // Run performs service-wide initialization and invokes the specified run | 120 // Run performs service-wide initialization and invokes the specified run |
| 89 // function. | 121 // function. |
| 90 func (s *Service) Run(c context.Context, f func(context.Context) error) { | 122 func (s *Service) Run(c context.Context, f func(context.Context) error) { |
| 91 c = gologger.StdConfig.Use(c) | 123 c = gologger.StdConfig.Use(c) |
| 92 | 124 |
| 93 // If a service name isn't specified, default to the base of the current | 125 // If a service name isn't specified, default to the base of the current |
| 94 // executable. | 126 // executable. |
| 95 if s.Name == "" { | 127 if s.Name == "" { |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 143 | 175 |
| 144 if err := pprof.WriteHeapProfile(fd); err != nil { | 176 if err := pprof.WriteHeapProfile(fd); err != nil { |
| 145 log.Fields{ | 177 log.Fields{ |
| 146 log.ErrorKey: err, | 178 log.ErrorKey: err, |
| 147 "path": p, | 179 "path": p, |
| 148 }.Warningf(c, "Failed to write heap profile.") | 180 }.Warningf(c, "Failed to write heap profile.") |
| 149 } | 181 } |
| 150 }() | 182 }() |
| 151 } | 183 } |
| 152 | 184 |
| 153 // Are we running on a GCE intance? | |
| 154 if err := s.probeGCEEnvironment(c); err != nil { | |
| 155 log.WithError(err).Errorf(c, "Failed to probe GCE environment.") | |
| 156 return err | |
| 157 } | |
| 158 | |
| 159 // Validate the runtime environment. | 185 // Validate the runtime environment. |
| 160 if s.serviceID == "" { | 186 if s.serviceID == "" { |
| 161 » » return errors.New("no service ID was configured") | 187 » » return errors.New("no service ID was configured (-service-id)") |
| 162 } | 188 } |
| 163 | 189 |
| 190 // Install a process-wide cache. |
| 191 c = proccache.Use(c, &proccache.Cache{}) |
| 192 |
| 164 // Configure our signal handler. It will listen for terminating signals
and | 193 // Configure our signal handler. It will listen for terminating signals
and |
| 165 // issue a shutdown signal if one is received. | 194 // issue a shutdown signal if one is received. |
| 166 signalC := make(chan os.Signal) | 195 signalC := make(chan os.Signal) |
| 167 go func(c context.Context) { | 196 go func(c context.Context) { |
| 168 hasShutdownAlready := false | 197 hasShutdownAlready := false |
| 169 for sig := range signalC { | 198 for sig := range signalC { |
| 170 if !hasShutdownAlready { | 199 if !hasShutdownAlready { |
| 171 hasShutdownAlready = true | 200 hasShutdownAlready = true |
| 172 | 201 |
| 173 log.Warningf(log.SetField(c, "signal", sig), "Re
ceived close signal. Send again to terminate immediately.") | 202 log.Warningf(log.SetField(c, "signal", sig), "Re
ceived close signal. Send again to terminate immediately.") |
| (...skipping 15 matching lines...) Expand all Loading... |
| 189 // Initialize our tsmon library. | 218 // Initialize our tsmon library. |
| 190 c = tsmon.WithState(c, tsmon.NewState()) | 219 c = tsmon.WithState(c, tsmon.NewState()) |
| 191 | 220 |
| 192 if err := tsmon.InitializeFromFlags(c, &s.tsMonFlags); err != nil { | 221 if err := tsmon.InitializeFromFlags(c, &s.tsMonFlags); err != nil { |
| 193 log.WithError(err).Warningf(c, "Failed to initialize monitoring;
will continue without metrics.") | 222 log.WithError(err).Warningf(c, "Failed to initialize monitoring;
will continue without metrics.") |
| 194 } | 223 } |
| 195 defer tsmon.Shutdown(c) | 224 defer tsmon.Shutdown(c) |
| 196 | 225 |
| 197 // Initialize our Client instantiations. | 226 // Initialize our Client instantiations. |
| 198 var err error | 227 var err error |
| 199 » s.coord, err = s.initCoordinatorClient(c) | 228 » if s.coord, err = s.initCoordinatorClient(c); err != nil { |
| 200 » if err != nil { | |
| 201 log.WithError(err).Errorf(c, "Failed to setup Coordinator client
.") | 229 log.WithError(err).Errorf(c, "Failed to setup Coordinator client
.") |
| 202 return err | 230 return err |
| 203 } | 231 } |
| 204 | 232 |
| 205 » s.config, err = s.initConfig(c) | 233 » // Initialize and install our config service and caching layers, and loa
d our |
| 206 » if err != nil { | 234 » // initial service config. |
| 235 » if err := s.initConfig(&c); err != nil { |
| 207 log.WithError(err).Errorf(c, "Failed to setup configuration.") | 236 log.WithError(err).Errorf(c, "Failed to setup configuration.") |
| 208 return err | 237 return err |
| 209 } | 238 } |
| 210 defer s.config.Close() | |
| 211 | 239 |
| 240 // Clear our shutdown function on termination. |
| 212 defer s.SetShutdownFunc(nil) | 241 defer s.SetShutdownFunc(nil) |
| 213 | 242 |
| 214 // Install a process-wide cache. | |
| 215 c = proccache.Use(c, &proccache.Cache{}) | |
| 216 | |
| 217 // Run main service function. | 243 // Run main service function. |
| 218 return f(c) | 244 return f(c) |
| 219 } | 245 } |
| 220 | 246 |
| 221 func (s *Service) addFlags(c context.Context, fs *flag.FlagSet) { | 247 func (s *Service) addFlags(c context.Context, fs *flag.FlagSet) { |
| 222 // Initialize logging flags. | 248 // Initialize logging flags. |
| 223 s.loggingFlags.Level = log.Warning | 249 s.loggingFlags.Level = log.Warning |
| 224 s.loggingFlags.AddFlags(fs) | 250 s.loggingFlags.AddFlags(fs) |
| 225 | 251 |
| 226 // Initialize tsmon flags. | 252 // Initialize tsmon flags. |
| 227 s.tsMonFlags = tsmon.NewFlags() | 253 s.tsMonFlags = tsmon.NewFlags() |
| 228 s.tsMonFlags.Flush = tsmon.FlushAuto | 254 s.tsMonFlags.Flush = tsmon.FlushAuto |
| 229 s.tsMonFlags.Target.TargetType = target.TaskType | 255 s.tsMonFlags.Target.TargetType = target.TaskType |
| 230 s.tsMonFlags.Target.TaskJobName = s.Name | 256 s.tsMonFlags.Target.TaskJobName = s.Name |
| 231 s.tsMonFlags.Register(fs) | 257 s.tsMonFlags.Register(fs) |
| 232 | 258 |
| 233 s.authFlags.Register(fs, auth.Options{}) | 259 s.authFlags.Register(fs, auth.Options{}) |
| 234 s.configFlags.AddToFlagSet(fs) | |
| 235 | 260 |
| 236 fs.StringVar(&s.serviceID, "service-id", "", | 261 fs.StringVar(&s.serviceID, "service-id", "", |
| 237 "Specify the service ID that this instance is supporting. If emp
ty, the service ID "+ | 262 "Specify the service ID that this instance is supporting. If emp
ty, the service ID "+ |
| 238 "will attempt to be resolved by probing the local enviro
nment. This probably will match the "+ | 263 "will attempt to be resolved by probing the local enviro
nment. This probably will match the "+ |
| 239 "App ID of the Coordinator.") | 264 "App ID of the Coordinator.") |
| 240 fs.StringVar(&s.coordinatorHost, "coordinator", "", | 265 fs.StringVar(&s.coordinatorHost, "coordinator", "", |
| 241 "The Coordinator service's [host][:port].") | 266 "The Coordinator service's [host][:port].") |
| 242 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, | 267 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, |
| 243 "Connect to Coordinator over HTTP (instead of HTTPS).") | 268 "Connect to Coordinator over HTTP (instead of HTTPS).") |
| 244 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path
", "", | 269 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path
", "", |
| 245 "If supplied, the path of a JSON credential file to load and use
for storage operations.") | 270 "If supplied, the path of a JSON credential file to load and use
for storage operations.") |
| 246 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "", | 271 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "", |
| 247 "If supplied, enable CPU profiling and write the profile here.") | 272 "If supplied, enable CPU profiling and write the profile here.") |
| 248 fs.StringVar(&s.heapProfilePath, "heap-profile-path", "", | 273 fs.StringVar(&s.heapProfilePath, "heap-profile-path", "", |
| 249 "If supplied, enable CPU profiling and write the profile here.") | 274 "If supplied, enable CPU profiling and write the profile here.") |
| 275 fs.Var(&s.killCheckInterval, "config-kill-interval", |
| 276 "If non-zero, poll for configuration changes and kill the applic
ation if one is detected.") |
| 277 fs.StringVar(&s.testConfigFilePath, "test-config-file-path", "", |
| 278 "(Testing) If set, load configuration from a local filesystem ro
oted here.") |
| 250 } | 279 } |
| 251 | 280 |
| 252 // probeGCEEnvironment fills in any parameters that can be probed from Google | 281 // probeGCEEnvironment fills in any parameters that can be probed from Google |
| 253 // Compute Engine metadata. | 282 // Compute Engine metadata. |
| 254 // | 283 // |
| 255 // If we're not running on GCE, this will return nil. An error will only be | 284 // If we're not running on GCE, this will return nil. An error will only be |
| 256 // returned if an operation that is expected to work fails. | 285 // returned if an operation that is expected to work fails. |
| 257 func (s *Service) probeGCEEnvironment(c context.Context) error { | 286 func (s *Service) probeGCEEnvironment(c context.Context) error { |
| 258 s.onGCE = metadata.OnGCE() | 287 s.onGCE = metadata.OnGCE() |
| 259 if !s.onGCE { | 288 if !s.onGCE { |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 294 prpcClient.Options.UserAgent = fmt.Sprintf("%s/%s", s.Name, prpc.Default
UserAgent) | 323 prpcClient.Options.UserAgent = fmt.Sprintf("%s/%s", s.Name, prpc.Default
UserAgent) |
| 295 if s.coordinatorInsecure { | 324 if s.coordinatorInsecure { |
| 296 prpcClient.Options.Insecure = true | 325 prpcClient.Options.Insecure = true |
| 297 } | 326 } |
| 298 sc := logdog.NewServicesPRPCClient(&prpcClient) | 327 sc := logdog.NewServicesPRPCClient(&prpcClient) |
| 299 | 328 |
| 300 // Wrap the resulting client in a retry harness. | 329 // Wrap the resulting client in a retry harness. |
| 301 return retryServicesClient.New(sc, nil), nil | 330 return retryServicesClient.New(sc, nil), nil |
| 302 } | 331 } |
| 303 | 332 |
| 304 func (s *Service) initConfig(c context.Context) (*config.Manager, error) { | 333 func (s *Service) initConfig(c *context.Context) error { |
| 305 » rt, err := s.AuthenticatedTransport(c, nil) | 334 » opts := config.CacheOptions{ |
| 306 » if err != nil { | 335 » » CacheExpiration: projectConfigCacheDuration, |
| 307 » » log.WithError(err).Errorf(c, "Failed to create config client.") | |
| 308 » » return nil, err | |
| 309 } | 336 } |
| 310 | 337 |
| 311 » s.configFlags.RoundTripper = rt | 338 » // If a testConfigFilePath was specified, use a mock configuration servi
ce |
| 312 » o, err := s.configFlags.CoordinatorOptions(c, s.coord) | 339 » // that loads from a local file. |
| 313 » if err != nil { | 340 » var p client.Provider |
| 314 » » log.WithError(err).Errorf(c, "Failed to load configuration param
eters.") | 341 » if s.testConfigFilePath == "" { |
| 315 » » return nil, err | 342 » » ccfg, err := s.coord.GetConfig(*c, &google.Empty{}) |
| 343 » » if err != nil { |
| 344 » » » return err |
| 345 » » } |
| 346 |
| 347 » » // Determine our config service host. |
| 348 » » // |
| 349 » » // Older Coordinator instances may provide the full URL instead
of the host, |
| 350 » » // in which case we will extract the host from the URL. |
| 351 » » host := ccfg.ConfigServiceHost |
| 352 » » if host == "" { |
| 353 » » » if ccfg.ConfigServiceUrl == "" { |
| 354 » » » » return errors.New("coordinator does not specify
a config service") |
| 355 » » » } |
| 356 » » » u, err := url.Parse(ccfg.ConfigServiceUrl) |
| 357 » » » if err != nil { |
| 358 » » » » return errors.Annotate(err).Reason("failed to pa
rse config service URL").Err() |
| 359 » » » } |
| 360 » » » host = u.Host |
| 361 » » } |
| 362 |
| 363 » » if ccfg.ConfigSet == "" { |
| 364 » » » return errors.New("coordinator does not specify a config
set") |
| 365 » » } |
| 366 |
| 367 » » log.Fields{ |
| 368 » » » "host": host, |
| 369 » » }.Debugf(*c, "Using remote configuration service client.") |
| 370 » » p = &client.RemoteProvider{ |
| 371 » » » Host: host, |
| 372 » » } |
| 373 » } else { |
| 374 » » // Test / Local: use filesystem config path. |
| 375 » » ci, err := filesystem.New(s.testConfigFilePath) |
| 376 » » if err != nil { |
| 377 » » » return err |
| 378 » » } |
| 379 » » p = &testconfig.Provider{Base: ci} |
| 316 } | 380 } |
| 317 o.ServiceID = s.serviceID | |
| 318 o.ProjectConfigCacheDuration = projectConfigCacheDuration | |
| 319 o.KillFunc = s.shutdown | |
| 320 | 381 |
| 321 » return config.NewManager(c, *o) | 382 » // Add config caching layers. |
| 383 » *c = opts.WrapBackend(*c, &client.Backend{ |
| 384 » » Provider: p, |
| 385 » }) |
| 386 |
| 387 » // Load our service configuration. |
| 388 » var meta cfgclient.Meta |
| 389 » cset, path := s.ServiceConfigPath() |
| 390 » if err := cfgclient.Get(*c, cfgclient.AsService, cset, path, textproto.M
essage(&s.serviceConfig), &meta); err != nil { |
| 391 » » return errors.Annotate(err).Reason("failed to load service confi
g").Err() |
| 392 » } |
| 393 |
| 394 » // Create a poller for our service config. |
| 395 » if s.killCheckInterval > 0 { |
| 396 » » pollerC, pollerCancelFunc := context.WithCancel(*c) |
| 397 |
| 398 » » poller := config.ChangePoller{ |
| 399 » » » ConfigSet: cset, |
| 400 » » » Path: path, |
| 401 » » » Period: time.Duration(s.killCheckInterval), |
| 402 » » » OnChange: func() { |
| 403 » » » » // When a configuration change is detected, stop
future polling and call |
| 404 » » » » // our shutdown function. |
| 405 » » » » pollerCancelFunc() |
| 406 » » » » s.shutdown() |
| 407 » » » }, |
| 408 » » » ContentHash: meta.ContentHash, |
| 409 » » } |
| 410 » » go poller.Run(pollerC) |
| 411 » } |
| 412 » return nil |
| 413 } |
| 414 |
| 415 // ServiceConfigPath returns the ConfigSet and path to the current service's |
| 416 // configuration. |
| 417 func (s *Service) ServiceConfigPath() (cfgtypes.ConfigSet, string) { |
| 418 » return cfgtypes.ServiceConfigSet(s.serviceID), svcconfig.ServiceConfigPa
th |
| 419 } |
| 420 |
| 421 // ServiceConfig returns the configuration data for the current service. |
| 422 func (s *Service) ServiceConfig() *svcconfig.Config { return &s.serviceConfig } |
| 423 |
| 424 // ProjectConfigPath returns the ConfigSet and path to the current service's |
| 425 // project configuration for proj. |
| 426 func (s *Service) ProjectConfigPath(proj cfgtypes.ProjectName) (cfgtypes.ConfigS
et, string) { |
| 427 » return cfgtypes.ProjectConfigSet(proj), svcconfig.ProjectConfigPath(s.se
rviceID) |
| 428 } |
| 429 |
| 430 // ProjectConfig returns the current service's project configuration for proj. |
| 431 func (s *Service) ProjectConfig(c context.Context, proj cfgtypes.ProjectName) (*
svcconfig.ProjectConfig, error) { |
| 432 » cset, path := s.ProjectConfigPath(proj) |
| 433 |
| 434 » var pcfg svcconfig.ProjectConfig |
| 435 » if err := cfgclient.Get(c, cfgclient.AsService, cset, path, textproto.Me
ssage(&pcfg), nil); err != nil { |
| 436 » » return nil, errors.Annotate(err).Reason("failed to load project
config from %(cset)s.%(path)s"). |
| 437 » » » D("cset", cset).D("path", path).Err() |
| 438 » } |
| 439 » return &pcfg, nil |
| 322 } | 440 } |
| 323 | 441 |
| 324 // SetShutdownFunc sets the service shutdown function. | 442 // SetShutdownFunc sets the service shutdown function. |
| 325 func (s *Service) SetShutdownFunc(f func()) { | 443 func (s *Service) SetShutdownFunc(f func()) { |
| 326 s.shutdownFunc.Store(f) | 444 s.shutdownFunc.Store(f) |
| 327 } | 445 } |
| 328 | 446 |
| 329 func (s *Service) shutdown() { | 447 func (s *Service) shutdown() { |
| 330 v := s.shutdownFunc.Load() | 448 v := s.shutdownFunc.Load() |
| 331 if f, ok := v.(func()); ok { | 449 if f, ok := v.(func()); ok { |
| 332 f() | 450 f() |
| 333 } else { | 451 } else { |
| 334 s.shutdownImmediately() | 452 s.shutdownImmediately() |
| 335 } | 453 } |
| 336 } | 454 } |
| 337 | 455 |
| 338 func (s *Service) shutdownImmediately() { | 456 func (s *Service) shutdownImmediately() { |
| 339 os.Exit(1) | 457 os.Exit(1) |
| 340 } | 458 } |
| 341 | 459 |
| 342 // Config returns the cached service configuration. | |
| 343 func (s *Service) Config() *svcconfig.Config { | |
| 344 return s.config.Config() | |
| 345 } | |
| 346 | |
| 347 // ProjectConfig returns the cached project configuration. | |
| 348 // | |
| 349 // If the project configuration is not available, nil will be returned. | |
| 350 func (s *Service) ProjectConfig(c context.Context, proj cfgtypes.ProjectName) (*
svcconfig.ProjectConfig, error) { | |
| 351 return s.config.ProjectConfig(c, proj) | |
| 352 } | |
| 353 | |
| 354 // Coordinator returns the cached Coordinator client. | 460 // Coordinator returns the cached Coordinator client. |
| 355 func (s *Service) Coordinator() logdog.ServicesClient { | 461 func (s *Service) Coordinator() logdog.ServicesClient { |
| 356 return s.coord | 462 return s.coord |
| 357 } | 463 } |
| 358 | 464 |
| 359 // ServiceID returns the service ID. | 465 // ServiceID returns the service ID. |
| 360 // | 466 // |
| 361 // This is synonymous with the cloud "project ID" and the AppEngine "app ID". | 467 // This is synonymous with the cloud "project ID" and the AppEngine "app ID". |
| 362 func (s *Service) ServiceID() string { | 468 func (s *Service) ServiceID() string { |
| 363 return s.serviceID | 469 return s.serviceID |
| 364 } | 470 } |
| 365 | 471 |
| 366 // IntermediateStorage instantiates the configured intermediate Storage | 472 // IntermediateStorage instantiates the configured intermediate Storage |
| 367 // instance. | 473 // instance. |
| 368 func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error
) { | 474 func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error
) { |
| 369 » cfg := s.config.Config() | 475 » cfg := s.ServiceConfig() |
| 370 if cfg.GetStorage() == nil { | 476 if cfg.GetStorage() == nil { |
| 371 log.Errorf(c, "Missing storage configuration.") | 477 log.Errorf(c, "Missing storage configuration.") |
| 372 return nil, ErrInvalidConfig | 478 return nil, ErrInvalidConfig |
| 373 } | 479 } |
| 374 | 480 |
| 375 btcfg := cfg.GetStorage().GetBigtable() | 481 btcfg := cfg.GetStorage().GetBigtable() |
| 376 if btcfg == nil { | 482 if btcfg == nil { |
| 377 log.Errorf(c, "Missing BigTable storage configuration") | 483 log.Errorf(c, "Missing BigTable storage configuration") |
| 378 return nil, ErrInvalidConfig | 484 return nil, ErrInvalidConfig |
| 379 } | 485 } |
| (...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 469 // | 575 // |
| 470 // An optional permutation function can be provided to modify those Options | 576 // An optional permutation function can be provided to modify those Options |
| 471 // before the Authenticator is created. | 577 // before the Authenticator is created. |
| 472 func (s *Service) TokenSource(c context.Context, f func(o *auth.Options)) (oauth
2.TokenSource, error) { | 578 func (s *Service) TokenSource(c context.Context, f func(o *auth.Options)) (oauth
2.TokenSource, error) { |
| 473 a, err := s.Authenticator(c, f) | 579 a, err := s.Authenticator(c, f) |
| 474 if err != nil { | 580 if err != nil { |
| 475 return nil, err | 581 return nil, err |
| 476 } | 582 } |
| 477 return a.TokenSource() | 583 return a.TokenSource() |
| 478 } | 584 } |
| OLD | NEW |