Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(820)

Unified Diff: logdog/appengine/coordinator/service.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: logdog/appengine/coordinator/service.go
diff --git a/logdog/appengine/coordinator/service.go b/logdog/appengine/coordinator/service.go
index 4d844f1130939ac1f218e40ebaa2806eaa5dc7f2..62b9508ff237444e3fdc9935f6628066e4d263e2 100644
--- a/logdog/appengine/coordinator/service.go
+++ b/logdog/appengine/coordinator/service.go
@@ -35,13 +35,14 @@ import (
"github.com/luci/luci-go/server/auth"
"github.com/luci/luci-go/server/router"
- gcps "cloud.google.com/go/pubsub"
+ vkit "cloud.google.com/go/pubsub/apiv1"
gcst "cloud.google.com/go/storage"
- "golang.org/x/net/context"
"google.golang.org/api/option"
"google.golang.org/appengine"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
+
+ "golang.org/x/net/context"
)
const (
@@ -125,13 +126,10 @@ type prodServicesInst struct {
// from this service.
archivalIndex int32
- // pubSubClients is a map of Pub/Sub client singletons generated during this
- // request. Each client is associated with its project, and will be
- // initialized the first time it is requested by getPubSubClient.
+ // pubSubClient is a Pub/Sub client generated during this request.
//
- // All clients will be closed on "close".
- pubSubClients map[string]*gcps.Client
-
+ // It will be closed on "close".
+ pubSubClient *vkit.PublisherClient
// signer is the signer instance to use.
signer gaesigner.Signer
}
@@ -140,16 +138,13 @@ func (s *prodServicesInst) close(c context.Context) {
s.Lock()
defer s.Unlock()
- for proj, client := range s.pubSubClients {
+ if client := s.pubSubClient; client != nil {
if err := client.Close(); err != nil {
- log.Fields{
- log.ErrorKey: err,
- "project": proj,
- }.Errorf(c, "Failed to close Pub/Sub client singleton.")
+ log.WithError(err).Errorf(c, "Failed to close Pub/Sub client singleton.")
}
+ s.pubSubClient = nil
}
}
-
func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) {
s.Lock()
defer s.Unlock()
@@ -363,58 +358,49 @@ func (s *prodServicesInst) ArchivalPublisher(c context.Context) (ArchivalPublish
}.Errorf(c, "Failed to validate archival topic.")
return nil, errors.New("invalid archival topic")
}
- project, topic := fullTopic.Split()
- // Get a Pub/Sub client (maybe re-use).
- psClient, err := s.getPubSubClient(project)
+ psClient, err := s.getPubSubClient()
if err != nil {
- log.WithError(err).Errorf(c, "Failed to get Pub/Sub client.")
return nil, err
}
// Create a Topic, and configure it to not bundle messages.
- psTopic := psClient.Topic(topic)
- pubsub.DisableTopicBundling(psTopic)
-
return &pubsubArchivalPublisher{
- client: psClient,
- topic: psTopic,
+ publisher: &pubsub.UnbufferedPublisher{
+ Topic: fullTopic,
+ Client: psClient,
+ },
publishIndexFunc: s.nextArchiveIndex,
}, nil
}
-func (s *prodServicesInst) getPubSubClient(proj string) (*gcps.Client, error) {
- // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, since
- // we don't want any caller RPC to be forwarded to the backend service.
- c := metadata.NewContext(s.aeCtx, nil)
-
+func (s *prodServicesInst) getPubSubClient() (*vkit.PublisherClient, error) {
s.Lock()
defer s.Unlock()
- client := s.pubSubClients[proj]
- if client != nil {
- return client, nil
+ if s.pubSubClient != nil {
+ return s.pubSubClient, nil
}
- // Create an authenticated Pub/Sub client.
+ // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, since
+ // we don't want any caller RPC to be forwarded to the backend service.
+ c := metadata.NewContext(s.aeCtx, nil)
+
+ // Create an authenticated unbuffered Pub/Sub Publisher.
creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pubsub.PublisherScopes...))
if err != nil {
log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentials.")
return nil, errors.New("failed to create Pub/Sub credentials")
}
- psClient, err := gcps.NewClient(c, proj,
+ psClient, err := vkit.NewPublisherClient(c,
option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)))
if err != nil {
log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
return nil, errors.New("failed to create Pub/Sub client")
}
- // Retain this client for the duration of our request.
- if s.pubSubClients == nil {
- s.pubSubClients = make(map[string]*gcps.Client)
- }
- s.pubSubClients[proj] = psClient
+ s.pubSubClient = psClient
return psClient, nil
}

Powered by Google App Engine
This is Rietveld 408576698