| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package service | 5 package service |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "flag" | 8 "flag" |
| 9 "fmt" | |
| 10 "net/http" | 9 "net/http" |
| 11 "net/url" | 10 "net/url" |
| 12 "os" | 11 "os" |
| 13 "os/signal" | 12 "os/signal" |
| 14 "path/filepath" | 13 "path/filepath" |
| 15 "runtime/pprof" | 14 "runtime/pprof" |
| 15 "sort" |
| 16 "strings" |
| 17 "sync" |
| 16 "sync/atomic" | 18 "sync/atomic" |
| 17 "time" | 19 "time" |
| 18 | 20 |
| 19 "github.com/luci/luci-go/appengine/gaesettings" | 21 "github.com/luci/luci-go/appengine/gaesettings" |
| 20 "github.com/luci/luci-go/client/authcli" | 22 "github.com/luci/luci-go/client/authcli" |
| 21 » "github.com/luci/luci-go/common/auth" | 23 » commonAuth "github.com/luci/luci-go/common/auth" |
| 22 "github.com/luci/luci-go/common/clock/clockflag" | 24 "github.com/luci/luci-go/common/clock/clockflag" |
| 23 "github.com/luci/luci-go/common/config/impl/filesystem" | 25 "github.com/luci/luci-go/common/config/impl/filesystem" |
| 24 "github.com/luci/luci-go/common/data/caching/proccache" | 26 "github.com/luci/luci-go/common/data/caching/proccache" |
| 25 "github.com/luci/luci-go/common/errors" | 27 "github.com/luci/luci-go/common/errors" |
| 26 "github.com/luci/luci-go/common/gcloud/gs" | 28 "github.com/luci/luci-go/common/gcloud/gs" |
| 29 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 27 log "github.com/luci/luci-go/common/logging" | 30 log "github.com/luci/luci-go/common/logging" |
| 28 "github.com/luci/luci-go/common/logging/gologger" | 31 "github.com/luci/luci-go/common/logging/gologger" |
| 29 "github.com/luci/luci-go/common/proto/google" | 32 "github.com/luci/luci-go/common/proto/google" |
| 30 "github.com/luci/luci-go/common/tsmon" | 33 "github.com/luci/luci-go/common/tsmon" |
| 31 "github.com/luci/luci-go/common/tsmon/target" | 34 "github.com/luci/luci-go/common/tsmon/target" |
| 32 "github.com/luci/luci-go/grpc/prpc" | 35 "github.com/luci/luci-go/grpc/prpc" |
| 33 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 36 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 34 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 37 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 35 "github.com/luci/luci-go/logdog/common/storage" | 38 "github.com/luci/luci-go/logdog/common/storage" |
| 36 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 39 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 37 "github.com/luci/luci-go/logdog/server/retryServicesClient" | 40 "github.com/luci/luci-go/logdog/server/retryServicesClient" |
| 38 "github.com/luci/luci-go/logdog/server/service/config" | 41 "github.com/luci/luci-go/logdog/server/service/config" |
| 39 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 42 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 40 "github.com/luci/luci-go/luci_config/server/cfgclient" | 43 "github.com/luci/luci-go/luci_config/server/cfgclient" |
| 41 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client" | 44 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client" |
| 42 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig
" | 45 "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig
" |
| 43 "github.com/luci/luci-go/luci_config/server/cfgclient/textproto" | 46 "github.com/luci/luci-go/luci_config/server/cfgclient/textproto" |
| 47 serverAuth "github.com/luci/luci-go/server/auth" |
| 44 "github.com/luci/luci-go/server/settings" | 48 "github.com/luci/luci-go/server/settings" |
| 45 | 49 |
| 46 "github.com/luci/gae/impl/cloud" | 50 "github.com/luci/gae/impl/cloud" |
| 47 | 51 |
| 48 "cloud.google.com/go/compute/metadata" | 52 "cloud.google.com/go/compute/metadata" |
| 49 "cloud.google.com/go/datastore" | 53 "cloud.google.com/go/datastore" |
| 54 "cloud.google.com/go/pubsub" |
| 50 "golang.org/x/net/context" | 55 "golang.org/x/net/context" |
| 51 "golang.org/x/oauth2" | |
| 52 "google.golang.org/api/option" | 56 "google.golang.org/api/option" |
| 53 ) | 57 ) |
| 54 | 58 |
| 55 var ( | 59 var ( |
| 56 // ErrInvalidConfig is an error that is returned when the supplied | 60 // ErrInvalidConfig is an error that is returned when the supplied |
| 57 // configuration is invalid. | 61 // configuration is invalid. |
| 58 ErrInvalidConfig = errors.New("invalid configuration") | 62 ErrInvalidConfig = errors.New("invalid configuration") |
| 59 | 63 |
| 60 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina
tor | 64 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina
tor |
| 61 // client. | 65 // client. |
| 62 CoordinatorScopes = []string{ | 66 CoordinatorScopes = []string{ |
| 63 » » auth.OAuthScopeEmail, | 67 » » commonAuth.OAuthScopeEmail, |
| 64 } | 68 } |
| 65 ) | 69 ) |
| 66 | 70 |
| 67 // projectConfigCacheDuration is the amount of time to cache a project's | 71 // projectConfigCacheDuration is the amount of time to cache a project's |
| 68 // configuration before reloading. | 72 // configuration before reloading. |
| 69 const ( | 73 const ( |
| 70 projectConfigCacheDuration = 30 * time.Minute | 74 projectConfigCacheDuration = 30 * time.Minute |
| 71 | 75 |
| 72 // minAuthTokenLifetime is the amount of time that an access token has b
efore | 76 // minAuthTokenLifetime is the amount of time that an access token has b
efore |
| 73 // expiring. | 77 // expiring. |
| 74 minAuthTokenLifetime = 2 * time.Minute | 78 minAuthTokenLifetime = 2 * time.Minute |
| 79 |
| 80 // authGlobalCacheSize is the maximum number of elements to store in the
auth |
| 81 // global cache LRU. |
| 82 // |
| 83 // We don't expect to load too many different authentication tokens, so
a |
| 84 // relatively low number should be fine here. |
| 85 authGlobalCacheSize = 128 |
| 75 ) | 86 ) |
| 76 | 87 |
| 77 // Service is a base class full of common LogDog service application parameters. | 88 // Service is a base class full of common LogDog service application parameters. |
| 78 type Service struct { | 89 type Service struct { |
| 79 // Name is the name of this service. It is used for logging, metrics, an
d | 90 // Name is the name of this service. It is used for logging, metrics, an
d |
| 80 // user agent string generation. | 91 // user agent string generation. |
| 81 // | 92 // |
| 82 // If empty, a service name will be inferred from the command-line argum
ents. | 93 // If empty, a service name will be inferred from the command-line argum
ents. |
| 83 Name string | 94 Name string |
| 84 // Flags is the set of flags that will be used by the Service. | 95 // Flags is the set of flags that will be used by the Service. |
| 85 Flags flag.FlagSet | 96 Flags flag.FlagSet |
| 86 | 97 |
| 87 shutdownFunc atomic.Value | 98 shutdownFunc atomic.Value |
| 88 | 99 |
| 89 loggingFlags log.Config | 100 loggingFlags log.Config |
| 90 authFlags authcli.Flags | 101 authFlags authcli.Flags |
| 91 tsMonFlags tsmon.Flags | 102 tsMonFlags tsmon.Flags |
| 92 | 103 |
| 93 » coordinatorHost string | 104 » coordinatorHost string |
| 94 » coordinatorInsecure bool | 105 » coordinatorInsecure bool |
| 95 » storageCredentialJSONPath string | 106 » cpuProfilePath string |
| 96 » cpuProfilePath string | 107 » heapProfilePath string |
| 97 » heapProfilePath string | |
| 98 | 108 |
| 99 // onGCE is true if we're on GCE. We probe this exactly once. | 109 // onGCE is true if we're on GCE. We probe this exactly once. |
| 100 onGCE bool | 110 onGCE bool |
| 101 hasDatastore bool | 111 hasDatastore bool |
| 102 | 112 |
| 103 killCheckInterval clockflag.Duration | 113 killCheckInterval clockflag.Duration |
| 104 configFilePath string | 114 configFilePath string |
| 105 serviceConfig svcconfig.Config | 115 serviceConfig svcconfig.Config |
| 106 configCache config.ProcCache | 116 configCache config.ProcCache |
| 107 | 117 |
| 108 // serviceID is the cloud project ID, which is also this service's uniqu
e | 118 // serviceID is the cloud project ID, which is also this service's uniqu
e |
| 109 // ID. This can be specified by flag or, if on GCE, will automatically b
e | 119 // ID. This can be specified by flag or, if on GCE, will automatically b
e |
| 110 // probed from metadata. | 120 // probed from metadata. |
| 111 serviceID string | 121 serviceID string |
| 112 | 122 |
| 113 coord logdog.ServicesClient | 123 coord logdog.ServicesClient |
| 124 |
| 125 // authCache is a cache of instantiated Authenticator instances, keyed o
n |
| 126 // sorted NULL-delimited scope strings (see authenticatorForScopes). |
| 127 authCacheLock sync.RWMutex |
| 128 authCache map[string]*commonAuth.Authenticator |
| 114 } | 129 } |
| 115 | 130 |
| 116 // Run performs service-wide initialization and invokes the specified run | 131 // Run performs service-wide initialization and invokes the specified run |
| 117 // function. | 132 // function. |
| 118 func (s *Service) Run(c context.Context, f func(context.Context) error) { | 133 func (s *Service) Run(c context.Context, f func(context.Context) error) { |
| 119 c = gologger.StdConfig.Use(c) | 134 c = gologger.StdConfig.Use(c) |
| 120 | 135 |
| 121 // If a service name isn't specified, default to the base of the current | 136 // If a service name isn't specified, default to the base of the current |
| 122 // executable. | 137 // executable. |
| 123 if s.Name == "" { | 138 if s.Name == "" { |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 184 | 199 |
| 185 // Cancel our Context after we're done our run loop. | 200 // Cancel our Context after we're done our run loop. |
| 186 c, cancelFunc := context.WithCancel(c) | 201 c, cancelFunc := context.WithCancel(c) |
| 187 defer cancelFunc() | 202 defer cancelFunc() |
| 188 | 203 |
| 189 // Validate the runtime environment. | 204 // Validate the runtime environment. |
| 190 if s.serviceID == "" { | 205 if s.serviceID == "" { |
| 191 return errors.New("no service ID was configured (-service-id)") | 206 return errors.New("no service ID was configured (-service-id)") |
| 192 } | 207 } |
| 193 | 208 |
| 209 // Install our authentication service. |
| 210 c = s.withAuthService(c) |
| 211 |
| 194 // Install a cloud datastore client. This is non-fatal if it fails. | 212 // Install a cloud datastore client. This is non-fatal if it fails. |
| 195 dsClient, err := s.initDatastoreClient(c) | 213 dsClient, err := s.initDatastoreClient(c) |
| 196 if err == nil { | 214 if err == nil { |
| 197 defer dsClient.Close() | 215 defer dsClient.Close() |
| 198 | 216 |
| 199 ccfg := cloud.Config{ | 217 ccfg := cloud.Config{ |
| 200 DS: dsClient, | 218 DS: dsClient, |
| 201 } | 219 } |
| 202 c = ccfg.Use(c) | 220 c = ccfg.Use(c) |
| 203 c = settings.Use(c, settings.New(gaesettings.Storage{})) | 221 c = settings.Use(c, settings.New(gaesettings.Storage{})) |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 269 s.loggingFlags.Level = log.Warning | 287 s.loggingFlags.Level = log.Warning |
| 270 s.loggingFlags.AddFlags(fs) | 288 s.loggingFlags.AddFlags(fs) |
| 271 | 289 |
| 272 // Initialize tsmon flags. | 290 // Initialize tsmon flags. |
| 273 s.tsMonFlags = tsmon.NewFlags() | 291 s.tsMonFlags = tsmon.NewFlags() |
| 274 s.tsMonFlags.Flush = tsmon.FlushAuto | 292 s.tsMonFlags.Flush = tsmon.FlushAuto |
| 275 s.tsMonFlags.Target.TargetType = target.TaskType | 293 s.tsMonFlags.Target.TargetType = target.TaskType |
| 276 s.tsMonFlags.Target.TaskJobName = s.Name | 294 s.tsMonFlags.Target.TaskJobName = s.Name |
| 277 s.tsMonFlags.Register(fs) | 295 s.tsMonFlags.Register(fs) |
| 278 | 296 |
| 279 » s.authFlags.Register(fs, auth.Options{}) | 297 » s.authFlags.Register(fs, commonAuth.Options{}) |
| 280 | 298 |
| 281 fs.StringVar(&s.serviceID, "service-id", "", | 299 fs.StringVar(&s.serviceID, "service-id", "", |
| 282 "Specify the service ID that this instance is supporting. If emp
ty, the service ID "+ | 300 "Specify the service ID that this instance is supporting. If emp
ty, the service ID "+ |
| 283 "will attempt to be resolved by probing the local enviro
nment. This probably will match the "+ | 301 "will attempt to be resolved by probing the local enviro
nment. This probably will match the "+ |
| 284 "App ID of the Coordinator.") | 302 "App ID of the Coordinator.") |
| 285 fs.StringVar(&s.coordinatorHost, "coordinator", "", | 303 fs.StringVar(&s.coordinatorHost, "coordinator", "", |
| 286 "The Coordinator service's [host][:port].") | 304 "The Coordinator service's [host][:port].") |
| 287 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, | 305 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, |
| 288 "Connect to Coordinator over HTTP (instead of HTTPS).") | 306 "Connect to Coordinator over HTTP (instead of HTTPS).") |
| 289 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path
", "", | |
| 290 "If supplied, the path of a JSON credential file to load and use
for storage operations.") | |
| 291 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "", | 307 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "", |
| 292 "If supplied, enable CPU profiling and write the profile here.") | 308 "If supplied, enable CPU profiling and write the profile here.") |
| 293 fs.StringVar(&s.heapProfilePath, "heap-profile-path", "", | 309 fs.StringVar(&s.heapProfilePath, "heap-profile-path", "", |
| 294 "If supplied, enable CPU profiling and write the profile here.") | 310 "If supplied, enable CPU profiling and write the profile here.") |
| 295 fs.Var(&s.killCheckInterval, "config-kill-interval", | 311 fs.Var(&s.killCheckInterval, "config-kill-interval", |
| 296 "If non-zero, poll for configuration changes and kill the applic
ation if one is detected.") | 312 "If non-zero, poll for configuration changes and kill the applic
ation if one is detected.") |
| 297 fs.StringVar(&s.configFilePath, "config-file-path", "", | 313 fs.StringVar(&s.configFilePath, "config-file-path", "", |
| 298 "If set, load configuration from a local filesystem rooted here.
") | 314 "If set, load configuration from a local filesystem rooted here.
") |
| 299 } | 315 } |
| 300 | 316 |
| (...skipping 12 matching lines...) Expand all Loading... |
| 313 // project ID. | 329 // project ID. |
| 314 if s.serviceID == "" { | 330 if s.serviceID == "" { |
| 315 var err error | 331 var err error |
| 316 if s.serviceID, err = metadata.ProjectID(); err != nil { | 332 if s.serviceID, err = metadata.ProjectID(); err != nil { |
| 317 log.WithError(err).Warningf(c, "Failed to probe GCE proj
ect ID.") | 333 log.WithError(err).Warningf(c, "Failed to probe GCE proj
ect ID.") |
| 318 } | 334 } |
| 319 } | 335 } |
| 320 } | 336 } |
| 321 | 337 |
| 322 func (s *Service) initDatastoreClient(c context.Context) (*datastore.Client, err
or) { | 338 func (s *Service) initDatastoreClient(c context.Context) (*datastore.Client, err
or) { |
| 323 // Initialize Storage authentication. | |
| 324 tokenSource, err := s.TokenSource(c, func(o *auth.Options) { | |
| 325 o.Scopes = []string{datastore.ScopeDatastore} | |
| 326 }) | |
| 327 if err != nil { | |
| 328 log.WithError(err).Errorf(c, "Failed to create datastore TokenSo
urce.") | |
| 329 return nil, err | |
| 330 } | |
| 331 | |
| 332 return datastore.NewClient(c, s.serviceID, | 339 return datastore.NewClient(c, s.serviceID, |
| 333 » » option.WithTokenSource(tokenSource)) | 340 » » option.WithUserAgent(s.getUserAgent()), |
| 341 » » option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c, datast
ore.ScopeDatastore))) |
| 334 } | 342 } |
| 335 | 343 |
| 336 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
t, error) { | 344 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
t, error) { |
| 337 if s.coordinatorHost == "" { | 345 if s.coordinatorHost == "" { |
| 338 log.Errorf(c, "Missing Coordinator URL (-coordinator).") | 346 log.Errorf(c, "Missing Coordinator URL (-coordinator).") |
| 339 return nil, ErrInvalidConfig | 347 return nil, ErrInvalidConfig |
| 340 } | 348 } |
| 341 | 349 |
| 342 » httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) { | 350 » transport, err := serverAuth.GetRPCTransport(c, serverAuth.AsSelf, serve
rAuth.WithScopes(CoordinatorScopes...)) |
| 343 » » o.Scopes = CoordinatorScopes | |
| 344 » }) | |
| 345 if err != nil { | 351 if err != nil { |
| 346 » » log.WithError(err).Errorf(c, "Failed to create authenticated cli
ent.") | 352 » » log.Errorf(c, "Failed to create authenticated transport for Coor
dinator client.") |
| 347 return nil, err | 353 return nil, err |
| 348 } | 354 } |
| 349 | 355 |
| 350 prpcClient := prpc.Client{ | 356 prpcClient := prpc.Client{ |
| 351 » » C: httpClient, | 357 » » C: &http.Client{ |
| 358 » » » Transport: transport, |
| 359 » » }, |
| 352 Host: s.coordinatorHost, | 360 Host: s.coordinatorHost, |
| 353 Options: prpc.DefaultOptions(), | 361 Options: prpc.DefaultOptions(), |
| 354 } | 362 } |
| 355 prpcClient.Options.UserAgent = fmt.Sprintf("%s/%s", s.Name, prpc.Default
UserAgent) | |
| 356 if s.coordinatorInsecure { | 363 if s.coordinatorInsecure { |
| 357 prpcClient.Options.Insecure = true | 364 prpcClient.Options.Insecure = true |
| 358 } | 365 } |
| 359 sc := logdog.NewServicesPRPCClient(&prpcClient) | 366 sc := logdog.NewServicesPRPCClient(&prpcClient) |
| 360 | 367 |
| 361 // Wrap the resulting client in a retry harness. | 368 // Wrap the resulting client in a retry harness. |
| 362 return retryServicesClient.New(sc, nil), nil | 369 return retryServicesClient.New(sc, nil), nil |
| 363 } | 370 } |
| 364 | 371 |
| 365 func (s *Service) initConfig(c *context.Context) error { | 372 func (s *Service) initConfig(c *context.Context) error { |
| (...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 506 log.Errorf(c, "Missing storage configuration.") | 513 log.Errorf(c, "Missing storage configuration.") |
| 507 return nil, ErrInvalidConfig | 514 return nil, ErrInvalidConfig |
| 508 } | 515 } |
| 509 | 516 |
| 510 btcfg := cfg.GetStorage().GetBigtable() | 517 btcfg := cfg.GetStorage().GetBigtable() |
| 511 if btcfg == nil { | 518 if btcfg == nil { |
| 512 log.Errorf(c, "Missing BigTable storage configuration") | 519 log.Errorf(c, "Missing BigTable storage configuration") |
| 513 return nil, ErrInvalidConfig | 520 return nil, ErrInvalidConfig |
| 514 } | 521 } |
| 515 | 522 |
| 516 » // Initialize Storage authentication. | 523 » // Initialize RPC credentials. |
| 517 » tokenSource, err := s.TokenSource(c, func(o *auth.Options) { | |
| 518 » » o.Scopes = bigtable.StorageScopes | |
| 519 » » if s.storageCredentialJSONPath != "" { | |
| 520 » » » o.ServiceAccountJSONPath = s.storageCredentialJSONPath | |
| 521 » » } | |
| 522 » }) | |
| 523 » if err != nil { | |
| 524 » » log.WithError(err).Errorf(c, "Failed to create BigTable TokenSou
rce.") | |
| 525 » » return nil, err | |
| 526 » } | |
| 527 | |
| 528 bt, err := bigtable.New(c, bigtable.Options{ | 524 bt, err := bigtable.New(c, bigtable.Options{ |
| 529 Project: btcfg.Project, | 525 Project: btcfg.Project, |
| 530 Instance: btcfg.Instance, | 526 Instance: btcfg.Instance, |
| 531 LogTable: btcfg.LogTableName, | 527 LogTable: btcfg.LogTableName, |
| 532 ClientOptions: []option.ClientOption{ | 528 ClientOptions: []option.ClientOption{ |
| 533 » » » option.WithTokenSource(tokenSource), | 529 » » » option.WithUserAgent(s.getUserAgent()), |
| 530 » » » option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c
, bigtable.StorageScopes...)), |
| 534 }, | 531 }, |
| 535 }) | 532 }) |
| 536 if err != nil { | 533 if err != nil { |
| 537 return nil, err | 534 return nil, err |
| 538 } | 535 } |
| 539 return bt, nil | 536 return bt, nil |
| 540 } | 537 } |
| 541 | 538 |
| 542 // GSClient returns an authenticated Google Storage client instance. | 539 // GSClient returns an authenticated Google Storage client instance. |
| 543 func (s *Service) GSClient(c context.Context) (gs.Client, error) { | 540 func (s *Service) GSClient(c context.Context) (gs.Client, error) { |
| 544 » rt, err := s.AuthenticatedTransport(c, func(o *auth.Options) { | 541 » // Get an Authenticator bound to the token scopes that we need for |
| 545 » » o.Scopes = gs.ReadWriteScopes | 542 » // authenticated Cloud Storage access. |
| 546 » }) | 543 » transport, err := serverAuth.GetRPCTransport(c, serverAuth.AsSelf, serve
rAuth.WithScopes(gs.ReadWriteScopes...)) |
| 547 if err != nil { | 544 if err != nil { |
| 548 » » log.WithError(err).Errorf(c, "Failed to create authenticated GS
transport.") | 545 » » log.WithError(err).Errorf(c, "Failed to create authenticated tra
nsport for Google Storage client.") |
| 549 return nil, err | 546 return nil, err |
| 550 } | 547 } |
| 551 | 548 |
| 552 » client, err := gs.NewProdClient(c, rt) | 549 » client, err := gs.NewProdClient(c, transport) |
| 553 if err != nil { | 550 if err != nil { |
| 554 log.WithError(err).Errorf(c, "Failed to create Google Storage cl
ient.") | 551 log.WithError(err).Errorf(c, "Failed to create Google Storage cl
ient.") |
| 555 return nil, err | 552 return nil, err |
| 556 } | 553 } |
| 557 return client, nil | 554 return client, nil |
| 558 } | 555 } |
| 559 | 556 |
| 560 // Authenticator returns an Authenticator instance. The Authenticator is | 557 // PubSubSubscriberClient returns a Pub/Sub client instance that is |
| 561 // configured from a base set of Authenticator Options. | 558 // authenticated with Pub/Sub subscriber scopes. |
| 562 // | 559 func (s *Service) PubSubSubscriberClient(c context.Context, projectID string) (*
pubsub.Client, error) { |
| 563 // An optional permutation function can be provided to modify those Options | 560 » return pubsub.NewClient(c, projectID, |
| 564 // before the Authenticator is created. | 561 » » option.WithUserAgent(s.getUserAgent()), |
| 565 func (s *Service) Authenticator(c context.Context, f func(o *auth.Options)) (*au
th.Authenticator, error) { | 562 » » option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c, gcps.S
ubscriberScopes...))) |
| 563 } |
| 564 |
| 565 func (s *Service) unauthenticatedTransport() http.RoundTripper { |
| 566 » smt := serviceModifyingTransport{ |
| 567 » » userAgent: s.getUserAgent(), |
| 568 » } |
| 569 » return smt.roundTripper(nil) |
| 570 } |
| 571 |
| 572 func (s *Service) getUserAgent() string { return s.Name + " / " + s.serviceID } |
| 573 |
| 574 // withAuthService configures service-wide authentication and installs it into |
| 575 // the supplied Context. |
| 576 func (s *Service) withAuthService(c context.Context) context.Context { |
| 577 » return serverAuth.SetConfig(c, serverAuth.Config{ |
| 578 » » DBProvider: nil, // We don't need to store an auth DB. |
| 579 » » Signer: nil, // We don't need to sign anything. |
| 580 » » AccessTokenProvider: func(ic context.Context, scopes []string) (
commonAuth.Token, error) { |
| 581 » » » // Create a new Authenticator for the supplied scopes. |
| 582 » » » // |
| 583 » » » // Pass our outer Context, since we don't want the cache
d Authenticator |
| 584 » » » // instance to be permanently bound to the inner Context
. |
| 585 » » » a, err := s.authenticatorForScopes(c, scopes) |
| 586 » » » if err != nil { |
| 587 » » » » return commonAuth.Token{}, err |
| 588 » » » } |
| 589 » » » return a.GetAccessToken(minAuthTokenLifetime) |
| 590 » » }, |
| 591 » » AnonymousTransport: func(ic context.Context) http.RoundTripper { |
| 592 » » » return s.unauthenticatedTransport() |
| 593 » » }, |
| 594 » » GlobalCache: serverAuth.ProcGlobalCache(authGlobalCacheSize), |
| 595 » }) |
| 596 } |
| 597 |
| 598 func (s *Service) authenticatorForScopes(c context.Context, scopes []string) (*c
ommonAuth.Authenticator, error) { |
| 599 » sort.Strings(scopes) |
| 600 » key := strings.Join(scopes, "\x00") |
| 601 |
| 602 » // First, check holding read lock. |
| 603 » s.authCacheLock.RLock() |
| 604 » a := s.authCache[key] |
| 605 » s.authCacheLock.RUnlock() |
| 606 |
| 607 » if a != nil { |
| 608 » » return a, nil |
| 609 » } |
| 610 |
| 611 » // No authenticator yet, check again with write lock. |
| 612 » s.authCacheLock.Lock() |
| 613 » defer s.authCacheLock.Unlock() |
| 614 |
| 615 » if a = s.authCache[key]; a != nil { |
| 616 » » // One was created in between locking! |
| 617 » » return a, nil |
| 618 » } |
| 619 |
| 620 » // Create a new Authenticator. |
| 566 authOpts, err := s.authFlags.Options() | 621 authOpts, err := s.authFlags.Options() |
| 567 if err != nil { | 622 if err != nil { |
| 568 return nil, ErrInvalidConfig | 623 return nil, ErrInvalidConfig |
| 569 } | 624 } |
| 570 » if f != nil { | 625 » authOpts.Scopes = append([]string(nil), scopes...) |
| 571 » » f(&authOpts) | 626 » authOpts.Transport = s.unauthenticatedTransport() |
| 627 |
| 628 » a = commonAuth.NewAuthenticator(c, commonAuth.SilentLogin, authOpts) |
| 629 » if s.authCache == nil { |
| 630 » » s.authCache = make(map[string]*commonAuth.Authenticator) |
| 572 } | 631 } |
| 573 » return auth.NewAuthenticator(c, auth.SilentLogin, authOpts), nil | 632 » s.authCache[key] = a |
| 633 » return a, nil |
| 574 } | 634 } |
| 575 | |
| 576 // AuthenticatedTransport returns an authenticated http.RoundTripper transport. | |
| 577 // The transport is configured from a base set of Authenticator Options. | |
| 578 // | |
| 579 // An optional permutation function can be provided to modify those Options | |
| 580 // before the Authenticator is created. | |
| 581 func (s *Service) AuthenticatedTransport(c context.Context, f func(o *auth.Optio
ns)) (http.RoundTripper, error) { | |
| 582 a, err := s.Authenticator(c, f) | |
| 583 if err != nil { | |
| 584 return nil, err | |
| 585 } | |
| 586 return a.Transport() | |
| 587 } | |
| 588 | |
| 589 // AuthenticatedClient returns an authenticated http.Client. The Client is | |
| 590 // configured from a base set of Authenticator Options. | |
| 591 // | |
| 592 // An optional permutation function can be provided to modify those Options | |
| 593 // before the Authenticator is created. | |
| 594 func (s *Service) AuthenticatedClient(c context.Context, f func(o *auth.Options)
) (*http.Client, error) { | |
| 595 a, err := s.Authenticator(c, f) | |
| 596 if err != nil { | |
| 597 return nil, err | |
| 598 } | |
| 599 return a.Client() | |
| 600 } | |
| 601 | |
| 602 // TokenSource returns oauth2.TokenSource configured from a base set of | |
| 603 // Authenticator Options. | |
| 604 // | |
| 605 // An optional permutation function can be provided to modify those Options | |
| 606 // before the Authenticator is created. | |
| 607 func (s *Service) TokenSource(c context.Context, f func(o *auth.Options)) (oauth
2.TokenSource, error) { | |
| 608 a, err := s.Authenticator(c, f) | |
| 609 if err != nil { | |
| 610 return nil, err | |
| 611 } | |
| 612 return a.TokenSource() | |
| 613 } | |
| OLD | NEW |