Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(100)

Side by Side Diff: client/internal/logdog/butler/output/pubsub/pubsubOutput.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package pubsub 5 package pubsub
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 "sync" 11 "sync"
12 "time" 12 "time"
13 13
14 "github.com/luci/luci-go/client/internal/logdog/butler/output" 14 "github.com/luci/luci-go/client/internal/logdog/butler/output"
15 "github.com/luci/luci-go/common/gcloud/gcps" 15 "github.com/luci/luci-go/common/gcloud/gcps"
16 "github.com/luci/luci-go/common/logdog/butlerproto" 16 "github.com/luci/luci-go/common/logdog/butlerproto"
17 "github.com/luci/luci-go/common/logdog/protocol" 17 "github.com/luci/luci-go/common/logdog/protocol"
18 log "github.com/luci/luci-go/common/logging" 18 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/recordio" 19 "github.com/luci/luci-go/common/recordio"
20 "github.com/luci/luci-go/common/retry" 20 "github.com/luci/luci-go/common/retry"
21 "golang.org/x/net/context" 21 "golang.org/x/net/context"
22 "google.golang.org/cloud/pubsub" 22 "google.golang.org/cloud/pubsub"
23 ) 23 )
24 24
25 // Publisher is an interface for something that publishes Pub/Sub messages.
dnj (Google) 2016/01/21 04:36:24 Make a specific interface instead of requiring one
26 //
27 // gcps.PubSub implements this interface.
28 type Publisher interface {
29 // Publish mirrors the gcps.PubSub Publish method.
30 Publish(context.Context, gcps.Topic, ...*pubsub.Message) ([]string, erro r)
31 }
32
33 var _ Publisher = gcps.PubSub(nil)
34
25 // Config is a configuration structure for GCPS output. 35 // Config is a configuration structure for GCPS output.
26 type Config struct { 36 type Config struct {
27 » // Pubsub is the Pub/Sub instance to use. 37 » // Publisher is the Pub/Sub instance to use.
28 » PubSub gcps.PubSub 38 » Publisher Publisher
29 39
30 // Topic is the name of the Cloud Pub/Sub topic to publish to. 40 // Topic is the name of the Cloud Pub/Sub topic to publish to.
31 Topic gcps.Topic 41 Topic gcps.Topic
32 42
33 // Compress, if true, enables zlib compression. 43 // Compress, if true, enables zlib compression.
34 Compress bool 44 Compress bool
35 } 45 }
36 46
37 // Validate validates the Output configuration. 47 // Validate validates the Output configuration.
38 func (c *Config) Validate() error { 48 func (c *Config) Validate() error {
39 » if c.PubSub == nil { 49 » if c.Publisher == nil {
40 return errors.New("gcps: no pub/sub instance configured") 50 return errors.New("gcps: no pub/sub instance configured")
41 } 51 }
42 if err := c.Topic.Validate(); err != nil { 52 if err := c.Topic.Validate(); err != nil {
43 return fmt.Errorf("gcps: invalid Topic [%s]: %s", c.Topic, err) 53 return fmt.Errorf("gcps: invalid Topic [%s]: %s", c.Topic, err)
44 } 54 }
45 return nil 55 return nil
46 } 56 }
47 57
48 // gcpsBuffer 58 // gcpsBuffer
49 type gcpsBuffer struct { 59 type gcpsBuffer struct {
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
158 Data: buf.Bytes(), 168 Data: buf.Bytes(),
159 }, nil 169 }, nil
160 } 170 }
161 171
162 // publishMessages handles an individual publish request. It will indefinitely 172 // publishMessages handles an individual publish request. It will indefinitely
163 // retry transient errors until the publish succeeds. 173 // retry transient errors until the publish succeeds.
164 func (o *gcpsOutput) publishMessages(messages []*pubsub.Message) error { 174 func (o *gcpsOutput) publishMessages(messages []*pubsub.Message) error {
165 var messageIDs []string 175 var messageIDs []string
166 count := 0 176 count := 0
167 err := retry.Retry(o, retry.TransientOnly(retry.Default()), func() error { 177 err := retry.Retry(o, retry.TransientOnly(retry.Default()), func() error {
168 » » ids, err := o.PubSub.Publish(o.Topic, messages...) 178 » » ids, err := o.Publisher.Publish(o, o.Topic, messages...)
169 if err != nil { 179 if err != nil {
170 return err 180 return err
171 } 181 }
172 messageIDs = ids 182 messageIDs = ids
173 return nil 183 return nil
174 }, func(err error, d time.Duration) { 184 }, func(err error, d time.Duration) {
175 log.Fields{ 185 log.Fields{
176 log.ErrorKey: err, 186 log.ErrorKey: err,
177 "count": count, 187 "count": count,
178 "delay": d, 188 "delay": d,
179 }.Warningf(o, "Transient publish error; retrying.") 189 }.Warningf(o, "Transient publish error; retrying.")
180 count++ 190 count++
181 }) 191 })
182 if err != nil { 192 if err != nil {
183 log.Errorf(log.SetError(o, err), "Failed to send PubSub message. ") 193 log.Errorf(log.SetError(o, err), "Failed to send PubSub message. ")
184 return err 194 return err
185 } 195 }
186 196
187 log.Debugf(log.SetField(o, "messageIds", messageIDs), "Published message s.") 197 log.Debugf(log.SetField(o, "messageIds", messageIDs), "Published message s.")
188 return nil 198 return nil
189 } 199 }
190 200
191 func (o *gcpsOutput) mergeStats(s output.Stats) { 201 func (o *gcpsOutput) mergeStats(s output.Stats) {
192 o.statsMu.Lock() 202 o.statsMu.Lock()
193 defer o.statsMu.Unlock() 203 defer o.statsMu.Unlock()
194 204
195 o.stats.Merge(s) 205 o.stats.Merge(s)
196 } 206 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698