Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(146)

Unified Diff: logdog/appengine/coordinator/service.go

Issue 2538203002: LogDog: Add signed GS URL fetching. (Closed)
Patch Set: Allow index signing, use gaesigner. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: logdog/appengine/coordinator/service.go
diff --git a/logdog/appengine/coordinator/service.go b/logdog/appengine/coordinator/service.go
index 0b644d3c5e2f97a2aac4d51e8ee65719e90a986e..b2a1baa867f1c1b1b9ac9ffa35c675bf58bd2469 100644
--- a/logdog/appengine/coordinator/service.go
+++ b/logdog/appengine/coordinator/service.go
@@ -8,8 +8,11 @@ import (
"net/http"
"sync"
"sync/atomic"
+ "time"
+ "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner"
"github.com/luci/luci-go/appengine/gaemiddleware"
+ "github.com/luci/luci-go/common/clock"
luciConfig "github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
@@ -18,18 +21,23 @@ import (
"github.com/luci/luci-go/logdog/api/config/svcconfig"
"github.com/luci/luci-go/logdog/appengine/coordinator/config"
"github.com/luci/luci-go/logdog/common/storage"
+ "github.com/luci/luci-go/logdog/common/storage/archive"
"github.com/luci/luci-go/logdog/common/storage/bigtable"
"github.com/luci/luci-go/logdog/common/storage/caching"
"github.com/luci/luci-go/server/auth"
"github.com/luci/luci-go/server/router"
gcps "cloud.google.com/go/pubsub"
+ gcst "cloud.google.com/go/storage"
"golang.org/x/net/context"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
+// maxSignedURLLifetime is the maximum allowed signed URL lifetime.
+const maxSignedURLLifetime = 1 * time.Hour
+
// Services is a set of support services used by Coordinator.
//
// Each Services instance is valid for a singel request, but can be re-used
@@ -56,20 +64,13 @@ type Services interface {
// Returns the same error codes as config.ProjectConfig.
ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.ProjectConfig, error)
- // Storage returns an intermediate storage instance for use by this service.
+ // Storage returns a Storage instance for the supplied log stream.
//
// The caller must close the returned instance if successful.
- IntermediateStorage(context.Context) (storage.Storage, error)
-
- // GSClient instantiates a Google Storage client.
- GSClient(context.Context) (gs.Client, error)
+ StorageForStream(context.Context, *LogStreamState) (Storage, error)
// ArchivalPublisher returns an ArchivalPublisher instance.
ArchivalPublisher(context.Context) (ArchivalPublisher, error)
-
- // StorageCache returns the storage cache instance to use, or nil for no
- // caching.
- StorageCache() caching.Cache
}
// ProdServices is middleware chain used by Coordinator services.
@@ -96,6 +97,9 @@ type prodServicesInst struct {
// ArchivalPublisher. This is shared between all ArchivalPublisher instances
// from this service.
archivalIndex int32
+
+ // signer is the signer instance to use.
+ signer gaesigner.Signer
}
func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) {
@@ -158,7 +162,21 @@ func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P
return s.getOrCreateCachedProjectConfig(project).resolve(c)
}
-func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Storage, error) {
+func (s *prodServicesInst) StorageForStream(c context.Context, lst *LogStreamState) (Storage, error) {
+ if !lst.ArchivalState().Archived() {
+ log.Debugf(c, "Log is not archived. Fetching from intermediate storage.")
+ return s.newBigTableStorage(c)
+ }
+
+ log.Fields{
+ "indexURL": lst.ArchiveIndexURL,
+ "streamURL": lst.ArchiveStreamURL,
+ "archiveTime": lst.ArchivedTime,
+ }.Debugf(c, "Log is archived. Fetching from archive storage.")
+ return s.newGoogleStorage(c, gs.Path(lst.ArchiveIndexURL), gs.Path(lst.ArchiveStreamURL))
+}
+
+func (s *prodServicesInst) newBigTableStorage(c context.Context) (Storage, error) {
cfg, err := s.Config(c)
if err != nil {
return nil, err
@@ -213,16 +231,54 @@ func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Stora
ClientOptions: []option.ClientOption{
option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)),
},
- Cache: s.StorageCache(),
+ Cache: s.getStorageCache(),
})
if err != nil {
log.WithError(err).Errorf(c, "Failed to create BigTable instance.")
return nil, err
}
- return st, nil
+
+ return &bigTableStorage{
+ Storage: st,
+ }, nil
}
-func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) {
+func (s *prodServicesInst) newGoogleStorage(c context.Context, index, stream gs.Path) (Storage, error) {
+ gs, err := s.newGSClient(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create Google Storage client.")
+ return nil, err
+ }
+ defer func() {
+ if gs != nil {
+ if err := gs.Close(); err != nil {
+ log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
+ }
+ }
+ }()
+
+ st, err := archive.New(c, archive.Options{
+ Index: index,
+ Stream: stream,
+ Client: gs,
+ Cache: s.getStorageCache(),
+ })
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create Google Storage storage instance.")
+ return nil, err
+ }
+
+ gs = nil // Don't close in defer.
+ return &googleStorage{
+ Storage: st,
+ svc: s,
+ gs: gs,
+ stream: stream,
+ index: index,
+ }, nil
+}
+
+func (s *prodServicesInst) newGSClient(c context.Context) (gs.Client, error) {
// Get an Authenticator bound to the token scopes that we need for
// authenticated Cloud Storage access.
transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(gs.ReadOnlyScopes...))
@@ -284,4 +340,138 @@ func (s *prodServicesInst) nextArchiveIndex() uint64 {
var storageCacheSingleton StorageCache
-func (s *prodServicesInst) StorageCache() caching.Cache { return &storageCacheSingleton }
+func (s *prodServicesInst) getStorageCache() caching.Cache { return &storageCacheSingleton }
+
+// Storage is an interface to storage used by the Coordinator.
+type Storage interface {
+ // Storage is the base Storage instance.
+ storage.Storage
+
+ // GetSignedURLs attempts to sign the storage's stream's RecordIO archive
+ // stream storage URL.
+ //
+ // If signing is not supported by this Storage instance, this will return
+ // a nil signing response and no error.
+ GetSignedURLs(context.Context, *URLSigningRequest) (*URLSigningResponse, error)
+}
+
+// URLSigningRequest is the set of URL signing parameters passed to a
+// Storage.GetSignedURLs call.
+type URLSigningRequest struct {
+ // Expriation is the signed URL expiration time.
+ Lifetime time.Duration
+
+ // Stream, if true, requests a signed log stream URL.
+ Stream bool
+ // Index, if true, requests a signed log stream index URL.
+ Index bool
+}
+
+// HasWork returns true if this signing request actually has work that is
+// requested.
+func (r *URLSigningRequest) HasWork() bool {
+ return (r.Stream || r.Index) && (r.Lifetime > 0)
+}
+
+// URLSigningResponse is the resulting signed URLs from a Storage.GetSignedURLs
+// call.
+type URLSigningResponse struct {
+ // Expriation is the signed URL expiration time.
+ Expiration time.Time
+
+ // Stream is the signed URL for the log stream, if requested.
+ Stream string
+ // Index is the signed URL for the log stream index, if requested.
+ Index string
+}
+
+// intermediateStorage is a Storage instance bound to BigTable.
+type bigTableStorage struct {
+ // Storage is the base storage.Storage instance.
+ storage.Storage
+}
+
+func (*bigTableStorage) GetSignedURLs(context.Context, *URLSigningRequest) (*URLSigningResponse, error) {
+ return nil, nil
+}
+
+type googleStorage struct {
+ // Storage is the base storage.Storage instance.
+ storage.Storage
+ // svc is the services instance that created this.
+ svc *prodServicesInst
+
+ // ctx is the Context that was bound at the time of of creation.
+ ctx context.Context
+ // gs is the backing Google Storage client.
+ gs gs.Client
+
+ // stream is the stream's Google Storage URL.
+ stream gs.Path
+ // index is the index's Google Storage URL.
+ index gs.Path
+
+ gsSigningOpts func(context.Context) (*gcst.SignedURLOptions, error)
+}
+
+func (si *googleStorage) Close() {
+ if err := si.gs.Close(); err != nil {
+ log.WithError(err).Warningf(si.ctx, "Failed to close Google Storage client.")
+ }
+ si.Storage.Close()
+}
+
+func (si *googleStorage) GetSignedURLs(c context.Context, req *URLSigningRequest) (*URLSigningResponse, error) {
+ info, err := si.svc.signer.ServiceInfo(c)
+ if err != nil {
+ return nil, errors.Annotate(err).InternalReason("failed to get service info").Err()
+ }
+
+ lifetime := req.Lifetime
+ switch {
+ case lifetime < 0:
+ return nil, errors.Reason("invalid signed URL lifetime: %(lifetime)s").D("lifetime", lifetime).Err()
+
+ case lifetime > maxSignedURLLifetime:
+ lifetime = maxSignedURLLifetime
+ }
+
+ // Get our signing options.
+ resp := URLSigningResponse{
+ Expiration: clock.Now(c).Add(lifetime),
+ }
+ opts := gcst.SignedURLOptions{
+ GoogleAccessID: info.ServiceAccountName,
+ SignBytes: func(b []byte) ([]byte, error) {
+ _, signedBytes, err := si.svc.signer.SignBytes(c, b)
+ return signedBytes, err
+ },
+ Method: "GET",
+ Expires: resp.Expiration,
+ }
+
+ doSign := func(path gs.Path) (string, error) {
+ url, err := gcst.SignedURL(path.Bucket(), path.Filename(), &opts)
+ if err != nil {
+ return "", errors.Annotate(err).InternalReason("failed to sign URL").
+ D("bucket", path.Bucket()).D("filename", path.Filename).Err()
+ }
+ return url, nil
+ }
+
+ // Sign stream URL.
+ if req.Stream {
+ if resp.Stream, err = doSign(si.stream); err != nil {
+ return nil, errors.Annotate(err).InternalReason("failed to sign stream URL").Err()
+ }
+ }
+
+ // Sign index URL.
+ if req.Index {
+ if resp.Index, err = doSign(si.index); err != nil {
+ return nil, errors.Annotate(err).InternalReason("failed to sign index URL").Err()
+ }
+ }
+
+ return &resp, nil
+}

Powered by Google App Engine
This is Rietveld 408576698