| Index: logdog/appengine/coordinator/service.go
|
| diff --git a/logdog/appengine/coordinator/service.go b/logdog/appengine/coordinator/service.go
|
| index 7b0385d12a9b14d9c2ea6bbc58cea42aa611897e..4a451e654eab64bb4be044ef7c373558988736f8 100644
|
| --- a/logdog/appengine/coordinator/service.go
|
| +++ b/logdog/appengine/coordinator/service.go
|
| @@ -5,7 +5,6 @@
|
| package coordinator
|
|
|
| import (
|
| - "net/http"
|
| "sync"
|
| "sync/atomic"
|
| "time"
|
| @@ -90,9 +89,12 @@ type Services interface {
|
| //
|
| // It installs a production Services instance into the Context.
|
| func ProdCoordinatorService(c *router.Context, next router.Handler) {
|
| - c.Context = WithServices(c.Context, &prodServicesInst{
|
| - req: c.Request,
|
| - })
|
| + services := prodServicesInst{
|
| + aeCtx: appengine.WithContext(c.Context, c.Request),
|
| + }
|
| + defer services.close(c.Context)
|
| +
|
| + c.Context = WithServices(c.Context, &services)
|
| next(c)
|
| }
|
|
|
| @@ -101,9 +103,8 @@ func ProdCoordinatorService(c *router.Context, next router.Handler) {
|
| type prodServicesInst struct {
|
| sync.Mutex
|
|
|
| - // req is the request that is being serviced. This is populated when the
|
| - // service is installed via ProdCoordinatorService.
|
| - req *http.Request
|
| + // aeCtx is an AppEngine Context initialized for the current request.
|
| + aeCtx context.Context
|
|
|
| // gcfg is the cached global configuration.
|
| gcfg *config.Config
|
| @@ -114,10 +115,31 @@ type prodServicesInst struct {
|
| // from this service.
|
| archivalIndex int32
|
|
|
| + // pubSubClients is a map of Pub/Sub client singletons generated during this
|
| + // request. Each client is associated with its project, and will be
|
| + // initialized the first time it is requested by getPubSubClient.
|
| + //
|
| + // All clients will be closed on "close".
|
| + pubSubClients map[string]*gcps.Client
|
| +
|
| // signer is the signer instance to use.
|
| signer gaesigner.Signer
|
| }
|
|
|
| +func (s *prodServicesInst) close(c context.Context) {
|
| + s.Lock()
|
| + defer s.Unlock()
|
| +
|
| + for proj, client := range s.pubSubClients {
|
| + if err := client.Close(); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "project": proj,
|
| + }.Errorf(c, "Failed to close Pub/Sub client singleton.")
|
| + }
|
| + }
|
| +}
|
| +
|
| func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) {
|
| s.Lock()
|
| defer s.Unlock()
|
| @@ -333,6 +355,33 @@ func (s *prodServicesInst) ArchivalPublisher(c context.Context) (ArchivalPublish
|
| }
|
| project, topic := fullTopic.Split()
|
|
|
| + // Get a Pub/Sub client (maybe re-use).
|
| + psClient, err := s.getPubSubClient(project)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to get Pub/Sub client.")
|
| + return nil, err
|
| + }
|
| +
|
| + return &pubsubArchivalPublisher{
|
| + client: psClient,
|
| + topic: psClient.Topic(topic),
|
| + publishIndexFunc: s.nextArchiveIndex,
|
| + }, nil
|
| +}
|
| +
|
| +func (s *prodServicesInst) getPubSubClient(proj string) (*gcps.Client, error) {
|
| + // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, since
|
| + // we don't want any caller RPC to be forwarded to the backend service.
|
| + c := metadata.NewContext(s.aeCtx, nil)
|
| +
|
| + s.Lock()
|
| + defer s.Unlock()
|
| +
|
| + client := s.pubSubClients[proj]
|
| + if client != nil {
|
| + return client, nil
|
| + }
|
| +
|
| // Create an authenticated Pub/Sub client.
|
| creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pubsub.PublisherScopes...))
|
| if err != nil {
|
| @@ -340,22 +389,19 @@ func (s *prodServicesInst) ArchivalPublisher(c context.Context) (ArchivalPublish
|
| return nil, errors.New("failed to create Pub/Sub credentials")
|
| }
|
|
|
| - // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, since
|
| - // we don't want any caller RPC to be forwarded to the backend service.
|
| - aeCtx := appengine.WithContext(metadata.NewContext(c, nil), s.req)
|
| -
|
| - psClient, err := gcps.NewClient(aeCtx, project,
|
| + psClient, err := gcps.NewClient(c, proj,
|
| option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)))
|
| if err != nil {
|
| log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
|
| return nil, errors.New("failed to create Pub/Sub client")
|
| }
|
|
|
| - return &pubsubArchivalPublisher{
|
| - client: psClient,
|
| - topic: psClient.Topic(topic),
|
| - publishIndexFunc: s.nextArchiveIndex,
|
| - }, nil
|
| + // Retain this client for the duration of our request.
|
| + if s.pubSubClients == nil {
|
| + s.pubSubClients = make(map[string]*gcps.Client)
|
| + }
|
| + s.pubSubClients[proj] = psClient
|
| + return psClient, nil
|
| }
|
|
|
| func (s *prodServicesInst) nextArchiveIndex() uint64 {
|
|
|