Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/http" | 8 "net/http" |
| 9 "sync" | 9 "sync" |
| 10 "sync/atomic" | 10 "sync/atomic" |
| 11 "time" | |
| 11 | 12 |
| 12 "github.com/luci/luci-go/appengine/gaemiddleware" | 13 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 14 "github.com/luci/luci-go/common/clock" | |
| 13 luciConfig "github.com/luci/luci-go/common/config" | 15 luciConfig "github.com/luci/luci-go/common/config" |
| 14 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/gcloud/gs" | 17 "github.com/luci/luci-go/common/gcloud/gs" |
| 16 "github.com/luci/luci-go/common/gcloud/pubsub" | 18 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 17 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
| 18 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 20 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | 21 "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
| 20 "github.com/luci/luci-go/logdog/common/storage" | 22 "github.com/luci/luci-go/logdog/common/storage" |
| 23 "github.com/luci/luci-go/logdog/common/storage/archive" | |
| 21 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 24 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 22 "github.com/luci/luci-go/logdog/common/storage/caching" | 25 "github.com/luci/luci-go/logdog/common/storage/caching" |
| 23 "github.com/luci/luci-go/server/auth" | 26 "github.com/luci/luci-go/server/auth" |
| 24 "github.com/luci/luci-go/server/router" | 27 "github.com/luci/luci-go/server/router" |
| 25 | 28 |
| 29 "github.com/luci/gae/service/info" | |
| 30 | |
| 26 gcps "cloud.google.com/go/pubsub" | 31 gcps "cloud.google.com/go/pubsub" |
| 32 gcst "cloud.google.com/go/storage" | |
| 27 "golang.org/x/net/context" | 33 "golang.org/x/net/context" |
| 28 "google.golang.org/api/option" | 34 "google.golang.org/api/option" |
| 29 "google.golang.org/grpc" | 35 "google.golang.org/grpc" |
| 30 "google.golang.org/grpc/metadata" | 36 "google.golang.org/grpc/metadata" |
| 31 ) | 37 ) |
| 32 | 38 |
| 39 // ErrSigningNotSupported is a sentinel error returned by Storage.SignURL if | |
| 40 // signing is not supported. | |
| 41 var ErrSigningNotSupported = errors.New("signing URLs is not supported") | |
| 42 | |
| 43 // maxSignedURLLifetime is the maximum allowed signed URL lifetime. | |
|
Vadim Sh.
2016/11/30 21:03:52
mention this limit in *.proto doc too
dnj
2016/12/01 17:39:31
I don't want to put constraints in the proto doc,
Vadim Sh.
2016/12/01 19:32:12
1. The doc doesn't mention it anymore.
2. "<=" is
| |
| 44 const maxSignedURLLifetime = 1 * time.Hour | |
| 45 | |
| 33 // Services is a set of support services used by Coordinator. | 46 // Services is a set of support services used by Coordinator. |
| 34 // | 47 // |
| 35 // Each Services instance is valid for a singel request, but can be re-used | 48 // Each Services instance is valid for a singel request, but can be re-used |
| 36 // throughout that request. This is advised, as the Services instance may | 49 // throughout that request. This is advised, as the Services instance may |
| 37 // optionally cache values. | 50 // optionally cache values. |
| 38 // | 51 // |
| 39 // Services methods are goroutine-safe. | 52 // Services methods are goroutine-safe. |
| 40 // | 53 // |
| 41 // By default, a production set of services will be used. However, this can be | 54 // By default, a production set of services will be used. However, this can be |
| 42 // overridden for testing to mock the service layer. | 55 // overridden for testing to mock the service layer. |
| 43 type Services interface { | 56 type Services interface { |
| 44 // Config returns the current instance and application configuration | 57 // Config returns the current instance and application configuration |
| 45 // instances. | 58 // instances. |
| 46 // | 59 // |
| 47 // The production instance will cache the results for the duration of th e | 60 // The production instance will cache the results for the duration of th e |
| 48 // request. | 61 // request. |
| 49 Config(context.Context) (*config.Config, error) | 62 Config(context.Context) (*config.Config, error) |
| 50 | 63 |
| 51 // ProjectConfig returns the project configuration for the named project . | 64 // ProjectConfig returns the project configuration for the named project . |
| 52 // | 65 // |
| 53 // The production instance will cache the results for the duration of th e | 66 // The production instance will cache the results for the duration of th e |
| 54 // request. | 67 // request. |
| 55 // | 68 // |
| 56 // Returns the same error codes as config.ProjectConfig. | 69 // Returns the same error codes as config.ProjectConfig. |
| 57 ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.Proje ctConfig, error) | 70 ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.Proje ctConfig, error) |
| 58 | 71 |
| 59 » // Storage returns an intermediate storage instance for use by this serv ice. | 72 » // Storage returns a Storage instance for the supplied log stream. |
| 60 // | 73 // |
| 61 // The caller must close the returned instance if successful. | 74 // The caller must close the returned instance if successful. |
| 62 » IntermediateStorage(context.Context) (storage.Storage, error) | 75 » StorageForStream(context.Context, *LogStreamState) (Storage, error) |
| 63 | |
| 64 » // GSClient instantiates a Google Storage client. | |
| 65 » GSClient(context.Context) (gs.Client, error) | |
| 66 | 76 |
| 67 // ArchivalPublisher returns an ArchivalPublisher instance. | 77 // ArchivalPublisher returns an ArchivalPublisher instance. |
| 68 ArchivalPublisher(context.Context) (ArchivalPublisher, error) | 78 ArchivalPublisher(context.Context) (ArchivalPublisher, error) |
| 69 | |
| 70 // StorageCache returns the storage cache instance to use, or nil for no | |
| 71 // caching. | |
| 72 StorageCache() caching.Cache | |
| 73 } | 79 } |
| 74 | 80 |
| 75 // ProdServices is middleware chain used by Coordinator services. | 81 // ProdServices is middleware chain used by Coordinator services. |
| 76 // | 82 // |
| 77 // It sets up basic GAE functionality as well as installs a production Services | 83 // It sets up basic GAE functionality as well as installs a production Services |
| 78 // instance. | 84 // instance. |
| 79 func ProdServices() router.MiddlewareChain { | 85 func ProdServices() router.MiddlewareChain { |
| 80 return gaemiddleware.BaseProd().Extend(func(c *router.Context, next rout er.Handler) { | 86 return gaemiddleware.BaseProd().Extend(func(c *router.Context, next rout er.Handler) { |
| 81 c.Context = WithServices(c.Context, &prodServicesInst{}) | 87 c.Context = WithServices(c.Context, &prodServicesInst{}) |
| 82 next(c) | 88 next(c) |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 151 } | 157 } |
| 152 s.projectConfigs[project] = cp | 158 s.projectConfigs[project] = cp |
| 153 } | 159 } |
| 154 return cp | 160 return cp |
| 155 } | 161 } |
| 156 | 162 |
| 157 func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P rojectName) (*svcconfig.ProjectConfig, error) { | 163 func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P rojectName) (*svcconfig.ProjectConfig, error) { |
| 158 return s.getOrCreateCachedProjectConfig(project).resolve(c) | 164 return s.getOrCreateCachedProjectConfig(project).resolve(c) |
| 159 } | 165 } |
| 160 | 166 |
| 161 func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Stora ge, error) { | 167 func (s *prodServicesInst) StorageForStream(c context.Context, lst *LogStreamSta te) (Storage, error) { |
| 168 » if !lst.ArchivalState().Archived() { | |
| 169 » » log.Debugf(c, "Log is not archived. Fetching from intermediate s torage.") | |
| 170 » » return s.newBigTableStorage(c) | |
| 171 » } | |
| 172 | |
| 173 » log.Fields{ | |
| 174 » » "indexURL": lst.ArchiveIndexURL, | |
| 175 » » "streamURL": lst.ArchiveStreamURL, | |
| 176 » » "archiveTime": lst.ArchivedTime, | |
| 177 » }.Debugf(c, "Log is archived. Fetching from archive storage.") | |
| 178 » return s.newGoogleStorage(c, gs.Path(lst.ArchiveIndexURL), gs.Path(lst.A rchiveStreamURL)) | |
| 179 } | |
| 180 | |
| 181 func (s *prodServicesInst) newBigTableStorage(c context.Context) (Storage, error ) { | |
| 162 cfg, err := s.Config(c) | 182 cfg, err := s.Config(c) |
| 163 if err != nil { | 183 if err != nil { |
| 164 return nil, err | 184 return nil, err |
| 165 } | 185 } |
| 166 | 186 |
| 167 // Is BigTable configured? | 187 // Is BigTable configured? |
| 168 if cfg.Storage == nil { | 188 if cfg.Storage == nil { |
| 169 return nil, errors.New("no storage configuration") | 189 return nil, errors.New("no storage configuration") |
| 170 } | 190 } |
| 171 | 191 |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 206 // calls. | 226 // calls. |
| 207 c = metadata.NewContext(c, nil) | 227 c = metadata.NewContext(c, nil) |
| 208 | 228 |
| 209 st, err := bigtable.New(c, bigtable.Options{ | 229 st, err := bigtable.New(c, bigtable.Options{ |
| 210 Project: bt.Project, | 230 Project: bt.Project, |
| 211 Instance: bt.Instance, | 231 Instance: bt.Instance, |
| 212 LogTable: bt.LogTableName, | 232 LogTable: bt.LogTableName, |
| 213 ClientOptions: []option.ClientOption{ | 233 ClientOptions: []option.ClientOption{ |
| 214 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cre ds)), | 234 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cre ds)), |
| 215 }, | 235 }, |
| 216 » » Cache: s.StorageCache(), | 236 » » Cache: s.getStorageCache(), |
| 217 }) | 237 }) |
| 218 if err != nil { | 238 if err != nil { |
| 219 log.WithError(err).Errorf(c, "Failed to create BigTable instance .") | 239 log.WithError(err).Errorf(c, "Failed to create BigTable instance .") |
| 220 return nil, err | 240 return nil, err |
| 221 } | 241 } |
| 222 » return st, nil | 242 |
| 243 » return &bigTableStorage{ | |
| 244 » » Storage: st, | |
| 245 » }, nil | |
| 223 } | 246 } |
| 224 | 247 |
| 225 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { | 248 func (s *prodServicesInst) newGoogleStorage(c context.Context, index, stream gs. Path) (Storage, error) { |
| 249 » gs, err := s.newGSClient(c) | |
| 250 » if err != nil { | |
| 251 » » log.WithError(err).Errorf(c, "Failed to create Google Storage cl ient.") | |
| 252 » » return nil, err | |
| 253 » } | |
| 254 » defer func() { | |
| 255 » » if gs != nil { | |
| 256 » » » if err := gs.Close(); err != nil { | |
| 257 » » » » log.WithError(err).Warningf(c, "Failed to close Google Storage client.") | |
| 258 » » » } | |
| 259 » » } | |
| 260 » }() | |
| 261 | |
| 262 » st, err := archive.New(c, archive.Options{ | |
| 263 » » Index: index, | |
| 264 » » Stream: stream, | |
| 265 » » Client: gs, | |
| 266 » » Cache: s.getStorageCache(), | |
| 267 » }) | |
| 268 » if err != nil { | |
| 269 » » log.WithError(err).Errorf(c, "Failed to create Google Storage st orage instance.") | |
| 270 » » return nil, err | |
| 271 » } | |
| 272 | |
| 273 » gs = nil // Don't close in defer. | |
|
Vadim Sh.
2016/11/30 21:03:52
nit: well.. there's only one exit point from this
dnj
2016/12/01 17:39:31
I suppose I gravitate towards defer to protect aga
| |
| 274 » return &googleStorage{ | |
| 275 » » Storage: st, | |
| 276 » » gs: gs, | |
| 277 » » stream: stream, | |
| 278 » }, nil | |
| 279 } | |
| 280 | |
| 281 func (s *prodServicesInst) newGSClient(c context.Context) (gs.Client, error) { | |
| 226 // Get an Authenticator bound to the token scopes that we need for | 282 // Get an Authenticator bound to the token scopes that we need for |
| 227 // authenticated Cloud Storage access. | 283 // authenticated Cloud Storage access. |
| 228 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(g s.ReadOnlyScopes...)) | 284 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(g s.ReadOnlyScopes...)) |
| 229 if err != nil { | 285 if err != nil { |
| 230 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra nsport.") | 286 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra nsport.") |
| 231 return nil, errors.New("failed to create Cloud Storage transport ") | 287 return nil, errors.New("failed to create Cloud Storage transport ") |
| 232 } | 288 } |
| 233 return gs.NewProdClient(c, transport) | 289 return gs.NewProdClient(c, transport) |
| 234 } | 290 } |
| 235 | 291 |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 277 // more than MaxInt32 archival tasks. | 333 // more than MaxInt32 archival tasks. |
| 278 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 | 334 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 |
| 279 if v < 0 { | 335 if v < 0 { |
| 280 panic("archival index has wrapped") | 336 panic("archival index has wrapped") |
| 281 } | 337 } |
| 282 return uint64(v) | 338 return uint64(v) |
| 283 } | 339 } |
| 284 | 340 |
| 285 var storageCacheSingleton StorageCache | 341 var storageCacheSingleton StorageCache |
| 286 | 342 |
| 287 func (s *prodServicesInst) StorageCache() caching.Cache { return &storageCacheSi ngleton } | 343 func (s *prodServicesInst) getStorageCache() caching.Cache { return &storageCach eSingleton } |
| 344 | |
| 345 // Storage is an interface to storage used by the Coordinator. | |
| 346 type Storage interface { | |
| 347 » // Storage is the base Storage instance. | |
| 348 » storage.Storage | |
| 349 | |
| 350 » // SignStreamURL attempts to sign the storage's stream's RecordIO archiv e | |
| 351 » // stream storage URL. | |
| 352 » // | |
| 353 » // If signing is not supported by this Storage instance, this will retur n | |
| 354 » // ErrNotSupported. | |
| 355 » SignStreamURL(context.Context, time.Duration) (string, time.Time, error) | |
| 356 } | |
| 357 | |
| 358 // intermediateStorage is a Storage instance bound to BigTable. | |
| 359 type bigTableStorage struct { | |
| 360 » // Storage is the base storage.Storage instance. | |
| 361 » storage.Storage | |
| 362 } | |
| 363 | |
| 364 func (*bigTableStorage) SignStreamURL(c context.Context, lifetime time.Duration) (string, time.Time, error) { | |
| 365 » return "", time.Time{}, ErrSigningNotSupported | |
| 366 } | |
| 367 | |
| 368 type googleStorage struct { | |
| 369 » // Storage is the base storage.Storage instance. | |
| 370 » storage.Storage | |
| 371 | |
| 372 » // ctx is the Context that was bound at the time of of creation. | |
| 373 » ctx context.Context | |
| 374 » // gs is the backing Google Storage client. | |
| 375 » gs gs.Client | |
| 376 » // stream is the stream's Google Storage URL. | |
| 377 » stream gs.Path | |
| 378 | |
| 379 » gsSigningOpts func(context.Context) (*gcst.SignedURLOptions, error) | |
| 380 } | |
| 381 | |
| 382 func (gs *googleStorage) Close() { | |
| 383 » if err := gs.gs.Close(); err != nil { | |
| 384 » » log.WithError(err).Warningf(gs.ctx, "Failed to close Google Stor age client.") | |
| 385 » } | |
| 386 » gs.Storage.Close() | |
| 387 } | |
| 388 | |
| 389 func (gs *googleStorage) SignStreamURL(c context.Context, lifetime time.Duration ) (url string, expires time.Time, err error) { | |
| 390 » acct, err := info.ServiceAccount(c) | |
|
Vadim Sh.
2016/11/30 21:09:09
oh, btw this is RPC to the backend too: https://gi
dnj
2016/12/01 17:39:30
Done.
| |
| 391 » if err != nil { | |
| 392 » » err = errors.Annotate(err).InternalReason("failed to get service account name").Err() | |
| 393 » » return | |
| 394 » } | |
| 395 | |
| 396 » switch { | |
| 397 » case lifetime < 0: | |
| 398 » » err = errors.Reason("invalid signed URL lifetime: %(lifetime)s") .D("lifetime", lifetime).Err() | |
| 399 » » return | |
| 400 | |
| 401 » case lifetime > maxSignedURLLifetime: | |
| 402 » » lifetime = maxSignedURLLifetime | |
| 403 » } | |
| 404 | |
| 405 » // Get our signing options. | |
| 406 » expires = clock.Now(c).Add(lifetime) | |
| 407 » opts := gcst.SignedURLOptions{ | |
| 408 » » GoogleAccessID: acct, | |
| 409 » » SignBytes: func(b []byte) ([]byte, error) { | |
| 410 » » » _, signedBytes, err := info.SignBytes(c, b) | |
| 411 » » » return signedBytes, err | |
| 412 » » }, | |
| 413 » » Method: "GET", | |
| 414 » » Expires: expires, | |
| 415 » } | |
| 416 | |
| 417 » if url, err = gcst.SignedURL(gs.stream.Bucket(), gs.stream.Filename(), & opts); err != nil { | |
|
Vadim Sh.
2016/11/30 21:03:52
consider caching the result in the future. SignByt
dnj
2016/12/01 17:39:31
This gets tricky, since the actual signed URLs hav
Vadim Sh.
2016/12/01 19:32:13
That's why I assumed >= in sign_entry_url_lifetime
| |
| 418 » » err = errors.Annotate(err).InternalReason("failed to sign URL"). | |
| 419 » » » D("bucket", gs.stream.Bucket()).D("filename", gs.stream. Filename).Err() | |
| 420 » » return | |
| 421 » } | |
| 422 » return | |
| 423 } | |
| OLD | NEW |