Chromium Code Reviews| Index: logdog/appengine/coordinator/service.go |
| diff --git a/logdog/appengine/coordinator/service.go b/logdog/appengine/coordinator/service.go |
| index 7b0385d12a9b14d9c2ea6bbc58cea42aa611897e..4b21761c9ed1802aaa08702a7ebbd610433010ef 100644 |
| --- a/logdog/appengine/coordinator/service.go |
| +++ b/logdog/appengine/coordinator/service.go |
| @@ -5,7 +5,6 @@ |
| package coordinator |
| import ( |
| - "net/http" |
| "sync" |
| "sync/atomic" |
| "time" |
| @@ -90,9 +89,12 @@ type Services interface { |
| // |
| // It installs a production Services instance into the Context. |
| func ProdCoordinatorService(c *router.Context, next router.Handler) { |
| - c.Context = WithServices(c.Context, &prodServicesInst{ |
| - req: c.Request, |
| - }) |
| + services := prodServicesInst{ |
| + aeCtx: appengine.WithContext(c.Context, c.Request), |
| + } |
| + defer services.close(c.Context) |
| + |
| + c.Context = WithServices(c.Context, &services) |
| next(c) |
| } |
| @@ -101,9 +103,8 @@ func ProdCoordinatorService(c *router.Context, next router.Handler) { |
| type prodServicesInst struct { |
| sync.Mutex |
| - // req is the request that is being serviced. This is populated when the |
| - // service is installed via ProdCoordinatorService. |
| - req *http.Request |
| + // aeCtx is an AppEngine Context initialized for the current request. |
| + aeCtx context.Context |
| // gcfg is the cached global configuration. |
| gcfg *config.Config |
| @@ -114,10 +115,28 @@ type prodServicesInst struct { |
| // from this service. |
| archivalIndex int32 |
| + // pubSubClients is a map of Pub/Sub client singletons generated during this |
| + // request. Each client is associated with its project, and will be |
| + // initialized the first time it is requested by getPubSubClient. |
| + // |
| + // All clients will be closed on "close". |
| + pubSubClients map[string]*gcps.Client |
| + |
| // signer is the signer instance to use. |
| signer gaesigner.Signer |
| } |
| +func (s *prodServicesInst) close(c context.Context) { |
| + for proj, client := range s.pubSubClients { |
|
Vadim Sh.
2016/12/19 19:59:45
nit: do it under the mutex for consistency.
dnj
2016/12/19 21:43:55
Done.
|
| + if err := client.Close(); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "project": proj, |
| + }.Errorf(c, "Failed to close Pub/Sub client singleton.") |
| + } |
| + } |
| +} |
| + |
| func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { |
| s.Lock() |
| defer s.Unlock() |
| @@ -333,6 +352,33 @@ func (s *prodServicesInst) ArchivalPublisher(c context.Context) (ArchivalPublish |
| } |
| project, topic := fullTopic.Split() |
| + // Get a Pub/Sub client (maybe re-use). |
| + psClient, err := s.getPubSubClient(project) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to get Pub/Sub client.") |
| + return nil, err |
| + } |
| + |
| + return &pubsubArchivalPublisher{ |
| + client: psClient, |
| + topic: psClient.Topic(topic), |
| + publishIndexFunc: s.nextArchiveIndex, |
| + }, nil |
| +} |
| + |
| +func (s *prodServicesInst) getPubSubClient(proj string) (*gcps.Client, error) { |
| + // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, since |
| + // we don't want any caller RPC to be forwarded to the backend service. |
| + c := metadata.NewContext(s.aeCtx, nil) |
| + |
| + s.Lock() |
| + defer s.Unlock() |
| + |
| + client := s.pubSubClients[proj] |
| + if client != nil { |
| + return client, nil |
| + } |
| + |
| // Create an authenticated Pub/Sub client. |
| creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pubsub.PublisherScopes...)) |
| if err != nil { |
| @@ -340,22 +386,19 @@ func (s *prodServicesInst) ArchivalPublisher(c context.Context) (ArchivalPublish |
| return nil, errors.New("failed to create Pub/Sub credentials") |
| } |
| - // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, since |
| - // we don't want any caller RPC to be forwarded to the backend service. |
| - aeCtx := appengine.WithContext(metadata.NewContext(c, nil), s.req) |
| - |
| - psClient, err := gcps.NewClient(aeCtx, project, |
| + psClient, err := gcps.NewClient(c, proj, |
| option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) |
| if err != nil { |
| log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
| return nil, errors.New("failed to create Pub/Sub client") |
| } |
| - return &pubsubArchivalPublisher{ |
| - client: psClient, |
| - topic: psClient.Topic(topic), |
| - publishIndexFunc: s.nextArchiveIndex, |
| - }, nil |
| + // Retain this client for the duration of our request. |
| + if s.pubSubClients == nil { |
| + s.pubSubClients = make(map[string]*gcps.Client) |
| + } |
| + s.pubSubClients[proj] = psClient |
| + return psClient, nil |
| } |
| func (s *prodServicesInst) nextArchiveIndex() uint64 { |