Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1201)

Side by Side Diff: logdog/appengine/coordinator/service.go

Issue 2538203002: LogDog: Add signed GS URL fetching. (Closed)
Patch Set: Allow index signing, use gaesigner. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "net/http" 8 "net/http"
9 "sync" 9 "sync"
10 "sync/atomic" 10 "sync/atomic"
11 "time"
11 12
13 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner"
12 "github.com/luci/luci-go/appengine/gaemiddleware" 14 "github.com/luci/luci-go/appengine/gaemiddleware"
15 "github.com/luci/luci-go/common/clock"
13 luciConfig "github.com/luci/luci-go/common/config" 16 luciConfig "github.com/luci/luci-go/common/config"
14 "github.com/luci/luci-go/common/errors" 17 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/gcloud/gs" 18 "github.com/luci/luci-go/common/gcloud/gs"
16 "github.com/luci/luci-go/common/gcloud/pubsub" 19 "github.com/luci/luci-go/common/gcloud/pubsub"
17 log "github.com/luci/luci-go/common/logging" 20 log "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/logdog/api/config/svcconfig" 21 "github.com/luci/luci-go/logdog/api/config/svcconfig"
19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" 22 "github.com/luci/luci-go/logdog/appengine/coordinator/config"
20 "github.com/luci/luci-go/logdog/common/storage" 23 "github.com/luci/luci-go/logdog/common/storage"
24 "github.com/luci/luci-go/logdog/common/storage/archive"
21 "github.com/luci/luci-go/logdog/common/storage/bigtable" 25 "github.com/luci/luci-go/logdog/common/storage/bigtable"
22 "github.com/luci/luci-go/logdog/common/storage/caching" 26 "github.com/luci/luci-go/logdog/common/storage/caching"
23 "github.com/luci/luci-go/server/auth" 27 "github.com/luci/luci-go/server/auth"
24 "github.com/luci/luci-go/server/router" 28 "github.com/luci/luci-go/server/router"
25 29
26 gcps "cloud.google.com/go/pubsub" 30 gcps "cloud.google.com/go/pubsub"
31 gcst "cloud.google.com/go/storage"
27 "golang.org/x/net/context" 32 "golang.org/x/net/context"
28 "google.golang.org/api/option" 33 "google.golang.org/api/option"
29 "google.golang.org/grpc" 34 "google.golang.org/grpc"
30 "google.golang.org/grpc/metadata" 35 "google.golang.org/grpc/metadata"
31 ) 36 )
32 37
38 // maxSignedURLLifetime is the maximum allowed signed URL lifetime.
39 const maxSignedURLLifetime = 1 * time.Hour
40
33 // Services is a set of support services used by Coordinator. 41 // Services is a set of support services used by Coordinator.
34 // 42 //
35 // Each Services instance is valid for a singel request, but can be re-used 43 // Each Services instance is valid for a singel request, but can be re-used
36 // throughout that request. This is advised, as the Services instance may 44 // throughout that request. This is advised, as the Services instance may
37 // optionally cache values. 45 // optionally cache values.
38 // 46 //
39 // Services methods are goroutine-safe. 47 // Services methods are goroutine-safe.
40 // 48 //
41 // By default, a production set of services will be used. However, this can be 49 // By default, a production set of services will be used. However, this can be
42 // overridden for testing to mock the service layer. 50 // overridden for testing to mock the service layer.
43 type Services interface { 51 type Services interface {
44 // Config returns the current instance and application configuration 52 // Config returns the current instance and application configuration
45 // instances. 53 // instances.
46 // 54 //
47 // The production instance will cache the results for the duration of th e 55 // The production instance will cache the results for the duration of th e
48 // request. 56 // request.
49 Config(context.Context) (*config.Config, error) 57 Config(context.Context) (*config.Config, error)
50 58
51 // ProjectConfig returns the project configuration for the named project . 59 // ProjectConfig returns the project configuration for the named project .
52 // 60 //
53 // The production instance will cache the results for the duration of th e 61 // The production instance will cache the results for the duration of th e
54 // request. 62 // request.
55 // 63 //
56 // Returns the same error codes as config.ProjectConfig. 64 // Returns the same error codes as config.ProjectConfig.
57 ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.Proje ctConfig, error) 65 ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.Proje ctConfig, error)
58 66
59 » // Storage returns an intermediate storage instance for use by this serv ice. 67 » // Storage returns a Storage instance for the supplied log stream.
60 // 68 //
61 // The caller must close the returned instance if successful. 69 // The caller must close the returned instance if successful.
62 » IntermediateStorage(context.Context) (storage.Storage, error) 70 » StorageForStream(context.Context, *LogStreamState) (Storage, error)
63
64 » // GSClient instantiates a Google Storage client.
65 » GSClient(context.Context) (gs.Client, error)
66 71
67 // ArchivalPublisher returns an ArchivalPublisher instance. 72 // ArchivalPublisher returns an ArchivalPublisher instance.
68 ArchivalPublisher(context.Context) (ArchivalPublisher, error) 73 ArchivalPublisher(context.Context) (ArchivalPublisher, error)
69
70 // StorageCache returns the storage cache instance to use, or nil for no
71 // caching.
72 StorageCache() caching.Cache
73 } 74 }
74 75
75 // ProdServices is middleware chain used by Coordinator services. 76 // ProdServices is middleware chain used by Coordinator services.
76 // 77 //
77 // It sets up basic GAE functionality as well as installs a production Services 78 // It sets up basic GAE functionality as well as installs a production Services
78 // instance. 79 // instance.
79 func ProdServices() router.MiddlewareChain { 80 func ProdServices() router.MiddlewareChain {
80 return gaemiddleware.BaseProd().Extend(func(c *router.Context, next rout er.Handler) { 81 return gaemiddleware.BaseProd().Extend(func(c *router.Context, next rout er.Handler) {
81 c.Context = WithServices(c.Context, &prodServicesInst{}) 82 c.Context = WithServices(c.Context, &prodServicesInst{})
82 next(c) 83 next(c)
83 }) 84 })
84 } 85 }
85 86
86 // prodServicesInst is a Service exposing production faciliites. A unique 87 // prodServicesInst is a Service exposing production faciliites. A unique
87 // instance is bound to each each request. 88 // instance is bound to each each request.
88 type prodServicesInst struct { 89 type prodServicesInst struct {
89 sync.Mutex 90 sync.Mutex
90 91
91 // gcfg is the cached global configuration. 92 // gcfg is the cached global configuration.
92 gcfg *config.Config 93 gcfg *config.Config
93 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig 94 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig
94 95
95 // archivalIndex is the atomically-manipulated archival index for the 96 // archivalIndex is the atomically-manipulated archival index for the
96 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces 97 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces
97 // from this service. 98 // from this service.
98 archivalIndex int32 99 archivalIndex int32
100
101 // signer is the signer instance to use.
102 signer gaesigner.Signer
99 } 103 }
100 104
101 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { 105 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) {
102 s.Lock() 106 s.Lock()
103 defer s.Unlock() 107 defer s.Unlock()
104 108
105 // Load/cache the global config. 109 // Load/cache the global config.
106 if s.gcfg == nil { 110 if s.gcfg == nil {
107 var err error 111 var err error
108 s.gcfg, err = config.Load(c) 112 s.gcfg, err = config.Load(c)
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
151 } 155 }
152 s.projectConfigs[project] = cp 156 s.projectConfigs[project] = cp
153 } 157 }
154 return cp 158 return cp
155 } 159 }
156 160
157 func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P rojectName) (*svcconfig.ProjectConfig, error) { 161 func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P rojectName) (*svcconfig.ProjectConfig, error) {
158 return s.getOrCreateCachedProjectConfig(project).resolve(c) 162 return s.getOrCreateCachedProjectConfig(project).resolve(c)
159 } 163 }
160 164
161 func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Stora ge, error) { 165 func (s *prodServicesInst) StorageForStream(c context.Context, lst *LogStreamSta te) (Storage, error) {
166 » if !lst.ArchivalState().Archived() {
167 » » log.Debugf(c, "Log is not archived. Fetching from intermediate s torage.")
168 » » return s.newBigTableStorage(c)
169 » }
170
171 » log.Fields{
172 » » "indexURL": lst.ArchiveIndexURL,
173 » » "streamURL": lst.ArchiveStreamURL,
174 » » "archiveTime": lst.ArchivedTime,
175 » }.Debugf(c, "Log is archived. Fetching from archive storage.")
176 » return s.newGoogleStorage(c, gs.Path(lst.ArchiveIndexURL), gs.Path(lst.A rchiveStreamURL))
177 }
178
179 func (s *prodServicesInst) newBigTableStorage(c context.Context) (Storage, error ) {
162 cfg, err := s.Config(c) 180 cfg, err := s.Config(c)
163 if err != nil { 181 if err != nil {
164 return nil, err 182 return nil, err
165 } 183 }
166 184
167 // Is BigTable configured? 185 // Is BigTable configured?
168 if cfg.Storage == nil { 186 if cfg.Storage == nil {
169 return nil, errors.New("no storage configuration") 187 return nil, errors.New("no storage configuration")
170 } 188 }
171 189
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
206 // calls. 224 // calls.
207 c = metadata.NewContext(c, nil) 225 c = metadata.NewContext(c, nil)
208 226
209 st, err := bigtable.New(c, bigtable.Options{ 227 st, err := bigtable.New(c, bigtable.Options{
210 Project: bt.Project, 228 Project: bt.Project,
211 Instance: bt.Instance, 229 Instance: bt.Instance,
212 LogTable: bt.LogTableName, 230 LogTable: bt.LogTableName,
213 ClientOptions: []option.ClientOption{ 231 ClientOptions: []option.ClientOption{
214 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cre ds)), 232 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cre ds)),
215 }, 233 },
216 » » Cache: s.StorageCache(), 234 » » Cache: s.getStorageCache(),
217 }) 235 })
218 if err != nil { 236 if err != nil {
219 log.WithError(err).Errorf(c, "Failed to create BigTable instance .") 237 log.WithError(err).Errorf(c, "Failed to create BigTable instance .")
220 return nil, err 238 return nil, err
221 } 239 }
222 » return st, nil 240
241 » return &bigTableStorage{
242 » » Storage: st,
243 » }, nil
223 } 244 }
224 245
225 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { 246 func (s *prodServicesInst) newGoogleStorage(c context.Context, index, stream gs. Path) (Storage, error) {
247 » gs, err := s.newGSClient(c)
248 » if err != nil {
249 » » log.WithError(err).Errorf(c, "Failed to create Google Storage cl ient.")
250 » » return nil, err
251 » }
252 » defer func() {
253 » » if gs != nil {
254 » » » if err := gs.Close(); err != nil {
255 » » » » log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
256 » » » }
257 » » }
258 » }()
259
260 » st, err := archive.New(c, archive.Options{
261 » » Index: index,
262 » » Stream: stream,
263 » » Client: gs,
264 » » Cache: s.getStorageCache(),
265 » })
266 » if err != nil {
267 » » log.WithError(err).Errorf(c, "Failed to create Google Storage st orage instance.")
268 » » return nil, err
269 » }
270
271 » gs = nil // Don't close in defer.
272 » return &googleStorage{
273 » » Storage: st,
274 » » svc: s,
275 » » gs: gs,
276 » » stream: stream,
277 » » index: index,
278 » }, nil
279 }
280
281 func (s *prodServicesInst) newGSClient(c context.Context) (gs.Client, error) {
226 // Get an Authenticator bound to the token scopes that we need for 282 // Get an Authenticator bound to the token scopes that we need for
227 // authenticated Cloud Storage access. 283 // authenticated Cloud Storage access.
228 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(g s.ReadOnlyScopes...)) 284 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(g s.ReadOnlyScopes...))
229 if err != nil { 285 if err != nil {
230 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra nsport.") 286 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra nsport.")
231 return nil, errors.New("failed to create Cloud Storage transport ") 287 return nil, errors.New("failed to create Cloud Storage transport ")
232 } 288 }
233 return gs.NewProdClient(c, transport) 289 return gs.NewProdClient(c, transport)
234 } 290 }
235 291
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
277 // more than MaxInt32 archival tasks. 333 // more than MaxInt32 archival tasks.
278 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 334 v := atomic.AddInt32(&s.archivalIndex, 1) - 1
279 if v < 0 { 335 if v < 0 {
280 panic("archival index has wrapped") 336 panic("archival index has wrapped")
281 } 337 }
282 return uint64(v) 338 return uint64(v)
283 } 339 }
284 340
285 var storageCacheSingleton StorageCache 341 var storageCacheSingleton StorageCache
286 342
287 func (s *prodServicesInst) StorageCache() caching.Cache { return &storageCacheSi ngleton } 343 func (s *prodServicesInst) getStorageCache() caching.Cache { return &storageCach eSingleton }
344
345 // Storage is an interface to storage used by the Coordinator.
346 type Storage interface {
347 » // Storage is the base Storage instance.
348 » storage.Storage
349
350 » // GetSignedURLs attempts to sign the storage's stream's RecordIO archiv e
351 » // stream storage URL.
352 » //
353 » // If signing is not supported by this Storage instance, this will retur n
354 » // a nil signing response and no error.
355 » GetSignedURLs(context.Context, *URLSigningRequest) (*URLSigningResponse, error)
356 }
357
358 // URLSigningRequest is the set of URL signing parameters passed to a
359 // Storage.GetSignedURLs call.
360 type URLSigningRequest struct {
361 » // Expriation is the signed URL expiration time.
362 » Lifetime time.Duration
363
364 » // Stream, if true, requests a signed log stream URL.
365 » Stream bool
366 » // Index, if true, requests a signed log stream index URL.
367 » Index bool
368 }
369
370 // HasWork returns true if this signing request actually has work that is
371 // requested.
372 func (r *URLSigningRequest) HasWork() bool {
373 » return (r.Stream || r.Index) && (r.Lifetime > 0)
374 }
375
376 // URLSigningResponse is the resulting signed URLs from a Storage.GetSignedURLs
377 // call.
378 type URLSigningResponse struct {
379 » // Expriation is the signed URL expiration time.
380 » Expiration time.Time
381
382 » // Stream is the signed URL for the log stream, if requested.
383 » Stream string
384 » // Index is the signed URL for the log stream index, if requested.
385 » Index string
386 }
387
388 // intermediateStorage is a Storage instance bound to BigTable.
389 type bigTableStorage struct {
390 » // Storage is the base storage.Storage instance.
391 » storage.Storage
392 }
393
394 func (*bigTableStorage) GetSignedURLs(context.Context, *URLSigningRequest) (*URL SigningResponse, error) {
395 » return nil, nil
396 }
397
398 type googleStorage struct {
399 » // Storage is the base storage.Storage instance.
400 » storage.Storage
401 » // svc is the services instance that created this.
402 » svc *prodServicesInst
403
404 » // ctx is the Context that was bound at the time of of creation.
405 » ctx context.Context
406 » // gs is the backing Google Storage client.
407 » gs gs.Client
408
409 » // stream is the stream's Google Storage URL.
410 » stream gs.Path
411 » // index is the index's Google Storage URL.
412 » index gs.Path
413
414 » gsSigningOpts func(context.Context) (*gcst.SignedURLOptions, error)
415 }
416
417 func (si *googleStorage) Close() {
418 » if err := si.gs.Close(); err != nil {
419 » » log.WithError(err).Warningf(si.ctx, "Failed to close Google Stor age client.")
420 » }
421 » si.Storage.Close()
422 }
423
424 func (si *googleStorage) GetSignedURLs(c context.Context, req *URLSigningRequest ) (*URLSigningResponse, error) {
425 » info, err := si.svc.signer.ServiceInfo(c)
426 » if err != nil {
427 » » return nil, errors.Annotate(err).InternalReason("failed to get s ervice info").Err()
428 » }
429
430 » lifetime := req.Lifetime
431 » switch {
432 » case lifetime < 0:
433 » » return nil, errors.Reason("invalid signed URL lifetime: %(lifeti me)s").D("lifetime", lifetime).Err()
434
435 » case lifetime > maxSignedURLLifetime:
436 » » lifetime = maxSignedURLLifetime
437 » }
438
439 » // Get our signing options.
440 » resp := URLSigningResponse{
441 » » Expiration: clock.Now(c).Add(lifetime),
442 » }
443 » opts := gcst.SignedURLOptions{
444 » » GoogleAccessID: info.ServiceAccountName,
445 » » SignBytes: func(b []byte) ([]byte, error) {
446 » » » _, signedBytes, err := si.svc.signer.SignBytes(c, b)
447 » » » return signedBytes, err
448 » » },
449 » » Method: "GET",
450 » » Expires: resp.Expiration,
451 » }
452
453 » doSign := func(path gs.Path) (string, error) {
454 » » url, err := gcst.SignedURL(path.Bucket(), path.Filename(), &opts )
455 » » if err != nil {
456 » » » return "", errors.Annotate(err).InternalReason("failed t o sign URL").
457 » » » » D("bucket", path.Bucket()).D("filename", path.Fi lename).Err()
458 » » }
459 » » return url, nil
460 » }
461
462 » // Sign stream URL.
463 » if req.Stream {
464 » » if resp.Stream, err = doSign(si.stream); err != nil {
465 » » » return nil, errors.Annotate(err).InternalReason("failed to sign stream URL").Err()
466 » » }
467 » }
468
469 » // Sign index URL.
470 » if req.Index {
471 » » if resp.Index, err = doSign(si.index); err != nil {
472 » » » return nil, errors.Annotate(err).InternalReason("failed to sign index URL").Err()
473 » » }
474 » }
475
476 » return &resp, nil
477 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698