Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(54)

Unified Diff: client/cmd/logdog_butler/output_pubsub.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/cmd/logdog_butler/output_pubsub.go
diff --git a/client/cmd/logdog_butler/output_pubsub.go b/client/cmd/logdog_butler/output_pubsub.go
index 7f4f15a7552428bf3f73bbec72825df0c7824b7e..f1507d5cb44fffb882d4a91429da5236bb7a93fe 100644
--- a/client/cmd/logdog_butler/output_pubsub.go
+++ b/client/cmd/logdog_butler/output_pubsub.go
@@ -13,8 +13,6 @@ import (
"github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub"
"github.com/luci/luci-go/common/gcloud/gcps"
log "github.com/luci/luci-go/common/logging"
- "github.com/luci/luci-go/common/retry"
- "golang.org/x/net/context"
)
func init() {
@@ -59,47 +57,36 @@ func (f *pubsubOutputFactory) configOutput(a *butlerApplication) (output.Output,
"topic": f.topic,
"project": f.project,
})
- ctx, err := a.authenticatedContext(ctx, f.project)
+ client, err := a.authenticatedClient(ctx)
if err != nil {
return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub context: %s", err)
}
- ps := gcps.New(ctx)
+ ps := &gcps.Retry{
dnj (Google) 2016/01/21 04:36:24 Use new gcps.Retry and Pub/Sub instantiation.
+ PS: gcps.New(client, f.project),
+ C: func(err error, d time.Duration) {
+ log.Fields{
+ log.ErrorKey: err,
+ "delay": d,
+ }.Warningf(ctx, "Transient error during Pub/Sub operation; retrying...")
+ },
+ }
// Assert that our Topic exists.
- if err := f.assertTopicExists(ctx, ps); err != nil {
- log.WithError(err).Errorf(ctx, "Topic does not exist.")
+ exists, err := ps.TopicExists(ctx, f.topic)
+ if err != nil {
+ log.WithError(err).Errorf(ctx, "Failed to check for topic.")
return nil, err
}
+ if !exists {
+ log.Fields{
+ "topic": f.topic,
+ }.Errorf(ctx, "Pub/Sub Topic does not exist.")
+ return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topic)
+ }
return pubsub.New(ctx, pubsub.Config{
- PubSub: ps,
- Topic: f.topic,
- Compress: !f.noCompress,
+ Publisher: ps,
+ Topic: f.topic,
+ Compress: !f.noCompress,
}), nil
}
-
-func (f *pubsubOutputFactory) assertTopicExists(ctx context.Context, ps gcps.PubSub) error {
- log.Infof(ctx, "Checking that Pub/Sub topic exists.")
-
- exists := false
- err := retry.Retry(ctx, retry.TransientOnly(retry.Default()), func() error {
- e, err := ps.TopicExists(f.topic)
- if err != nil {
- return err
- }
- exists = e
- return nil
- }, func(err error, d time.Duration) {
- log.Fields{
- log.ErrorKey: err,
- "delay": d,
- }.Warningf(ctx, "Transient error during topic check; retrying.")
- })
- if err != nil {
- return fmt.Errorf("pubsub: failed to check for topic: %s", err)
- }
- if !exists {
- return fmt.Errorf("pubsub: topic [%s] does not exist", f.topic)
- }
- return nil
-}

Powered by Google App Engine
This is Rietveld 408576698