Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(301)

Side by Side Diff: logdog/appengine/coordinator/service.go

Issue 2538203002: LogDog: Add signed GS URL fetching. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "net/http" 8 "net/http"
9 "sync" 9 "sync"
10 "sync/atomic" 10 "sync/atomic"
11 "time"
11 12
12 "github.com/luci/luci-go/appengine/gaemiddleware" 13 "github.com/luci/luci-go/appengine/gaemiddleware"
14 "github.com/luci/luci-go/common/clock"
13 luciConfig "github.com/luci/luci-go/common/config" 15 luciConfig "github.com/luci/luci-go/common/config"
14 "github.com/luci/luci-go/common/errors" 16 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/gcloud/gs" 17 "github.com/luci/luci-go/common/gcloud/gs"
16 "github.com/luci/luci-go/common/gcloud/pubsub" 18 "github.com/luci/luci-go/common/gcloud/pubsub"
17 log "github.com/luci/luci-go/common/logging" 19 log "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/logdog/api/config/svcconfig" 20 "github.com/luci/luci-go/logdog/api/config/svcconfig"
19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" 21 "github.com/luci/luci-go/logdog/appengine/coordinator/config"
20 "github.com/luci/luci-go/logdog/common/storage" 22 "github.com/luci/luci-go/logdog/common/storage"
23 "github.com/luci/luci-go/logdog/common/storage/archive"
21 "github.com/luci/luci-go/logdog/common/storage/bigtable" 24 "github.com/luci/luci-go/logdog/common/storage/bigtable"
22 "github.com/luci/luci-go/logdog/common/storage/caching" 25 "github.com/luci/luci-go/logdog/common/storage/caching"
23 "github.com/luci/luci-go/server/auth" 26 "github.com/luci/luci-go/server/auth"
24 "github.com/luci/luci-go/server/router" 27 "github.com/luci/luci-go/server/router"
25 28
29 "github.com/luci/gae/service/info"
30
26 gcps "cloud.google.com/go/pubsub" 31 gcps "cloud.google.com/go/pubsub"
32 gcst "cloud.google.com/go/storage"
27 "golang.org/x/net/context" 33 "golang.org/x/net/context"
28 "google.golang.org/api/option" 34 "google.golang.org/api/option"
29 "google.golang.org/grpc" 35 "google.golang.org/grpc"
30 "google.golang.org/grpc/metadata" 36 "google.golang.org/grpc/metadata"
31 ) 37 )
32 38
39 // ErrSigningNotSupported is a sentinel error returned by Storage.SignURL if
40 // signing is not supported.
41 var ErrSigningNotSupported = errors.New("signing URLs is not supported")
42
43 // maxSignedURLLifetime is the maximum allowed signed URL lifetime.
Vadim Sh. 2016/11/30 21:03:52 mention this limit in *.proto doc too
dnj 2016/12/01 17:39:31 I don't want to put constraints in the proto doc,
Vadim Sh. 2016/12/01 19:32:12 1. The doc doesn't mention it anymore. 2. "<=" is
44 const maxSignedURLLifetime = 1 * time.Hour
45
33 // Services is a set of support services used by Coordinator. 46 // Services is a set of support services used by Coordinator.
34 // 47 //
35 // Each Services instance is valid for a singel request, but can be re-used 48 // Each Services instance is valid for a singel request, but can be re-used
36 // throughout that request. This is advised, as the Services instance may 49 // throughout that request. This is advised, as the Services instance may
37 // optionally cache values. 50 // optionally cache values.
38 // 51 //
39 // Services methods are goroutine-safe. 52 // Services methods are goroutine-safe.
40 // 53 //
41 // By default, a production set of services will be used. However, this can be 54 // By default, a production set of services will be used. However, this can be
42 // overridden for testing to mock the service layer. 55 // overridden for testing to mock the service layer.
43 type Services interface { 56 type Services interface {
44 // Config returns the current instance and application configuration 57 // Config returns the current instance and application configuration
45 // instances. 58 // instances.
46 // 59 //
47 // The production instance will cache the results for the duration of th e 60 // The production instance will cache the results for the duration of th e
48 // request. 61 // request.
49 Config(context.Context) (*config.Config, error) 62 Config(context.Context) (*config.Config, error)
50 63
51 // ProjectConfig returns the project configuration for the named project . 64 // ProjectConfig returns the project configuration for the named project .
52 // 65 //
53 // The production instance will cache the results for the duration of th e 66 // The production instance will cache the results for the duration of th e
54 // request. 67 // request.
55 // 68 //
56 // Returns the same error codes as config.ProjectConfig. 69 // Returns the same error codes as config.ProjectConfig.
57 ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.Proje ctConfig, error) 70 ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.Proje ctConfig, error)
58 71
59 » // Storage returns an intermediate storage instance for use by this serv ice. 72 » // Storage returns a Storage instance for the supplied log stream.
60 // 73 //
61 // The caller must close the returned instance if successful. 74 // The caller must close the returned instance if successful.
62 » IntermediateStorage(context.Context) (storage.Storage, error) 75 » StorageForStream(context.Context, *LogStreamState) (Storage, error)
63
64 » // GSClient instantiates a Google Storage client.
65 » GSClient(context.Context) (gs.Client, error)
66 76
67 // ArchivalPublisher returns an ArchivalPublisher instance. 77 // ArchivalPublisher returns an ArchivalPublisher instance.
68 ArchivalPublisher(context.Context) (ArchivalPublisher, error) 78 ArchivalPublisher(context.Context) (ArchivalPublisher, error)
69
70 // StorageCache returns the storage cache instance to use, or nil for no
71 // caching.
72 StorageCache() caching.Cache
73 } 79 }
74 80
75 // ProdServices is middleware chain used by Coordinator services. 81 // ProdServices is middleware chain used by Coordinator services.
76 // 82 //
77 // It sets up basic GAE functionality as well as installs a production Services 83 // It sets up basic GAE functionality as well as installs a production Services
78 // instance. 84 // instance.
79 func ProdServices() router.MiddlewareChain { 85 func ProdServices() router.MiddlewareChain {
80 return gaemiddleware.BaseProd().Extend(func(c *router.Context, next rout er.Handler) { 86 return gaemiddleware.BaseProd().Extend(func(c *router.Context, next rout er.Handler) {
81 c.Context = WithServices(c.Context, &prodServicesInst{}) 87 c.Context = WithServices(c.Context, &prodServicesInst{})
82 next(c) 88 next(c)
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
151 } 157 }
152 s.projectConfigs[project] = cp 158 s.projectConfigs[project] = cp
153 } 159 }
154 return cp 160 return cp
155 } 161 }
156 162
157 func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P rojectName) (*svcconfig.ProjectConfig, error) { 163 func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P rojectName) (*svcconfig.ProjectConfig, error) {
158 return s.getOrCreateCachedProjectConfig(project).resolve(c) 164 return s.getOrCreateCachedProjectConfig(project).resolve(c)
159 } 165 }
160 166
161 func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Stora ge, error) { 167 func (s *prodServicesInst) StorageForStream(c context.Context, lst *LogStreamSta te) (Storage, error) {
168 » if !lst.ArchivalState().Archived() {
169 » » log.Debugf(c, "Log is not archived. Fetching from intermediate s torage.")
170 » » return s.newBigTableStorage(c)
171 » }
172
173 » log.Fields{
174 » » "indexURL": lst.ArchiveIndexURL,
175 » » "streamURL": lst.ArchiveStreamURL,
176 » » "archiveTime": lst.ArchivedTime,
177 » }.Debugf(c, "Log is archived. Fetching from archive storage.")
178 » return s.newGoogleStorage(c, gs.Path(lst.ArchiveIndexURL), gs.Path(lst.A rchiveStreamURL))
179 }
180
181 func (s *prodServicesInst) newBigTableStorage(c context.Context) (Storage, error ) {
162 cfg, err := s.Config(c) 182 cfg, err := s.Config(c)
163 if err != nil { 183 if err != nil {
164 return nil, err 184 return nil, err
165 } 185 }
166 186
167 // Is BigTable configured? 187 // Is BigTable configured?
168 if cfg.Storage == nil { 188 if cfg.Storage == nil {
169 return nil, errors.New("no storage configuration") 189 return nil, errors.New("no storage configuration")
170 } 190 }
171 191
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
206 // calls. 226 // calls.
207 c = metadata.NewContext(c, nil) 227 c = metadata.NewContext(c, nil)
208 228
209 st, err := bigtable.New(c, bigtable.Options{ 229 st, err := bigtable.New(c, bigtable.Options{
210 Project: bt.Project, 230 Project: bt.Project,
211 Instance: bt.Instance, 231 Instance: bt.Instance,
212 LogTable: bt.LogTableName, 232 LogTable: bt.LogTableName,
213 ClientOptions: []option.ClientOption{ 233 ClientOptions: []option.ClientOption{
214 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cre ds)), 234 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cre ds)),
215 }, 235 },
216 » » Cache: s.StorageCache(), 236 » » Cache: s.getStorageCache(),
217 }) 237 })
218 if err != nil { 238 if err != nil {
219 log.WithError(err).Errorf(c, "Failed to create BigTable instance .") 239 log.WithError(err).Errorf(c, "Failed to create BigTable instance .")
220 return nil, err 240 return nil, err
221 } 241 }
222 » return st, nil 242
243 » return &bigTableStorage{
244 » » Storage: st,
245 » }, nil
223 } 246 }
224 247
225 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { 248 func (s *prodServicesInst) newGoogleStorage(c context.Context, index, stream gs. Path) (Storage, error) {
249 » gs, err := s.newGSClient(c)
250 » if err != nil {
251 » » log.WithError(err).Errorf(c, "Failed to create Google Storage cl ient.")
252 » » return nil, err
253 » }
254 » defer func() {
255 » » if gs != nil {
256 » » » if err := gs.Close(); err != nil {
257 » » » » log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
258 » » » }
259 » » }
260 » }()
261
262 » st, err := archive.New(c, archive.Options{
263 » » Index: index,
264 » » Stream: stream,
265 » » Client: gs,
266 » » Cache: s.getStorageCache(),
267 » })
268 » if err != nil {
269 » » log.WithError(err).Errorf(c, "Failed to create Google Storage st orage instance.")
270 » » return nil, err
271 » }
272
273 » gs = nil // Don't close in defer.
Vadim Sh. 2016/11/30 21:03:52 nit: well.. there's only one exit point from this
dnj 2016/12/01 17:39:31 I suppose I gravitate towards defer to protect aga
274 » return &googleStorage{
275 » » Storage: st,
276 » » gs: gs,
277 » » stream: stream,
278 » }, nil
279 }
280
281 func (s *prodServicesInst) newGSClient(c context.Context) (gs.Client, error) {
226 // Get an Authenticator bound to the token scopes that we need for 282 // Get an Authenticator bound to the token scopes that we need for
227 // authenticated Cloud Storage access. 283 // authenticated Cloud Storage access.
228 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(g s.ReadOnlyScopes...)) 284 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(g s.ReadOnlyScopes...))
229 if err != nil { 285 if err != nil {
230 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra nsport.") 286 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra nsport.")
231 return nil, errors.New("failed to create Cloud Storage transport ") 287 return nil, errors.New("failed to create Cloud Storage transport ")
232 } 288 }
233 return gs.NewProdClient(c, transport) 289 return gs.NewProdClient(c, transport)
234 } 290 }
235 291
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
277 // more than MaxInt32 archival tasks. 333 // more than MaxInt32 archival tasks.
278 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 334 v := atomic.AddInt32(&s.archivalIndex, 1) - 1
279 if v < 0 { 335 if v < 0 {
280 panic("archival index has wrapped") 336 panic("archival index has wrapped")
281 } 337 }
282 return uint64(v) 338 return uint64(v)
283 } 339 }
284 340
285 var storageCacheSingleton StorageCache 341 var storageCacheSingleton StorageCache
286 342
287 func (s *prodServicesInst) StorageCache() caching.Cache { return &storageCacheSi ngleton } 343 func (s *prodServicesInst) getStorageCache() caching.Cache { return &storageCach eSingleton }
344
345 // Storage is an interface to storage used by the Coordinator.
346 type Storage interface {
347 » // Storage is the base Storage instance.
348 » storage.Storage
349
350 » // SignStreamURL attempts to sign the storage's stream's RecordIO archiv e
351 » // stream storage URL.
352 » //
353 » // If signing is not supported by this Storage instance, this will retur n
354 » // ErrNotSupported.
355 » SignStreamURL(context.Context, time.Duration) (string, time.Time, error)
356 }
357
358 // intermediateStorage is a Storage instance bound to BigTable.
359 type bigTableStorage struct {
360 » // Storage is the base storage.Storage instance.
361 » storage.Storage
362 }
363
364 func (*bigTableStorage) SignStreamURL(c context.Context, lifetime time.Duration) (string, time.Time, error) {
365 » return "", time.Time{}, ErrSigningNotSupported
366 }
367
368 type googleStorage struct {
369 » // Storage is the base storage.Storage instance.
370 » storage.Storage
371
372 » // ctx is the Context that was bound at the time of of creation.
373 » ctx context.Context
374 » // gs is the backing Google Storage client.
375 » gs gs.Client
376 » // stream is the stream's Google Storage URL.
377 » stream gs.Path
378
379 » gsSigningOpts func(context.Context) (*gcst.SignedURLOptions, error)
380 }
381
382 func (gs *googleStorage) Close() {
383 » if err := gs.gs.Close(); err != nil {
384 » » log.WithError(err).Warningf(gs.ctx, "Failed to close Google Stor age client.")
385 » }
386 » gs.Storage.Close()
387 }
388
389 func (gs *googleStorage) SignStreamURL(c context.Context, lifetime time.Duration ) (url string, expires time.Time, err error) {
390 » acct, err := info.ServiceAccount(c)
Vadim Sh. 2016/11/30 21:09:09 oh, btw this is RPC to the backend too: https://gi
dnj 2016/12/01 17:39:30 Done.
391 » if err != nil {
392 » » err = errors.Annotate(err).InternalReason("failed to get service account name").Err()
393 » » return
394 » }
395
396 » switch {
397 » case lifetime < 0:
398 » » err = errors.Reason("invalid signed URL lifetime: %(lifetime)s") .D("lifetime", lifetime).Err()
399 » » return
400
401 » case lifetime > maxSignedURLLifetime:
402 » » lifetime = maxSignedURLLifetime
403 » }
404
405 » // Get our signing options.
406 » expires = clock.Now(c).Add(lifetime)
407 » opts := gcst.SignedURLOptions{
408 » » GoogleAccessID: acct,
409 » » SignBytes: func(b []byte) ([]byte, error) {
410 » » » _, signedBytes, err := info.SignBytes(c, b)
411 » » » return signedBytes, err
412 » » },
413 » » Method: "GET",
414 » » Expires: expires,
415 » }
416
417 » if url, err = gcst.SignedURL(gs.stream.Bucket(), gs.stream.Filename(), & opts); err != nil {
Vadim Sh. 2016/11/30 21:03:52 consider caching the result in the future. SignByt
dnj 2016/12/01 17:39:31 This gets tricky, since the actual signed URLs hav
Vadim Sh. 2016/12/01 19:32:13 That's why I assumed >= in sign_entry_url_lifetime
418 » » err = errors.Annotate(err).InternalReason("failed to sign URL").
419 » » » D("bucket", gs.stream.Bucket()).D("filename", gs.stream. Filename).Err()
420 » » return
421 » }
422 » return
423 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698