Chromium Code Reviews| Index: client/cmd/logdog_butler/output_pubsub.go |
| diff --git a/client/cmd/logdog_butler/output_pubsub.go b/client/cmd/logdog_butler/output_pubsub.go |
| index 7f4f15a7552428bf3f73bbec72825df0c7824b7e..f1507d5cb44fffb882d4a91429da5236bb7a93fe 100644 |
| --- a/client/cmd/logdog_butler/output_pubsub.go |
| +++ b/client/cmd/logdog_butler/output_pubsub.go |
| @@ -13,8 +13,6 @@ import ( |
| "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub" |
| "github.com/luci/luci-go/common/gcloud/gcps" |
| log "github.com/luci/luci-go/common/logging" |
| - "github.com/luci/luci-go/common/retry" |
| - "golang.org/x/net/context" |
| ) |
| func init() { |
| @@ -59,47 +57,36 @@ func (f *pubsubOutputFactory) configOutput(a *butlerApplication) (output.Output, |
| "topic": f.topic, |
| "project": f.project, |
| }) |
| - ctx, err := a.authenticatedContext(ctx, f.project) |
| + client, err := a.authenticatedClient(ctx) |
| if err != nil { |
| return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub context: %s", err) |
| } |
| - ps := gcps.New(ctx) |
| + ps := &gcps.Retry{ |
|
dnj (Google)
2016/01/21 04:36:24
Use new gcps.Retry and Pub/Sub instantiation.
|
| + PS: gcps.New(client, f.project), |
| + C: func(err error, d time.Duration) { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "delay": d, |
| + }.Warningf(ctx, "Transient error during Pub/Sub operation; retrying...") |
| + }, |
| + } |
| // Assert that our Topic exists. |
| - if err := f.assertTopicExists(ctx, ps); err != nil { |
| - log.WithError(err).Errorf(ctx, "Topic does not exist.") |
| + exists, err := ps.TopicExists(ctx, f.topic) |
| + if err != nil { |
| + log.WithError(err).Errorf(ctx, "Failed to check for topic.") |
| return nil, err |
| } |
| + if !exists { |
| + log.Fields{ |
| + "topic": f.topic, |
| + }.Errorf(ctx, "Pub/Sub Topic does not exist.") |
| + return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topic) |
| + } |
| return pubsub.New(ctx, pubsub.Config{ |
| - PubSub: ps, |
| - Topic: f.topic, |
| - Compress: !f.noCompress, |
| + Publisher: ps, |
| + Topic: f.topic, |
| + Compress: !f.noCompress, |
| }), nil |
| } |
| - |
| -func (f *pubsubOutputFactory) assertTopicExists(ctx context.Context, ps gcps.PubSub) error { |
| - log.Infof(ctx, "Checking that Pub/Sub topic exists.") |
| - |
| - exists := false |
| - err := retry.Retry(ctx, retry.TransientOnly(retry.Default()), func() error { |
| - e, err := ps.TopicExists(f.topic) |
| - if err != nil { |
| - return err |
| - } |
| - exists = e |
| - return nil |
| - }, func(err error, d time.Duration) { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "delay": d, |
| - }.Warningf(ctx, "Transient error during topic check; retrying.") |
| - }) |
| - if err != nil { |
| - return fmt.Errorf("pubsub: failed to check for topic: %s", err) |
| - } |
| - if !exists { |
| - return fmt.Errorf("pubsub: topic [%s] does not exist", f.topic) |
| - } |
| - return nil |
| -} |