Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(611)

Side by Side Diff: logdog/appengine/coordinator/service.go

Issue 2583033002: LogDog: Re-use Pub/Sub gRPC clients. (Closed)
Patch Set: close under lock Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/appengine/coordinator/archivalPublisher.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "net/http"
9 "sync" 8 "sync"
10 "sync/atomic" 9 "sync/atomic"
11 "time" 10 "time"
12 11
13 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" 12 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner"
14 "github.com/luci/luci-go/common/clock" 13 "github.com/luci/luci-go/common/clock"
15 luciConfig "github.com/luci/luci-go/common/config" 14 luciConfig "github.com/luci/luci-go/common/config"
16 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
17 "github.com/luci/luci-go/common/gcloud/gs" 16 "github.com/luci/luci-go/common/gcloud/gs"
18 "github.com/luci/luci-go/common/gcloud/pubsub" 17 "github.com/luci/luci-go/common/gcloud/pubsub"
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
83 StorageForStream(context.Context, *LogStreamState) (Storage, error) 82 StorageForStream(context.Context, *LogStreamState) (Storage, error)
84 83
85 // ArchivalPublisher returns an ArchivalPublisher instance. 84 // ArchivalPublisher returns an ArchivalPublisher instance.
86 ArchivalPublisher(context.Context) (ArchivalPublisher, error) 85 ArchivalPublisher(context.Context) (ArchivalPublisher, error)
87 } 86 }
88 87
89 // ProdCoordinatorService is Middleware used by Coordinator services. 88 // ProdCoordinatorService is Middleware used by Coordinator services.
90 // 89 //
91 // It installs a production Services instance into the Context. 90 // It installs a production Services instance into the Context.
92 func ProdCoordinatorService(c *router.Context, next router.Handler) { 91 func ProdCoordinatorService(c *router.Context, next router.Handler) {
93 » c.Context = WithServices(c.Context, &prodServicesInst{ 92 » services := prodServicesInst{
94 » » req: c.Request, 93 » » aeCtx: appengine.WithContext(c.Context, c.Request),
95 » }) 94 » }
95 » defer services.close(c.Context)
96
97 » c.Context = WithServices(c.Context, &services)
96 next(c) 98 next(c)
97 } 99 }
98 100
99 // prodServicesInst is a Service exposing production faciliites. A unique 101 // prodServicesInst is a Service exposing production faciliites. A unique
100 // instance is bound to each each request. 102 // instance is bound to each each request.
101 type prodServicesInst struct { 103 type prodServicesInst struct {
102 sync.Mutex 104 sync.Mutex
103 105
104 » // req is the request that is being serviced. This is populated when the 106 » // aeCtx is an AppEngine Context initialized for the current request.
105 » // service is installed via ProdCoordinatorService. 107 » aeCtx context.Context
106 » req *http.Request
107 108
108 // gcfg is the cached global configuration. 109 // gcfg is the cached global configuration.
109 gcfg *config.Config 110 gcfg *config.Config
110 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig 111 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig
111 112
112 // archivalIndex is the atomically-manipulated archival index for the 113 // archivalIndex is the atomically-manipulated archival index for the
113 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces 114 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces
114 // from this service. 115 // from this service.
115 archivalIndex int32 116 archivalIndex int32
116 117
118 // pubSubClients is a map of Pub/Sub client singletons generated during this
119 // request. Each client is associated with its project, and will be
120 // initialized the first time it is requested by getPubSubClient.
121 //
122 // All clients will be closed on "close".
123 pubSubClients map[string]*gcps.Client
124
117 // signer is the signer instance to use. 125 // signer is the signer instance to use.
118 signer gaesigner.Signer 126 signer gaesigner.Signer
119 } 127 }
120 128
129 func (s *prodServicesInst) close(c context.Context) {
130 s.Lock()
131 defer s.Unlock()
132
133 for proj, client := range s.pubSubClients {
134 if err := client.Close(); err != nil {
135 log.Fields{
136 log.ErrorKey: err,
137 "project": proj,
138 }.Errorf(c, "Failed to close Pub/Sub client singleton.")
139 }
140 }
141 }
142
121 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { 143 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) {
122 s.Lock() 144 s.Lock()
123 defer s.Unlock() 145 defer s.Unlock()
124 146
125 // Load/cache the global config. 147 // Load/cache the global config.
126 if s.gcfg == nil { 148 if s.gcfg == nil {
127 var err error 149 var err error
128 s.gcfg, err = config.Load(c) 150 s.gcfg, err = config.Load(c)
129 if err != nil { 151 if err != nil {
130 return nil, err 152 return nil, err
(...skipping 195 matching lines...) Expand 10 before | Expand all | Expand 10 after
326 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic) 348 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic)
327 if err := fullTopic.Validate(); err != nil { 349 if err := fullTopic.Validate(); err != nil {
328 log.Fields{ 350 log.Fields{
329 log.ErrorKey: err, 351 log.ErrorKey: err,
330 "topic": fullTopic, 352 "topic": fullTopic,
331 }.Errorf(c, "Failed to validate archival topic.") 353 }.Errorf(c, "Failed to validate archival topic.")
332 return nil, errors.New("invalid archival topic") 354 return nil, errors.New("invalid archival topic")
333 } 355 }
334 project, topic := fullTopic.Split() 356 project, topic := fullTopic.Split()
335 357
358 // Get a Pub/Sub client (maybe re-use).
359 psClient, err := s.getPubSubClient(project)
360 if err != nil {
361 log.WithError(err).Errorf(c, "Failed to get Pub/Sub client.")
362 return nil, err
363 }
364
365 return &pubsubArchivalPublisher{
366 client: psClient,
367 topic: psClient.Topic(topic),
368 publishIndexFunc: s.nextArchiveIndex,
369 }, nil
370 }
371
372 func (s *prodServicesInst) getPubSubClient(proj string) (*gcps.Client, error) {
373 // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, s ince
374 // we don't want any caller RPC to be forwarded to the backend service.
375 c := metadata.NewContext(s.aeCtx, nil)
376
377 s.Lock()
378 defer s.Unlock()
379
380 client := s.pubSubClients[proj]
381 if client != nil {
382 return client, nil
383 }
384
336 // Create an authenticated Pub/Sub client. 385 // Create an authenticated Pub/Sub client.
337 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub sub.PublisherScopes...)) 386 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub sub.PublisherScopes...))
338 if err != nil { 387 if err != nil {
339 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia ls.") 388 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia ls.")
340 return nil, errors.New("failed to create Pub/Sub credentials") 389 return nil, errors.New("failed to create Pub/Sub credentials")
341 } 390 }
342 391
343 » // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, s ince 392 » psClient, err := gcps.NewClient(c, proj,
344 » // we don't want any caller RPC to be forwarded to the backend service.
345 » aeCtx := appengine.WithContext(metadata.NewContext(c, nil), s.req)
346
347 » psClient, err := gcps.NewClient(aeCtx, project,
348 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) 393 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)))
349 if err != nil { 394 if err != nil {
350 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") 395 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
351 return nil, errors.New("failed to create Pub/Sub client") 396 return nil, errors.New("failed to create Pub/Sub client")
352 } 397 }
353 398
354 » return &pubsubArchivalPublisher{ 399 » // Retain this client for the duration of our request.
355 » » client: psClient, 400 » if s.pubSubClients == nil {
356 » » topic: psClient.Topic(topic), 401 » » s.pubSubClients = make(map[string]*gcps.Client)
357 » » publishIndexFunc: s.nextArchiveIndex, 402 » }
358 » }, nil 403 » s.pubSubClients[proj] = psClient
404 » return psClient, nil
359 } 405 }
360 406
361 func (s *prodServicesInst) nextArchiveIndex() uint64 { 407 func (s *prodServicesInst) nextArchiveIndex() uint64 {
362 // We use a 32-bit value for this because it avoids atomic memory bounar y 408 // We use a 32-bit value for this because it avoids atomic memory bounar y
363 // issues. Furthermore, we constrain it to be positive, using a negative 409 // issues. Furthermore, we constrain it to be positive, using a negative
364 // value as a sentinel that the archival index has wrapped. 410 // value as a sentinel that the archival index has wrapped.
365 // 411 //
366 // This is reasonable, as it is very unlikely that a single request will issue 412 // This is reasonable, as it is very unlikely that a single request will issue
367 // more than MaxInt32 archival tasks. 413 // more than MaxInt32 archival tasks.
368 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 414 v := atomic.AddInt32(&s.archivalIndex, 1) - 1
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
502 548
503 // Sign index URL. 549 // Sign index URL.
504 if req.Index { 550 if req.Index {
505 if resp.Index, err = doSign(si.index); err != nil { 551 if resp.Index, err = doSign(si.index); err != nil {
506 return nil, errors.Annotate(err).InternalReason("failed to sign index URL").Err() 552 return nil, errors.Annotate(err).InternalReason("failed to sign index URL").Err()
507 } 553 }
508 } 554 }
509 555
510 return &resp, nil 556 return &resp, nil
511 } 557 }
OLDNEW
« no previous file with comments | « logdog/appengine/coordinator/archivalPublisher.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698