| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package service | 5 package service |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "flag" | 8 "flag" |
| 9 "fmt" | |
| 10 "net/http" | 9 "net/http" |
| 11 "net/url" | 10 "net/url" |
| 12 "os" | 11 "os" |
| 13 "os/signal" | 12 "os/signal" |
| 14 "path/filepath" | 13 "path/filepath" |
| 15 "runtime/pprof" | 14 "runtime/pprof" |
| 15 "sort" |
| 16 "strings" |
| 17 "sync" |
| 16 "sync/atomic" | 18 "sync/atomic" |
| 17 "time" | 19 "time" |
| 18 | 20 |
| 19 "github.com/luci/luci-go/appengine/gaesettings" | 21 "github.com/luci/luci-go/appengine/gaesettings" |
| 20 "github.com/luci/luci-go/client/authcli" | 22 "github.com/luci/luci-go/client/authcli" |
| 21 » "github.com/luci/luci-go/common/auth" | 23 » commonAuth "github.com/luci/luci-go/common/auth" |
| 22 "github.com/luci/luci-go/common/clock/clockflag" | 24 "github.com/luci/luci-go/common/clock/clockflag" |
| 23 "github.com/luci/luci-go/common/config/impl/filesystem" | 25 "github.com/luci/luci-go/common/config/impl/filesystem" |
| 24 "github.com/luci/luci-go/common/data/caching/proccache" | 26 "github.com/luci/luci-go/common/data/caching/proccache" |
| 25 "github.com/luci/luci-go/common/errors" | 27 "github.com/luci/luci-go/common/errors" |
| 26 "github.com/luci/luci-go/common/gcloud/gs" | 28 "github.com/luci/luci-go/common/gcloud/gs" |
| 29 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 27 log "github.com/luci/luci-go/common/logging" | 30 log "github.com/luci/luci-go/common/logging" |
| 28 "github.com/luci/luci-go/common/logging/gologger" | 31 "github.com/luci/luci-go/common/logging/gologger" |
| 29 "github.com/luci/luci-go/common/proto/google" | 32 "github.com/luci/luci-go/common/proto/google" |
| 30 "github.com/luci/luci-go/common/tsmon" | 33 "github.com/luci/luci-go/common/tsmon" |
| 31 "github.com/luci/luci-go/common/tsmon/target" | 34 "github.com/luci/luci-go/common/tsmon/target" |
| 32 "github.com/luci/luci-go/grpc/prpc" | 35 "github.com/luci/luci-go/grpc/prpc" |
| 33 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 36 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 34 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 37 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 35 "github.com/luci/luci-go/logdog/common/storage" | 38 "github.com/luci/luci-go/logdog/common/storage" |
| 36 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 39 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 37 "github.com/luci/luci-go/logdog/server/retryServicesClient" | 40 "github.com/luci/luci-go/logdog/server/retryServicesClient" |
| 38 "github.com/luci/luci-go/logdog/server/service/config" | 41 "github.com/luci/luci-go/logdog/server/service/config" |
| 39 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 42 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 40 "github.com/luci/luci-go/luci_config/server/cfgclient" | 43 "github.com/luci/luci-go/luci_config/server/cfgclient" |
| 41 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client" | 44 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client" |
| 42 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig
" | 45 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig
" |
| 43 "github.com/luci/luci-go/luci_config/server/cfgclient/textproto" | 46 "github.com/luci/luci-go/luci_config/server/cfgclient/textproto" |
| 47 serverAuth "github.com/luci/luci-go/server/auth" |
| 44 "github.com/luci/luci-go/server/settings" | 48 "github.com/luci/luci-go/server/settings" |
| 45 | 49 |
| 46 "github.com/luci/gae/impl/cloud" | 50 "github.com/luci/gae/impl/cloud" |
| 47 | 51 |
| 48 "cloud.google.com/go/compute/metadata" | 52 "cloud.google.com/go/compute/metadata" |
| 49 "cloud.google.com/go/datastore" | 53 "cloud.google.com/go/datastore" |
| 54 "cloud.google.com/go/pubsub" |
| 50 "golang.org/x/net/context" | 55 "golang.org/x/net/context" |
| 51 "golang.org/x/oauth2" | |
| 52 "google.golang.org/api/option" | 56 "google.golang.org/api/option" |
| 53 ) | 57 ) |
| 54 | 58 |
| 55 var ( | 59 var ( |
| 56 // ErrInvalidConfig is an error that is returned when the supplied | 60 // ErrInvalidConfig is an error that is returned when the supplied |
| 57 // configuration is invalid. | 61 // configuration is invalid. |
| 58 ErrInvalidConfig = errors.New("invalid configuration") | 62 ErrInvalidConfig = errors.New("invalid configuration") |
| 59 | 63 |
| 60 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina
tor | 64 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina
tor |
| 61 // client. | 65 // client. |
| 62 CoordinatorScopes = []string{ | 66 CoordinatorScopes = []string{ |
| 63 » » auth.OAuthScopeEmail, | 67 » » commonAuth.OAuthScopeEmail, |
| 64 } | 68 } |
| 65 ) | 69 ) |
| 66 | 70 |
| 67 // projectConfigCacheDuration is the amount of time to cache a project's | 71 // projectConfigCacheDuration is the amount of time to cache a project's |
| 68 // configuration before reloading. | 72 // configuration before reloading. |
| 69 const ( | 73 const ( |
| 70 projectConfigCacheDuration = 30 * time.Minute | 74 projectConfigCacheDuration = 30 * time.Minute |
| 71 | 75 |
| 72 // minAuthTokenLifetime is the amount of time that an access token has b
efore | 76 // minAuthTokenLifetime is the amount of time that an access token has b
efore |
| 73 // expiring. | 77 // expiring. |
| 74 minAuthTokenLifetime = 2 * time.Minute | 78 minAuthTokenLifetime = 2 * time.Minute |
| 79 |
| 80 // authCacheSize is the maximum number of elements to store in the auth |
| 81 // global cache LRU. |
| 82 // |
| 83 // We don't expect to load too many different authentication tokens, so
a |
| 84 // relatively low number should be fine here. |
| 85 authCacheSize = 128 |
| 75 ) | 86 ) |
| 76 | 87 |
| 77 // Service is a base class full of common LogDog service application parameters. | 88 // Service is a base class full of common LogDog service application parameters. |
| 78 type Service struct { | 89 type Service struct { |
| 79 // Name is the name of this service. It is used for logging, metrics, an
d | 90 // Name is the name of this service. It is used for logging, metrics, an
d |
| 80 // user agent string generation. | 91 // user agent string generation. |
| 81 // | 92 // |
| 82 // If empty, a service name will be inferred from the command-line argum
ents. | 93 // If empty, a service name will be inferred from the command-line argum
ents. |
| 83 Name string | 94 Name string |
| 84 // Flags is the set of flags that will be used by the Service. | 95 // Flags is the set of flags that will be used by the Service. |
| 85 Flags flag.FlagSet | 96 Flags flag.FlagSet |
| 86 | 97 |
| 87 shutdownFunc atomic.Value | 98 shutdownFunc atomic.Value |
| 88 | 99 |
| 89 loggingFlags log.Config | 100 loggingFlags log.Config |
| 90 authFlags authcli.Flags | 101 authFlags authcli.Flags |
| 91 tsMonFlags tsmon.Flags | 102 tsMonFlags tsmon.Flags |
| 92 | 103 |
| 93 » coordinatorHost string | 104 » coordinatorHost string |
| 94 » coordinatorInsecure bool | 105 » coordinatorInsecure bool |
| 95 » storageCredentialJSONPath string | 106 » cpuProfilePath string |
| 96 » cpuProfilePath string | 107 » heapProfilePath string |
| 97 » heapProfilePath string | |
| 98 | 108 |
| 99 // onGCE is true if we're on GCE. We probe this once during Run. | 109 // onGCE is true if we're on GCE. We probe this once during Run. |
| 100 onGCE bool | 110 onGCE bool |
| 101 hasDatastore bool | 111 hasDatastore bool |
| 102 | 112 |
| 103 // killCheckInterval is the amount of time in between service configurat
ion | 113 // killCheckInterval is the amount of time in between service configurat
ion |
| 104 // checks. If set, this service will periodically reload its service | 114 // checks. If set, this service will periodically reload its service |
| 105 // configuration. If that configuration has changed, the service will ki
ll | 115 // configuration. If that configuration has changed, the service will ki
ll |
| 106 // itself. | 116 // itself. |
| 107 // | 117 // |
| 108 // Since, in production, this is running under an execution harness such
as | 118 // Since, in production, this is running under an execution harness such
as |
| 109 // Kubernetes, the service will restart and load the new configuration.
This | 119 // Kubernetes, the service will restart and load the new configuration.
This |
| 110 // is easier than implementing in-process configuration updating. | 120 // is easier than implementing in-process configuration updating. |
| 111 killCheckInterval clockflag.Duration | 121 killCheckInterval clockflag.Duration |
| 112 // testConfigFilePath is the path to a local configuration service files
ystem | 122 // testConfigFilePath is the path to a local configuration service files
ystem |
| 113 // (impl/filesystem) root. This is used for testing. | 123 // (impl/filesystem) root. This is used for testing. |
| 114 testConfigFilePath string | 124 testConfigFilePath string |
| 115 // serviceConfig is the cached service configuration. | 125 // serviceConfig is the cached service configuration. |
| 116 serviceConfig svcconfig.Config | 126 serviceConfig svcconfig.Config |
| 117 configCache config.MessageCache | 127 configCache config.MessageCache |
| 118 | 128 |
| 119 // serviceID is the cloud project ID, which is also this service's uniqu
e | 129 // serviceID is the cloud project ID, which is also this service's uniqu
e |
| 120 // ID. This can be specified by flag or, if on GCE, will automatically b
e | 130 // ID. This can be specified by flag or, if on GCE, will automatically b
e |
| 121 // probed from metadata. | 131 // probed from metadata. |
| 122 serviceID string | 132 serviceID string |
| 123 | 133 |
| 124 coord logdog.ServicesClient | 134 coord logdog.ServicesClient |
| 135 |
| 136 // authCache is a cache of instantiated Authenticator instances, keyed o
n |
| 137 // sorted NULL-delimited scope strings (see authenticatorForScopes). |
| 138 authCacheLock sync.RWMutex |
| 139 authCache map[string]*commonAuth.Authenticator |
| 125 } | 140 } |
| 126 | 141 |
| 127 // Run performs service-wide initialization and invokes the specified run | 142 // Run performs service-wide initialization and invokes the specified run |
| 128 // function. | 143 // function. |
| 129 func (s *Service) Run(c context.Context, f func(context.Context) error) { | 144 func (s *Service) Run(c context.Context, f func(context.Context) error) { |
| 130 c = gologger.StdConfig.Use(c) | 145 c = gologger.StdConfig.Use(c) |
| 131 | 146 |
| 132 // If a service name isn't specified, default to the base of the current | 147 // If a service name isn't specified, default to the base of the current |
| 133 // executable. | 148 // executable. |
| 134 if s.Name == "" { | 149 if s.Name == "" { |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 195 | 210 |
| 196 // Cancel our Context after we're done our run loop. | 211 // Cancel our Context after we're done our run loop. |
| 197 c, cancelFunc := context.WithCancel(c) | 212 c, cancelFunc := context.WithCancel(c) |
| 198 defer cancelFunc() | 213 defer cancelFunc() |
| 199 | 214 |
| 200 // Validate the runtime environment. | 215 // Validate the runtime environment. |
| 201 if s.serviceID == "" { | 216 if s.serviceID == "" { |
| 202 return errors.New("no service ID was configured (-service-id)") | 217 return errors.New("no service ID was configured (-service-id)") |
| 203 } | 218 } |
| 204 | 219 |
| 220 // Install our authentication service. |
| 221 c = s.withAuthService(c) |
| 222 |
| 205 // Install a cloud datastore client. This is non-fatal if it fails. | 223 // Install a cloud datastore client. This is non-fatal if it fails. |
| 206 dsClient, err := s.initDatastoreClient(c) | 224 dsClient, err := s.initDatastoreClient(c) |
| 207 if err == nil { | 225 if err == nil { |
| 208 defer dsClient.Close() | 226 defer dsClient.Close() |
| 209 | 227 |
| 210 ccfg := cloud.Config{ | 228 ccfg := cloud.Config{ |
| 211 DS: dsClient, | 229 DS: dsClient, |
| 212 } | 230 } |
| 213 c = ccfg.Use(c) | 231 c = ccfg.Use(c) |
| 214 c = settings.Use(c, settings.New(gaesettings.Storage{})) | 232 c = settings.Use(c, settings.New(gaesettings.Storage{})) |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 280 s.loggingFlags.Level = log.Warning | 298 s.loggingFlags.Level = log.Warning |
| 281 s.loggingFlags.AddFlags(fs) | 299 s.loggingFlags.AddFlags(fs) |
| 282 | 300 |
| 283 // Initialize tsmon flags. | 301 // Initialize tsmon flags. |
| 284 s.tsMonFlags = tsmon.NewFlags() | 302 s.tsMonFlags = tsmon.NewFlags() |
| 285 s.tsMonFlags.Flush = tsmon.FlushAuto | 303 s.tsMonFlags.Flush = tsmon.FlushAuto |
| 286 s.tsMonFlags.Target.TargetType = target.TaskType | 304 s.tsMonFlags.Target.TargetType = target.TaskType |
| 287 s.tsMonFlags.Target.TaskJobName = s.Name | 305 s.tsMonFlags.Target.TaskJobName = s.Name |
| 288 s.tsMonFlags.Register(fs) | 306 s.tsMonFlags.Register(fs) |
| 289 | 307 |
| 290 » s.authFlags.Register(fs, auth.Options{}) | 308 » s.authFlags.Register(fs, commonAuth.Options{}) |
| 291 | 309 |
| 292 fs.StringVar(&s.serviceID, "service-id", "", | 310 fs.StringVar(&s.serviceID, "service-id", "", |
| 293 "Specify the service ID that this instance is supporting. If emp
ty, the service ID "+ | 311 "Specify the service ID that this instance is supporting. If emp
ty, the service ID "+ |
| 294 "will attempt to be resolved by probing the local enviro
nment. This probably will match the "+ | 312 "will attempt to be resolved by probing the local enviro
nment. This probably will match the "+ |
| 295 "App ID of the Coordinator.") | 313 "App ID of the Coordinator.") |
| 296 fs.StringVar(&s.coordinatorHost, "coordinator", "", | 314 fs.StringVar(&s.coordinatorHost, "coordinator", "", |
| 297 "The Coordinator service's [host][:port].") | 315 "The Coordinator service's [host][:port].") |
| 298 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, | 316 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, |
| 299 "Connect to Coordinator over HTTP (instead of HTTPS).") | 317 "Connect to Coordinator over HTTP (instead of HTTPS).") |
| 300 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path
", "", | |
| 301 "If supplied, the path of a JSON credential file to load and use
for storage operations.") | |
| 302 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "", | 318 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "", |
| 303 "If supplied, enable CPU profiling and write the profile here.") | 319 "If supplied, enable CPU profiling and write the profile here.") |
| 304 fs.StringVar(&s.heapProfilePath, "heap-profile-path", "", | 320 fs.StringVar(&s.heapProfilePath, "heap-profile-path", "", |
| 305 "If supplied, enable CPU profiling and write the profile here.") | 321 "If supplied, enable CPU profiling and write the profile here.") |
| 306 fs.Var(&s.killCheckInterval, "config-kill-interval", | 322 fs.Var(&s.killCheckInterval, "config-kill-interval", |
| 307 "If non-zero, poll for configuration changes and kill the applic
ation if one is detected.") | 323 "If non-zero, poll for configuration changes and kill the applic
ation if one is detected.") |
| 308 fs.StringVar(&s.testConfigFilePath, "test-config-file-path", "", | 324 fs.StringVar(&s.testConfigFilePath, "test-config-file-path", "", |
| 309 "(Testing) If set, load configuration from a local filesystem ro
oted here.") | 325 "(Testing) If set, load configuration from a local filesystem ro
oted here.") |
| 310 } | 326 } |
| 311 | 327 |
| (...skipping 12 matching lines...) Expand all Loading... |
| 324 // project ID. | 340 // project ID. |
| 325 if s.serviceID == "" { | 341 if s.serviceID == "" { |
| 326 var err error | 342 var err error |
| 327 if s.serviceID, err = metadata.ProjectID(); err != nil { | 343 if s.serviceID, err = metadata.ProjectID(); err != nil { |
| 328 log.WithError(err).Warningf(c, "Failed to probe GCE proj
ect ID.") | 344 log.WithError(err).Warningf(c, "Failed to probe GCE proj
ect ID.") |
| 329 } | 345 } |
| 330 } | 346 } |
| 331 } | 347 } |
| 332 | 348 |
| 333 func (s *Service) initDatastoreClient(c context.Context) (*datastore.Client, err
or) { | 349 func (s *Service) initDatastoreClient(c context.Context) (*datastore.Client, err
or) { |
| 334 // Initialize Storage authentication. | |
| 335 tokenSource, err := s.TokenSource(c, func(o *auth.Options) { | |
| 336 o.Scopes = []string{datastore.ScopeDatastore} | |
| 337 }) | |
| 338 if err != nil { | |
| 339 log.WithError(err).Errorf(c, "Failed to create datastore TokenSo
urce.") | |
| 340 return nil, err | |
| 341 } | |
| 342 | |
| 343 return datastore.NewClient(c, s.serviceID, | 350 return datastore.NewClient(c, s.serviceID, |
| 344 » » option.WithTokenSource(tokenSource)) | 351 » » option.WithUserAgent(s.getUserAgent()), |
| 352 » » option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c, datast
ore.ScopeDatastore))) |
| 345 } | 353 } |
| 346 | 354 |
| 347 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
t, error) { | 355 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
t, error) { |
| 348 if s.coordinatorHost == "" { | 356 if s.coordinatorHost == "" { |
| 349 log.Errorf(c, "Missing Coordinator URL (-coordinator).") | 357 log.Errorf(c, "Missing Coordinator URL (-coordinator).") |
| 350 return nil, ErrInvalidConfig | 358 return nil, ErrInvalidConfig |
| 351 } | 359 } |
| 352 | 360 |
| 353 » httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) { | 361 » transport, err := serverAuth.GetRPCTransport(c, serverAuth.AsSelf, serve
rAuth.WithScopes(CoordinatorScopes...)) |
| 354 » » o.Scopes = CoordinatorScopes | |
| 355 » }) | |
| 356 if err != nil { | 362 if err != nil { |
| 357 » » log.WithError(err).Errorf(c, "Failed to create authenticated cli
ent.") | 363 » » log.Errorf(c, "Failed to create authenticated transport for Coor
dinator client.") |
| 358 return nil, err | 364 return nil, err |
| 359 } | 365 } |
| 360 | 366 |
| 361 prpcClient := prpc.Client{ | 367 prpcClient := prpc.Client{ |
| 362 » » C: httpClient, | 368 » » C: &http.Client{ |
| 369 » » » Transport: transport, |
| 370 » » }, |
| 363 Host: s.coordinatorHost, | 371 Host: s.coordinatorHost, |
| 364 Options: prpc.DefaultOptions(), | 372 Options: prpc.DefaultOptions(), |
| 365 } | 373 } |
| 366 prpcClient.Options.UserAgent = fmt.Sprintf("%s/%s", s.Name, prpc.Default
UserAgent) | |
| 367 if s.coordinatorInsecure { | 374 if s.coordinatorInsecure { |
| 368 prpcClient.Options.Insecure = true | 375 prpcClient.Options.Insecure = true |
| 369 } | 376 } |
| 370 sc := logdog.NewServicesPRPCClient(&prpcClient) | 377 sc := logdog.NewServicesPRPCClient(&prpcClient) |
| 371 | 378 |
| 372 // Wrap the resulting client in a retry harness. | 379 // Wrap the resulting client in a retry harness. |
| 373 return retryServicesClient.New(sc, nil), nil | 380 return retryServicesClient.New(sc, nil), nil |
| 374 } | 381 } |
| 375 | 382 |
| 376 func (s *Service) initConfig(c *context.Context) error { | 383 func (s *Service) initConfig(c *context.Context) error { |
| (...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 528 log.Errorf(c, "Missing storage configuration.") | 535 log.Errorf(c, "Missing storage configuration.") |
| 529 return nil, ErrInvalidConfig | 536 return nil, ErrInvalidConfig |
| 530 } | 537 } |
| 531 | 538 |
| 532 btcfg := cfg.GetStorage().GetBigtable() | 539 btcfg := cfg.GetStorage().GetBigtable() |
| 533 if btcfg == nil { | 540 if btcfg == nil { |
| 534 log.Errorf(c, "Missing BigTable storage configuration") | 541 log.Errorf(c, "Missing BigTable storage configuration") |
| 535 return nil, ErrInvalidConfig | 542 return nil, ErrInvalidConfig |
| 536 } | 543 } |
| 537 | 544 |
| 538 » // Initialize Storage authentication. | 545 » // Initialize RPC credentials. |
| 539 » tokenSource, err := s.TokenSource(c, func(o *auth.Options) { | |
| 540 » » o.Scopes = bigtable.StorageScopes | |
| 541 » » if s.storageCredentialJSONPath != "" { | |
| 542 » » » o.ServiceAccountJSONPath = s.storageCredentialJSONPath | |
| 543 » » } | |
| 544 » }) | |
| 545 » if err != nil { | |
| 546 » » log.WithError(err).Errorf(c, "Failed to create BigTable TokenSou
rce.") | |
| 547 » » return nil, err | |
| 548 » } | |
| 549 | |
| 550 bt, err := bigtable.New(c, bigtable.Options{ | 546 bt, err := bigtable.New(c, bigtable.Options{ |
| 551 Project: btcfg.Project, | 547 Project: btcfg.Project, |
| 552 Instance: btcfg.Instance, | 548 Instance: btcfg.Instance, |
| 553 LogTable: btcfg.LogTableName, | 549 LogTable: btcfg.LogTableName, |
| 554 ClientOptions: []option.ClientOption{ | 550 ClientOptions: []option.ClientOption{ |
| 555 » » » option.WithTokenSource(tokenSource), | 551 » » » option.WithUserAgent(s.getUserAgent()), |
| 552 » » » option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c
, bigtable.StorageScopes...)), |
| 556 }, | 553 }, |
| 557 }) | 554 }) |
| 558 if err != nil { | 555 if err != nil { |
| 559 return nil, err | 556 return nil, err |
| 560 } | 557 } |
| 561 return bt, nil | 558 return bt, nil |
| 562 } | 559 } |
| 563 | 560 |
| 564 // GSClient returns an authenticated Google Storage client instance. | 561 // GSClient returns an authenticated Google Storage client instance. |
| 565 func (s *Service) GSClient(c context.Context) (gs.Client, error) { | 562 func (s *Service) GSClient(c context.Context) (gs.Client, error) { |
| 566 » rt, err := s.AuthenticatedTransport(c, func(o *auth.Options) { | 563 » // Get an Authenticator bound to the token scopes that we need for |
| 567 » » o.Scopes = gs.ReadWriteScopes | 564 » // authenticated Cloud Storage access. |
| 568 » }) | 565 » transport, err := serverAuth.GetRPCTransport(c, serverAuth.AsSelf, serve
rAuth.WithScopes(gs.ReadWriteScopes...)) |
| 569 if err != nil { | 566 if err != nil { |
| 570 » » log.WithError(err).Errorf(c, "Failed to create authenticated GS
transport.") | 567 » » log.WithError(err).Errorf(c, "Failed to create authenticated tra
nsport for Google Storage client.") |
| 571 return nil, err | 568 return nil, err |
| 572 } | 569 } |
| 573 | 570 |
| 574 » client, err := gs.NewProdClient(c, rt) | 571 » client, err := gs.NewProdClient(c, transport) |
| 575 if err != nil { | 572 if err != nil { |
| 576 log.WithError(err).Errorf(c, "Failed to create Google Storage cl
ient.") | 573 log.WithError(err).Errorf(c, "Failed to create Google Storage cl
ient.") |
| 577 return nil, err | 574 return nil, err |
| 578 } | 575 } |
| 579 return client, nil | 576 return client, nil |
| 580 } | 577 } |
| 581 | 578 |
| 582 // Authenticator returns an Authenticator instance. The Authenticator is | 579 // PubSubSubscriberClient returns a Pub/Sub client instance that is |
| 583 // configured from a base set of Authenticator Options. | 580 // authenticated with Pub/Sub subscriber scopes. |
| 584 // | 581 func (s *Service) PubSubSubscriberClient(c context.Context, projectID string) (*
pubsub.Client, error) { |
| 585 // An optional permutation function can be provided to modify those Options | 582 » return pubsub.NewClient(c, projectID, |
| 586 // before the Authenticator is created. | 583 » » option.WithUserAgent(s.getUserAgent()), |
| 587 func (s *Service) Authenticator(c context.Context, f func(o *auth.Options)) (*au
th.Authenticator, error) { | 584 » » option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c, gcps.S
ubscriberScopes...))) |
| 585 } |
| 586 |
| 587 func (s *Service) unauthenticatedTransport() http.RoundTripper { |
| 588 » smt := serviceModifyingTransport{ |
| 589 » » userAgent: s.getUserAgent(), |
| 590 » } |
| 591 » return smt.roundTripper(nil) |
| 592 } |
| 593 |
| 594 func (s *Service) getUserAgent() string { return s.Name + " / " + s.serviceID } |
| 595 |
| 596 // withAuthService configures service-wide authentication and installs it into |
| 597 // the supplied Context. |
| 598 func (s *Service) withAuthService(c context.Context) context.Context { |
| 599 » return serverAuth.SetConfig(c, serverAuth.Config{ |
| 600 » » DBProvider: nil, // We don't need to store an auth DB. |
| 601 » » Signer: nil, // We don't need to sign anything. |
| 602 » » AccessTokenProvider: func(ic context.Context, scopes []string) (
commonAuth.Token, error) { |
| 603 » » » // Create a new Authenticator for the supplied scopes. |
| 604 » » » // |
| 605 » » » // Pass our outer Context, since we don't want the cache
d Authenticator |
| 606 » » » // instance to be permanently bound to the inner Context
. |
| 607 » » » a, err := s.authenticatorForScopes(c, scopes) |
| 608 » » » if err != nil { |
| 609 » » » » return commonAuth.Token{}, err |
| 610 » » » } |
| 611 » » » return a.GetAccessToken(minAuthTokenLifetime) |
| 612 » » }, |
| 613 » » AnonymousTransport: func(ic context.Context) http.RoundTripper { |
| 614 » » » return s.unauthenticatedTransport() |
| 615 » » }, |
| 616 » » Cache: serverAuth.MemoryCache(authCacheSize), |
| 617 » }) |
| 618 } |
| 619 |
| 620 func (s *Service) authenticatorForScopes(c context.Context, scopes []string) (*c
ommonAuth.Authenticator, error) { |
| 621 » sort.Strings(scopes) |
| 622 » key := strings.Join(scopes, "\x00") |
| 623 |
| 624 » // First, check holding read lock. |
| 625 » s.authCacheLock.RLock() |
| 626 » a := s.authCache[key] |
| 627 » s.authCacheLock.RUnlock() |
| 628 |
| 629 » if a != nil { |
| 630 » » return a, nil |
| 631 » } |
| 632 |
| 633 » // No authenticator yet, check again with write lock. |
| 634 » s.authCacheLock.Lock() |
| 635 » defer s.authCacheLock.Unlock() |
| 636 |
| 637 » if a = s.authCache[key]; a != nil { |
| 638 » » // One was created in between locking! |
| 639 » » return a, nil |
| 640 » } |
| 641 |
| 642 » // Create a new Authenticator. |
| 588 authOpts, err := s.authFlags.Options() | 643 authOpts, err := s.authFlags.Options() |
| 589 if err != nil { | 644 if err != nil { |
| 590 return nil, ErrInvalidConfig | 645 return nil, ErrInvalidConfig |
| 591 } | 646 } |
| 592 » if f != nil { | 647 » authOpts.Scopes = append([]string(nil), scopes...) |
| 593 » » f(&authOpts) | 648 » authOpts.Transport = s.unauthenticatedTransport() |
| 649 |
| 650 » a = commonAuth.NewAuthenticator(c, commonAuth.SilentLogin, authOpts) |
| 651 » if s.authCache == nil { |
| 652 » » s.authCache = make(map[string]*commonAuth.Authenticator) |
| 594 } | 653 } |
| 595 » return auth.NewAuthenticator(c, auth.SilentLogin, authOpts), nil | 654 » s.authCache[key] = a |
| 655 » return a, nil |
| 596 } | 656 } |
| 597 | |
| 598 // AuthenticatedTransport returns an authenticated http.RoundTripper transport. | |
| 599 // The transport is configured from a base set of Authenticator Options. | |
| 600 // | |
| 601 // An optional permutation function can be provided to modify those Options | |
| 602 // before the Authenticator is created. | |
| 603 func (s *Service) AuthenticatedTransport(c context.Context, f func(o *auth.Optio
ns)) (http.RoundTripper, error) { | |
| 604 a, err := s.Authenticator(c, f) | |
| 605 if err != nil { | |
| 606 return nil, err | |
| 607 } | |
| 608 return a.Transport() | |
| 609 } | |
| 610 | |
| 611 // AuthenticatedClient returns an authenticated http.Client. The Client is | |
| 612 // configured from a base set of Authenticator Options. | |
| 613 // | |
| 614 // An optional permutation function can be provided to modify those Options | |
| 615 // before the Authenticator is created. | |
| 616 func (s *Service) AuthenticatedClient(c context.Context, f func(o *auth.Options)
) (*http.Client, error) { | |
| 617 a, err := s.Authenticator(c, f) | |
| 618 if err != nil { | |
| 619 return nil, err | |
| 620 } | |
| 621 return a.Client() | |
| 622 } | |
| 623 | |
| 624 // TokenSource returns oauth2.TokenSource configured from a base set of | |
| 625 // Authenticator Options. | |
| 626 // | |
| 627 // An optional permutation function can be provided to modify those Options | |
| 628 // before the Authenticator is created. | |
| 629 func (s *Service) TokenSource(c context.Context, f func(o *auth.Options)) (oauth
2.TokenSource, error) { | |
| 630 a, err := s.Authenticator(c, f) | |
| 631 if err != nil { | |
| 632 return nil, err | |
| 633 } | |
| 634 return a.TokenSource() | |
| 635 } | |
| OLD | NEW |