Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(113)

Unified Diff: client/internal/logdog/butler/output/pubsub/pubsubOutput.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/output/pubsub/pubsubOutput.go
diff --git a/client/internal/logdog/butler/output/pubsub/pubsubOutput.go b/client/internal/logdog/butler/output/pubsub/pubsubOutput.go
index a2a05c23422fbd6cb74fe00781a8bd73075e6f0c..9b9c691a021d6b1ba0ebfd88207ea8f52f1f7145 100644
--- a/client/internal/logdog/butler/output/pubsub/pubsubOutput.go
+++ b/client/internal/logdog/butler/output/pubsub/pubsubOutput.go
@@ -9,33 +9,36 @@ import (
"errors"
"fmt"
"sync"
+ "time"
"github.com/luci/luci-go/client/internal/logdog/butler/output"
- "github.com/luci/luci-go/common/gcloud/pubsub"
+ gcps "github.com/luci/luci-go/common/gcloud/pubsub"
"github.com/luci/luci-go/common/logdog/butlerproto"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/proto/logdog/logpb"
"github.com/luci/luci-go/common/recordio"
+ "github.com/luci/luci-go/common/retry"
"golang.org/x/net/context"
+ "google.golang.org/cloud/pubsub"
)
-// Publisher is an interface for something that publishes Pub/Sub messages.
+// Topic is an interface for a Pub/Sub topic.
//
-// pubsub.Connection implements this interface.
-type Publisher interface {
+// pubsub.TopicHandle implements Topic.
+type Topic interface {
+ // Name returns the name of the topic.
+ Name() string
+
// Publish mirrors the pubsub.Connection Publish method.
- Publish(context.Context, pubsub.Topic, ...*pubsub.Message) ([]string, error)
+ Publish(context.Context, ...*pubsub.Message) ([]string, error)
}
-var _ Publisher = pubsub.Connection(nil)
+var _ Topic = (*pubsub.TopicHandle)(nil)
// Config is a configuration structure for Pub/Sub output.
type Config struct {
- // Publisher is the Pub/Sub instance to use.
- Publisher Publisher
-
- // Topic is the name of the Cloud Pub/Sub topic to publish to.
- Topic pubsub.Topic
+ // Topic is the Pub/Sub topic to publish to.
+ Topic Topic
// Compress, if true, enables zlib compression.
Compress bool
@@ -45,17 +48,6 @@ type Config struct {
Track bool
}
-// Validate validates the Output configuration.
-func (c *Config) Validate() error {
- if c.Publisher == nil {
- return errors.New("pubsub: no pub/sub instance configured")
- }
- if err := c.Topic.Validate(); err != nil {
- return fmt.Errorf("pubsub: invalid Topic [%s]: %s", c.Topic, err)
- }
- return nil
-}
-
// buffer
type buffer struct {
bytes.Buffer // Output buffer for published message data.
@@ -94,7 +86,7 @@ func New(ctx context.Context, c Config) output.Output {
}
func (o *pubSubOutput) String() string {
- return fmt.Sprintf("pubsub(%s)", o.Topic)
+ return fmt.Sprintf("pubsub(%s)", o.Topic.Name())
}
func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error {
@@ -113,10 +105,10 @@ func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error {
st.F.Errors++
return err
}
- if len(message.Data) > pubsub.MaxPublishSize {
+ if len(message.Data) > gcps.MaxPublishSize {
log.Fields{
"messageSize": len(message.Data),
- "maxPubSubSize": pubsub.MaxPublishSize,
+ "maxPubSubSize": gcps.MaxPublishSize,
}.Errorf(o, "Constructed message exceeds Pub/Sub maximum size.")
return errors.New("pubsub: bundle contents violate Pub/Sub size limit")
}
@@ -136,7 +128,7 @@ func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error {
}
func (*pubSubOutput) MaxSize() int {
- return pubsub.MaxPublishSize / 2
+ return gcps.MaxPublishSize / 2
}
func (o *pubSubOutput) Stats() output.Stats {
@@ -191,16 +183,25 @@ func (o *pubSubOutput) buildMessage(buf *buffer, bundle *logpb.ButlerLogBundle)
// publishMessages handles an individual publish request. It will indefinitely
// retry transient errors until the publish succeeds.
func (o *pubSubOutput) publishMessages(messages []*pubsub.Message) error {
- messageIDs, err := o.Publisher.Publish(o, o.Topic, messages...)
- if err != nil {
- return err
- }
+ var messageIDs []string
+ err := retry.Retry(o, retry.TransientOnly(indefiniteRetry), func() (err error) {
+ messageIDs, err = o.Topic.Publish(o, messages...)
+ return
+ }, func(err error, d time.Duration) {
+ log.Fields{
+ log.ErrorKey: err,
+ "delay": d,
+ "count": len(messages),
+ }.Warningf(o, "TRANSIENT error publishing messages; retrying...")
+ })
if err != nil {
- log.Errorf(log.SetError(o, err), "Failed to send PubSub message.")
+ log.WithError(err).Errorf(o, "Failed to send PubSub message.")
return err
}
- log.Debugf(log.SetField(o, "messageIds", messageIDs), "Published messages.")
+ log.Fields{
+ "messageIds": messageIDs,
+ }.Debugf(o, "Published messages.")
return nil
}
@@ -210,3 +211,14 @@ func (o *pubSubOutput) mergeStats(s output.Stats) {
o.stats.Merge(s)
}
+
+// indefiniteRetry is a retry.Iterator that will indefinitely retry errors with
+// a maximum backoff.
+func indefiniteRetry() retry.Iterator {
+ return &retry.ExponentialBackoff{
+ Limited: retry.Limited{
+ Retries: -1,
+ },
+ MaxDelay: 30 * time.Second,
+ }
+}
« no previous file with comments | « client/cmd/logdog_butler/output_pubsub.go ('k') | client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698