Chromium Code Reviews| Index: client/internal/logdog/butler/output/pubsub/pubsubOutput.go |
| diff --git a/client/internal/logdog/butler/output/pubsub/pubsubOutput.go b/client/internal/logdog/butler/output/pubsub/pubsubOutput.go |
| index a2a05c23422fbd6cb74fe00781a8bd73075e6f0c..9b9c691a021d6b1ba0ebfd88207ea8f52f1f7145 100644 |
| --- a/client/internal/logdog/butler/output/pubsub/pubsubOutput.go |
| +++ b/client/internal/logdog/butler/output/pubsub/pubsubOutput.go |
| @@ -9,33 +9,36 @@ import ( |
| "errors" |
| "fmt" |
| "sync" |
| + "time" |
| "github.com/luci/luci-go/client/internal/logdog/butler/output" |
| - "github.com/luci/luci-go/common/gcloud/pubsub" |
| + gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| "github.com/luci/luci-go/common/logdog/butlerproto" |
| log "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/proto/logdog/logpb" |
| "github.com/luci/luci-go/common/recordio" |
| + "github.com/luci/luci-go/common/retry" |
| "golang.org/x/net/context" |
| + "google.golang.org/cloud/pubsub" |
| ) |
| -// Publisher is an interface for something that publishes Pub/Sub messages. |
| +// Topic is an interface for a Pub/Sub topic. |
| // |
| -// pubsub.Connection implements this interface. |
| -type Publisher interface { |
| +// pubsub.TopicHandle implements Topic. |
| +type Topic interface { |
| + // Name returns the name of the topic. |
| + Name() string |
| + |
| // Publish mirrors the pubsub.Connection Publish method. |
| - Publish(context.Context, pubsub.Topic, ...*pubsub.Message) ([]string, error) |
| + Publish(context.Context, ...*pubsub.Message) ([]string, error) |
| } |
| -var _ Publisher = pubsub.Connection(nil) |
| +var _ Topic = (*pubsub.TopicHandle)(nil) |
| // Config is a configuration structure for Pub/Sub output. |
| type Config struct { |
| - // Publisher is the Pub/Sub instance to use. |
| - Publisher Publisher |
| - |
| - // Topic is the name of the Cloud Pub/Sub topic to publish to. |
| - Topic pubsub.Topic |
| + // Topic is the Pub/Sub topic to publish to. |
| + Topic Topic |
| // Compress, if true, enables zlib compression. |
| Compress bool |
| @@ -45,17 +48,6 @@ type Config struct { |
| Track bool |
| } |
| -// Validate validates the Output configuration. |
| -func (c *Config) Validate() error { |
| - if c.Publisher == nil { |
| - return errors.New("pubsub: no pub/sub instance configured") |
| - } |
| - if err := c.Topic.Validate(); err != nil { |
| - return fmt.Errorf("pubsub: invalid Topic [%s]: %s", c.Topic, err) |
| - } |
| - return nil |
| -} |
| - |
| // buffer |
| type buffer struct { |
| bytes.Buffer // Output buffer for published message data. |
| @@ -94,7 +86,7 @@ func New(ctx context.Context, c Config) output.Output { |
| } |
| func (o *pubSubOutput) String() string { |
| - return fmt.Sprintf("pubsub(%s)", o.Topic) |
| + return fmt.Sprintf("pubsub(%s)", o.Topic.Name()) |
| } |
| func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error { |
| @@ -113,10 +105,10 @@ func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error { |
| st.F.Errors++ |
| return err |
| } |
| - if len(message.Data) > pubsub.MaxPublishSize { |
| + if len(message.Data) > gcps.MaxPublishSize { |
| log.Fields{ |
| "messageSize": len(message.Data), |
| - "maxPubSubSize": pubsub.MaxPublishSize, |
| + "maxPubSubSize": gcps.MaxPublishSize, |
| }.Errorf(o, "Constructed message exceeds Pub/Sub maximum size.") |
| return errors.New("pubsub: bundle contents violate Pub/Sub size limit") |
| } |
| @@ -136,7 +128,7 @@ func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error { |
| } |
| func (*pubSubOutput) MaxSize() int { |
| - return pubsub.MaxPublishSize / 2 |
| + return gcps.MaxPublishSize / 2 |
| } |
| func (o *pubSubOutput) Stats() output.Stats { |
| @@ -191,16 +183,25 @@ func (o *pubSubOutput) buildMessage(buf *buffer, bundle *logpb.ButlerLogBundle) |
| // publishMessages handles an individual publish request. It will indefinitely |
| // retry transient errors until the publish succeeds. |
| func (o *pubSubOutput) publishMessages(messages []*pubsub.Message) error { |
| - messageIDs, err := o.Publisher.Publish(o, o.Topic, messages...) |
| - if err != nil { |
| - return err |
| - } |
| + var messageIDs []string |
| + err := retry.Retry(o, retry.TransientOnly(indefiniteRetry), func() (err error) { |
| + messageIDs, err = o.Topic.Publish(o, messages...) |
|
Vadim Sh.
2016/03/29 19:02:37
note: this can in theory produce duplicates (if er
dnj
2016/03/29 19:27:20
That's a good point. They are (b/c they can also f
|
| + return |
| + }, func(err error, d time.Duration) { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "delay": d, |
| + "count": len(messages), |
| + }.Warningf(o, "TRANSIENT error publishing messages; retrying...") |
| + }) |
| if err != nil { |
| - log.Errorf(log.SetError(o, err), "Failed to send PubSub message.") |
| + log.WithError(err).Errorf(o, "Failed to send PubSub message.") |
| return err |
| } |
| - log.Debugf(log.SetField(o, "messageIds", messageIDs), "Published messages.") |
| + log.Fields{ |
| + "messageIds": messageIDs, |
| + }.Debugf(o, "Published messages.") |
| return nil |
| } |
| @@ -210,3 +211,14 @@ func (o *pubSubOutput) mergeStats(s output.Stats) { |
| o.stats.Merge(s) |
| } |
| + |
| +// indefiniteRetry is a retry.Iterator that will indefinitely retry errors with |
| +// a maximum backoff. |
| +func indefiniteRetry() retry.Iterator { |
| + return &retry.ExponentialBackoff{ |
| + Limited: retry.Limited{ |
| + Retries: -1, |
| + }, |
| + MaxDelay: 30 * time.Second, |
| + } |
| +} |