Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(887)

Side by Side Diff: logdog/appengine/coordinator/service.go

Issue 2583033002: LogDog: Re-use Pub/Sub gRPC clients. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/appengine/coordinator/archivalPublisher.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "net/http"
9 "sync" 8 "sync"
10 "sync/atomic" 9 "sync/atomic"
11 "time" 10 "time"
12 11
13 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" 12 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner"
14 "github.com/luci/luci-go/common/clock" 13 "github.com/luci/luci-go/common/clock"
15 luciConfig "github.com/luci/luci-go/common/config" 14 luciConfig "github.com/luci/luci-go/common/config"
16 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
17 "github.com/luci/luci-go/common/gcloud/gs" 16 "github.com/luci/luci-go/common/gcloud/gs"
18 "github.com/luci/luci-go/common/gcloud/pubsub" 17 "github.com/luci/luci-go/common/gcloud/pubsub"
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
83 StorageForStream(context.Context, *LogStreamState) (Storage, error) 82 StorageForStream(context.Context, *LogStreamState) (Storage, error)
84 83
85 // ArchivalPublisher returns an ArchivalPublisher instance. 84 // ArchivalPublisher returns an ArchivalPublisher instance.
86 ArchivalPublisher(context.Context) (ArchivalPublisher, error) 85 ArchivalPublisher(context.Context) (ArchivalPublisher, error)
87 } 86 }
88 87
89 // ProdCoordinatorService is Middleware used by Coordinator services. 88 // ProdCoordinatorService is Middleware used by Coordinator services.
90 // 89 //
91 // It installs a production Services instance into the Context. 90 // It installs a production Services instance into the Context.
92 func ProdCoordinatorService(c *router.Context, next router.Handler) { 91 func ProdCoordinatorService(c *router.Context, next router.Handler) {
93 » c.Context = WithServices(c.Context, &prodServicesInst{ 92 » services := prodServicesInst{
94 » » req: c.Request, 93 » » aeCtx: appengine.WithContext(c.Context, c.Request),
95 » }) 94 » }
95 » defer services.close(c.Context)
96
97 » c.Context = WithServices(c.Context, &services)
96 next(c) 98 next(c)
97 } 99 }
98 100
99 // prodServicesInst is a Service exposing production faciliites. A unique 101 // prodServicesInst is a Service exposing production faciliites. A unique
100 // instance is bound to each each request. 102 // instance is bound to each each request.
101 type prodServicesInst struct { 103 type prodServicesInst struct {
102 sync.Mutex 104 sync.Mutex
103 105
104 » // req is the request that is being serviced. This is populated when the 106 » // aeCtx is an AppEngine Context initialized for the current request.
105 » // service is installed via ProdCoordinatorService. 107 » aeCtx context.Context
106 » req *http.Request
107 108
108 // gcfg is the cached global configuration. 109 // gcfg is the cached global configuration.
109 gcfg *config.Config 110 gcfg *config.Config
110 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig 111 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig
111 112
112 // archivalIndex is the atomically-manipulated archival index for the 113 // archivalIndex is the atomically-manipulated archival index for the
113 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces 114 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces
114 // from this service. 115 // from this service.
115 archivalIndex int32 116 archivalIndex int32
116 117
118 // pubSubClients is a map of Pub/Sub client singletons generated during this
119 // request. Each client is associated with its project, and will be
120 // initialized the first time it is requested by getPubSubClient.
121 //
122 // All clients will be closed on "close".
123 pubSubClients map[string]*gcps.Client
124
117 // signer is the signer instance to use. 125 // signer is the signer instance to use.
118 signer gaesigner.Signer 126 signer gaesigner.Signer
119 } 127 }
120 128
129 func (s *prodServicesInst) close(c context.Context) {
130 for proj, client := range s.pubSubClients {
Vadim Sh. 2016/12/19 19:59:45 nit: do it under the mutex for consistency.
dnj 2016/12/19 21:43:55 Done.
131 if err := client.Close(); err != nil {
132 log.Fields{
133 log.ErrorKey: err,
134 "project": proj,
135 }.Errorf(c, "Failed to close Pub/Sub client singleton.")
136 }
137 }
138 }
139
121 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { 140 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) {
122 s.Lock() 141 s.Lock()
123 defer s.Unlock() 142 defer s.Unlock()
124 143
125 // Load/cache the global config. 144 // Load/cache the global config.
126 if s.gcfg == nil { 145 if s.gcfg == nil {
127 var err error 146 var err error
128 s.gcfg, err = config.Load(c) 147 s.gcfg, err = config.Load(c)
129 if err != nil { 148 if err != nil {
130 return nil, err 149 return nil, err
(...skipping 195 matching lines...) Expand 10 before | Expand all | Expand 10 after
326 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic) 345 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic)
327 if err := fullTopic.Validate(); err != nil { 346 if err := fullTopic.Validate(); err != nil {
328 log.Fields{ 347 log.Fields{
329 log.ErrorKey: err, 348 log.ErrorKey: err,
330 "topic": fullTopic, 349 "topic": fullTopic,
331 }.Errorf(c, "Failed to validate archival topic.") 350 }.Errorf(c, "Failed to validate archival topic.")
332 return nil, errors.New("invalid archival topic") 351 return nil, errors.New("invalid archival topic")
333 } 352 }
334 project, topic := fullTopic.Split() 353 project, topic := fullTopic.Split()
335 354
355 // Get a Pub/Sub client (maybe re-use).
356 psClient, err := s.getPubSubClient(project)
357 if err != nil {
358 log.WithError(err).Errorf(c, "Failed to get Pub/Sub client.")
359 return nil, err
360 }
361
362 return &pubsubArchivalPublisher{
363 client: psClient,
364 topic: psClient.Topic(topic),
365 publishIndexFunc: s.nextArchiveIndex,
366 }, nil
367 }
368
369 func (s *prodServicesInst) getPubSubClient(proj string) (*gcps.Client, error) {
370 // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, s ince
371 // we don't want any caller RPC to be forwarded to the backend service.
372 c := metadata.NewContext(s.aeCtx, nil)
373
374 s.Lock()
375 defer s.Unlock()
376
377 client := s.pubSubClients[proj]
378 if client != nil {
379 return client, nil
380 }
381
336 // Create an authenticated Pub/Sub client. 382 // Create an authenticated Pub/Sub client.
337 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub sub.PublisherScopes...)) 383 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub sub.PublisherScopes...))
338 if err != nil { 384 if err != nil {
339 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia ls.") 385 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia ls.")
340 return nil, errors.New("failed to create Pub/Sub credentials") 386 return nil, errors.New("failed to create Pub/Sub credentials")
341 } 387 }
342 388
343 » // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, s ince 389 » psClient, err := gcps.NewClient(c, proj,
344 » // we don't want any caller RPC to be forwarded to the backend service.
345 » aeCtx := appengine.WithContext(metadata.NewContext(c, nil), s.req)
346
347 » psClient, err := gcps.NewClient(aeCtx, project,
348 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) 390 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)))
349 if err != nil { 391 if err != nil {
350 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") 392 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
351 return nil, errors.New("failed to create Pub/Sub client") 393 return nil, errors.New("failed to create Pub/Sub client")
352 } 394 }
353 395
354 » return &pubsubArchivalPublisher{ 396 » // Retain this client for the duration of our request.
355 » » client: psClient, 397 » if s.pubSubClients == nil {
356 » » topic: psClient.Topic(topic), 398 » » s.pubSubClients = make(map[string]*gcps.Client)
357 » » publishIndexFunc: s.nextArchiveIndex, 399 » }
358 » }, nil 400 » s.pubSubClients[proj] = psClient
401 » return psClient, nil
359 } 402 }
360 403
361 func (s *prodServicesInst) nextArchiveIndex() uint64 { 404 func (s *prodServicesInst) nextArchiveIndex() uint64 {
362 // We use a 32-bit value for this because it avoids atomic memory bounar y 405 // We use a 32-bit value for this because it avoids atomic memory bounar y
363 // issues. Furthermore, we constrain it to be positive, using a negative 406 // issues. Furthermore, we constrain it to be positive, using a negative
364 // value as a sentinel that the archival index has wrapped. 407 // value as a sentinel that the archival index has wrapped.
365 // 408 //
366 // This is reasonable, as it is very unlikely that a single request will issue 409 // This is reasonable, as it is very unlikely that a single request will issue
367 // more than MaxInt32 archival tasks. 410 // more than MaxInt32 archival tasks.
368 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 411 v := atomic.AddInt32(&s.archivalIndex, 1) - 1
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
502 545
503 // Sign index URL. 546 // Sign index URL.
504 if req.Index { 547 if req.Index {
505 if resp.Index, err = doSign(si.index); err != nil { 548 if resp.Index, err = doSign(si.index); err != nil {
506 return nil, errors.Annotate(err).InternalReason("failed to sign index URL").Err() 549 return nil, errors.Annotate(err).InternalReason("failed to sign index URL").Err()
507 } 550 }
508 } 551 }
509 552
510 return &resp, nil 553 return &resp, nil
511 } 554 }
OLDNEW
« no previous file with comments | « logdog/appengine/coordinator/archivalPublisher.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698