| Index: client/internal/logdog/butler/output/pubsub/pubsubOutput.go
|
| diff --git a/client/internal/logdog/butler/output/pubsub/pubsubOutput.go b/client/internal/logdog/butler/output/pubsub/pubsubOutput.go
|
| index a2a05c23422fbd6cb74fe00781a8bd73075e6f0c..9b9c691a021d6b1ba0ebfd88207ea8f52f1f7145 100644
|
| --- a/client/internal/logdog/butler/output/pubsub/pubsubOutput.go
|
| +++ b/client/internal/logdog/butler/output/pubsub/pubsubOutput.go
|
| @@ -9,33 +9,36 @@ import (
|
| "errors"
|
| "fmt"
|
| "sync"
|
| + "time"
|
|
|
| "github.com/luci/luci-go/client/internal/logdog/butler/output"
|
| - "github.com/luci/luci-go/common/gcloud/pubsub"
|
| + gcps "github.com/luci/luci-go/common/gcloud/pubsub"
|
| "github.com/luci/luci-go/common/logdog/butlerproto"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/proto/logdog/logpb"
|
| "github.com/luci/luci-go/common/recordio"
|
| + "github.com/luci/luci-go/common/retry"
|
| "golang.org/x/net/context"
|
| + "google.golang.org/cloud/pubsub"
|
| )
|
|
|
| -// Publisher is an interface for something that publishes Pub/Sub messages.
|
| +// Topic is an interface for a Pub/Sub topic.
|
| //
|
| -// pubsub.Connection implements this interface.
|
| -type Publisher interface {
|
| +// pubsub.TopicHandle implements Topic.
|
| +type Topic interface {
|
| + // Name returns the name of the topic.
|
| + Name() string
|
| +
|
| // Publish mirrors the pubsub.Connection Publish method.
|
| - Publish(context.Context, pubsub.Topic, ...*pubsub.Message) ([]string, error)
|
| + Publish(context.Context, ...*pubsub.Message) ([]string, error)
|
| }
|
|
|
| -var _ Publisher = pubsub.Connection(nil)
|
| +var _ Topic = (*pubsub.TopicHandle)(nil)
|
|
|
| // Config is a configuration structure for Pub/Sub output.
|
| type Config struct {
|
| - // Publisher is the Pub/Sub instance to use.
|
| - Publisher Publisher
|
| -
|
| - // Topic is the name of the Cloud Pub/Sub topic to publish to.
|
| - Topic pubsub.Topic
|
| + // Topic is the Pub/Sub topic to publish to.
|
| + Topic Topic
|
|
|
| // Compress, if true, enables zlib compression.
|
| Compress bool
|
| @@ -45,17 +48,6 @@ type Config struct {
|
| Track bool
|
| }
|
|
|
| -// Validate validates the Output configuration.
|
| -func (c *Config) Validate() error {
|
| - if c.Publisher == nil {
|
| - return errors.New("pubsub: no pub/sub instance configured")
|
| - }
|
| - if err := c.Topic.Validate(); err != nil {
|
| - return fmt.Errorf("pubsub: invalid Topic [%s]: %s", c.Topic, err)
|
| - }
|
| - return nil
|
| -}
|
| -
|
| // buffer
|
| type buffer struct {
|
| bytes.Buffer // Output buffer for published message data.
|
| @@ -94,7 +86,7 @@ func New(ctx context.Context, c Config) output.Output {
|
| }
|
|
|
| func (o *pubSubOutput) String() string {
|
| - return fmt.Sprintf("pubsub(%s)", o.Topic)
|
| + return fmt.Sprintf("pubsub(%s)", o.Topic.Name())
|
| }
|
|
|
| func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error {
|
| @@ -113,10 +105,10 @@ func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error {
|
| st.F.Errors++
|
| return err
|
| }
|
| - if len(message.Data) > pubsub.MaxPublishSize {
|
| + if len(message.Data) > gcps.MaxPublishSize {
|
| log.Fields{
|
| "messageSize": len(message.Data),
|
| - "maxPubSubSize": pubsub.MaxPublishSize,
|
| + "maxPubSubSize": gcps.MaxPublishSize,
|
| }.Errorf(o, "Constructed message exceeds Pub/Sub maximum size.")
|
| return errors.New("pubsub: bundle contents violate Pub/Sub size limit")
|
| }
|
| @@ -136,7 +128,7 @@ func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error {
|
| }
|
|
|
| func (*pubSubOutput) MaxSize() int {
|
| - return pubsub.MaxPublishSize / 2
|
| + return gcps.MaxPublishSize / 2
|
| }
|
|
|
| func (o *pubSubOutput) Stats() output.Stats {
|
| @@ -191,16 +183,25 @@ func (o *pubSubOutput) buildMessage(buf *buffer, bundle *logpb.ButlerLogBundle)
|
| // publishMessages handles an individual publish request. It will indefinitely
|
| // retry transient errors until the publish succeeds.
|
| func (o *pubSubOutput) publishMessages(messages []*pubsub.Message) error {
|
| - messageIDs, err := o.Publisher.Publish(o, o.Topic, messages...)
|
| - if err != nil {
|
| - return err
|
| - }
|
| + var messageIDs []string
|
| + err := retry.Retry(o, retry.TransientOnly(indefiniteRetry), func() (err error) {
|
| + messageIDs, err = o.Topic.Publish(o, messages...)
|
| + return
|
| + }, func(err error, d time.Duration) {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "delay": d,
|
| + "count": len(messages),
|
| + }.Warningf(o, "TRANSIENT error publishing messages; retrying...")
|
| + })
|
| if err != nil {
|
| - log.Errorf(log.SetError(o, err), "Failed to send PubSub message.")
|
| + log.WithError(err).Errorf(o, "Failed to send PubSub message.")
|
| return err
|
| }
|
|
|
| - log.Debugf(log.SetField(o, "messageIds", messageIDs), "Published messages.")
|
| + log.Fields{
|
| + "messageIds": messageIDs,
|
| + }.Debugf(o, "Published messages.")
|
| return nil
|
| }
|
|
|
| @@ -210,3 +211,14 @@ func (o *pubSubOutput) mergeStats(s output.Stats) {
|
|
|
| o.stats.Merge(s)
|
| }
|
| +
|
| +// indefiniteRetry is a retry.Iterator that will indefinitely retry errors with
|
| +// a maximum backoff.
|
| +func indefiniteRetry() retry.Iterator {
|
| + return &retry.ExponentialBackoff{
|
| + Limited: retry.Limited{
|
| + Retries: -1,
|
| + },
|
| + MaxDelay: 30 * time.Second,
|
| + }
|
| +}
|
|
|