Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(630)

Unified Diff: logdog/appengine/coordinator/service.go

Issue 2538203002: LogDog: Add signed GS URL fetching. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: logdog/appengine/coordinator/service.go
diff --git a/logdog/appengine/coordinator/service.go b/logdog/appengine/coordinator/service.go
index 0b644d3c5e2f97a2aac4d51e8ee65719e90a986e..6f5e4e8813833a1cdfed27644f82bfa628483664 100644
--- a/logdog/appengine/coordinator/service.go
+++ b/logdog/appengine/coordinator/service.go
@@ -8,8 +8,10 @@ import (
"net/http"
"sync"
"sync/atomic"
+ "time"
"github.com/luci/luci-go/appengine/gaemiddleware"
+ "github.com/luci/luci-go/common/clock"
luciConfig "github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
@@ -18,18 +20,29 @@ import (
"github.com/luci/luci-go/logdog/api/config/svcconfig"
"github.com/luci/luci-go/logdog/appengine/coordinator/config"
"github.com/luci/luci-go/logdog/common/storage"
+ "github.com/luci/luci-go/logdog/common/storage/archive"
"github.com/luci/luci-go/logdog/common/storage/bigtable"
"github.com/luci/luci-go/logdog/common/storage/caching"
"github.com/luci/luci-go/server/auth"
"github.com/luci/luci-go/server/router"
+ "github.com/luci/gae/service/info"
+
gcps "cloud.google.com/go/pubsub"
+ gcst "cloud.google.com/go/storage"
"golang.org/x/net/context"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
+// ErrSigningNotSupported is a sentinel error returned by Storage.SignURL if
+// signing is not supported.
+var ErrSigningNotSupported = errors.New("signing URLs is not supported")
+
+// maxSignedURLLifetime is the maximum allowed signed URL lifetime.
Vadim Sh. 2016/11/30 21:03:52 mention this limit in *.proto doc too
dnj 2016/12/01 17:39:31 I don't want to put constraints in the proto doc,
Vadim Sh. 2016/12/01 19:32:12 1. The doc doesn't mention it anymore. 2. "<=" is
+const maxSignedURLLifetime = 1 * time.Hour
+
// Services is a set of support services used by Coordinator.
//
// Each Services instance is valid for a singel request, but can be re-used
@@ -56,20 +69,13 @@ type Services interface {
// Returns the same error codes as config.ProjectConfig.
ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.ProjectConfig, error)
- // Storage returns an intermediate storage instance for use by this service.
+ // Storage returns a Storage instance for the supplied log stream.
//
// The caller must close the returned instance if successful.
- IntermediateStorage(context.Context) (storage.Storage, error)
-
- // GSClient instantiates a Google Storage client.
- GSClient(context.Context) (gs.Client, error)
+ StorageForStream(context.Context, *LogStreamState) (Storage, error)
// ArchivalPublisher returns an ArchivalPublisher instance.
ArchivalPublisher(context.Context) (ArchivalPublisher, error)
-
- // StorageCache returns the storage cache instance to use, or nil for no
- // caching.
- StorageCache() caching.Cache
}
// ProdServices is middleware chain used by Coordinator services.
@@ -158,7 +164,21 @@ func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P
return s.getOrCreateCachedProjectConfig(project).resolve(c)
}
-func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Storage, error) {
+func (s *prodServicesInst) StorageForStream(c context.Context, lst *LogStreamState) (Storage, error) {
+ if !lst.ArchivalState().Archived() {
+ log.Debugf(c, "Log is not archived. Fetching from intermediate storage.")
+ return s.newBigTableStorage(c)
+ }
+
+ log.Fields{
+ "indexURL": lst.ArchiveIndexURL,
+ "streamURL": lst.ArchiveStreamURL,
+ "archiveTime": lst.ArchivedTime,
+ }.Debugf(c, "Log is archived. Fetching from archive storage.")
+ return s.newGoogleStorage(c, gs.Path(lst.ArchiveIndexURL), gs.Path(lst.ArchiveStreamURL))
+}
+
+func (s *prodServicesInst) newBigTableStorage(c context.Context) (Storage, error) {
cfg, err := s.Config(c)
if err != nil {
return nil, err
@@ -213,16 +233,52 @@ func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Stora
ClientOptions: []option.ClientOption{
option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)),
},
- Cache: s.StorageCache(),
+ Cache: s.getStorageCache(),
})
if err != nil {
log.WithError(err).Errorf(c, "Failed to create BigTable instance.")
return nil, err
}
- return st, nil
+
+ return &bigTableStorage{
+ Storage: st,
+ }, nil
+}
+
+func (s *prodServicesInst) newGoogleStorage(c context.Context, index, stream gs.Path) (Storage, error) {
+ gs, err := s.newGSClient(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create Google Storage client.")
+ return nil, err
+ }
+ defer func() {
+ if gs != nil {
+ if err := gs.Close(); err != nil {
+ log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
+ }
+ }
+ }()
+
+ st, err := archive.New(c, archive.Options{
+ Index: index,
+ Stream: stream,
+ Client: gs,
+ Cache: s.getStorageCache(),
+ })
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create Google Storage storage instance.")
+ return nil, err
+ }
+
+ gs = nil // Don't close in defer.
Vadim Sh. 2016/11/30 21:03:52 nit: well.. there's only one exit point from this
dnj 2016/12/01 17:39:31 I suppose I gravitate towards defer to protect aga
+ return &googleStorage{
+ Storage: st,
+ gs: gs,
+ stream: stream,
+ }, nil
}
-func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) {
+func (s *prodServicesInst) newGSClient(c context.Context) (gs.Client, error) {
// Get an Authenticator bound to the token scopes that we need for
// authenticated Cloud Storage access.
transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(gs.ReadOnlyScopes...))
@@ -284,4 +340,84 @@ func (s *prodServicesInst) nextArchiveIndex() uint64 {
var storageCacheSingleton StorageCache
-func (s *prodServicesInst) StorageCache() caching.Cache { return &storageCacheSingleton }
+func (s *prodServicesInst) getStorageCache() caching.Cache { return &storageCacheSingleton }
+
+// Storage is an interface to storage used by the Coordinator.
+type Storage interface {
+ // Storage is the base Storage instance.
+ storage.Storage
+
+ // SignStreamURL attempts to sign the storage's stream's RecordIO archive
+ // stream storage URL.
+ //
+ // If signing is not supported by this Storage instance, this will return
+ // ErrNotSupported.
+ SignStreamURL(context.Context, time.Duration) (string, time.Time, error)
+}
+
+// intermediateStorage is a Storage instance bound to BigTable.
+type bigTableStorage struct {
+ // Storage is the base storage.Storage instance.
+ storage.Storage
+}
+
+func (*bigTableStorage) SignStreamURL(c context.Context, lifetime time.Duration) (string, time.Time, error) {
+ return "", time.Time{}, ErrSigningNotSupported
+}
+
+type googleStorage struct {
+ // Storage is the base storage.Storage instance.
+ storage.Storage
+
+ // ctx is the Context that was bound at the time of of creation.
+ ctx context.Context
+ // gs is the backing Google Storage client.
+ gs gs.Client
+ // stream is the stream's Google Storage URL.
+ stream gs.Path
+
+ gsSigningOpts func(context.Context) (*gcst.SignedURLOptions, error)
+}
+
+func (gs *googleStorage) Close() {
+ if err := gs.gs.Close(); err != nil {
+ log.WithError(err).Warningf(gs.ctx, "Failed to close Google Storage client.")
+ }
+ gs.Storage.Close()
+}
+
+func (gs *googleStorage) SignStreamURL(c context.Context, lifetime time.Duration) (url string, expires time.Time, err error) {
+ acct, err := info.ServiceAccount(c)
Vadim Sh. 2016/11/30 21:09:09 oh, btw this is RPC to the backend too: https://gi
dnj 2016/12/01 17:39:30 Done.
+ if err != nil {
+ err = errors.Annotate(err).InternalReason("failed to get service account name").Err()
+ return
+ }
+
+ switch {
+ case lifetime < 0:
+ err = errors.Reason("invalid signed URL lifetime: %(lifetime)s").D("lifetime", lifetime).Err()
+ return
+
+ case lifetime > maxSignedURLLifetime:
+ lifetime = maxSignedURLLifetime
+ }
+
+ // Get our signing options.
+ expires = clock.Now(c).Add(lifetime)
+ opts := gcst.SignedURLOptions{
+ GoogleAccessID: acct,
+ SignBytes: func(b []byte) ([]byte, error) {
+ _, signedBytes, err := info.SignBytes(c, b)
+ return signedBytes, err
+ },
+ Method: "GET",
+ Expires: expires,
+ }
+
+ if url, err = gcst.SignedURL(gs.stream.Bucket(), gs.stream.Filename(), &opts); err != nil {
Vadim Sh. 2016/11/30 21:03:52 consider caching the result in the future. SignByt
dnj 2016/12/01 17:39:31 This gets tricky, since the actual signed URLs hav
Vadim Sh. 2016/12/01 19:32:13 That's why I assumed >= in sign_entry_url_lifetime
+ err = errors.Annotate(err).InternalReason("failed to sign URL").
+ D("bucket", gs.stream.Bucket()).D("filename", gs.stream.Filename).Err()
+ return
+ }
+ return
+}

Powered by Google App Engine
This is Rietveld 408576698