| Index: logdog/server/service/service.go
|
| diff --git a/logdog/server/service/service.go b/logdog/server/service/service.go
|
| index 711e94c4cbec18adef60334e36361816b6ffabdf..0b40311f5fa6724d3143f96e5cd7e2a63501252a 100644
|
| --- a/logdog/server/service/service.go
|
| +++ b/logdog/server/service/service.go
|
| @@ -6,24 +6,27 @@ package service
|
|
|
| import (
|
| "flag"
|
| - "fmt"
|
| "net/http"
|
| "net/url"
|
| "os"
|
| "os/signal"
|
| "path/filepath"
|
| "runtime/pprof"
|
| + "sort"
|
| + "strings"
|
| + "sync"
|
| "sync/atomic"
|
| "time"
|
|
|
| "github.com/luci/luci-go/appengine/gaesettings"
|
| "github.com/luci/luci-go/client/authcli"
|
| - "github.com/luci/luci-go/common/auth"
|
| + commonAuth "github.com/luci/luci-go/common/auth"
|
| "github.com/luci/luci-go/common/clock/clockflag"
|
| "github.com/luci/luci-go/common/config/impl/filesystem"
|
| "github.com/luci/luci-go/common/data/caching/proccache"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| + gcps "github.com/luci/luci-go/common/gcloud/pubsub"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/logging/gologger"
|
| "github.com/luci/luci-go/common/proto/google"
|
| @@ -41,14 +44,15 @@ import (
|
| "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client"
|
| "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig"
|
| "github.com/luci/luci-go/luci_config/server/cfgclient/textproto"
|
| + serverAuth "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/settings"
|
|
|
| "github.com/luci/gae/impl/cloud"
|
|
|
| "cloud.google.com/go/compute/metadata"
|
| "cloud.google.com/go/datastore"
|
| + "cloud.google.com/go/pubsub"
|
| "golang.org/x/net/context"
|
| - "golang.org/x/oauth2"
|
| "google.golang.org/api/option"
|
| )
|
|
|
| @@ -60,7 +64,7 @@ var (
|
| // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordinator
|
| // client.
|
| CoordinatorScopes = []string{
|
| - auth.OAuthScopeEmail,
|
| + commonAuth.OAuthScopeEmail,
|
| }
|
| )
|
|
|
| @@ -72,6 +76,13 @@ const (
|
| // minAuthTokenLifetime is the amount of time that an access token has before
|
| // expiring.
|
| minAuthTokenLifetime = 2 * time.Minute
|
| +
|
| + // authCacheSize is the maximum number of elements to store in the auth
|
| + // global cache LRU.
|
| + //
|
| + // We don't expect to load too many different authentication tokens, so a
|
| + // relatively low number should be fine here.
|
| + authCacheSize = 128
|
| )
|
|
|
| // Service is a base class full of common LogDog service application parameters.
|
| @@ -90,11 +101,10 @@ type Service struct {
|
| authFlags authcli.Flags
|
| tsMonFlags tsmon.Flags
|
|
|
| - coordinatorHost string
|
| - coordinatorInsecure bool
|
| - storageCredentialJSONPath string
|
| - cpuProfilePath string
|
| - heapProfilePath string
|
| + coordinatorHost string
|
| + coordinatorInsecure bool
|
| + cpuProfilePath string
|
| + heapProfilePath string
|
|
|
| // onGCE is true if we're on GCE. We probe this once during Run.
|
| onGCE bool
|
| @@ -122,6 +132,11 @@ type Service struct {
|
| serviceID string
|
|
|
| coord logdog.ServicesClient
|
| +
|
| + // authCache is a cache of instantiated Authenticator instances, keyed on
|
| + // sorted NULL-delimited scope strings (see authenticatorForScopes).
|
| + authCacheLock sync.RWMutex
|
| + authCache map[string]*commonAuth.Authenticator
|
| }
|
|
|
| // Run performs service-wide initialization and invokes the specified run
|
| @@ -202,6 +217,9 @@ func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro
|
| return errors.New("no service ID was configured (-service-id)")
|
| }
|
|
|
| + // Install our authentication service.
|
| + c = s.withAuthService(c)
|
| +
|
| // Install a cloud datastore client. This is non-fatal if it fails.
|
| dsClient, err := s.initDatastoreClient(c)
|
| if err == nil {
|
| @@ -287,7 +305,7 @@ func (s *Service) addFlags(c context.Context, fs *flag.FlagSet) {
|
| s.tsMonFlags.Target.TaskJobName = s.Name
|
| s.tsMonFlags.Register(fs)
|
|
|
| - s.authFlags.Register(fs, auth.Options{})
|
| + s.authFlags.Register(fs, commonAuth.Options{})
|
|
|
| fs.StringVar(&s.serviceID, "service-id", "",
|
| "Specify the service ID that this instance is supporting. If empty, the service ID "+
|
| @@ -297,8 +315,6 @@ func (s *Service) addFlags(c context.Context, fs *flag.FlagSet) {
|
| "The Coordinator service's [host][:port].")
|
| fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false,
|
| "Connect to Coordinator over HTTP (instead of HTTPS).")
|
| - fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path", "",
|
| - "If supplied, the path of a JSON credential file to load and use for storage operations.")
|
| fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "",
|
| "If supplied, enable CPU profiling and write the profile here.")
|
| fs.StringVar(&s.heapProfilePath, "heap-profile-path", "",
|
| @@ -331,17 +347,9 @@ func (s *Service) probeGCEEnvironment(c context.Context) {
|
| }
|
|
|
| func (s *Service) initDatastoreClient(c context.Context) (*datastore.Client, error) {
|
| - // Initialize Storage authentication.
|
| - tokenSource, err := s.TokenSource(c, func(o *auth.Options) {
|
| - o.Scopes = []string{datastore.ScopeDatastore}
|
| - })
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create datastore TokenSource.")
|
| - return nil, err
|
| - }
|
| -
|
| return datastore.NewClient(c, s.serviceID,
|
| - option.WithTokenSource(tokenSource))
|
| + option.WithUserAgent(s.getUserAgent()),
|
| + option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c, datastore.ScopeDatastore)))
|
| }
|
|
|
| func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClient, error) {
|
| @@ -350,20 +358,19 @@ func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
|
| return nil, ErrInvalidConfig
|
| }
|
|
|
| - httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) {
|
| - o.Scopes = CoordinatorScopes
|
| - })
|
| + transport, err := serverAuth.GetRPCTransport(c, serverAuth.AsSelf, serverAuth.WithScopes(CoordinatorScopes...))
|
| if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create authenticated client.")
|
| + log.Errorf(c, "Failed to create authenticated transport for Coordinator client.")
|
| return nil, err
|
| }
|
|
|
| prpcClient := prpc.Client{
|
| - C: httpClient,
|
| + C: &http.Client{
|
| + Transport: transport,
|
| + },
|
| Host: s.coordinatorHost,
|
| Options: prpc.DefaultOptions(),
|
| }
|
| - prpcClient.Options.UserAgent = fmt.Sprintf("%s/%s", s.Name, prpc.DefaultUserAgent)
|
| if s.coordinatorInsecure {
|
| prpcClient.Options.Insecure = true
|
| }
|
| @@ -535,24 +542,14 @@ func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error
|
| return nil, ErrInvalidConfig
|
| }
|
|
|
| - // Initialize Storage authentication.
|
| - tokenSource, err := s.TokenSource(c, func(o *auth.Options) {
|
| - o.Scopes = bigtable.StorageScopes
|
| - if s.storageCredentialJSONPath != "" {
|
| - o.ServiceAccountJSONPath = s.storageCredentialJSONPath
|
| - }
|
| - })
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create BigTable TokenSource.")
|
| - return nil, err
|
| - }
|
| -
|
| + // Initialize RPC credentials.
|
| bt, err := bigtable.New(c, bigtable.Options{
|
| Project: btcfg.Project,
|
| Instance: btcfg.Instance,
|
| LogTable: btcfg.LogTableName,
|
| ClientOptions: []option.ClientOption{
|
| - option.WithTokenSource(tokenSource),
|
| + option.WithUserAgent(s.getUserAgent()),
|
| + option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c, bigtable.StorageScopes...)),
|
| },
|
| })
|
| if err != nil {
|
| @@ -563,15 +560,15 @@ func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error
|
|
|
| // GSClient returns an authenticated Google Storage client instance.
|
| func (s *Service) GSClient(c context.Context) (gs.Client, error) {
|
| - rt, err := s.AuthenticatedTransport(c, func(o *auth.Options) {
|
| - o.Scopes = gs.ReadWriteScopes
|
| - })
|
| + // Get an Authenticator bound to the token scopes that we need for
|
| + // authenticated Cloud Storage access.
|
| + transport, err := serverAuth.GetRPCTransport(c, serverAuth.AsSelf, serverAuth.WithScopes(gs.ReadWriteScopes...))
|
| if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create authenticated GS transport.")
|
| + log.WithError(err).Errorf(c, "Failed to create authenticated transport for Google Storage client.")
|
| return nil, err
|
| }
|
|
|
| - client, err := gs.NewProdClient(c, rt)
|
| + client, err := gs.NewProdClient(c, transport)
|
| if err != nil {
|
| log.WithError(err).Errorf(c, "Failed to create Google Storage client.")
|
| return nil, err
|
| @@ -579,57 +576,81 @@ func (s *Service) GSClient(c context.Context) (gs.Client, error) {
|
| return client, nil
|
| }
|
|
|
| -// Authenticator returns an Authenticator instance. The Authenticator is
|
| -// configured from a base set of Authenticator Options.
|
| -//
|
| -// An optional permutation function can be provided to modify those Options
|
| -// before the Authenticator is created.
|
| -func (s *Service) Authenticator(c context.Context, f func(o *auth.Options)) (*auth.Authenticator, error) {
|
| - authOpts, err := s.authFlags.Options()
|
| - if err != nil {
|
| - return nil, ErrInvalidConfig
|
| - }
|
| - if f != nil {
|
| - f(&authOpts)
|
| - }
|
| - return auth.NewAuthenticator(c, auth.SilentLogin, authOpts), nil
|
| +// PubSubSubscriberClient returns a Pub/Sub client instance that is
|
| +// authenticated with Pub/Sub subscriber scopes.
|
| +func (s *Service) PubSubSubscriberClient(c context.Context, projectID string) (*pubsub.Client, error) {
|
| + return pubsub.NewClient(c, projectID,
|
| + option.WithUserAgent(s.getUserAgent()),
|
| + option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c, gcps.SubscriberScopes...)))
|
| }
|
|
|
| -// AuthenticatedTransport returns an authenticated http.RoundTripper transport.
|
| -// The transport is configured from a base set of Authenticator Options.
|
| -//
|
| -// An optional permutation function can be provided to modify those Options
|
| -// before the Authenticator is created.
|
| -func (s *Service) AuthenticatedTransport(c context.Context, f func(o *auth.Options)) (http.RoundTripper, error) {
|
| - a, err := s.Authenticator(c, f)
|
| - if err != nil {
|
| - return nil, err
|
| +func (s *Service) unauthenticatedTransport() http.RoundTripper {
|
| + smt := serviceModifyingTransport{
|
| + userAgent: s.getUserAgent(),
|
| }
|
| - return a.Transport()
|
| + return smt.roundTripper(nil)
|
| }
|
|
|
| -// AuthenticatedClient returns an authenticated http.Client. The Client is
|
| -// configured from a base set of Authenticator Options.
|
| -//
|
| -// An optional permutation function can be provided to modify those Options
|
| -// before the Authenticator is created.
|
| -func (s *Service) AuthenticatedClient(c context.Context, f func(o *auth.Options)) (*http.Client, error) {
|
| - a, err := s.Authenticator(c, f)
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - return a.Client()
|
| +func (s *Service) getUserAgent() string { return s.Name + " / " + s.serviceID }
|
| +
|
| +// withAuthService configures service-wide authentication and installs it into
|
| +// the supplied Context.
|
| +func (s *Service) withAuthService(c context.Context) context.Context {
|
| + return serverAuth.SetConfig(c, serverAuth.Config{
|
| + DBProvider: nil, // We don't need to store an auth DB.
|
| + Signer: nil, // We don't need to sign anything.
|
| + AccessTokenProvider: func(ic context.Context, scopes []string) (commonAuth.Token, error) {
|
| + // Create a new Authenticator for the supplied scopes.
|
| + //
|
| + // Pass our outer Context, since we don't want the cached Authenticator
|
| + // instance to be permanently bound to the inner Context.
|
| + a, err := s.authenticatorForScopes(c, scopes)
|
| + if err != nil {
|
| + return commonAuth.Token{}, err
|
| + }
|
| + return a.GetAccessToken(minAuthTokenLifetime)
|
| + },
|
| + AnonymousTransport: func(ic context.Context) http.RoundTripper {
|
| + return s.unauthenticatedTransport()
|
| + },
|
| + Cache: serverAuth.MemoryCache(authCacheSize),
|
| + })
|
| }
|
|
|
| -// TokenSource returns oauth2.TokenSource configured from a base set of
|
| -// Authenticator Options.
|
| -//
|
| -// An optional permutation function can be provided to modify those Options
|
| -// before the Authenticator is created.
|
| -func (s *Service) TokenSource(c context.Context, f func(o *auth.Options)) (oauth2.TokenSource, error) {
|
| - a, err := s.Authenticator(c, f)
|
| +func (s *Service) authenticatorForScopes(c context.Context, scopes []string) (*commonAuth.Authenticator, error) {
|
| + sort.Strings(scopes)
|
| + key := strings.Join(scopes, "\x00")
|
| +
|
| + // First, check holding read lock.
|
| + s.authCacheLock.RLock()
|
| + a := s.authCache[key]
|
| + s.authCacheLock.RUnlock()
|
| +
|
| + if a != nil {
|
| + return a, nil
|
| + }
|
| +
|
| + // No authenticator yet, check again with write lock.
|
| + s.authCacheLock.Lock()
|
| + defer s.authCacheLock.Unlock()
|
| +
|
| + if a = s.authCache[key]; a != nil {
|
| + // One was created in between locking!
|
| + return a, nil
|
| + }
|
| +
|
| + // Create a new Authenticator.
|
| + authOpts, err := s.authFlags.Options()
|
| if err != nil {
|
| - return nil, err
|
| + return nil, ErrInvalidConfig
|
| + }
|
| + authOpts.Scopes = append([]string(nil), scopes...)
|
| + authOpts.Transport = s.unauthenticatedTransport()
|
| +
|
| + a = commonAuth.NewAuthenticator(c, commonAuth.SilentLogin, authOpts)
|
| + if s.authCache == nil {
|
| + s.authCache = make(map[string]*commonAuth.Authenticator)
|
| }
|
| - return a.TokenSource()
|
| + s.authCache[key] = a
|
| + return a, nil
|
| }
|
|
|