Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(460)

Side by Side Diff: client/internal/logdog/butler/output/pubsub/pubsubOutput.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package pubsub 5 package pubsub
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 "sync" 11 "sync"
12 "time"
12 13
13 "github.com/luci/luci-go/client/internal/logdog/butler/output" 14 "github.com/luci/luci-go/client/internal/logdog/butler/output"
14 » "github.com/luci/luci-go/common/gcloud/pubsub" 15 » gcps "github.com/luci/luci-go/common/gcloud/pubsub"
15 "github.com/luci/luci-go/common/logdog/butlerproto" 16 "github.com/luci/luci-go/common/logdog/butlerproto"
16 log "github.com/luci/luci-go/common/logging" 17 log "github.com/luci/luci-go/common/logging"
17 "github.com/luci/luci-go/common/proto/logdog/logpb" 18 "github.com/luci/luci-go/common/proto/logdog/logpb"
18 "github.com/luci/luci-go/common/recordio" 19 "github.com/luci/luci-go/common/recordio"
20 "github.com/luci/luci-go/common/retry"
19 "golang.org/x/net/context" 21 "golang.org/x/net/context"
22 "google.golang.org/cloud/pubsub"
20 ) 23 )
21 24
22 // Publisher is an interface for something that publishes Pub/Sub messages. 25 // Topic is an interface for a Pub/Sub topic.
23 // 26 //
24 // pubsub.Connection implements this interface. 27 // pubsub.TopicHandle implements Topic.
25 type Publisher interface { 28 type Topic interface {
29 » // Name returns the name of the topic.
30 » Name() string
31
26 // Publish mirrors the pubsub.Connection Publish method. 32 // Publish mirrors the pubsub.Connection Publish method.
27 » Publish(context.Context, pubsub.Topic, ...*pubsub.Message) ([]string, er ror) 33 » Publish(context.Context, ...*pubsub.Message) ([]string, error)
28 } 34 }
29 35
30 var _ Publisher = pubsub.Connection(nil) 36 var _ Topic = (*pubsub.TopicHandle)(nil)
31 37
32 // Config is a configuration structure for Pub/Sub output. 38 // Config is a configuration structure for Pub/Sub output.
33 type Config struct { 39 type Config struct {
34 » // Publisher is the Pub/Sub instance to use. 40 » // Topic is the Pub/Sub topic to publish to.
35 » Publisher Publisher 41 » Topic Topic
36
37 » // Topic is the name of the Cloud Pub/Sub topic to publish to.
38 » Topic pubsub.Topic
39 42
40 // Compress, if true, enables zlib compression. 43 // Compress, if true, enables zlib compression.
41 Compress bool 44 Compress bool
42 45
43 // Track, if true, tracks all log entries that have been successfully 46 // Track, if true, tracks all log entries that have been successfully
44 // submitted. 47 // submitted.
45 Track bool 48 Track bool
46 } 49 }
47 50
48 // Validate validates the Output configuration.
49 func (c *Config) Validate() error {
50 if c.Publisher == nil {
51 return errors.New("pubsub: no pub/sub instance configured")
52 }
53 if err := c.Topic.Validate(); err != nil {
54 return fmt.Errorf("pubsub: invalid Topic [%s]: %s", c.Topic, err )
55 }
56 return nil
57 }
58
59 // buffer 51 // buffer
60 type buffer struct { 52 type buffer struct {
61 bytes.Buffer // Output buffer for published message data. 53 bytes.Buffer // Output buffer for published message data.
62 54
63 frameWriter recordio.Writer 55 frameWriter recordio.Writer
64 protoWriter *butlerproto.Writer 56 protoWriter *butlerproto.Writer
65 } 57 }
66 58
67 // Butler Output that sends messages into Google Cloud PubSub as compressed 59 // Butler Output that sends messages into Google Cloud PubSub as compressed
68 // protocol buffer blobs. 60 // protocol buffer blobs.
(...skipping 18 matching lines...) Expand all
87 79
88 if c.Track { 80 if c.Track {
89 o.et = &output.EntryTracker{} 81 o.et = &output.EntryTracker{}
90 } 82 }
91 83
92 o.Context = log.SetField(ctx, "pubsub", &o) 84 o.Context = log.SetField(ctx, "pubsub", &o)
93 return &o 85 return &o
94 } 86 }
95 87
96 func (o *pubSubOutput) String() string { 88 func (o *pubSubOutput) String() string {
97 » return fmt.Sprintf("pubsub(%s)", o.Topic) 89 » return fmt.Sprintf("pubsub(%s)", o.Topic.Name())
98 } 90 }
99 91
100 func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error { 92 func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error {
101 st := output.StatsBase{} 93 st := output.StatsBase{}
102 defer o.mergeStats(&st) 94 defer o.mergeStats(&st)
103 95
104 b := o.bufferPool.Get().(*buffer) 96 b := o.bufferPool.Get().(*buffer)
105 defer o.bufferPool.Put(b) 97 defer o.bufferPool.Put(b)
106 98
107 message, err := o.buildMessage(b, bundle) 99 message, err := o.buildMessage(b, bundle)
108 if err != nil { 100 if err != nil {
109 log.Fields{ 101 log.Fields{
110 log.ErrorKey: err, 102 log.ErrorKey: err,
111 }.Errorf(o, "Failed to build PubSub Message from bundle.") 103 }.Errorf(o, "Failed to build PubSub Message from bundle.")
112 st.F.DiscardedMessages++ 104 st.F.DiscardedMessages++
113 st.F.Errors++ 105 st.F.Errors++
114 return err 106 return err
115 } 107 }
116 » if len(message.Data) > pubsub.MaxPublishSize { 108 » if len(message.Data) > gcps.MaxPublishSize {
117 log.Fields{ 109 log.Fields{
118 "messageSize": len(message.Data), 110 "messageSize": len(message.Data),
119 » » » "maxPubSubSize": pubsub.MaxPublishSize, 111 » » » "maxPubSubSize": gcps.MaxPublishSize,
120 }.Errorf(o, "Constructed message exceeds Pub/Sub maximum size.") 112 }.Errorf(o, "Constructed message exceeds Pub/Sub maximum size.")
121 return errors.New("pubsub: bundle contents violate Pub/Sub size limit") 113 return errors.New("pubsub: bundle contents violate Pub/Sub size limit")
122 } 114 }
123 if err := o.publishMessages([]*pubsub.Message{message}); err != nil { 115 if err := o.publishMessages([]*pubsub.Message{message}); err != nil {
124 st.F.DiscardedMessages++ 116 st.F.DiscardedMessages++
125 st.F.Errors++ 117 st.F.Errors++
126 return err 118 return err
127 } 119 }
128 120
129 if o.et != nil { 121 if o.et != nil {
130 o.et.Track(bundle) 122 o.et.Track(bundle)
131 } 123 }
132 124
133 st.F.SentBytes += len(message.Data) 125 st.F.SentBytes += len(message.Data)
134 st.F.SentMessages++ 126 st.F.SentMessages++
135 return nil 127 return nil
136 } 128 }
137 129
138 func (*pubSubOutput) MaxSize() int { 130 func (*pubSubOutput) MaxSize() int {
139 » return pubsub.MaxPublishSize / 2 131 » return gcps.MaxPublishSize / 2
140 } 132 }
141 133
142 func (o *pubSubOutput) Stats() output.Stats { 134 func (o *pubSubOutput) Stats() output.Stats {
143 o.statsMu.Lock() 135 o.statsMu.Lock()
144 defer o.statsMu.Unlock() 136 defer o.statsMu.Unlock()
145 137
146 statsCopy := o.stats 138 statsCopy := o.stats
147 return &statsCopy 139 return &statsCopy
148 } 140 }
149 141
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
184 } 176 }
185 177
186 return &pubsub.Message{ 178 return &pubsub.Message{
187 Data: buf.Bytes(), 179 Data: buf.Bytes(),
188 }, nil 180 }, nil
189 } 181 }
190 182
191 // publishMessages handles an individual publish request. It will indefinitely 183 // publishMessages handles an individual publish request. It will indefinitely
192 // retry transient errors until the publish succeeds. 184 // retry transient errors until the publish succeeds.
193 func (o *pubSubOutput) publishMessages(messages []*pubsub.Message) error { 185 func (o *pubSubOutput) publishMessages(messages []*pubsub.Message) error {
194 » messageIDs, err := o.Publisher.Publish(o, o.Topic, messages...) 186 » var messageIDs []string
187 » err := retry.Retry(o, retry.TransientOnly(indefiniteRetry), func() (err error) {
188 » » messageIDs, err = o.Topic.Publish(o, messages...)
189 » » return
190 » }, func(err error, d time.Duration) {
191 » » log.Fields{
192 » » » log.ErrorKey: err,
193 » » » "delay": d,
194 » » » "count": len(messages),
195 » » }.Warningf(o, "TRANSIENT error publishing messages; retrying..." )
196 » })
195 if err != nil { 197 if err != nil {
196 » » return err 198 » » log.WithError(err).Errorf(o, "Failed to send PubSub message.")
197 » }
198 » if err != nil {
199 » » log.Errorf(log.SetError(o, err), "Failed to send PubSub message. ")
200 return err 199 return err
201 } 200 }
202 201
203 » log.Debugf(log.SetField(o, "messageIds", messageIDs), "Published message s.") 202 » log.Fields{
203 » » "messageIds": messageIDs,
204 » }.Debugf(o, "Published messages.")
204 return nil 205 return nil
205 } 206 }
206 207
207 func (o *pubSubOutput) mergeStats(s output.Stats) { 208 func (o *pubSubOutput) mergeStats(s output.Stats) {
208 o.statsMu.Lock() 209 o.statsMu.Lock()
209 defer o.statsMu.Unlock() 210 defer o.statsMu.Unlock()
210 211
211 o.stats.Merge(s) 212 o.stats.Merge(s)
212 } 213 }
214
215 // indefiniteRetry is a retry.Iterator that will indefinitely retry errors with
216 // a maximum backoff.
217 func indefiniteRetry() retry.Iterator {
218 return &retry.ExponentialBackoff{
219 Limited: retry.Limited{
220 Retries: -1,
221 },
222 MaxDelay: 30 * time.Second,
223 }
224 }
OLDNEW
« no previous file with comments | « client/cmd/logdog_butler/output_pubsub.go ('k') | client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698