| Index: logdog/appengine/coordinator/service.go
|
| diff --git a/logdog/appengine/coordinator/service.go b/logdog/appengine/coordinator/service.go
|
| index 0b644d3c5e2f97a2aac4d51e8ee65719e90a986e..b2a1baa867f1c1b1b9ac9ffa35c675bf58bd2469 100644
|
| --- a/logdog/appengine/coordinator/service.go
|
| +++ b/logdog/appengine/coordinator/service.go
|
| @@ -8,8 +8,11 @@ import (
|
| "net/http"
|
| "sync"
|
| "sync/atomic"
|
| + "time"
|
|
|
| + "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner"
|
| "github.com/luci/luci-go/appengine/gaemiddleware"
|
| + "github.com/luci/luci-go/common/clock"
|
| luciConfig "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| @@ -18,18 +21,23 @@ import (
|
| "github.com/luci/luci-go/logdog/api/config/svcconfig"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator/config"
|
| "github.com/luci/luci-go/logdog/common/storage"
|
| + "github.com/luci/luci-go/logdog/common/storage/archive"
|
| "github.com/luci/luci-go/logdog/common/storage/bigtable"
|
| "github.com/luci/luci-go/logdog/common/storage/caching"
|
| "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/router"
|
|
|
| gcps "cloud.google.com/go/pubsub"
|
| + gcst "cloud.google.com/go/storage"
|
| "golang.org/x/net/context"
|
| "google.golang.org/api/option"
|
| "google.golang.org/grpc"
|
| "google.golang.org/grpc/metadata"
|
| )
|
|
|
| +// maxSignedURLLifetime is the maximum allowed signed URL lifetime.
|
| +const maxSignedURLLifetime = 1 * time.Hour
|
| +
|
| // Services is a set of support services used by Coordinator.
|
| //
|
| // Each Services instance is valid for a singel request, but can be re-used
|
| @@ -56,20 +64,13 @@ type Services interface {
|
| // Returns the same error codes as config.ProjectConfig.
|
| ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.ProjectConfig, error)
|
|
|
| - // Storage returns an intermediate storage instance for use by this service.
|
| + // Storage returns a Storage instance for the supplied log stream.
|
| //
|
| // The caller must close the returned instance if successful.
|
| - IntermediateStorage(context.Context) (storage.Storage, error)
|
| -
|
| - // GSClient instantiates a Google Storage client.
|
| - GSClient(context.Context) (gs.Client, error)
|
| + StorageForStream(context.Context, *LogStreamState) (Storage, error)
|
|
|
| // ArchivalPublisher returns an ArchivalPublisher instance.
|
| ArchivalPublisher(context.Context) (ArchivalPublisher, error)
|
| -
|
| - // StorageCache returns the storage cache instance to use, or nil for no
|
| - // caching.
|
| - StorageCache() caching.Cache
|
| }
|
|
|
| // ProdServices is middleware chain used by Coordinator services.
|
| @@ -96,6 +97,9 @@ type prodServicesInst struct {
|
| // ArchivalPublisher. This is shared between all ArchivalPublisher instances
|
| // from this service.
|
| archivalIndex int32
|
| +
|
| + // signer is the signer instance to use.
|
| + signer gaesigner.Signer
|
| }
|
|
|
| func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) {
|
| @@ -158,7 +162,21 @@ func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P
|
| return s.getOrCreateCachedProjectConfig(project).resolve(c)
|
| }
|
|
|
| -func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Storage, error) {
|
| +func (s *prodServicesInst) StorageForStream(c context.Context, lst *LogStreamState) (Storage, error) {
|
| + if !lst.ArchivalState().Archived() {
|
| + log.Debugf(c, "Log is not archived. Fetching from intermediate storage.")
|
| + return s.newBigTableStorage(c)
|
| + }
|
| +
|
| + log.Fields{
|
| + "indexURL": lst.ArchiveIndexURL,
|
| + "streamURL": lst.ArchiveStreamURL,
|
| + "archiveTime": lst.ArchivedTime,
|
| + }.Debugf(c, "Log is archived. Fetching from archive storage.")
|
| + return s.newGoogleStorage(c, gs.Path(lst.ArchiveIndexURL), gs.Path(lst.ArchiveStreamURL))
|
| +}
|
| +
|
| +func (s *prodServicesInst) newBigTableStorage(c context.Context) (Storage, error) {
|
| cfg, err := s.Config(c)
|
| if err != nil {
|
| return nil, err
|
| @@ -213,16 +231,54 @@ func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Stora
|
| ClientOptions: []option.ClientOption{
|
| option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)),
|
| },
|
| - Cache: s.StorageCache(),
|
| + Cache: s.getStorageCache(),
|
| })
|
| if err != nil {
|
| log.WithError(err).Errorf(c, "Failed to create BigTable instance.")
|
| return nil, err
|
| }
|
| - return st, nil
|
| +
|
| + return &bigTableStorage{
|
| + Storage: st,
|
| + }, nil
|
| }
|
|
|
| -func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) {
|
| +func (s *prodServicesInst) newGoogleStorage(c context.Context, index, stream gs.Path) (Storage, error) {
|
| + gs, err := s.newGSClient(c)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create Google Storage client.")
|
| + return nil, err
|
| + }
|
| + defer func() {
|
| + if gs != nil {
|
| + if err := gs.Close(); err != nil {
|
| + log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
|
| + }
|
| + }
|
| + }()
|
| +
|
| + st, err := archive.New(c, archive.Options{
|
| + Index: index,
|
| + Stream: stream,
|
| + Client: gs,
|
| + Cache: s.getStorageCache(),
|
| + })
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create Google Storage storage instance.")
|
| + return nil, err
|
| + }
|
| +
|
| + gs = nil // Don't close in defer.
|
| + return &googleStorage{
|
| + Storage: st,
|
| + svc: s,
|
| + gs: gs,
|
| + stream: stream,
|
| + index: index,
|
| + }, nil
|
| +}
|
| +
|
| +func (s *prodServicesInst) newGSClient(c context.Context) (gs.Client, error) {
|
| // Get an Authenticator bound to the token scopes that we need for
|
| // authenticated Cloud Storage access.
|
| transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(gs.ReadOnlyScopes...))
|
| @@ -284,4 +340,138 @@ func (s *prodServicesInst) nextArchiveIndex() uint64 {
|
|
|
| var storageCacheSingleton StorageCache
|
|
|
| -func (s *prodServicesInst) StorageCache() caching.Cache { return &storageCacheSingleton }
|
| +func (s *prodServicesInst) getStorageCache() caching.Cache { return &storageCacheSingleton }
|
| +
|
| +// Storage is an interface to storage used by the Coordinator.
|
| +type Storage interface {
|
| + // Storage is the base Storage instance.
|
| + storage.Storage
|
| +
|
| + // GetSignedURLs attempts to sign the storage's stream's RecordIO archive
|
| + // stream storage URL.
|
| + //
|
| + // If signing is not supported by this Storage instance, this will return
|
| + // a nil signing response and no error.
|
| + GetSignedURLs(context.Context, *URLSigningRequest) (*URLSigningResponse, error)
|
| +}
|
| +
|
| +// URLSigningRequest is the set of URL signing parameters passed to a
|
| +// Storage.GetSignedURLs call.
|
| +type URLSigningRequest struct {
|
| + // Expriation is the signed URL expiration time.
|
| + Lifetime time.Duration
|
| +
|
| + // Stream, if true, requests a signed log stream URL.
|
| + Stream bool
|
| + // Index, if true, requests a signed log stream index URL.
|
| + Index bool
|
| +}
|
| +
|
| +// HasWork returns true if this signing request actually has work that is
|
| +// requested.
|
| +func (r *URLSigningRequest) HasWork() bool {
|
| + return (r.Stream || r.Index) && (r.Lifetime > 0)
|
| +}
|
| +
|
| +// URLSigningResponse is the resulting signed URLs from a Storage.GetSignedURLs
|
| +// call.
|
| +type URLSigningResponse struct {
|
| + // Expriation is the signed URL expiration time.
|
| + Expiration time.Time
|
| +
|
| + // Stream is the signed URL for the log stream, if requested.
|
| + Stream string
|
| + // Index is the signed URL for the log stream index, if requested.
|
| + Index string
|
| +}
|
| +
|
| +// intermediateStorage is a Storage instance bound to BigTable.
|
| +type bigTableStorage struct {
|
| + // Storage is the base storage.Storage instance.
|
| + storage.Storage
|
| +}
|
| +
|
| +func (*bigTableStorage) GetSignedURLs(context.Context, *URLSigningRequest) (*URLSigningResponse, error) {
|
| + return nil, nil
|
| +}
|
| +
|
| +type googleStorage struct {
|
| + // Storage is the base storage.Storage instance.
|
| + storage.Storage
|
| + // svc is the services instance that created this.
|
| + svc *prodServicesInst
|
| +
|
| + // ctx is the Context that was bound at the time of of creation.
|
| + ctx context.Context
|
| + // gs is the backing Google Storage client.
|
| + gs gs.Client
|
| +
|
| + // stream is the stream's Google Storage URL.
|
| + stream gs.Path
|
| + // index is the index's Google Storage URL.
|
| + index gs.Path
|
| +
|
| + gsSigningOpts func(context.Context) (*gcst.SignedURLOptions, error)
|
| +}
|
| +
|
| +func (si *googleStorage) Close() {
|
| + if err := si.gs.Close(); err != nil {
|
| + log.WithError(err).Warningf(si.ctx, "Failed to close Google Storage client.")
|
| + }
|
| + si.Storage.Close()
|
| +}
|
| +
|
| +func (si *googleStorage) GetSignedURLs(c context.Context, req *URLSigningRequest) (*URLSigningResponse, error) {
|
| + info, err := si.svc.signer.ServiceInfo(c)
|
| + if err != nil {
|
| + return nil, errors.Annotate(err).InternalReason("failed to get service info").Err()
|
| + }
|
| +
|
| + lifetime := req.Lifetime
|
| + switch {
|
| + case lifetime < 0:
|
| + return nil, errors.Reason("invalid signed URL lifetime: %(lifetime)s").D("lifetime", lifetime).Err()
|
| +
|
| + case lifetime > maxSignedURLLifetime:
|
| + lifetime = maxSignedURLLifetime
|
| + }
|
| +
|
| + // Get our signing options.
|
| + resp := URLSigningResponse{
|
| + Expiration: clock.Now(c).Add(lifetime),
|
| + }
|
| + opts := gcst.SignedURLOptions{
|
| + GoogleAccessID: info.ServiceAccountName,
|
| + SignBytes: func(b []byte) ([]byte, error) {
|
| + _, signedBytes, err := si.svc.signer.SignBytes(c, b)
|
| + return signedBytes, err
|
| + },
|
| + Method: "GET",
|
| + Expires: resp.Expiration,
|
| + }
|
| +
|
| + doSign := func(path gs.Path) (string, error) {
|
| + url, err := gcst.SignedURL(path.Bucket(), path.Filename(), &opts)
|
| + if err != nil {
|
| + return "", errors.Annotate(err).InternalReason("failed to sign URL").
|
| + D("bucket", path.Bucket()).D("filename", path.Filename).Err()
|
| + }
|
| + return url, nil
|
| + }
|
| +
|
| + // Sign stream URL.
|
| + if req.Stream {
|
| + if resp.Stream, err = doSign(si.stream); err != nil {
|
| + return nil, errors.Annotate(err).InternalReason("failed to sign stream URL").Err()
|
| + }
|
| + }
|
| +
|
| + // Sign index URL.
|
| + if req.Index {
|
| + if resp.Index, err = doSign(si.index); err != nil {
|
| + return nil, errors.Annotate(err).InternalReason("failed to sign index URL").Err()
|
| + }
|
| + }
|
| +
|
| + return &resp, nil
|
| +}
|
|
|