| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/http" | 8 "net/http" |
| 9 "sync" | 9 "sync" |
| 10 "sync/atomic" | 10 "sync/atomic" |
| 11 "time" |
| 11 | 12 |
| 13 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" |
| 12 "github.com/luci/luci-go/appengine/gaemiddleware" | 14 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 15 "github.com/luci/luci-go/common/clock" |
| 13 luciConfig "github.com/luci/luci-go/common/config" | 16 luciConfig "github.com/luci/luci-go/common/config" |
| 14 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/gcloud/gs" | 18 "github.com/luci/luci-go/common/gcloud/gs" |
| 16 "github.com/luci/luci-go/common/gcloud/pubsub" | 19 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 17 log "github.com/luci/luci-go/common/logging" | 20 log "github.com/luci/luci-go/common/logging" |
| 18 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 21 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | 22 "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
| 20 "github.com/luci/luci-go/logdog/common/storage" | 23 "github.com/luci/luci-go/logdog/common/storage" |
| 24 "github.com/luci/luci-go/logdog/common/storage/archive" |
| 21 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 25 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 22 "github.com/luci/luci-go/logdog/common/storage/caching" | 26 "github.com/luci/luci-go/logdog/common/storage/caching" |
| 23 "github.com/luci/luci-go/server/auth" | 27 "github.com/luci/luci-go/server/auth" |
| 24 "github.com/luci/luci-go/server/router" | 28 "github.com/luci/luci-go/server/router" |
| 25 | 29 |
| 26 gcps "cloud.google.com/go/pubsub" | 30 gcps "cloud.google.com/go/pubsub" |
| 31 gcst "cloud.google.com/go/storage" |
| 27 "golang.org/x/net/context" | 32 "golang.org/x/net/context" |
| 28 "google.golang.org/api/option" | 33 "google.golang.org/api/option" |
| 29 "google.golang.org/grpc" | 34 "google.golang.org/grpc" |
| 30 "google.golang.org/grpc/metadata" | 35 "google.golang.org/grpc/metadata" |
| 31 ) | 36 ) |
| 32 | 37 |
| 38 // maxSignedURLLifetime is the maximum allowed signed URL lifetime. |
| 39 const maxSignedURLLifetime = 1 * time.Hour |
| 40 |
| 33 // Services is a set of support services used by Coordinator. | 41 // Services is a set of support services used by Coordinator. |
| 34 // | 42 // |
| 35 // Each Services instance is valid for a singel request, but can be re-used | 43 // Each Services instance is valid for a singel request, but can be re-used |
| 36 // throughout that request. This is advised, as the Services instance may | 44 // throughout that request. This is advised, as the Services instance may |
| 37 // optionally cache values. | 45 // optionally cache values. |
| 38 // | 46 // |
| 39 // Services methods are goroutine-safe. | 47 // Services methods are goroutine-safe. |
| 40 // | 48 // |
| 41 // By default, a production set of services will be used. However, this can be | 49 // By default, a production set of services will be used. However, this can be |
| 42 // overridden for testing to mock the service layer. | 50 // overridden for testing to mock the service layer. |
| 43 type Services interface { | 51 type Services interface { |
| 44 // Config returns the current instance and application configuration | 52 // Config returns the current instance and application configuration |
| 45 // instances. | 53 // instances. |
| 46 // | 54 // |
| 47 // The production instance will cache the results for the duration of th
e | 55 // The production instance will cache the results for the duration of th
e |
| 48 // request. | 56 // request. |
| 49 Config(context.Context) (*config.Config, error) | 57 Config(context.Context) (*config.Config, error) |
| 50 | 58 |
| 51 // ProjectConfig returns the project configuration for the named project
. | 59 // ProjectConfig returns the project configuration for the named project
. |
| 52 // | 60 // |
| 53 // The production instance will cache the results for the duration of th
e | 61 // The production instance will cache the results for the duration of th
e |
| 54 // request. | 62 // request. |
| 55 // | 63 // |
| 56 // Returns the same error codes as config.ProjectConfig. | 64 // Returns the same error codes as config.ProjectConfig. |
| 57 ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.Proje
ctConfig, error) | 65 ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.Proje
ctConfig, error) |
| 58 | 66 |
| 59 » // Storage returns an intermediate storage instance for use by this serv
ice. | 67 » // Storage returns a Storage instance for the supplied log stream. |
| 60 // | 68 // |
| 61 // The caller must close the returned instance if successful. | 69 // The caller must close the returned instance if successful. |
| 62 » IntermediateStorage(context.Context) (storage.Storage, error) | 70 » StorageForStream(context.Context, *LogStreamState) (Storage, error) |
| 63 | |
| 64 » // GSClient instantiates a Google Storage client. | |
| 65 » GSClient(context.Context) (gs.Client, error) | |
| 66 | 71 |
| 67 // ArchivalPublisher returns an ArchivalPublisher instance. | 72 // ArchivalPublisher returns an ArchivalPublisher instance. |
| 68 ArchivalPublisher(context.Context) (ArchivalPublisher, error) | 73 ArchivalPublisher(context.Context) (ArchivalPublisher, error) |
| 69 | |
| 70 // StorageCache returns the storage cache instance to use, or nil for no | |
| 71 // caching. | |
| 72 StorageCache() caching.Cache | |
| 73 } | 74 } |
| 74 | 75 |
| 75 // ProdServices is middleware chain used by Coordinator services. | 76 // ProdServices is middleware chain used by Coordinator services. |
| 76 // | 77 // |
| 77 // It sets up basic GAE functionality as well as installs a production Services | 78 // It sets up basic GAE functionality as well as installs a production Services |
| 78 // instance. | 79 // instance. |
| 79 func ProdServices() router.MiddlewareChain { | 80 func ProdServices() router.MiddlewareChain { |
| 80 return gaemiddleware.BaseProd().Extend(func(c *router.Context, next rout
er.Handler) { | 81 return gaemiddleware.BaseProd().Extend(func(c *router.Context, next rout
er.Handler) { |
| 81 c.Context = WithServices(c.Context, &prodServicesInst{}) | 82 c.Context = WithServices(c.Context, &prodServicesInst{}) |
| 82 next(c) | 83 next(c) |
| 83 }) | 84 }) |
| 84 } | 85 } |
| 85 | 86 |
| 86 // prodServicesInst is a Service exposing production faciliites. A unique | 87 // prodServicesInst is a Service exposing production faciliites. A unique |
| 87 // instance is bound to each each request. | 88 // instance is bound to each each request. |
| 88 type prodServicesInst struct { | 89 type prodServicesInst struct { |
| 89 sync.Mutex | 90 sync.Mutex |
| 90 | 91 |
| 91 // gcfg is the cached global configuration. | 92 // gcfg is the cached global configuration. |
| 92 gcfg *config.Config | 93 gcfg *config.Config |
| 93 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig | 94 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig |
| 94 | 95 |
| 95 // archivalIndex is the atomically-manipulated archival index for the | 96 // archivalIndex is the atomically-manipulated archival index for the |
| 96 // ArchivalPublisher. This is shared between all ArchivalPublisher insta
nces | 97 // ArchivalPublisher. This is shared between all ArchivalPublisher insta
nces |
| 97 // from this service. | 98 // from this service. |
| 98 archivalIndex int32 | 99 archivalIndex int32 |
| 100 |
| 101 // signer is the signer instance to use. |
| 102 signer gaesigner.Signer |
| 99 } | 103 } |
| 100 | 104 |
| 101 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { | 105 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { |
| 102 s.Lock() | 106 s.Lock() |
| 103 defer s.Unlock() | 107 defer s.Unlock() |
| 104 | 108 |
| 105 // Load/cache the global config. | 109 // Load/cache the global config. |
| 106 if s.gcfg == nil { | 110 if s.gcfg == nil { |
| 107 var err error | 111 var err error |
| 108 s.gcfg, err = config.Load(c) | 112 s.gcfg, err = config.Load(c) |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 151 } | 155 } |
| 152 s.projectConfigs[project] = cp | 156 s.projectConfigs[project] = cp |
| 153 } | 157 } |
| 154 return cp | 158 return cp |
| 155 } | 159 } |
| 156 | 160 |
| 157 func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P
rojectName) (*svcconfig.ProjectConfig, error) { | 161 func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P
rojectName) (*svcconfig.ProjectConfig, error) { |
| 158 return s.getOrCreateCachedProjectConfig(project).resolve(c) | 162 return s.getOrCreateCachedProjectConfig(project).resolve(c) |
| 159 } | 163 } |
| 160 | 164 |
| 161 func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Stora
ge, error) { | 165 func (s *prodServicesInst) StorageForStream(c context.Context, lst *LogStreamSta
te) (Storage, error) { |
| 166 » if !lst.ArchivalState().Archived() { |
| 167 » » log.Debugf(c, "Log is not archived. Fetching from intermediate s
torage.") |
| 168 » » return s.newBigTableStorage(c) |
| 169 » } |
| 170 |
| 171 » log.Fields{ |
| 172 » » "indexURL": lst.ArchiveIndexURL, |
| 173 » » "streamURL": lst.ArchiveStreamURL, |
| 174 » » "archiveTime": lst.ArchivedTime, |
| 175 » }.Debugf(c, "Log is archived. Fetching from archive storage.") |
| 176 » return s.newGoogleStorage(c, gs.Path(lst.ArchiveIndexURL), gs.Path(lst.A
rchiveStreamURL)) |
| 177 } |
| 178 |
| 179 func (s *prodServicesInst) newBigTableStorage(c context.Context) (Storage, error
) { |
| 162 cfg, err := s.Config(c) | 180 cfg, err := s.Config(c) |
| 163 if err != nil { | 181 if err != nil { |
| 164 return nil, err | 182 return nil, err |
| 165 } | 183 } |
| 166 | 184 |
| 167 // Is BigTable configured? | 185 // Is BigTable configured? |
| 168 if cfg.Storage == nil { | 186 if cfg.Storage == nil { |
| 169 return nil, errors.New("no storage configuration") | 187 return nil, errors.New("no storage configuration") |
| 170 } | 188 } |
| 171 | 189 |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 206 // calls. | 224 // calls. |
| 207 c = metadata.NewContext(c, nil) | 225 c = metadata.NewContext(c, nil) |
| 208 | 226 |
| 209 st, err := bigtable.New(c, bigtable.Options{ | 227 st, err := bigtable.New(c, bigtable.Options{ |
| 210 Project: bt.Project, | 228 Project: bt.Project, |
| 211 Instance: bt.Instance, | 229 Instance: bt.Instance, |
| 212 LogTable: bt.LogTableName, | 230 LogTable: bt.LogTableName, |
| 213 ClientOptions: []option.ClientOption{ | 231 ClientOptions: []option.ClientOption{ |
| 214 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cre
ds)), | 232 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cre
ds)), |
| 215 }, | 233 }, |
| 216 » » Cache: s.StorageCache(), | 234 » » Cache: s.getStorageCache(), |
| 217 }) | 235 }) |
| 218 if err != nil { | 236 if err != nil { |
| 219 log.WithError(err).Errorf(c, "Failed to create BigTable instance
.") | 237 log.WithError(err).Errorf(c, "Failed to create BigTable instance
.") |
| 220 return nil, err | 238 return nil, err |
| 221 } | 239 } |
| 222 » return st, nil | 240 |
| 241 » return &bigTableStorage{ |
| 242 » » Storage: st, |
| 243 » }, nil |
| 223 } | 244 } |
| 224 | 245 |
| 225 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { | 246 func (s *prodServicesInst) newGoogleStorage(c context.Context, index, stream gs.
Path) (Storage, error) { |
| 247 » gs, err := s.newGSClient(c) |
| 248 » if err != nil { |
| 249 » » log.WithError(err).Errorf(c, "Failed to create Google Storage cl
ient.") |
| 250 » » return nil, err |
| 251 » } |
| 252 » defer func() { |
| 253 » » if gs != nil { |
| 254 » » » if err := gs.Close(); err != nil { |
| 255 » » » » log.WithError(err).Warningf(c, "Failed to close
Google Storage client.") |
| 256 » » » } |
| 257 » » } |
| 258 » }() |
| 259 |
| 260 » st, err := archive.New(c, archive.Options{ |
| 261 » » Index: index, |
| 262 » » Stream: stream, |
| 263 » » Client: gs, |
| 264 » » Cache: s.getStorageCache(), |
| 265 » }) |
| 266 » if err != nil { |
| 267 » » log.WithError(err).Errorf(c, "Failed to create Google Storage st
orage instance.") |
| 268 » » return nil, err |
| 269 » } |
| 270 |
| 271 » gs = nil // Don't close in defer. |
| 272 » return &googleStorage{ |
| 273 » » Storage: st, |
| 274 » » svc: s, |
| 275 » » gs: gs, |
| 276 » » stream: stream, |
| 277 » » index: index, |
| 278 » }, nil |
| 279 } |
| 280 |
| 281 func (s *prodServicesInst) newGSClient(c context.Context) (gs.Client, error) { |
| 226 // Get an Authenticator bound to the token scopes that we need for | 282 // Get an Authenticator bound to the token scopes that we need for |
| 227 // authenticated Cloud Storage access. | 283 // authenticated Cloud Storage access. |
| 228 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(g
s.ReadOnlyScopes...)) | 284 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(g
s.ReadOnlyScopes...)) |
| 229 if err != nil { | 285 if err != nil { |
| 230 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra
nsport.") | 286 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra
nsport.") |
| 231 return nil, errors.New("failed to create Cloud Storage transport
") | 287 return nil, errors.New("failed to create Cloud Storage transport
") |
| 232 } | 288 } |
| 233 return gs.NewProdClient(c, transport) | 289 return gs.NewProdClient(c, transport) |
| 234 } | 290 } |
| 235 | 291 |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 277 // more than MaxInt32 archival tasks. | 333 // more than MaxInt32 archival tasks. |
| 278 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 | 334 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 |
| 279 if v < 0 { | 335 if v < 0 { |
| 280 panic("archival index has wrapped") | 336 panic("archival index has wrapped") |
| 281 } | 337 } |
| 282 return uint64(v) | 338 return uint64(v) |
| 283 } | 339 } |
| 284 | 340 |
| 285 var storageCacheSingleton StorageCache | 341 var storageCacheSingleton StorageCache |
| 286 | 342 |
| 287 func (s *prodServicesInst) StorageCache() caching.Cache { return &storageCacheSi
ngleton } | 343 func (s *prodServicesInst) getStorageCache() caching.Cache { return &storageCach
eSingleton } |
| 344 |
| 345 // Storage is an interface to storage used by the Coordinator. |
| 346 type Storage interface { |
| 347 » // Storage is the base Storage instance. |
| 348 » storage.Storage |
| 349 |
| 350 » // GetSignedURLs attempts to sign the storage's stream's RecordIO archiv
e |
| 351 » // stream storage URL. |
| 352 » // |
| 353 » // If signing is not supported by this Storage instance, this will retur
n |
| 354 » // a nil signing response and no error. |
| 355 » GetSignedURLs(context.Context, *URLSigningRequest) (*URLSigningResponse,
error) |
| 356 } |
| 357 |
| 358 // URLSigningRequest is the set of URL signing parameters passed to a |
| 359 // Storage.GetSignedURLs call. |
| 360 type URLSigningRequest struct { |
| 361 » // Expriation is the signed URL expiration time. |
| 362 » Lifetime time.Duration |
| 363 |
| 364 » // Stream, if true, requests a signed log stream URL. |
| 365 » Stream bool |
| 366 » // Index, if true, requests a signed log stream index URL. |
| 367 » Index bool |
| 368 } |
| 369 |
| 370 // HasWork returns true if this signing request actually has work that is |
| 371 // requested. |
| 372 func (r *URLSigningRequest) HasWork() bool { |
| 373 » return (r.Stream || r.Index) && (r.Lifetime > 0) |
| 374 } |
| 375 |
| 376 // URLSigningResponse is the resulting signed URLs from a Storage.GetSignedURLs |
| 377 // call. |
| 378 type URLSigningResponse struct { |
| 379 » // Expriation is the signed URL expiration time. |
| 380 » Expiration time.Time |
| 381 |
| 382 » // Stream is the signed URL for the log stream, if requested. |
| 383 » Stream string |
| 384 » // Index is the signed URL for the log stream index, if requested. |
| 385 » Index string |
| 386 } |
| 387 |
| 388 // intermediateStorage is a Storage instance bound to BigTable. |
| 389 type bigTableStorage struct { |
| 390 » // Storage is the base storage.Storage instance. |
| 391 » storage.Storage |
| 392 } |
| 393 |
| 394 func (*bigTableStorage) GetSignedURLs(context.Context, *URLSigningRequest) (*URL
SigningResponse, error) { |
| 395 » return nil, nil |
| 396 } |
| 397 |
| 398 type googleStorage struct { |
| 399 » // Storage is the base storage.Storage instance. |
| 400 » storage.Storage |
| 401 » // svc is the services instance that created this. |
| 402 » svc *prodServicesInst |
| 403 |
| 404 » // ctx is the Context that was bound at the time of of creation. |
| 405 » ctx context.Context |
| 406 » // gs is the backing Google Storage client. |
| 407 » gs gs.Client |
| 408 |
| 409 » // stream is the stream's Google Storage URL. |
| 410 » stream gs.Path |
| 411 » // index is the index's Google Storage URL. |
| 412 » index gs.Path |
| 413 |
| 414 » gsSigningOpts func(context.Context) (*gcst.SignedURLOptions, error) |
| 415 } |
| 416 |
| 417 func (si *googleStorage) Close() { |
| 418 » if err := si.gs.Close(); err != nil { |
| 419 » » log.WithError(err).Warningf(si.ctx, "Failed to close Google Stor
age client.") |
| 420 » } |
| 421 » si.Storage.Close() |
| 422 } |
| 423 |
| 424 func (si *googleStorage) GetSignedURLs(c context.Context, req *URLSigningRequest
) (*URLSigningResponse, error) { |
| 425 » info, err := si.svc.signer.ServiceInfo(c) |
| 426 » if err != nil { |
| 427 » » return nil, errors.Annotate(err).InternalReason("failed to get s
ervice info").Err() |
| 428 » } |
| 429 |
| 430 » lifetime := req.Lifetime |
| 431 » switch { |
| 432 » case lifetime < 0: |
| 433 » » return nil, errors.Reason("invalid signed URL lifetime: %(lifeti
me)s").D("lifetime", lifetime).Err() |
| 434 |
| 435 » case lifetime > maxSignedURLLifetime: |
| 436 » » lifetime = maxSignedURLLifetime |
| 437 » } |
| 438 |
| 439 » // Get our signing options. |
| 440 » resp := URLSigningResponse{ |
| 441 » » Expiration: clock.Now(c).Add(lifetime), |
| 442 » } |
| 443 » opts := gcst.SignedURLOptions{ |
| 444 » » GoogleAccessID: info.ServiceAccountName, |
| 445 » » SignBytes: func(b []byte) ([]byte, error) { |
| 446 » » » _, signedBytes, err := si.svc.signer.SignBytes(c, b) |
| 447 » » » return signedBytes, err |
| 448 » » }, |
| 449 » » Method: "GET", |
| 450 » » Expires: resp.Expiration, |
| 451 » } |
| 452 |
| 453 » doSign := func(path gs.Path) (string, error) { |
| 454 » » url, err := gcst.SignedURL(path.Bucket(), path.Filename(), &opts
) |
| 455 » » if err != nil { |
| 456 » » » return "", errors.Annotate(err).InternalReason("failed t
o sign URL"). |
| 457 » » » » D("bucket", path.Bucket()).D("filename", path.Fi
lename).Err() |
| 458 » » } |
| 459 » » return url, nil |
| 460 » } |
| 461 |
| 462 » // Sign stream URL. |
| 463 » if req.Stream { |
| 464 » » if resp.Stream, err = doSign(si.stream); err != nil { |
| 465 » » » return nil, errors.Annotate(err).InternalReason("failed
to sign stream URL").Err() |
| 466 » » } |
| 467 » } |
| 468 |
| 469 » // Sign index URL. |
| 470 » if req.Index { |
| 471 » » if resp.Index, err = doSign(si.index); err != nil { |
| 472 » » » return nil, errors.Annotate(err).InternalReason("failed
to sign index URL").Err() |
| 473 » » } |
| 474 » } |
| 475 |
| 476 » return &resp, nil |
| 477 } |
| OLD | NEW |