Chromium Code Reviews| Index: logdog/appengine/coordinator/service.go |
| diff --git a/logdog/appengine/coordinator/service.go b/logdog/appengine/coordinator/service.go |
| index 0b644d3c5e2f97a2aac4d51e8ee65719e90a986e..6f5e4e8813833a1cdfed27644f82bfa628483664 100644 |
| --- a/logdog/appengine/coordinator/service.go |
| +++ b/logdog/appengine/coordinator/service.go |
| @@ -8,8 +8,10 @@ import ( |
| "net/http" |
| "sync" |
| "sync/atomic" |
| + "time" |
| "github.com/luci/luci-go/appengine/gaemiddleware" |
| + "github.com/luci/luci-go/common/clock" |
| luciConfig "github.com/luci/luci-go/common/config" |
| "github.com/luci/luci-go/common/errors" |
| "github.com/luci/luci-go/common/gcloud/gs" |
| @@ -18,18 +20,29 @@ import ( |
| "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
| "github.com/luci/luci-go/logdog/common/storage" |
| + "github.com/luci/luci-go/logdog/common/storage/archive" |
| "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| "github.com/luci/luci-go/logdog/common/storage/caching" |
| "github.com/luci/luci-go/server/auth" |
| "github.com/luci/luci-go/server/router" |
| + "github.com/luci/gae/service/info" |
| + |
| gcps "cloud.google.com/go/pubsub" |
| + gcst "cloud.google.com/go/storage" |
| "golang.org/x/net/context" |
| "google.golang.org/api/option" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/metadata" |
| ) |
| +// ErrSigningNotSupported is a sentinel error returned by Storage.SignURL if |
| +// signing is not supported. |
| +var ErrSigningNotSupported = errors.New("signing URLs is not supported") |
| + |
| +// maxSignedURLLifetime is the maximum allowed signed URL lifetime. |
|
Vadim Sh.
2016/11/30 21:03:52
mention this limit in *.proto doc too
dnj
2016/12/01 17:39:31
I don't want to put constraints in the proto doc,
Vadim Sh.
2016/12/01 19:32:12
1. The doc doesn't mention it anymore.
2. "<=" is
|
| +const maxSignedURLLifetime = 1 * time.Hour |
| + |
| // Services is a set of support services used by Coordinator. |
| // |
| // Each Services instance is valid for a singel request, but can be re-used |
| @@ -56,20 +69,13 @@ type Services interface { |
| // Returns the same error codes as config.ProjectConfig. |
| ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.ProjectConfig, error) |
| - // Storage returns an intermediate storage instance for use by this service. |
| + // Storage returns a Storage instance for the supplied log stream. |
| // |
| // The caller must close the returned instance if successful. |
| - IntermediateStorage(context.Context) (storage.Storage, error) |
| - |
| - // GSClient instantiates a Google Storage client. |
| - GSClient(context.Context) (gs.Client, error) |
| + StorageForStream(context.Context, *LogStreamState) (Storage, error) |
| // ArchivalPublisher returns an ArchivalPublisher instance. |
| ArchivalPublisher(context.Context) (ArchivalPublisher, error) |
| - |
| - // StorageCache returns the storage cache instance to use, or nil for no |
| - // caching. |
| - StorageCache() caching.Cache |
| } |
| // ProdServices is middleware chain used by Coordinator services. |
| @@ -158,7 +164,21 @@ func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P |
| return s.getOrCreateCachedProjectConfig(project).resolve(c) |
| } |
| -func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Storage, error) { |
| +func (s *prodServicesInst) StorageForStream(c context.Context, lst *LogStreamState) (Storage, error) { |
| + if !lst.ArchivalState().Archived() { |
| + log.Debugf(c, "Log is not archived. Fetching from intermediate storage.") |
| + return s.newBigTableStorage(c) |
| + } |
| + |
| + log.Fields{ |
| + "indexURL": lst.ArchiveIndexURL, |
| + "streamURL": lst.ArchiveStreamURL, |
| + "archiveTime": lst.ArchivedTime, |
| + }.Debugf(c, "Log is archived. Fetching from archive storage.") |
| + return s.newGoogleStorage(c, gs.Path(lst.ArchiveIndexURL), gs.Path(lst.ArchiveStreamURL)) |
| +} |
| + |
| +func (s *prodServicesInst) newBigTableStorage(c context.Context) (Storage, error) { |
| cfg, err := s.Config(c) |
| if err != nil { |
| return nil, err |
| @@ -213,16 +233,52 @@ func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Stora |
| ClientOptions: []option.ClientOption{ |
| option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)), |
| }, |
| - Cache: s.StorageCache(), |
| + Cache: s.getStorageCache(), |
| }) |
| if err != nil { |
| log.WithError(err).Errorf(c, "Failed to create BigTable instance.") |
| return nil, err |
| } |
| - return st, nil |
| + |
| + return &bigTableStorage{ |
| + Storage: st, |
| + }, nil |
| +} |
| + |
| +func (s *prodServicesInst) newGoogleStorage(c context.Context, index, stream gs.Path) (Storage, error) { |
| + gs, err := s.newGSClient(c) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to create Google Storage client.") |
| + return nil, err |
| + } |
| + defer func() { |
| + if gs != nil { |
| + if err := gs.Close(); err != nil { |
| + log.WithError(err).Warningf(c, "Failed to close Google Storage client.") |
| + } |
| + } |
| + }() |
| + |
| + st, err := archive.New(c, archive.Options{ |
| + Index: index, |
| + Stream: stream, |
| + Client: gs, |
| + Cache: s.getStorageCache(), |
| + }) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to create Google Storage storage instance.") |
| + return nil, err |
| + } |
| + |
| + gs = nil // Don't close in defer. |
|
Vadim Sh.
2016/11/30 21:03:52
nit: well.. there's only one exit point from this
dnj
2016/12/01 17:39:31
I suppose I gravitate towards defer to protect aga
|
| + return &googleStorage{ |
| + Storage: st, |
| + gs: gs, |
| + stream: stream, |
| + }, nil |
| } |
| -func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { |
| +func (s *prodServicesInst) newGSClient(c context.Context) (gs.Client, error) { |
| // Get an Authenticator bound to the token scopes that we need for |
| // authenticated Cloud Storage access. |
| transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(gs.ReadOnlyScopes...)) |
| @@ -284,4 +340,84 @@ func (s *prodServicesInst) nextArchiveIndex() uint64 { |
| var storageCacheSingleton StorageCache |
| -func (s *prodServicesInst) StorageCache() caching.Cache { return &storageCacheSingleton } |
| +func (s *prodServicesInst) getStorageCache() caching.Cache { return &storageCacheSingleton } |
| + |
| +// Storage is an interface to storage used by the Coordinator. |
| +type Storage interface { |
| + // Storage is the base Storage instance. |
| + storage.Storage |
| + |
| + // SignStreamURL attempts to sign the storage's stream's RecordIO archive |
| + // stream storage URL. |
| + // |
| + // If signing is not supported by this Storage instance, this will return |
| + // ErrNotSupported. |
| + SignStreamURL(context.Context, time.Duration) (string, time.Time, error) |
| +} |
| + |
| +// intermediateStorage is a Storage instance bound to BigTable. |
| +type bigTableStorage struct { |
| + // Storage is the base storage.Storage instance. |
| + storage.Storage |
| +} |
| + |
| +func (*bigTableStorage) SignStreamURL(c context.Context, lifetime time.Duration) (string, time.Time, error) { |
| + return "", time.Time{}, ErrSigningNotSupported |
| +} |
| + |
| +type googleStorage struct { |
| + // Storage is the base storage.Storage instance. |
| + storage.Storage |
| + |
| + // ctx is the Context that was bound at the time of of creation. |
| + ctx context.Context |
| + // gs is the backing Google Storage client. |
| + gs gs.Client |
| + // stream is the stream's Google Storage URL. |
| + stream gs.Path |
| + |
| + gsSigningOpts func(context.Context) (*gcst.SignedURLOptions, error) |
| +} |
| + |
| +func (gs *googleStorage) Close() { |
| + if err := gs.gs.Close(); err != nil { |
| + log.WithError(err).Warningf(gs.ctx, "Failed to close Google Storage client.") |
| + } |
| + gs.Storage.Close() |
| +} |
| + |
| +func (gs *googleStorage) SignStreamURL(c context.Context, lifetime time.Duration) (url string, expires time.Time, err error) { |
| + acct, err := info.ServiceAccount(c) |
|
Vadim Sh.
2016/11/30 21:09:09
oh, btw this is RPC to the backend too: https://gi
dnj
2016/12/01 17:39:30
Done.
|
| + if err != nil { |
| + err = errors.Annotate(err).InternalReason("failed to get service account name").Err() |
| + return |
| + } |
| + |
| + switch { |
| + case lifetime < 0: |
| + err = errors.Reason("invalid signed URL lifetime: %(lifetime)s").D("lifetime", lifetime).Err() |
| + return |
| + |
| + case lifetime > maxSignedURLLifetime: |
| + lifetime = maxSignedURLLifetime |
| + } |
| + |
| + // Get our signing options. |
| + expires = clock.Now(c).Add(lifetime) |
| + opts := gcst.SignedURLOptions{ |
| + GoogleAccessID: acct, |
| + SignBytes: func(b []byte) ([]byte, error) { |
| + _, signedBytes, err := info.SignBytes(c, b) |
| + return signedBytes, err |
| + }, |
| + Method: "GET", |
| + Expires: expires, |
| + } |
| + |
| + if url, err = gcst.SignedURL(gs.stream.Bucket(), gs.stream.Filename(), &opts); err != nil { |
|
Vadim Sh.
2016/11/30 21:03:52
consider caching the result in the future. SignByt
dnj
2016/12/01 17:39:31
This gets tricky, since the actual signed URLs hav
Vadim Sh.
2016/12/01 19:32:13
That's why I assumed >= in sign_entry_url_lifetime
|
| + err = errors.Annotate(err).InternalReason("failed to sign URL"). |
| + D("bucket", gs.stream.Bucket()).D("filename", gs.stream.Filename).Err() |
| + return |
| + } |
| + return |
| +} |