| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package pubsub | 5 package pubsub |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 "sync" | 11 "sync" |
| 12 "time" |
| 12 | 13 |
| 13 "github.com/luci/luci-go/client/internal/logdog/butler/output" | 14 "github.com/luci/luci-go/client/internal/logdog/butler/output" |
| 14 » "github.com/luci/luci-go/common/gcloud/pubsub" | 15 » gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 15 "github.com/luci/luci-go/common/logdog/butlerproto" | 16 "github.com/luci/luci-go/common/logdog/butlerproto" |
| 16 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
| 17 "github.com/luci/luci-go/common/proto/logdog/logpb" | 18 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 18 "github.com/luci/luci-go/common/recordio" | 19 "github.com/luci/luci-go/common/recordio" |
| 20 "github.com/luci/luci-go/common/retry" |
| 19 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
| 22 "google.golang.org/cloud/pubsub" |
| 20 ) | 23 ) |
| 21 | 24 |
| 22 // Publisher is an interface for something that publishes Pub/Sub messages. | 25 // Topic is an interface for a Pub/Sub topic. |
| 23 // | 26 // |
| 24 // pubsub.Connection implements this interface. | 27 // pubsub.TopicHandle implements Topic. |
| 25 type Publisher interface { | 28 type Topic interface { |
| 29 » // Name returns the name of the topic. |
| 30 » Name() string |
| 31 |
| 26 // Publish mirrors the pubsub.Connection Publish method. | 32 // Publish mirrors the pubsub.Connection Publish method. |
| 27 » Publish(context.Context, pubsub.Topic, ...*pubsub.Message) ([]string, er
ror) | 33 » Publish(context.Context, ...*pubsub.Message) ([]string, error) |
| 28 } | 34 } |
| 29 | 35 |
| 30 var _ Publisher = pubsub.Connection(nil) | 36 var _ Topic = (*pubsub.TopicHandle)(nil) |
| 31 | 37 |
| 32 // Config is a configuration structure for Pub/Sub output. | 38 // Config is a configuration structure for Pub/Sub output. |
| 33 type Config struct { | 39 type Config struct { |
| 34 » // Publisher is the Pub/Sub instance to use. | 40 » // Topic is the Pub/Sub topic to publish to. |
| 35 » Publisher Publisher | 41 » Topic Topic |
| 36 | |
| 37 » // Topic is the name of the Cloud Pub/Sub topic to publish to. | |
| 38 » Topic pubsub.Topic | |
| 39 | 42 |
| 40 // Compress, if true, enables zlib compression. | 43 // Compress, if true, enables zlib compression. |
| 41 Compress bool | 44 Compress bool |
| 42 | 45 |
| 43 // Track, if true, tracks all log entries that have been successfully | 46 // Track, if true, tracks all log entries that have been successfully |
| 44 // submitted. | 47 // submitted. |
| 45 Track bool | 48 Track bool |
| 46 } | 49 } |
| 47 | 50 |
| 48 // Validate validates the Output configuration. | |
| 49 func (c *Config) Validate() error { | |
| 50 if c.Publisher == nil { | |
| 51 return errors.New("pubsub: no pub/sub instance configured") | |
| 52 } | |
| 53 if err := c.Topic.Validate(); err != nil { | |
| 54 return fmt.Errorf("pubsub: invalid Topic [%s]: %s", c.Topic, err
) | |
| 55 } | |
| 56 return nil | |
| 57 } | |
| 58 | |
| 59 // buffer | 51 // buffer |
| 60 type buffer struct { | 52 type buffer struct { |
| 61 bytes.Buffer // Output buffer for published message data. | 53 bytes.Buffer // Output buffer for published message data. |
| 62 | 54 |
| 63 frameWriter recordio.Writer | 55 frameWriter recordio.Writer |
| 64 protoWriter *butlerproto.Writer | 56 protoWriter *butlerproto.Writer |
| 65 } | 57 } |
| 66 | 58 |
| 67 // Butler Output that sends messages into Google Cloud PubSub as compressed | 59 // Butler Output that sends messages into Google Cloud PubSub as compressed |
| 68 // protocol buffer blobs. | 60 // protocol buffer blobs. |
| (...skipping 18 matching lines...) Expand all Loading... |
| 87 | 79 |
| 88 if c.Track { | 80 if c.Track { |
| 89 o.et = &output.EntryTracker{} | 81 o.et = &output.EntryTracker{} |
| 90 } | 82 } |
| 91 | 83 |
| 92 o.Context = log.SetField(ctx, "pubsub", &o) | 84 o.Context = log.SetField(ctx, "pubsub", &o) |
| 93 return &o | 85 return &o |
| 94 } | 86 } |
| 95 | 87 |
| 96 func (o *pubSubOutput) String() string { | 88 func (o *pubSubOutput) String() string { |
| 97 » return fmt.Sprintf("pubsub(%s)", o.Topic) | 89 » return fmt.Sprintf("pubsub(%s)", o.Topic.Name()) |
| 98 } | 90 } |
| 99 | 91 |
| 100 func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error { | 92 func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error { |
| 101 st := output.StatsBase{} | 93 st := output.StatsBase{} |
| 102 defer o.mergeStats(&st) | 94 defer o.mergeStats(&st) |
| 103 | 95 |
| 104 b := o.bufferPool.Get().(*buffer) | 96 b := o.bufferPool.Get().(*buffer) |
| 105 defer o.bufferPool.Put(b) | 97 defer o.bufferPool.Put(b) |
| 106 | 98 |
| 107 message, err := o.buildMessage(b, bundle) | 99 message, err := o.buildMessage(b, bundle) |
| 108 if err != nil { | 100 if err != nil { |
| 109 log.Fields{ | 101 log.Fields{ |
| 110 log.ErrorKey: err, | 102 log.ErrorKey: err, |
| 111 }.Errorf(o, "Failed to build PubSub Message from bundle.") | 103 }.Errorf(o, "Failed to build PubSub Message from bundle.") |
| 112 st.F.DiscardedMessages++ | 104 st.F.DiscardedMessages++ |
| 113 st.F.Errors++ | 105 st.F.Errors++ |
| 114 return err | 106 return err |
| 115 } | 107 } |
| 116 » if len(message.Data) > pubsub.MaxPublishSize { | 108 » if len(message.Data) > gcps.MaxPublishSize { |
| 117 log.Fields{ | 109 log.Fields{ |
| 118 "messageSize": len(message.Data), | 110 "messageSize": len(message.Data), |
| 119 » » » "maxPubSubSize": pubsub.MaxPublishSize, | 111 » » » "maxPubSubSize": gcps.MaxPublishSize, |
| 120 }.Errorf(o, "Constructed message exceeds Pub/Sub maximum size.") | 112 }.Errorf(o, "Constructed message exceeds Pub/Sub maximum size.") |
| 121 return errors.New("pubsub: bundle contents violate Pub/Sub size
limit") | 113 return errors.New("pubsub: bundle contents violate Pub/Sub size
limit") |
| 122 } | 114 } |
| 123 if err := o.publishMessages([]*pubsub.Message{message}); err != nil { | 115 if err := o.publishMessages([]*pubsub.Message{message}); err != nil { |
| 124 st.F.DiscardedMessages++ | 116 st.F.DiscardedMessages++ |
| 125 st.F.Errors++ | 117 st.F.Errors++ |
| 126 return err | 118 return err |
| 127 } | 119 } |
| 128 | 120 |
| 129 if o.et != nil { | 121 if o.et != nil { |
| 130 o.et.Track(bundle) | 122 o.et.Track(bundle) |
| 131 } | 123 } |
| 132 | 124 |
| 133 st.F.SentBytes += len(message.Data) | 125 st.F.SentBytes += len(message.Data) |
| 134 st.F.SentMessages++ | 126 st.F.SentMessages++ |
| 135 return nil | 127 return nil |
| 136 } | 128 } |
| 137 | 129 |
| 138 func (*pubSubOutput) MaxSize() int { | 130 func (*pubSubOutput) MaxSize() int { |
| 139 » return pubsub.MaxPublishSize / 2 | 131 » return gcps.MaxPublishSize / 2 |
| 140 } | 132 } |
| 141 | 133 |
| 142 func (o *pubSubOutput) Stats() output.Stats { | 134 func (o *pubSubOutput) Stats() output.Stats { |
| 143 o.statsMu.Lock() | 135 o.statsMu.Lock() |
| 144 defer o.statsMu.Unlock() | 136 defer o.statsMu.Unlock() |
| 145 | 137 |
| 146 statsCopy := o.stats | 138 statsCopy := o.stats |
| 147 return &statsCopy | 139 return &statsCopy |
| 148 } | 140 } |
| 149 | 141 |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 184 } | 176 } |
| 185 | 177 |
| 186 return &pubsub.Message{ | 178 return &pubsub.Message{ |
| 187 Data: buf.Bytes(), | 179 Data: buf.Bytes(), |
| 188 }, nil | 180 }, nil |
| 189 } | 181 } |
| 190 | 182 |
| 191 // publishMessages handles an individual publish request. It will indefinitely | 183 // publishMessages handles an individual publish request. It will indefinitely |
| 192 // retry transient errors until the publish succeeds. | 184 // retry transient errors until the publish succeeds. |
| 193 func (o *pubSubOutput) publishMessages(messages []*pubsub.Message) error { | 185 func (o *pubSubOutput) publishMessages(messages []*pubsub.Message) error { |
| 194 » messageIDs, err := o.Publisher.Publish(o, o.Topic, messages...) | 186 » var messageIDs []string |
| 187 » err := retry.Retry(o, retry.TransientOnly(indefiniteRetry), func() (err
error) { |
| 188 » » messageIDs, err = o.Topic.Publish(o, messages...) |
| 189 » » return |
| 190 » }, func(err error, d time.Duration) { |
| 191 » » log.Fields{ |
| 192 » » » log.ErrorKey: err, |
| 193 » » » "delay": d, |
| 194 » » » "count": len(messages), |
| 195 » » }.Warningf(o, "TRANSIENT error publishing messages; retrying..."
) |
| 196 » }) |
| 195 if err != nil { | 197 if err != nil { |
| 196 » » return err | 198 » » log.WithError(err).Errorf(o, "Failed to send PubSub message.") |
| 197 » } | |
| 198 » if err != nil { | |
| 199 » » log.Errorf(log.SetError(o, err), "Failed to send PubSub message.
") | |
| 200 return err | 199 return err |
| 201 } | 200 } |
| 202 | 201 |
| 203 » log.Debugf(log.SetField(o, "messageIds", messageIDs), "Published message
s.") | 202 » log.Fields{ |
| 203 » » "messageIds": messageIDs, |
| 204 » }.Debugf(o, "Published messages.") |
| 204 return nil | 205 return nil |
| 205 } | 206 } |
| 206 | 207 |
| 207 func (o *pubSubOutput) mergeStats(s output.Stats) { | 208 func (o *pubSubOutput) mergeStats(s output.Stats) { |
| 208 o.statsMu.Lock() | 209 o.statsMu.Lock() |
| 209 defer o.statsMu.Unlock() | 210 defer o.statsMu.Unlock() |
| 210 | 211 |
| 211 o.stats.Merge(s) | 212 o.stats.Merge(s) |
| 212 } | 213 } |
| 214 |
| 215 // indefiniteRetry is a retry.Iterator that will indefinitely retry errors with |
| 216 // a maximum backoff. |
| 217 func indefiniteRetry() retry.Iterator { |
| 218 return &retry.ExponentialBackoff{ |
| 219 Limited: retry.Limited{ |
| 220 Retries: -1, |
| 221 }, |
| 222 MaxDelay: 30 * time.Second, |
| 223 } |
| 224 } |
| OLD | NEW |