OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package gcps | |
6 | |
7 import ( | |
8 "bytes" | |
9 "errors" | |
10 "fmt" | |
11 "sync" | |
12 "time" | |
13 | |
14 "github.com/luci/luci-go/client/internal/logdog/butler/output" | |
15 "github.com/luci/luci-go/client/logdog/butlerproto" | |
16 "github.com/luci/luci-go/common/gcloud/gcps" | |
17 "github.com/luci/luci-go/common/logdog/protocol" | |
18 log "github.com/luci/luci-go/common/logging" | |
19 "github.com/luci/luci-go/common/recordio" | |
20 "github.com/luci/luci-go/common/retry" | |
21 "golang.org/x/net/context" | |
22 "google.golang.org/cloud/pubsub" | |
estaab
2015/11/19 23:57:24
I think some encapsulation is being broken here -
dnj
2015/11/20 01:55:12
"gcps" package is never used outside of the pubsub
estaab
2015/11/20 04:37:28
Besides clean abstraction being a general good pra
dnj (Google)
2015/11/20 04:45:48
Yeah, Pub/Sub abstraction is in place so we can te
estaab
2015/11/20 05:15:48
Ok, that's fine. I think it's a good habit but it
| |
23 ) | |
24 | |
25 // Config is a configuration structure for GCPS output. | |
26 type Config struct { | |
27 // Pubsub is the Pub/Sub instance to use. | |
28 PubSub gcps.PubSub | |
29 | |
30 // Topic is the name of the Cloud Pub/Sub topic to publish to. | |
31 Topic gcps.Topic | |
32 | |
33 // Compress, if true, enables zlib compression. | |
34 Compress bool | |
35 } | |
36 | |
37 // Validate validates the Output configuration. | |
38 func (c *Config) Validate() error { | |
39 if c.PubSub == nil { | |
40 return errors.New("gcps: no pub/sub instance configured") | |
41 } | |
42 if err := c.Topic.Validate(); err != nil { | |
43 return fmt.Errorf("gcps: invalid Topic [%s]: %s", c.Topic, err) | |
44 } | |
45 return nil | |
46 } | |
47 | |
48 // gcpsBuffer | |
49 type gcpsBuffer struct { | |
50 bytes.Buffer // Output buffer for published message data. | |
51 | |
52 frameWriter recordio.Writer | |
53 protoWriter *butlerproto.Writer | |
54 } | |
55 | |
56 // Butler Output that sends messages into Google Cloud PubSub as compressed | |
57 // protocol buffer blobs. | |
58 type gcpsOutput struct { | |
59 *Config | |
60 | |
61 ctx context.Context // Retained context object. | |
62 bufferPool sync.Pool // Pool of reusable gcpsBuffer instances. | |
63 | |
64 statsMu sync.Mutex | |
65 stats output.StatsBase | |
66 } | |
67 | |
68 // New instantiates a new GCPS output. | |
69 func New(ctx context.Context, c Config) output.Output { | |
70 o := gcpsOutput{ | |
71 Config: &c, | |
72 ctx: ctx, | |
73 } | |
74 o.bufferPool.New = func() interface{} { return &gcpsBuffer{} } | |
75 o.ctx = log.SetField(o.ctx, "output", &o) | |
76 return &o | |
77 } | |
78 | |
79 func (o *gcpsOutput) String() string { | |
80 return fmt.Sprintf("gcps(%s)", o.Topic) | |
81 } | |
82 | |
83 func (o *gcpsOutput) SendBundle(bundle *protocol.ButlerLogBundle) error { | |
84 st := output.StatsBase{} | |
85 defer o.mergeStats(&st) | |
86 | |
87 b := o.bufferPool.Get().(*gcpsBuffer) | |
88 defer o.bufferPool.Put(b) | |
89 | |
90 message, err := o.buildMessage(b, bundle) | |
91 if err != nil { | |
92 log.Errorf(log.SetError(o.ctx, err), "Failed to build PubSub Mes sage from bundle.") | |
estaab
2015/11/19 23:57:24
Do the rules about lowercase first letter and no p
dnj
2015/11/20 01:55:12
Nope, this isn't a Go error, it's a logging error
| |
93 st.F.DiscardedMessages++ | |
94 st.F.Errors++ | |
95 return err | |
96 } | |
97 if len(message.Data) > gcps.MaxPublishSize { | |
98 log.Fields{ | |
99 "messageSize": len(message.Data), | |
100 "maxPubSubSize": gcps.MaxPublishSize, | |
101 }.Errorf(o.ctx, "Constructed message exceeds Pub/Sub maximum siz e.") | |
102 return errors.New("gcps: bundle contents violate Pub/Sub size li mit") | |
103 } | |
104 if err := o.publishMessages([]*pubsub.Message{message}); err != nil { | |
105 st.F.DiscardedMessages++ | |
106 st.F.Errors++ | |
107 return err | |
108 } | |
109 | |
110 st.F.SentBytes += len(message.Data) | |
111 st.F.SentMessages++ | |
112 return nil | |
113 } | |
114 | |
115 func (*gcpsOutput) MaxSize() int { | |
116 return gcps.MaxPublishSize / 2 | |
117 } | |
118 | |
119 func (o *gcpsOutput) Stats() output.Stats { | |
120 o.statsMu.Lock() | |
121 defer o.statsMu.Unlock() | |
122 | |
123 statsCopy := o.stats | |
124 return &statsCopy | |
125 } | |
126 | |
127 func (o *gcpsOutput) Close() { | |
128 // Nothing to do. | |
129 } | |
130 | |
131 // buildMessage constructs a Pub/Sub Message out of LogDog frames. | |
132 // | |
133 // The first frame will be a ButlerMetadata message describing the second | |
134 // frame. The second frame will be a ButlerLogBundle containing the bundle | |
135 // data. | |
136 func (o *gcpsOutput) buildMessage(buf *gcpsBuffer, bundle *protocol.ButlerLogBun dle) (*pubsub.Message, error) { | |
137 if buf.protoWriter == nil { | |
138 buf.protoWriter = &butlerproto.Writer{ | |
139 Compress: o.Compress, | |
140 CompressThreshold: butlerproto.DefaultCompressThreshold, | |
141 } | |
142 } | |
143 | |
144 // Clear our buffer and (re)initialize our frame writer. | |
145 buf.Reset() | |
146 if buf.frameWriter == nil { | |
147 buf.frameWriter = recordio.NewWriter(buf) | |
148 } else { | |
149 buf.frameWriter.Reset(buf) | |
150 } | |
151 | |
152 if err := buf.protoWriter.WriteWith(buf.frameWriter, bundle); err != nil { | |
153 return nil, err | |
154 } | |
155 | |
156 return &pubsub.Message{ | |
157 Data: buf.Bytes(), | |
158 }, nil | |
159 } | |
160 | |
161 // publishMessages handles an individual publish request. It will indefinitely | |
162 // retry transient errors until the publish succeeds. | |
163 func (o *gcpsOutput) publishMessages(messages []*pubsub.Message) error { | |
164 var messageIDs []string | |
165 count := 0 | |
166 err := retry.Retry(o.ctx, retry.TransientOnly(o.publishRetryIterator()), func() error { | |
167 ids, err := o.PubSub.Publish(o.Topic, messages...) | |
168 if err != nil { | |
169 return err | |
170 } | |
171 messageIDs = ids | |
172 return nil | |
173 }, func(err error, d time.Duration) { | |
174 log.Fields{ | |
175 log.ErrorKey: err, | |
176 "count": count, | |
177 "delay": d, | |
178 }.Warningf(o.ctx, "Transient publish error; retrying.") | |
179 count++ | |
180 }) | |
181 if err != nil { | |
182 log.Errorf(log.SetError(o.ctx, err), "Failed to send PubSub mess age.") | |
183 return err | |
184 } | |
185 | |
186 log.Debugf(log.SetField(o.ctx, "messageIds", messageIDs), "Published mes sages.") | |
187 return nil | |
188 } | |
189 | |
190 func (o *gcpsOutput) mergeStats(s output.Stats) { | |
191 o.statsMu.Lock() | |
192 defer o.statsMu.Unlock() | |
193 | |
194 o.stats.Merge(s) | |
195 } | |
196 | |
197 // publishRetryIterator returns a retry.Iterator configured for publish | |
198 // requests. | |
199 // | |
200 // Note that this iterator has no retry upper bound. It will continue retrying | |
201 // indefinitely until success. | |
202 func (*gcpsOutput) publishRetryIterator() retry.Iterator { | |
203 return &retry.ExponentialBackoff{ | |
204 Limited: retry.Limited{ | |
205 Delay: 500 * time.Millisecond, | |
206 }, | |
207 Multiplier: 3.0, | |
208 MaxDelay: 15 * time.Second, | |
209 } | |
210 } | |
OLD | NEW |