Chromium Code Reviews| Index: client/internal/logdog/butler/output/gcps/gcpsOutput.go |
| diff --git a/client/internal/logdog/butler/output/gcps/gcpsOutput.go b/client/internal/logdog/butler/output/gcps/gcpsOutput.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..a7c8b41309d22d530516f70d3a459480c2bda7b1 |
| --- /dev/null |
| +++ b/client/internal/logdog/butler/output/gcps/gcpsOutput.go |
| @@ -0,0 +1,210 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package gcps |
| + |
| +import ( |
| + "bytes" |
| + "errors" |
| + "fmt" |
| + "sync" |
| + "time" |
| + |
| + "github.com/luci/luci-go/client/internal/logdog/butler/output" |
| + "github.com/luci/luci-go/client/logdog/butlerproto" |
| + "github.com/luci/luci-go/common/gcloud/gcps" |
| + "github.com/luci/luci-go/common/logdog/protocol" |
| + log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/recordio" |
| + "github.com/luci/luci-go/common/retry" |
| + "golang.org/x/net/context" |
| + "google.golang.org/cloud/pubsub" |
|
estaab
2015/11/19 23:57:24
I think some encapsulation is being broken here -
dnj
2015/11/20 01:55:12
"gcps" package is never used outside of the pubsub
estaab
2015/11/20 04:37:28
Besides clean abstraction being a general good pra
dnj (Google)
2015/11/20 04:45:48
Yeah, Pub/Sub abstraction is in place so we can te
estaab
2015/11/20 05:15:48
Ok, that's fine. I think it's a good habit but it
|
| +) |
| + |
| +// Config is a configuration structure for GCPS output. |
| +type Config struct { |
| + // Pubsub is the Pub/Sub instance to use. |
| + PubSub gcps.PubSub |
| + |
| + // Topic is the name of the Cloud Pub/Sub topic to publish to. |
| + Topic gcps.Topic |
| + |
| + // Compress, if true, enables zlib compression. |
| + Compress bool |
| +} |
| + |
| +// Validate validates the Output configuration. |
| +func (c *Config) Validate() error { |
| + if c.PubSub == nil { |
| + return errors.New("gcps: no pub/sub instance configured") |
| + } |
| + if err := c.Topic.Validate(); err != nil { |
| + return fmt.Errorf("gcps: invalid Topic [%s]: %s", c.Topic, err) |
| + } |
| + return nil |
| +} |
| + |
| +// gcpsBuffer |
| +type gcpsBuffer struct { |
| + bytes.Buffer // Output buffer for published message data. |
| + |
| + frameWriter recordio.Writer |
| + protoWriter *butlerproto.Writer |
| +} |
| + |
| +// Butler Output that sends messages into Google Cloud PubSub as compressed |
| +// protocol buffer blobs. |
| +type gcpsOutput struct { |
| + *Config |
| + |
| + ctx context.Context // Retained context object. |
| + bufferPool sync.Pool // Pool of reusable gcpsBuffer instances. |
| + |
| + statsMu sync.Mutex |
| + stats output.StatsBase |
| +} |
| + |
| +// New instantiates a new GCPS output. |
| +func New(ctx context.Context, c Config) output.Output { |
| + o := gcpsOutput{ |
| + Config: &c, |
| + ctx: ctx, |
| + } |
| + o.bufferPool.New = func() interface{} { return &gcpsBuffer{} } |
| + o.ctx = log.SetField(o.ctx, "output", &o) |
| + return &o |
| +} |
| + |
| +func (o *gcpsOutput) String() string { |
| + return fmt.Sprintf("gcps(%s)", o.Topic) |
| +} |
| + |
| +func (o *gcpsOutput) SendBundle(bundle *protocol.ButlerLogBundle) error { |
| + st := output.StatsBase{} |
| + defer o.mergeStats(&st) |
| + |
| + b := o.bufferPool.Get().(*gcpsBuffer) |
| + defer o.bufferPool.Put(b) |
| + |
| + message, err := o.buildMessage(b, bundle) |
| + if err != nil { |
| + log.Errorf(log.SetError(o.ctx, err), "Failed to build PubSub Message from bundle.") |
|
estaab
2015/11/19 23:57:24
Do the rules about lowercase first letter and no p
dnj
2015/11/20 01:55:12
Nope, this isn't a Go error, it's a logging error
|
| + st.F.DiscardedMessages++ |
| + st.F.Errors++ |
| + return err |
| + } |
| + if len(message.Data) > gcps.MaxPublishSize { |
| + log.Fields{ |
| + "messageSize": len(message.Data), |
| + "maxPubSubSize": gcps.MaxPublishSize, |
| + }.Errorf(o.ctx, "Constructed message exceeds Pub/Sub maximum size.") |
| + return errors.New("gcps: bundle contents violate Pub/Sub size limit") |
| + } |
| + if err := o.publishMessages([]*pubsub.Message{message}); err != nil { |
| + st.F.DiscardedMessages++ |
| + st.F.Errors++ |
| + return err |
| + } |
| + |
| + st.F.SentBytes += len(message.Data) |
| + st.F.SentMessages++ |
| + return nil |
| +} |
| + |
| +func (*gcpsOutput) MaxSize() int { |
| + return gcps.MaxPublishSize / 2 |
| +} |
| + |
| +func (o *gcpsOutput) Stats() output.Stats { |
| + o.statsMu.Lock() |
| + defer o.statsMu.Unlock() |
| + |
| + statsCopy := o.stats |
| + return &statsCopy |
| +} |
| + |
| +func (o *gcpsOutput) Close() { |
| + // Nothing to do. |
| +} |
| + |
| +// buildMessage constructs a Pub/Sub Message out of LogDog frames. |
| +// |
| +// The first frame will be a ButlerMetadata message describing the second |
| +// frame. The second frame will be a ButlerLogBundle containing the bundle |
| +// data. |
| +func (o *gcpsOutput) buildMessage(buf *gcpsBuffer, bundle *protocol.ButlerLogBundle) (*pubsub.Message, error) { |
| + if buf.protoWriter == nil { |
| + buf.protoWriter = &butlerproto.Writer{ |
| + Compress: o.Compress, |
| + CompressThreshold: butlerproto.DefaultCompressThreshold, |
| + } |
| + } |
| + |
| + // Clear our buffer and (re)initialize our frame writer. |
| + buf.Reset() |
| + if buf.frameWriter == nil { |
| + buf.frameWriter = recordio.NewWriter(buf) |
| + } else { |
| + buf.frameWriter.Reset(buf) |
| + } |
| + |
| + if err := buf.protoWriter.WriteWith(buf.frameWriter, bundle); err != nil { |
| + return nil, err |
| + } |
| + |
| + return &pubsub.Message{ |
| + Data: buf.Bytes(), |
| + }, nil |
| +} |
| + |
| +// publishMessages handles an individual publish request. It will indefinitely |
| +// retry transient errors until the publish succeeds. |
| +func (o *gcpsOutput) publishMessages(messages []*pubsub.Message) error { |
| + var messageIDs []string |
| + count := 0 |
| + err := retry.Retry(o.ctx, retry.TransientOnly(o.publishRetryIterator()), func() error { |
| + ids, err := o.PubSub.Publish(o.Topic, messages...) |
| + if err != nil { |
| + return err |
| + } |
| + messageIDs = ids |
| + return nil |
| + }, func(err error, d time.Duration) { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "count": count, |
| + "delay": d, |
| + }.Warningf(o.ctx, "Transient publish error; retrying.") |
| + count++ |
| + }) |
| + if err != nil { |
| + log.Errorf(log.SetError(o.ctx, err), "Failed to send PubSub message.") |
| + return err |
| + } |
| + |
| + log.Debugf(log.SetField(o.ctx, "messageIds", messageIDs), "Published messages.") |
| + return nil |
| +} |
| + |
| +func (o *gcpsOutput) mergeStats(s output.Stats) { |
| + o.statsMu.Lock() |
| + defer o.statsMu.Unlock() |
| + |
| + o.stats.Merge(s) |
| +} |
| + |
| +// publishRetryIterator returns a retry.Iterator configured for publish |
| +// requests. |
| +// |
| +// Note that this iterator has no retry upper bound. It will continue retrying |
| +// indefinitely until success. |
| +func (*gcpsOutput) publishRetryIterator() retry.Iterator { |
| + return &retry.ExponentialBackoff{ |
| + Limited: retry.Limited{ |
| + Delay: 500 * time.Millisecond, |
| + }, |
| + Multiplier: 3.0, |
| + MaxDelay: 15 * time.Second, |
| + } |
| +} |