| Index: logdog/appengine/coordinator/service.go
|
| diff --git a/logdog/appengine/coordinator/service.go b/logdog/appengine/coordinator/service.go
|
| index 4d844f1130939ac1f218e40ebaa2806eaa5dc7f2..62b9508ff237444e3fdc9935f6628066e4d263e2 100644
|
| --- a/logdog/appengine/coordinator/service.go
|
| +++ b/logdog/appengine/coordinator/service.go
|
| @@ -35,13 +35,14 @@ import (
|
| "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/router"
|
|
|
| - gcps "cloud.google.com/go/pubsub"
|
| + vkit "cloud.google.com/go/pubsub/apiv1"
|
| gcst "cloud.google.com/go/storage"
|
| - "golang.org/x/net/context"
|
| "google.golang.org/api/option"
|
| "google.golang.org/appengine"
|
| "google.golang.org/grpc"
|
| "google.golang.org/grpc/metadata"
|
| +
|
| + "golang.org/x/net/context"
|
| )
|
|
|
| const (
|
| @@ -125,13 +126,10 @@ type prodServicesInst struct {
|
| // from this service.
|
| archivalIndex int32
|
|
|
| - // pubSubClients is a map of Pub/Sub client singletons generated during this
|
| - // request. Each client is associated with its project, and will be
|
| - // initialized the first time it is requested by getPubSubClient.
|
| + // pubSubClient is a Pub/Sub client generated during this request.
|
| //
|
| - // All clients will be closed on "close".
|
| - pubSubClients map[string]*gcps.Client
|
| -
|
| + // It will be closed on "close".
|
| + pubSubClient *vkit.PublisherClient
|
| // signer is the signer instance to use.
|
| signer gaesigner.Signer
|
| }
|
| @@ -140,16 +138,13 @@ func (s *prodServicesInst) close(c context.Context) {
|
| s.Lock()
|
| defer s.Unlock()
|
|
|
| - for proj, client := range s.pubSubClients {
|
| + if client := s.pubSubClient; client != nil {
|
| if err := client.Close(); err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "project": proj,
|
| - }.Errorf(c, "Failed to close Pub/Sub client singleton.")
|
| + log.WithError(err).Errorf(c, "Failed to close Pub/Sub client singleton.")
|
| }
|
| + s.pubSubClient = nil
|
| }
|
| }
|
| -
|
| func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) {
|
| s.Lock()
|
| defer s.Unlock()
|
| @@ -363,58 +358,49 @@ func (s *prodServicesInst) ArchivalPublisher(c context.Context) (ArchivalPublish
|
| }.Errorf(c, "Failed to validate archival topic.")
|
| return nil, errors.New("invalid archival topic")
|
| }
|
| - project, topic := fullTopic.Split()
|
|
|
| - // Get a Pub/Sub client (maybe re-use).
|
| - psClient, err := s.getPubSubClient(project)
|
| + psClient, err := s.getPubSubClient()
|
| if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to get Pub/Sub client.")
|
| return nil, err
|
| }
|
|
|
| // Create a Topic, and configure it to not bundle messages.
|
| - psTopic := psClient.Topic(topic)
|
| - pubsub.DisableTopicBundling(psTopic)
|
| -
|
| return &pubsubArchivalPublisher{
|
| - client: psClient,
|
| - topic: psTopic,
|
| + publisher: &pubsub.UnbufferedPublisher{
|
| + Topic: fullTopic,
|
| + Client: psClient,
|
| + },
|
| publishIndexFunc: s.nextArchiveIndex,
|
| }, nil
|
| }
|
|
|
| -func (s *prodServicesInst) getPubSubClient(proj string) (*gcps.Client, error) {
|
| - // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, since
|
| - // we don't want any caller RPC to be forwarded to the backend service.
|
| - c := metadata.NewContext(s.aeCtx, nil)
|
| -
|
| +func (s *prodServicesInst) getPubSubClient() (*vkit.PublisherClient, error) {
|
| s.Lock()
|
| defer s.Unlock()
|
|
|
| - client := s.pubSubClients[proj]
|
| - if client != nil {
|
| - return client, nil
|
| + if s.pubSubClient != nil {
|
| + return s.pubSubClient, nil
|
| }
|
|
|
| - // Create an authenticated Pub/Sub client.
|
| + // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, since
|
| + // we don't want any caller RPC to be forwarded to the backend service.
|
| + c := metadata.NewContext(s.aeCtx, nil)
|
| +
|
| + // Create an authenticated unbuffered Pub/Sub Publisher.
|
| creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pubsub.PublisherScopes...))
|
| if err != nil {
|
| log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentials.")
|
| return nil, errors.New("failed to create Pub/Sub credentials")
|
| }
|
|
|
| - psClient, err := gcps.NewClient(c, proj,
|
| + psClient, err := vkit.NewPublisherClient(c,
|
| option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)))
|
| if err != nil {
|
| log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
|
| return nil, errors.New("failed to create Pub/Sub client")
|
| }
|
|
|
| - // Retain this client for the duration of our request.
|
| - if s.pubSubClients == nil {
|
| - s.pubSubClients = make(map[string]*gcps.Client)
|
| - }
|
| - s.pubSubClients[proj] = psClient
|
| + s.pubSubClient = psClient
|
| return psClient, nil
|
| }
|
|
|
|
|