Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(24)

Side by Side Diff: client/internal/logdog/butler/output/pubsub/pubsubOutput.go

Issue 1211053004: LogDog: Add Butler Output package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Relocate butlerproto to common, document. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package pubsub
6
7 import (
8 "bytes"
9 "errors"
10 "fmt"
11 "sync"
12 "time"
13
14 "github.com/luci/luci-go/client/internal/logdog/butler/output"
15 "github.com/luci/luci-go/common/gcloud/gcps"
16 "github.com/luci/luci-go/common/logdog/butlerproto"
17 "github.com/luci/luci-go/common/logdog/protocol"
18 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/recordio"
20 "github.com/luci/luci-go/common/retry"
21 "golang.org/x/net/context"
22 "google.golang.org/cloud/pubsub"
23 )
24
25 // Config is a configuration structure for GCPS output.
26 type Config struct {
27 // Pubsub is the Pub/Sub instance to use.
28 PubSub gcps.PubSub
29
30 // Topic is the name of the Cloud Pub/Sub topic to publish to.
31 Topic gcps.Topic
32
33 // Compress, if true, enables zlib compression.
34 Compress bool
35 }
36
37 // Validate validates the Output configuration.
38 func (c *Config) Validate() error {
39 if c.PubSub == nil {
40 return errors.New("gcps: no pub/sub instance configured")
41 }
42 if err := c.Topic.Validate(); err != nil {
43 return fmt.Errorf("gcps: invalid Topic [%s]: %s", c.Topic, err)
44 }
45 return nil
46 }
47
48 // gcpsBuffer
49 type gcpsBuffer struct {
50 bytes.Buffer // Output buffer for published message data.
51
52 frameWriter recordio.Writer
53 protoWriter *butlerproto.Writer
54 }
55
56 // Butler Output that sends messages into Google Cloud PubSub as compressed
57 // protocol buffer blobs.
58 type gcpsOutput struct {
59 *Config
60
61 ctx context.Context // Retained context object.
62 bufferPool sync.Pool // Pool of reusable gcpsBuffer instances.
63
64 statsMu sync.Mutex
65 stats output.StatsBase
66 }
67
68 // New instantiates a new GCPS output.
69 func New(ctx context.Context, c Config) output.Output {
70 o := gcpsOutput{
71 Config: &c,
72 ctx: ctx,
73 }
74 o.bufferPool.New = func() interface{} { return &gcpsBuffer{} }
75 o.ctx = log.SetField(o.ctx, "output", &o)
76 return &o
77 }
78
79 func (o *gcpsOutput) String() string {
80 return fmt.Sprintf("gcps(%s)", o.Topic)
81 }
82
83 func (o *gcpsOutput) SendBundle(bundle *protocol.ButlerLogBundle) error {
84 st := output.StatsBase{}
85 defer o.mergeStats(&st)
86
87 b := o.bufferPool.Get().(*gcpsBuffer)
88 defer o.bufferPool.Put(b)
89
90 message, err := o.buildMessage(b, bundle)
91 if err != nil {
92 log.Errorf(log.SetError(o.ctx, err), "Failed to build PubSub Mes sage from bundle.")
93 st.F.DiscardedMessages++
94 st.F.Errors++
95 return err
96 }
97 if len(message.Data) > gcps.MaxPublishSize {
98 log.Fields{
99 "messageSize": len(message.Data),
100 "maxPubSubSize": gcps.MaxPublishSize,
101 }.Errorf(o.ctx, "Constructed message exceeds Pub/Sub maximum siz e.")
102 return errors.New("gcps: bundle contents violate Pub/Sub size li mit")
103 }
104 if err := o.publishMessages([]*pubsub.Message{message}); err != nil {
105 st.F.DiscardedMessages++
106 st.F.Errors++
107 return err
108 }
109
110 st.F.SentBytes += len(message.Data)
111 st.F.SentMessages++
112 return nil
113 }
114
115 func (*gcpsOutput) MaxSize() int {
116 return gcps.MaxPublishSize / 2
117 }
118
119 func (o *gcpsOutput) Stats() output.Stats {
120 o.statsMu.Lock()
121 defer o.statsMu.Unlock()
122
123 statsCopy := o.stats
124 return &statsCopy
125 }
126
127 func (o *gcpsOutput) Close() {
128 // Nothing to do.
129 }
130
131 // buildMessage constructs a Pub/Sub Message out of LogDog frames.
132 //
133 // The first frame will be a ButlerMetadata message describing the second
134 // frame. The second frame will be a ButlerLogBundle containing the bundle
135 // data.
136 func (o *gcpsOutput) buildMessage(buf *gcpsBuffer, bundle *protocol.ButlerLogBun dle) (*pubsub.Message, error) {
137 if buf.protoWriter == nil {
138 buf.protoWriter = &butlerproto.Writer{
139 Compress: o.Compress,
140 CompressThreshold: butlerproto.DefaultCompressThreshold,
141 }
142 }
143
144 // Clear our buffer and (re)initialize our frame writer.
145 buf.Reset()
146 if buf.frameWriter == nil {
147 buf.frameWriter = recordio.NewWriter(buf)
148 } else {
149 buf.frameWriter.Reset(buf)
150 }
151
152 if err := buf.protoWriter.WriteWith(buf.frameWriter, bundle); err != nil {
153 return nil, err
154 }
155
156 return &pubsub.Message{
157 Data: buf.Bytes(),
158 }, nil
159 }
160
161 // publishMessages handles an individual publish request. It will indefinitely
162 // retry transient errors until the publish succeeds.
163 func (o *gcpsOutput) publishMessages(messages []*pubsub.Message) error {
164 var messageIDs []string
165 count := 0
166 err := retry.Retry(o.ctx, retry.TransientOnly(o.publishRetryIterator()), func() error {
167 ids, err := o.PubSub.Publish(o.Topic, messages...)
168 if err != nil {
169 return err
170 }
171 messageIDs = ids
172 return nil
173 }, func(err error, d time.Duration) {
174 log.Fields{
175 log.ErrorKey: err,
176 "count": count,
177 "delay": d,
178 }.Warningf(o.ctx, "Transient publish error; retrying.")
179 count++
180 })
181 if err != nil {
182 log.Errorf(log.SetError(o.ctx, err), "Failed to send PubSub mess age.")
183 return err
184 }
185
186 log.Debugf(log.SetField(o.ctx, "messageIds", messageIDs), "Published mes sages.")
187 return nil
188 }
189
190 func (o *gcpsOutput) mergeStats(s output.Stats) {
191 o.statsMu.Lock()
192 defer o.statsMu.Unlock()
193
194 o.stats.Merge(s)
195 }
196
197 // publishRetryIterator returns a retry.Iterator configured for publish
198 // requests.
199 //
200 // Note that this iterator has no retry upper bound. It will continue retrying
201 // indefinitely until success.
202 func (*gcpsOutput) publishRetryIterator() retry.Iterator {
203 return &retry.ExponentialBackoff{
204 Limited: retry.Limited{
205 Delay: 500 * time.Millisecond,
206 },
207 Multiplier: 3.0,
208 MaxDelay: 15 * time.Second,
209 }
210 }
OLDNEW
« no previous file with comments | « client/internal/logdog/butler/output/pubsub/doc.go ('k') | client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698