Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(183)

Unified Diff: logdog/server/service/service.go

Issue 2643363002: LogDog: Use server/auth for authentication. (Closed)
Patch Set: LogDog: Use server/auth for authentication. Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « logdog/server/service/rt.go ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: logdog/server/service/service.go
diff --git a/logdog/server/service/service.go b/logdog/server/service/service.go
index 711e94c4cbec18adef60334e36361816b6ffabdf..0b40311f5fa6724d3143f96e5cd7e2a63501252a 100644
--- a/logdog/server/service/service.go
+++ b/logdog/server/service/service.go
@@ -6,24 +6,27 @@ package service
import (
"flag"
- "fmt"
"net/http"
"net/url"
"os"
"os/signal"
"path/filepath"
"runtime/pprof"
+ "sort"
+ "strings"
+ "sync"
"sync/atomic"
"time"
"github.com/luci/luci-go/appengine/gaesettings"
"github.com/luci/luci-go/client/authcli"
- "github.com/luci/luci-go/common/auth"
+ commonAuth "github.com/luci/luci-go/common/auth"
"github.com/luci/luci-go/common/clock/clockflag"
"github.com/luci/luci-go/common/config/impl/filesystem"
"github.com/luci/luci-go/common/data/caching/proccache"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
+ gcps "github.com/luci/luci-go/common/gcloud/pubsub"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/logging/gologger"
"github.com/luci/luci-go/common/proto/google"
@@ -41,14 +44,15 @@ import (
"github.com/luci/luci-go/luci_config/server/cfgclient/backend/client"
"github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig"
"github.com/luci/luci-go/luci_config/server/cfgclient/textproto"
+ serverAuth "github.com/luci/luci-go/server/auth"
"github.com/luci/luci-go/server/settings"
"github.com/luci/gae/impl/cloud"
"cloud.google.com/go/compute/metadata"
"cloud.google.com/go/datastore"
+ "cloud.google.com/go/pubsub"
"golang.org/x/net/context"
- "golang.org/x/oauth2"
"google.golang.org/api/option"
)
@@ -60,7 +64,7 @@ var (
// CoordinatorScopes is the set of OAuth2 scopes to use for the Coordinator
// client.
CoordinatorScopes = []string{
- auth.OAuthScopeEmail,
+ commonAuth.OAuthScopeEmail,
}
)
@@ -72,6 +76,13 @@ const (
// minAuthTokenLifetime is the amount of time that an access token has before
// expiring.
minAuthTokenLifetime = 2 * time.Minute
+
+ // authCacheSize is the maximum number of elements to store in the auth
+ // global cache LRU.
+ //
+ // We don't expect to load too many different authentication tokens, so a
+ // relatively low number should be fine here.
+ authCacheSize = 128
)
// Service is a base class full of common LogDog service application parameters.
@@ -90,11 +101,10 @@ type Service struct {
authFlags authcli.Flags
tsMonFlags tsmon.Flags
- coordinatorHost string
- coordinatorInsecure bool
- storageCredentialJSONPath string
- cpuProfilePath string
- heapProfilePath string
+ coordinatorHost string
+ coordinatorInsecure bool
+ cpuProfilePath string
+ heapProfilePath string
// onGCE is true if we're on GCE. We probe this once during Run.
onGCE bool
@@ -122,6 +132,11 @@ type Service struct {
serviceID string
coord logdog.ServicesClient
+
+ // authCache is a cache of instantiated Authenticator instances, keyed on
+ // sorted NULL-delimited scope strings (see authenticatorForScopes).
+ authCacheLock sync.RWMutex
+ authCache map[string]*commonAuth.Authenticator
}
// Run performs service-wide initialization and invokes the specified run
@@ -202,6 +217,9 @@ func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro
return errors.New("no service ID was configured (-service-id)")
}
+ // Install our authentication service.
+ c = s.withAuthService(c)
+
// Install a cloud datastore client. This is non-fatal if it fails.
dsClient, err := s.initDatastoreClient(c)
if err == nil {
@@ -287,7 +305,7 @@ func (s *Service) addFlags(c context.Context, fs *flag.FlagSet) {
s.tsMonFlags.Target.TaskJobName = s.Name
s.tsMonFlags.Register(fs)
- s.authFlags.Register(fs, auth.Options{})
+ s.authFlags.Register(fs, commonAuth.Options{})
fs.StringVar(&s.serviceID, "service-id", "",
"Specify the service ID that this instance is supporting. If empty, the service ID "+
@@ -297,8 +315,6 @@ func (s *Service) addFlags(c context.Context, fs *flag.FlagSet) {
"The Coordinator service's [host][:port].")
fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false,
"Connect to Coordinator over HTTP (instead of HTTPS).")
- fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path", "",
- "If supplied, the path of a JSON credential file to load and use for storage operations.")
fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "",
"If supplied, enable CPU profiling and write the profile here.")
fs.StringVar(&s.heapProfilePath, "heap-profile-path", "",
@@ -331,17 +347,9 @@ func (s *Service) probeGCEEnvironment(c context.Context) {
}
func (s *Service) initDatastoreClient(c context.Context) (*datastore.Client, error) {
- // Initialize Storage authentication.
- tokenSource, err := s.TokenSource(c, func(o *auth.Options) {
- o.Scopes = []string{datastore.ScopeDatastore}
- })
- if err != nil {
- log.WithError(err).Errorf(c, "Failed to create datastore TokenSource.")
- return nil, err
- }
-
return datastore.NewClient(c, s.serviceID,
- option.WithTokenSource(tokenSource))
+ option.WithUserAgent(s.getUserAgent()),
+ option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c, datastore.ScopeDatastore)))
}
func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClient, error) {
@@ -350,20 +358,19 @@ func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
return nil, ErrInvalidConfig
}
- httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) {
- o.Scopes = CoordinatorScopes
- })
+ transport, err := serverAuth.GetRPCTransport(c, serverAuth.AsSelf, serverAuth.WithScopes(CoordinatorScopes...))
if err != nil {
- log.WithError(err).Errorf(c, "Failed to create authenticated client.")
+ log.Errorf(c, "Failed to create authenticated transport for Coordinator client.")
return nil, err
}
prpcClient := prpc.Client{
- C: httpClient,
+ C: &http.Client{
+ Transport: transport,
+ },
Host: s.coordinatorHost,
Options: prpc.DefaultOptions(),
}
- prpcClient.Options.UserAgent = fmt.Sprintf("%s/%s", s.Name, prpc.DefaultUserAgent)
if s.coordinatorInsecure {
prpcClient.Options.Insecure = true
}
@@ -535,24 +542,14 @@ func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error
return nil, ErrInvalidConfig
}
- // Initialize Storage authentication.
- tokenSource, err := s.TokenSource(c, func(o *auth.Options) {
- o.Scopes = bigtable.StorageScopes
- if s.storageCredentialJSONPath != "" {
- o.ServiceAccountJSONPath = s.storageCredentialJSONPath
- }
- })
- if err != nil {
- log.WithError(err).Errorf(c, "Failed to create BigTable TokenSource.")
- return nil, err
- }
-
+ // Initialize RPC credentials.
bt, err := bigtable.New(c, bigtable.Options{
Project: btcfg.Project,
Instance: btcfg.Instance,
LogTable: btcfg.LogTableName,
ClientOptions: []option.ClientOption{
- option.WithTokenSource(tokenSource),
+ option.WithUserAgent(s.getUserAgent()),
+ option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c, bigtable.StorageScopes...)),
},
})
if err != nil {
@@ -563,15 +560,15 @@ func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error
// GSClient returns an authenticated Google Storage client instance.
func (s *Service) GSClient(c context.Context) (gs.Client, error) {
- rt, err := s.AuthenticatedTransport(c, func(o *auth.Options) {
- o.Scopes = gs.ReadWriteScopes
- })
+ // Get an Authenticator bound to the token scopes that we need for
+ // authenticated Cloud Storage access.
+ transport, err := serverAuth.GetRPCTransport(c, serverAuth.AsSelf, serverAuth.WithScopes(gs.ReadWriteScopes...))
if err != nil {
- log.WithError(err).Errorf(c, "Failed to create authenticated GS transport.")
+ log.WithError(err).Errorf(c, "Failed to create authenticated transport for Google Storage client.")
return nil, err
}
- client, err := gs.NewProdClient(c, rt)
+ client, err := gs.NewProdClient(c, transport)
if err != nil {
log.WithError(err).Errorf(c, "Failed to create Google Storage client.")
return nil, err
@@ -579,57 +576,81 @@ func (s *Service) GSClient(c context.Context) (gs.Client, error) {
return client, nil
}
-// Authenticator returns an Authenticator instance. The Authenticator is
-// configured from a base set of Authenticator Options.
-//
-// An optional permutation function can be provided to modify those Options
-// before the Authenticator is created.
-func (s *Service) Authenticator(c context.Context, f func(o *auth.Options)) (*auth.Authenticator, error) {
- authOpts, err := s.authFlags.Options()
- if err != nil {
- return nil, ErrInvalidConfig
- }
- if f != nil {
- f(&authOpts)
- }
- return auth.NewAuthenticator(c, auth.SilentLogin, authOpts), nil
+// PubSubSubscriberClient returns a Pub/Sub client instance that is
+// authenticated with Pub/Sub subscriber scopes.
+func (s *Service) PubSubSubscriberClient(c context.Context, projectID string) (*pubsub.Client, error) {
+ return pubsub.NewClient(c, projectID,
+ option.WithUserAgent(s.getUserAgent()),
+ option.WithTokenSource(serverAuth.GetTokenSourceAsSelf(c, gcps.SubscriberScopes...)))
}
-// AuthenticatedTransport returns an authenticated http.RoundTripper transport.
-// The transport is configured from a base set of Authenticator Options.
-//
-// An optional permutation function can be provided to modify those Options
-// before the Authenticator is created.
-func (s *Service) AuthenticatedTransport(c context.Context, f func(o *auth.Options)) (http.RoundTripper, error) {
- a, err := s.Authenticator(c, f)
- if err != nil {
- return nil, err
+func (s *Service) unauthenticatedTransport() http.RoundTripper {
+ smt := serviceModifyingTransport{
+ userAgent: s.getUserAgent(),
}
- return a.Transport()
+ return smt.roundTripper(nil)
}
-// AuthenticatedClient returns an authenticated http.Client. The Client is
-// configured from a base set of Authenticator Options.
-//
-// An optional permutation function can be provided to modify those Options
-// before the Authenticator is created.
-func (s *Service) AuthenticatedClient(c context.Context, f func(o *auth.Options)) (*http.Client, error) {
- a, err := s.Authenticator(c, f)
- if err != nil {
- return nil, err
- }
- return a.Client()
+func (s *Service) getUserAgent() string { return s.Name + " / " + s.serviceID }
+
+// withAuthService configures service-wide authentication and installs it into
+// the supplied Context.
+func (s *Service) withAuthService(c context.Context) context.Context {
+ return serverAuth.SetConfig(c, serverAuth.Config{
+ DBProvider: nil, // We don't need to store an auth DB.
+ Signer: nil, // We don't need to sign anything.
+ AccessTokenProvider: func(ic context.Context, scopes []string) (commonAuth.Token, error) {
+ // Create a new Authenticator for the supplied scopes.
+ //
+ // Pass our outer Context, since we don't want the cached Authenticator
+ // instance to be permanently bound to the inner Context.
+ a, err := s.authenticatorForScopes(c, scopes)
+ if err != nil {
+ return commonAuth.Token{}, err
+ }
+ return a.GetAccessToken(minAuthTokenLifetime)
+ },
+ AnonymousTransport: func(ic context.Context) http.RoundTripper {
+ return s.unauthenticatedTransport()
+ },
+ Cache: serverAuth.MemoryCache(authCacheSize),
+ })
}
-// TokenSource returns oauth2.TokenSource configured from a base set of
-// Authenticator Options.
-//
-// An optional permutation function can be provided to modify those Options
-// before the Authenticator is created.
-func (s *Service) TokenSource(c context.Context, f func(o *auth.Options)) (oauth2.TokenSource, error) {
- a, err := s.Authenticator(c, f)
+func (s *Service) authenticatorForScopes(c context.Context, scopes []string) (*commonAuth.Authenticator, error) {
+ sort.Strings(scopes)
+ key := strings.Join(scopes, "\x00")
+
+ // First, check holding read lock.
+ s.authCacheLock.RLock()
+ a := s.authCache[key]
+ s.authCacheLock.RUnlock()
+
+ if a != nil {
+ return a, nil
+ }
+
+ // No authenticator yet, check again with write lock.
+ s.authCacheLock.Lock()
+ defer s.authCacheLock.Unlock()
+
+ if a = s.authCache[key]; a != nil {
+ // One was created in between locking!
+ return a, nil
+ }
+
+ // Create a new Authenticator.
+ authOpts, err := s.authFlags.Options()
if err != nil {
- return nil, err
+ return nil, ErrInvalidConfig
+ }
+ authOpts.Scopes = append([]string(nil), scopes...)
+ authOpts.Transport = s.unauthenticatedTransport()
+
+ a = commonAuth.NewAuthenticator(c, commonAuth.SilentLogin, authOpts)
+ if s.authCache == nil {
+ s.authCache = make(map[string]*commonAuth.Authenticator)
}
- return a.TokenSource()
+ s.authCache[key] = a
+ return a, nil
}
« no previous file with comments | « logdog/server/service/rt.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698