Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package pubsub | 5 package pubsub |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 "sync" | 11 "sync" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/luci/luci-go/client/internal/logdog/butler/output" | 14 "github.com/luci/luci-go/client/internal/logdog/butler/output" |
| 15 "github.com/luci/luci-go/common/gcloud/gcps" | 15 "github.com/luci/luci-go/common/gcloud/gcps" |
| 16 "github.com/luci/luci-go/common/logdog/butlerproto" | 16 "github.com/luci/luci-go/common/logdog/butlerproto" |
| 17 "github.com/luci/luci-go/common/logdog/protocol" | 17 "github.com/luci/luci-go/common/logdog/protocol" |
| 18 log "github.com/luci/luci-go/common/logging" | 18 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/recordio" | 19 "github.com/luci/luci-go/common/recordio" |
| 20 "github.com/luci/luci-go/common/retry" | 20 "github.com/luci/luci-go/common/retry" |
| 21 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
| 22 "google.golang.org/cloud/pubsub" | 22 "google.golang.org/cloud/pubsub" |
| 23 ) | 23 ) |
| 24 | 24 |
| 25 // Publisher is an interface for something that publishes Pub/Sub messages. | |
|
dnj (Google)
2016/01/21 04:36:24
Make a specific interface instead of requiring one
| |
| 26 // | |
| 27 // gcps.PubSub implements this interface. | |
| 28 type Publisher interface { | |
| 29 // Publish mirrors the gcps.PubSub Publish method. | |
| 30 Publish(context.Context, gcps.Topic, ...*pubsub.Message) ([]string, erro r) | |
| 31 } | |
| 32 | |
| 33 var _ Publisher = gcps.PubSub(nil) | |
| 34 | |
| 25 // Config is a configuration structure for GCPS output. | 35 // Config is a configuration structure for GCPS output. |
| 26 type Config struct { | 36 type Config struct { |
| 27 » // Pubsub is the Pub/Sub instance to use. | 37 » // Publisher is the Pub/Sub instance to use. |
| 28 » PubSub gcps.PubSub | 38 » Publisher Publisher |
| 29 | 39 |
| 30 // Topic is the name of the Cloud Pub/Sub topic to publish to. | 40 // Topic is the name of the Cloud Pub/Sub topic to publish to. |
| 31 Topic gcps.Topic | 41 Topic gcps.Topic |
| 32 | 42 |
| 33 // Compress, if true, enables zlib compression. | 43 // Compress, if true, enables zlib compression. |
| 34 Compress bool | 44 Compress bool |
| 35 } | 45 } |
| 36 | 46 |
| 37 // Validate validates the Output configuration. | 47 // Validate validates the Output configuration. |
| 38 func (c *Config) Validate() error { | 48 func (c *Config) Validate() error { |
| 39 » if c.PubSub == nil { | 49 » if c.Publisher == nil { |
| 40 return errors.New("gcps: no pub/sub instance configured") | 50 return errors.New("gcps: no pub/sub instance configured") |
| 41 } | 51 } |
| 42 if err := c.Topic.Validate(); err != nil { | 52 if err := c.Topic.Validate(); err != nil { |
| 43 return fmt.Errorf("gcps: invalid Topic [%s]: %s", c.Topic, err) | 53 return fmt.Errorf("gcps: invalid Topic [%s]: %s", c.Topic, err) |
| 44 } | 54 } |
| 45 return nil | 55 return nil |
| 46 } | 56 } |
| 47 | 57 |
| 48 // gcpsBuffer | 58 // gcpsBuffer |
| 49 type gcpsBuffer struct { | 59 type gcpsBuffer struct { |
| (...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 158 Data: buf.Bytes(), | 168 Data: buf.Bytes(), |
| 159 }, nil | 169 }, nil |
| 160 } | 170 } |
| 161 | 171 |
| 162 // publishMessages handles an individual publish request. It will indefinitely | 172 // publishMessages handles an individual publish request. It will indefinitely |
| 163 // retry transient errors until the publish succeeds. | 173 // retry transient errors until the publish succeeds. |
| 164 func (o *gcpsOutput) publishMessages(messages []*pubsub.Message) error { | 174 func (o *gcpsOutput) publishMessages(messages []*pubsub.Message) error { |
| 165 var messageIDs []string | 175 var messageIDs []string |
| 166 count := 0 | 176 count := 0 |
| 167 err := retry.Retry(o, retry.TransientOnly(retry.Default()), func() error { | 177 err := retry.Retry(o, retry.TransientOnly(retry.Default()), func() error { |
| 168 » » ids, err := o.PubSub.Publish(o.Topic, messages...) | 178 » » ids, err := o.Publisher.Publish(o, o.Topic, messages...) |
| 169 if err != nil { | 179 if err != nil { |
| 170 return err | 180 return err |
| 171 } | 181 } |
| 172 messageIDs = ids | 182 messageIDs = ids |
| 173 return nil | 183 return nil |
| 174 }, func(err error, d time.Duration) { | 184 }, func(err error, d time.Duration) { |
| 175 log.Fields{ | 185 log.Fields{ |
| 176 log.ErrorKey: err, | 186 log.ErrorKey: err, |
| 177 "count": count, | 187 "count": count, |
| 178 "delay": d, | 188 "delay": d, |
| 179 }.Warningf(o, "Transient publish error; retrying.") | 189 }.Warningf(o, "Transient publish error; retrying.") |
| 180 count++ | 190 count++ |
| 181 }) | 191 }) |
| 182 if err != nil { | 192 if err != nil { |
| 183 log.Errorf(log.SetError(o, err), "Failed to send PubSub message. ") | 193 log.Errorf(log.SetError(o, err), "Failed to send PubSub message. ") |
| 184 return err | 194 return err |
| 185 } | 195 } |
| 186 | 196 |
| 187 log.Debugf(log.SetField(o, "messageIds", messageIDs), "Published message s.") | 197 log.Debugf(log.SetField(o, "messageIds", messageIDs), "Published message s.") |
| 188 return nil | 198 return nil |
| 189 } | 199 } |
| 190 | 200 |
| 191 func (o *gcpsOutput) mergeStats(s output.Stats) { | 201 func (o *gcpsOutput) mergeStats(s output.Stats) { |
| 192 o.statsMu.Lock() | 202 o.statsMu.Lock() |
| 193 defer o.statsMu.Unlock() | 203 defer o.statsMu.Unlock() |
| 194 | 204 |
| 195 o.stats.Merge(s) | 205 o.stats.Merge(s) |
| 196 } | 206 } |
| OLD | NEW |