Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(758)

Unified Diff: client/internal/logdog/butler/output/pubsub/pubsubOutput.go

Issue 1211053004: LogDog: Add Butler Output package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Relocate butlerproto to common, document. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/output/pubsub/pubsubOutput.go
diff --git a/client/internal/logdog/butler/output/pubsub/pubsubOutput.go b/client/internal/logdog/butler/output/pubsub/pubsubOutput.go
new file mode 100644
index 0000000000000000000000000000000000000000..24762c741028a4a69835d21841f1b3f08a20dc44
--- /dev/null
+++ b/client/internal/logdog/butler/output/pubsub/pubsubOutput.go
@@ -0,0 +1,210 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package pubsub
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/luci/luci-go/client/internal/logdog/butler/output"
+ "github.com/luci/luci-go/common/gcloud/gcps"
+ "github.com/luci/luci-go/common/logdog/butlerproto"
+ "github.com/luci/luci-go/common/logdog/protocol"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/recordio"
+ "github.com/luci/luci-go/common/retry"
+ "golang.org/x/net/context"
+ "google.golang.org/cloud/pubsub"
+)
+
+// Config is a configuration structure for GCPS output.
+type Config struct {
+ // Pubsub is the Pub/Sub instance to use.
+ PubSub gcps.PubSub
+
+ // Topic is the name of the Cloud Pub/Sub topic to publish to.
+ Topic gcps.Topic
+
+ // Compress, if true, enables zlib compression.
+ Compress bool
+}
+
+// Validate validates the Output configuration.
+func (c *Config) Validate() error {
+ if c.PubSub == nil {
+ return errors.New("gcps: no pub/sub instance configured")
+ }
+ if err := c.Topic.Validate(); err != nil {
+ return fmt.Errorf("gcps: invalid Topic [%s]: %s", c.Topic, err)
+ }
+ return nil
+}
+
+// gcpsBuffer
+type gcpsBuffer struct {
+ bytes.Buffer // Output buffer for published message data.
+
+ frameWriter recordio.Writer
+ protoWriter *butlerproto.Writer
+}
+
+// Butler Output that sends messages into Google Cloud PubSub as compressed
+// protocol buffer blobs.
+type gcpsOutput struct {
+ *Config
+
+ ctx context.Context // Retained context object.
+ bufferPool sync.Pool // Pool of reusable gcpsBuffer instances.
+
+ statsMu sync.Mutex
+ stats output.StatsBase
+}
+
+// New instantiates a new GCPS output.
+func New(ctx context.Context, c Config) output.Output {
+ o := gcpsOutput{
+ Config: &c,
+ ctx: ctx,
+ }
+ o.bufferPool.New = func() interface{} { return &gcpsBuffer{} }
+ o.ctx = log.SetField(o.ctx, "output", &o)
+ return &o
+}
+
+func (o *gcpsOutput) String() string {
+ return fmt.Sprintf("gcps(%s)", o.Topic)
+}
+
+func (o *gcpsOutput) SendBundle(bundle *protocol.ButlerLogBundle) error {
+ st := output.StatsBase{}
+ defer o.mergeStats(&st)
+
+ b := o.bufferPool.Get().(*gcpsBuffer)
+ defer o.bufferPool.Put(b)
+
+ message, err := o.buildMessage(b, bundle)
+ if err != nil {
+ log.Errorf(log.SetError(o.ctx, err), "Failed to build PubSub Message from bundle.")
+ st.F.DiscardedMessages++
+ st.F.Errors++
+ return err
+ }
+ if len(message.Data) > gcps.MaxPublishSize {
+ log.Fields{
+ "messageSize": len(message.Data),
+ "maxPubSubSize": gcps.MaxPublishSize,
+ }.Errorf(o.ctx, "Constructed message exceeds Pub/Sub maximum size.")
+ return errors.New("gcps: bundle contents violate Pub/Sub size limit")
+ }
+ if err := o.publishMessages([]*pubsub.Message{message}); err != nil {
+ st.F.DiscardedMessages++
+ st.F.Errors++
+ return err
+ }
+
+ st.F.SentBytes += len(message.Data)
+ st.F.SentMessages++
+ return nil
+}
+
+func (*gcpsOutput) MaxSize() int {
+ return gcps.MaxPublishSize / 2
+}
+
+func (o *gcpsOutput) Stats() output.Stats {
+ o.statsMu.Lock()
+ defer o.statsMu.Unlock()
+
+ statsCopy := o.stats
+ return &statsCopy
+}
+
+func (o *gcpsOutput) Close() {
+ // Nothing to do.
+}
+
+// buildMessage constructs a Pub/Sub Message out of LogDog frames.
+//
+// The first frame will be a ButlerMetadata message describing the second
+// frame. The second frame will be a ButlerLogBundle containing the bundle
+// data.
+func (o *gcpsOutput) buildMessage(buf *gcpsBuffer, bundle *protocol.ButlerLogBundle) (*pubsub.Message, error) {
+ if buf.protoWriter == nil {
+ buf.protoWriter = &butlerproto.Writer{
+ Compress: o.Compress,
+ CompressThreshold: butlerproto.DefaultCompressThreshold,
+ }
+ }
+
+ // Clear our buffer and (re)initialize our frame writer.
+ buf.Reset()
+ if buf.frameWriter == nil {
+ buf.frameWriter = recordio.NewWriter(buf)
+ } else {
+ buf.frameWriter.Reset(buf)
+ }
+
+ if err := buf.protoWriter.WriteWith(buf.frameWriter, bundle); err != nil {
+ return nil, err
+ }
+
+ return &pubsub.Message{
+ Data: buf.Bytes(),
+ }, nil
+}
+
+// publishMessages handles an individual publish request. It will indefinitely
+// retry transient errors until the publish succeeds.
+func (o *gcpsOutput) publishMessages(messages []*pubsub.Message) error {
+ var messageIDs []string
+ count := 0
+ err := retry.Retry(o.ctx, retry.TransientOnly(o.publishRetryIterator()), func() error {
+ ids, err := o.PubSub.Publish(o.Topic, messages...)
+ if err != nil {
+ return err
+ }
+ messageIDs = ids
+ return nil
+ }, func(err error, d time.Duration) {
+ log.Fields{
+ log.ErrorKey: err,
+ "count": count,
+ "delay": d,
+ }.Warningf(o.ctx, "Transient publish error; retrying.")
+ count++
+ })
+ if err != nil {
+ log.Errorf(log.SetError(o.ctx, err), "Failed to send PubSub message.")
+ return err
+ }
+
+ log.Debugf(log.SetField(o.ctx, "messageIds", messageIDs), "Published messages.")
+ return nil
+}
+
+func (o *gcpsOutput) mergeStats(s output.Stats) {
+ o.statsMu.Lock()
+ defer o.statsMu.Unlock()
+
+ o.stats.Merge(s)
+}
+
+// publishRetryIterator returns a retry.Iterator configured for publish
+// requests.
+//
+// Note that this iterator has no retry upper bound. It will continue retrying
+// indefinitely until success.
+func (*gcpsOutput) publishRetryIterator() retry.Iterator {
+ return &retry.ExponentialBackoff{
+ Limited: retry.Limited{
+ Delay: 500 * time.Millisecond,
+ },
+ Multiplier: 3.0,
+ MaxDelay: 15 * time.Second,
+ }
+}
« no previous file with comments | « client/internal/logdog/butler/output/pubsub/doc.go ('k') | client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698