Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Side by Side Diff: logdog/appengine/coordinator/service.go

Issue 2584293002: Coordinator: AppEngine context for Pub/Sub. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "net/http"
8 "sync" 9 "sync"
9 "sync/atomic" 10 "sync/atomic"
10 "time" 11 "time"
11 12
12 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" 13 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner"
13 "github.com/luci/luci-go/common/clock" 14 "github.com/luci/luci-go/common/clock"
14 luciConfig "github.com/luci/luci-go/common/config" 15 luciConfig "github.com/luci/luci-go/common/config"
15 "github.com/luci/luci-go/common/errors" 16 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/gcloud/gs" 17 "github.com/luci/luci-go/common/gcloud/gs"
17 "github.com/luci/luci-go/common/gcloud/pubsub" 18 "github.com/luci/luci-go/common/gcloud/pubsub"
18 log "github.com/luci/luci-go/common/logging" 19 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/logdog/api/config/svcconfig" 20 "github.com/luci/luci-go/logdog/api/config/svcconfig"
20 "github.com/luci/luci-go/logdog/appengine/coordinator/config" 21 "github.com/luci/luci-go/logdog/appengine/coordinator/config"
21 "github.com/luci/luci-go/logdog/common/storage" 22 "github.com/luci/luci-go/logdog/common/storage"
22 "github.com/luci/luci-go/logdog/common/storage/archive" 23 "github.com/luci/luci-go/logdog/common/storage/archive"
23 "github.com/luci/luci-go/logdog/common/storage/bigtable" 24 "github.com/luci/luci-go/logdog/common/storage/bigtable"
24 "github.com/luci/luci-go/logdog/common/storage/caching" 25 "github.com/luci/luci-go/logdog/common/storage/caching"
25 "github.com/luci/luci-go/server/auth" 26 "github.com/luci/luci-go/server/auth"
26 "github.com/luci/luci-go/server/router" 27 "github.com/luci/luci-go/server/router"
27 28
28 gcps "cloud.google.com/go/pubsub" 29 gcps "cloud.google.com/go/pubsub"
29 gcst "cloud.google.com/go/storage" 30 gcst "cloud.google.com/go/storage"
30 "golang.org/x/net/context" 31 "golang.org/x/net/context"
31 "google.golang.org/api/option" 32 "google.golang.org/api/option"
33 "google.golang.org/appengine"
32 "google.golang.org/grpc" 34 "google.golang.org/grpc"
33 "google.golang.org/grpc/metadata" 35 "google.golang.org/grpc/metadata"
34 ) 36 )
35 37
36 const ( 38 const (
37 // maxSignedURLLifetime is the maximum allowed signed URL lifetime. 39 // maxSignedURLLifetime is the maximum allowed signed URL lifetime.
38 maxSignedURLLifetime = 1 * time.Hour 40 maxSignedURLLifetime = 1 * time.Hour
39 41
40 // maxGSFetchSize is the maximum amount of data we can fetch from a sing le 42 // maxGSFetchSize is the maximum amount of data we can fetch from a sing le
41 // Google Storage RPC call. 43 // Google Storage RPC call.
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
81 StorageForStream(context.Context, *LogStreamState) (Storage, error) 83 StorageForStream(context.Context, *LogStreamState) (Storage, error)
82 84
83 // ArchivalPublisher returns an ArchivalPublisher instance. 85 // ArchivalPublisher returns an ArchivalPublisher instance.
84 ArchivalPublisher(context.Context) (ArchivalPublisher, error) 86 ArchivalPublisher(context.Context) (ArchivalPublisher, error)
85 } 87 }
86 88
87 // ProdCoordinatorService is Middleware used by Coordinator services. 89 // ProdCoordinatorService is Middleware used by Coordinator services.
88 // 90 //
89 // It installs a production Services instance into the Context. 91 // It installs a production Services instance into the Context.
90 func ProdCoordinatorService(c *router.Context, next router.Handler) { 92 func ProdCoordinatorService(c *router.Context, next router.Handler) {
91 » c.Context = WithServices(c.Context, &prodServicesInst{}) 93 » c.Context = WithServices(c.Context, &prodServicesInst{
94 » » req: c.Request,
95 » })
92 next(c) 96 next(c)
93 } 97 }
94 98
95 // prodServicesInst is a Service exposing production faciliites. A unique 99 // prodServicesInst is a Service exposing production faciliites. A unique
96 // instance is bound to each each request. 100 // instance is bound to each each request.
97 type prodServicesInst struct { 101 type prodServicesInst struct {
98 sync.Mutex 102 sync.Mutex
99 103
104 // req is the request that is being serviced. This is populated when the
105 // service is installed via ProdCoordinatorService.
106 req *http.Request
107
100 // gcfg is the cached global configuration. 108 // gcfg is the cached global configuration.
101 gcfg *config.Config 109 gcfg *config.Config
102 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig 110 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig
103 111
104 // archivalIndex is the atomically-manipulated archival index for the 112 // archivalIndex is the atomically-manipulated archival index for the
105 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces 113 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces
106 // from this service. 114 // from this service.
107 archivalIndex int32 115 archivalIndex int32
108 116
109 // signer is the signer instance to use. 117 // signer is the signer instance to use.
(...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after
324 return nil, errors.New("invalid archival topic") 332 return nil, errors.New("invalid archival topic")
325 } 333 }
326 project, topic := fullTopic.Split() 334 project, topic := fullTopic.Split()
327 335
328 // Create an authenticated Pub/Sub client. 336 // Create an authenticated Pub/Sub client.
329 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub sub.PublisherScopes...)) 337 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub sub.PublisherScopes...))
330 if err != nil { 338 if err != nil {
331 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia ls.") 339 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia ls.")
332 return nil, errors.New("failed to create Pub/Sub credentials") 340 return nil, errors.New("failed to create Pub/Sub credentials")
333 } 341 }
342
343 // Create a new AppEngine context.
344 aeCtx := appengine.WithContext(metadata.NewContext(c, nil), s.req)
345
334 // Don't pass gRPC metadata to PubSub. 346 // Don't pass gRPC metadata to PubSub.
Vadim Sh. 2016/12/18 21:40:10 nit: move this comment to the list 343.
335 » psClient, err := gcps.NewClient( 347 » psClient, err := gcps.NewClient(aeCtx, project,
336 » » metadata.NewContext(c, nil), project,
337 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) 348 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)))
338 if err != nil { 349 if err != nil {
339 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") 350 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
340 return nil, errors.New("failed to create Pub/Sub client") 351 return nil, errors.New("failed to create Pub/Sub client")
341 } 352 }
342 353
343 return &pubsubArchivalPublisher{ 354 return &pubsubArchivalPublisher{
344 client: psClient, 355 client: psClient,
345 topic: psClient.Topic(topic), 356 topic: psClient.Topic(topic),
346 publishIndexFunc: s.nextArchiveIndex, 357 publishIndexFunc: s.nextArchiveIndex,
(...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after
491 502
492 // Sign index URL. 503 // Sign index URL.
493 if req.Index { 504 if req.Index {
494 if resp.Index, err = doSign(si.index); err != nil { 505 if resp.Index, err = doSign(si.index); err != nil {
495 return nil, errors.Annotate(err).InternalReason("failed to sign index URL").Err() 506 return nil, errors.Annotate(err).InternalReason("failed to sign index URL").Err()
496 } 507 }
497 } 508 }
498 509
499 return &resp, nil 510 return &resp, nil
500 } 511 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698