Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(169)

Unified Diff: logdog/appengine/coordinator/service.go

Issue 2583033002: LogDog: Re-use Pub/Sub gRPC clients. (Closed)
Patch Set: close under lock Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « logdog/appengine/coordinator/archivalPublisher.go ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: logdog/appengine/coordinator/service.go
diff --git a/logdog/appengine/coordinator/service.go b/logdog/appengine/coordinator/service.go
index 7b0385d12a9b14d9c2ea6bbc58cea42aa611897e..4a451e654eab64bb4be044ef7c373558988736f8 100644
--- a/logdog/appengine/coordinator/service.go
+++ b/logdog/appengine/coordinator/service.go
@@ -5,7 +5,6 @@
package coordinator
import (
- "net/http"
"sync"
"sync/atomic"
"time"
@@ -90,9 +89,12 @@ type Services interface {
//
// It installs a production Services instance into the Context.
func ProdCoordinatorService(c *router.Context, next router.Handler) {
- c.Context = WithServices(c.Context, &prodServicesInst{
- req: c.Request,
- })
+ services := prodServicesInst{
+ aeCtx: appengine.WithContext(c.Context, c.Request),
+ }
+ defer services.close(c.Context)
+
+ c.Context = WithServices(c.Context, &services)
next(c)
}
@@ -101,9 +103,8 @@ func ProdCoordinatorService(c *router.Context, next router.Handler) {
type prodServicesInst struct {
sync.Mutex
- // req is the request that is being serviced. This is populated when the
- // service is installed via ProdCoordinatorService.
- req *http.Request
+ // aeCtx is an AppEngine Context initialized for the current request.
+ aeCtx context.Context
// gcfg is the cached global configuration.
gcfg *config.Config
@@ -114,10 +115,31 @@ type prodServicesInst struct {
// from this service.
archivalIndex int32
+ // pubSubClients is a map of Pub/Sub client singletons generated during this
+ // request. Each client is associated with its project, and will be
+ // initialized the first time it is requested by getPubSubClient.
+ //
+ // All clients will be closed on "close".
+ pubSubClients map[string]*gcps.Client
+
// signer is the signer instance to use.
signer gaesigner.Signer
}
+func (s *prodServicesInst) close(c context.Context) {
+ s.Lock()
+ defer s.Unlock()
+
+ for proj, client := range s.pubSubClients {
+ if err := client.Close(); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "project": proj,
+ }.Errorf(c, "Failed to close Pub/Sub client singleton.")
+ }
+ }
+}
+
func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) {
s.Lock()
defer s.Unlock()
@@ -333,6 +355,33 @@ func (s *prodServicesInst) ArchivalPublisher(c context.Context) (ArchivalPublish
}
project, topic := fullTopic.Split()
+ // Get a Pub/Sub client (maybe re-use).
+ psClient, err := s.getPubSubClient(project)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to get Pub/Sub client.")
+ return nil, err
+ }
+
+ return &pubsubArchivalPublisher{
+ client: psClient,
+ topic: psClient.Topic(topic),
+ publishIndexFunc: s.nextArchiveIndex,
+ }, nil
+}
+
+func (s *prodServicesInst) getPubSubClient(proj string) (*gcps.Client, error) {
+ // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, since
+ // we don't want any caller RPC to be forwarded to the backend service.
+ c := metadata.NewContext(s.aeCtx, nil)
+
+ s.Lock()
+ defer s.Unlock()
+
+ client := s.pubSubClients[proj]
+ if client != nil {
+ return client, nil
+ }
+
// Create an authenticated Pub/Sub client.
creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pubsub.PublisherScopes...))
if err != nil {
@@ -340,22 +389,19 @@ func (s *prodServicesInst) ArchivalPublisher(c context.Context) (ArchivalPublish
return nil, errors.New("failed to create Pub/Sub credentials")
}
- // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, since
- // we don't want any caller RPC to be forwarded to the backend service.
- aeCtx := appengine.WithContext(metadata.NewContext(c, nil), s.req)
-
- psClient, err := gcps.NewClient(aeCtx, project,
+ psClient, err := gcps.NewClient(c, proj,
option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)))
if err != nil {
log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
return nil, errors.New("failed to create Pub/Sub client")
}
- return &pubsubArchivalPublisher{
- client: psClient,
- topic: psClient.Topic(topic),
- publishIndexFunc: s.nextArchiveIndex,
- }, nil
+ // Retain this client for the duration of our request.
+ if s.pubSubClients == nil {
+ s.pubSubClients = make(map[string]*gcps.Client)
+ }
+ s.pubSubClients[proj] = psClient
+ return psClient, nil
}
func (s *prodServicesInst) nextArchiveIndex() uint64 {
« no previous file with comments | « logdog/appengine/coordinator/archivalPublisher.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698