| Index: client/cmd/logdog_butler/output_pubsub.go
|
| diff --git a/client/cmd/logdog_butler/output_pubsub.go b/client/cmd/logdog_butler/output_pubsub.go
|
| index 5b2b765c83a36eca6f99a2d17135a13e9fe3ad2c..1c613317c4a0aa6498b6d6489776abb4db12ed87 100644
|
| --- a/client/cmd/logdog_butler/output_pubsub.go
|
| +++ b/client/cmd/logdog_butler/output_pubsub.go
|
| @@ -9,10 +9,14 @@ import (
|
| "time"
|
|
|
| "github.com/luci/luci-go/client/internal/logdog/butler/output"
|
| - "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub"
|
| + out "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub"
|
| "github.com/luci/luci-go/common/flag/multiflag"
|
| ps "github.com/luci/luci-go/common/gcloud/pubsub"
|
| log "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/retry"
|
| + "golang.org/x/net/context"
|
| + "google.golang.org/cloud"
|
| + "google.golang.org/cloud/pubsub"
|
| )
|
|
|
| func init() {
|
| @@ -55,22 +59,22 @@ func (f *pubsubOutputFactory) configOutput(a *application) (output.Output, error
|
| ctx := log.SetFields(a.ncCtx, log.Fields{
|
| "topic": f.topic,
|
| })
|
| - client, err := a.authenticatedClient(ctx)
|
| + ts, err := a.tokenSource(ctx)
|
| if err != nil {
|
| - return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub context: %s", err)
|
| + return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub token source: %s", err)
|
| }
|
| - psConn := &ps.Retry{
|
| - Connection: ps.NewConnection(client),
|
| - Callback: func(err error, d time.Duration) {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "delay": d,
|
| - }.Warningf(ctx, "Transient error during Pub/Sub operation; retrying...")
|
| - },
|
| +
|
| + // Split topic into Pub/Sub project and name.
|
| + project, name := f.topic.Split()
|
| +
|
| + psClient, err := pubsub.NewClient(ctx, project, cloud.WithTokenSource(ts))
|
| + if err != nil {
|
| + return nil, fmt.Errorf("pubsub: failed to get Pub/Sub client: %s", err)
|
| }
|
| + psTopic := psClient.Topic(name)
|
|
|
| // Assert that our Topic exists.
|
| - exists, err := psConn.TopicExists(ctx, f.topic)
|
| + exists, err := retryTopicExists(ctx, psTopic)
|
| if err != nil {
|
| log.WithError(err).Errorf(ctx, "Failed to check for topic.")
|
| return nil, err
|
| @@ -82,10 +86,23 @@ func (f *pubsubOutputFactory) configOutput(a *application) (output.Output, error
|
| return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topic)
|
| }
|
|
|
| - return pubsub.New(ctx, pubsub.Config{
|
| - Publisher: psConn,
|
| - Topic: f.topic,
|
| - Compress: !f.noCompress,
|
| - Track: f.track,
|
| + return out.New(ctx, out.Config{
|
| + Topic: psTopic,
|
| + Compress: !f.noCompress,
|
| + Track: f.track,
|
| }), nil
|
| }
|
| +
|
| +func retryTopicExists(ctx context.Context, t *pubsub.TopicHandle) (bool, error) {
|
| + var exists bool
|
| + err := retry.Retry(ctx, retry.Default, func() (err error) {
|
| + exists, err = t.Exists(ctx)
|
| + return
|
| + }, func(err error, d time.Duration) {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "delay": d,
|
| + }.Errorf(ctx, "Failed to check if topic exists; retrying...")
|
| + })
|
| + return exists, err
|
| +}
|
|
|