| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/http" |
| 8 "sync" | 9 "sync" |
| 9 "sync/atomic" | 10 "sync/atomic" |
| 10 "time" | 11 "time" |
| 11 | 12 |
| 12 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" | 13 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" |
| 13 "github.com/luci/luci-go/common/clock" | 14 "github.com/luci/luci-go/common/clock" |
| 14 luciConfig "github.com/luci/luci-go/common/config" | 15 luciConfig "github.com/luci/luci-go/common/config" |
| 15 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
| 16 "github.com/luci/luci-go/common/gcloud/gs" | 17 "github.com/luci/luci-go/common/gcloud/gs" |
| 17 "github.com/luci/luci-go/common/gcloud/pubsub" | 18 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 18 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 20 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 20 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | 21 "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
| 21 "github.com/luci/luci-go/logdog/common/storage" | 22 "github.com/luci/luci-go/logdog/common/storage" |
| 22 "github.com/luci/luci-go/logdog/common/storage/archive" | 23 "github.com/luci/luci-go/logdog/common/storage/archive" |
| 23 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 24 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 24 "github.com/luci/luci-go/logdog/common/storage/caching" | 25 "github.com/luci/luci-go/logdog/common/storage/caching" |
| 25 "github.com/luci/luci-go/server/auth" | 26 "github.com/luci/luci-go/server/auth" |
| 26 "github.com/luci/luci-go/server/router" | 27 "github.com/luci/luci-go/server/router" |
| 27 | 28 |
| 28 gcps "cloud.google.com/go/pubsub" | 29 gcps "cloud.google.com/go/pubsub" |
| 29 gcst "cloud.google.com/go/storage" | 30 gcst "cloud.google.com/go/storage" |
| 30 "golang.org/x/net/context" | 31 "golang.org/x/net/context" |
| 31 "google.golang.org/api/option" | 32 "google.golang.org/api/option" |
| 33 "google.golang.org/appengine" |
| 32 "google.golang.org/grpc" | 34 "google.golang.org/grpc" |
| 33 "google.golang.org/grpc/metadata" | 35 "google.golang.org/grpc/metadata" |
| 34 ) | 36 ) |
| 35 | 37 |
| 36 const ( | 38 const ( |
| 37 // maxSignedURLLifetime is the maximum allowed signed URL lifetime. | 39 // maxSignedURLLifetime is the maximum allowed signed URL lifetime. |
| 38 maxSignedURLLifetime = 1 * time.Hour | 40 maxSignedURLLifetime = 1 * time.Hour |
| 39 | 41 |
| 40 // maxGSFetchSize is the maximum amount of data we can fetch from a sing
le | 42 // maxGSFetchSize is the maximum amount of data we can fetch from a sing
le |
| 41 // Google Storage RPC call. | 43 // Google Storage RPC call. |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 81 StorageForStream(context.Context, *LogStreamState) (Storage, error) | 83 StorageForStream(context.Context, *LogStreamState) (Storage, error) |
| 82 | 84 |
| 83 // ArchivalPublisher returns an ArchivalPublisher instance. | 85 // ArchivalPublisher returns an ArchivalPublisher instance. |
| 84 ArchivalPublisher(context.Context) (ArchivalPublisher, error) | 86 ArchivalPublisher(context.Context) (ArchivalPublisher, error) |
| 85 } | 87 } |
| 86 | 88 |
| 87 // ProdCoordinatorService is Middleware used by Coordinator services. | 89 // ProdCoordinatorService is Middleware used by Coordinator services. |
| 88 // | 90 // |
| 89 // It installs a production Services instance into the Context. | 91 // It installs a production Services instance into the Context. |
| 90 func ProdCoordinatorService(c *router.Context, next router.Handler) { | 92 func ProdCoordinatorService(c *router.Context, next router.Handler) { |
| 91 » c.Context = WithServices(c.Context, &prodServicesInst{}) | 93 » c.Context = WithServices(c.Context, &prodServicesInst{ |
| 94 » » req: c.Request, |
| 95 » }) |
| 92 next(c) | 96 next(c) |
| 93 } | 97 } |
| 94 | 98 |
| 95 // prodServicesInst is a Service exposing production faciliites. A unique | 99 // prodServicesInst is a Service exposing production faciliites. A unique |
| 96 // instance is bound to each each request. | 100 // instance is bound to each each request. |
| 97 type prodServicesInst struct { | 101 type prodServicesInst struct { |
| 98 sync.Mutex | 102 sync.Mutex |
| 99 | 103 |
| 104 // req is the request that is being serviced. This is populated when the |
| 105 // service is installed via ProdCoordinatorService. |
| 106 req *http.Request |
| 107 |
| 100 // gcfg is the cached global configuration. | 108 // gcfg is the cached global configuration. |
| 101 gcfg *config.Config | 109 gcfg *config.Config |
| 102 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig | 110 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig |
| 103 | 111 |
| 104 // archivalIndex is the atomically-manipulated archival index for the | 112 // archivalIndex is the atomically-manipulated archival index for the |
| 105 // ArchivalPublisher. This is shared between all ArchivalPublisher insta
nces | 113 // ArchivalPublisher. This is shared between all ArchivalPublisher insta
nces |
| 106 // from this service. | 114 // from this service. |
| 107 archivalIndex int32 | 115 archivalIndex int32 |
| 108 | 116 |
| 109 // signer is the signer instance to use. | 117 // signer is the signer instance to use. |
| (...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 324 return nil, errors.New("invalid archival topic") | 332 return nil, errors.New("invalid archival topic") |
| 325 } | 333 } |
| 326 project, topic := fullTopic.Split() | 334 project, topic := fullTopic.Split() |
| 327 | 335 |
| 328 // Create an authenticated Pub/Sub client. | 336 // Create an authenticated Pub/Sub client. |
| 329 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub
sub.PublisherScopes...)) | 337 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub
sub.PublisherScopes...)) |
| 330 if err != nil { | 338 if err != nil { |
| 331 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia
ls.") | 339 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia
ls.") |
| 332 return nil, errors.New("failed to create Pub/Sub credentials") | 340 return nil, errors.New("failed to create Pub/Sub credentials") |
| 333 } | 341 } |
| 334 » // Don't pass gRPC metadata to PubSub. | 342 |
| 335 » psClient, err := gcps.NewClient( | 343 » // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, s
ince |
| 336 » » metadata.NewContext(c, nil), project, | 344 » // we don't want any caller RPC to be forwarded to the backend service. |
| 345 » aeCtx := appengine.WithContext(metadata.NewContext(c, nil), s.req) |
| 346 |
| 347 » psClient, err := gcps.NewClient(aeCtx, project, |
| 337 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) | 348 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) |
| 338 if err != nil { | 349 if err != nil { |
| 339 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | 350 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
| 340 return nil, errors.New("failed to create Pub/Sub client") | 351 return nil, errors.New("failed to create Pub/Sub client") |
| 341 } | 352 } |
| 342 | 353 |
| 343 return &pubsubArchivalPublisher{ | 354 return &pubsubArchivalPublisher{ |
| 344 client: psClient, | 355 client: psClient, |
| 345 topic: psClient.Topic(topic), | 356 topic: psClient.Topic(topic), |
| 346 publishIndexFunc: s.nextArchiveIndex, | 357 publishIndexFunc: s.nextArchiveIndex, |
| (...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 491 | 502 |
| 492 // Sign index URL. | 503 // Sign index URL. |
| 493 if req.Index { | 504 if req.Index { |
| 494 if resp.Index, err = doSign(si.index); err != nil { | 505 if resp.Index, err = doSign(si.index); err != nil { |
| 495 return nil, errors.Annotate(err).InternalReason("failed
to sign index URL").Err() | 506 return nil, errors.Annotate(err).InternalReason("failed
to sign index URL").Err() |
| 496 } | 507 } |
| 497 } | 508 } |
| 498 | 509 |
| 499 return &resp, nil | 510 return &resp, nil |
| 500 } | 511 } |
| OLD | NEW |