Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(195)

Unified Diff: client/cmd/logdog_butler/output_pubsub.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/cmd/logdog_butler/main.go ('k') | client/internal/logdog/butler/output/pubsub/pubsubOutput.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/cmd/logdog_butler/output_pubsub.go
diff --git a/client/cmd/logdog_butler/output_pubsub.go b/client/cmd/logdog_butler/output_pubsub.go
index 5b2b765c83a36eca6f99a2d17135a13e9fe3ad2c..1c613317c4a0aa6498b6d6489776abb4db12ed87 100644
--- a/client/cmd/logdog_butler/output_pubsub.go
+++ b/client/cmd/logdog_butler/output_pubsub.go
@@ -9,10 +9,14 @@ import (
"time"
"github.com/luci/luci-go/client/internal/logdog/butler/output"
- "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub"
+ out "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub"
"github.com/luci/luci-go/common/flag/multiflag"
ps "github.com/luci/luci-go/common/gcloud/pubsub"
log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/retry"
+ "golang.org/x/net/context"
+ "google.golang.org/cloud"
+ "google.golang.org/cloud/pubsub"
)
func init() {
@@ -55,22 +59,22 @@ func (f *pubsubOutputFactory) configOutput(a *application) (output.Output, error
ctx := log.SetFields(a.ncCtx, log.Fields{
"topic": f.topic,
})
- client, err := a.authenticatedClient(ctx)
+ ts, err := a.tokenSource(ctx)
if err != nil {
- return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub context: %s", err)
+ return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub token source: %s", err)
}
- psConn := &ps.Retry{
- Connection: ps.NewConnection(client),
- Callback: func(err error, d time.Duration) {
- log.Fields{
- log.ErrorKey: err,
- "delay": d,
- }.Warningf(ctx, "Transient error during Pub/Sub operation; retrying...")
- },
+
+ // Split topic into Pub/Sub project and name.
+ project, name := f.topic.Split()
+
+ psClient, err := pubsub.NewClient(ctx, project, cloud.WithTokenSource(ts))
+ if err != nil {
+ return nil, fmt.Errorf("pubsub: failed to get Pub/Sub client: %s", err)
}
+ psTopic := psClient.Topic(name)
// Assert that our Topic exists.
- exists, err := psConn.TopicExists(ctx, f.topic)
+ exists, err := retryTopicExists(ctx, psTopic)
if err != nil {
log.WithError(err).Errorf(ctx, "Failed to check for topic.")
return nil, err
@@ -82,10 +86,23 @@ func (f *pubsubOutputFactory) configOutput(a *application) (output.Output, error
return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topic)
}
- return pubsub.New(ctx, pubsub.Config{
- Publisher: psConn,
- Topic: f.topic,
- Compress: !f.noCompress,
- Track: f.track,
+ return out.New(ctx, out.Config{
+ Topic: psTopic,
+ Compress: !f.noCompress,
+ Track: f.track,
}), nil
}
+
+func retryTopicExists(ctx context.Context, t *pubsub.TopicHandle) (bool, error) {
+ var exists bool
+ err := retry.Retry(ctx, retry.Default, func() (err error) {
+ exists, err = t.Exists(ctx)
+ return
+ }, func(err error, d time.Duration) {
+ log.Fields{
+ log.ErrorKey: err,
+ "delay": d,
+ }.Errorf(ctx, "Failed to check if topic exists; retrying...")
+ })
+ return exists, err
+}
« no previous file with comments | « client/cmd/logdog_butler/main.go ('k') | client/internal/logdog/butler/output/pubsub/pubsubOutput.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698