Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(235)

Side by Side Diff: client/internal/logdog/butler/output/pubsub/pubsubOutput.go

Issue 1975683002: LogDog: Implement prefix registration in Butler. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-collector
Patch Set: Rebarse Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package pubsub 5 package pubsub
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 "sync" 11 "sync"
12 "time" 12 "time"
13 13
14 "github.com/luci/luci-go/client/internal/logdog/butler/output" 14 "github.com/luci/luci-go/client/internal/logdog/butler/output"
15 gcps "github.com/luci/luci-go/common/gcloud/pubsub" 15 gcps "github.com/luci/luci-go/common/gcloud/pubsub"
16 "github.com/luci/luci-go/common/logdog/butlerproto" 16 "github.com/luci/luci-go/common/logdog/butlerproto"
17 "github.com/luci/luci-go/common/logdog/types"
17 log "github.com/luci/luci-go/common/logging" 18 log "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/common/proto/logdog/logpb" 19 "github.com/luci/luci-go/common/proto/logdog/logpb"
19 "github.com/luci/luci-go/common/recordio" 20 "github.com/luci/luci-go/common/recordio"
20 "github.com/luci/luci-go/common/retry" 21 "github.com/luci/luci-go/common/retry"
21 "golang.org/x/net/context" 22 "golang.org/x/net/context"
22 "google.golang.org/cloud/pubsub" 23 "google.golang.org/cloud/pubsub"
23 ) 24 )
24 25
25 // Topic is an interface for a Pub/Sub topic. 26 // Topic is an interface for a Pub/Sub topic.
26 // 27 //
27 // pubsub.Topic implements Topic. 28 // pubsub.Topic implements Topic.
28 type Topic interface { 29 type Topic interface {
29 // Name returns the name of the topic. 30 // Name returns the name of the topic.
30 Name() string 31 Name() string
31 32
32 // Publish mirrors the pubsub.Connection Publish method. 33 // Publish mirrors the pubsub.Connection Publish method.
33 Publish(context.Context, ...*pubsub.Message) ([]string, error) 34 Publish(context.Context, ...*pubsub.Message) ([]string, error)
34 } 35 }
35 36
36 var _ Topic = (*pubsub.Topic)(nil) 37 var _ Topic = (*pubsub.Topic)(nil)
37 38
38 // Config is a configuration structure for Pub/Sub output. 39 // Config is a configuration structure for Pub/Sub output.
39 type Config struct { 40 type Config struct {
40 // Topic is the Pub/Sub topic to publish to. 41 // Topic is the Pub/Sub topic to publish to.
41 Topic Topic 42 Topic Topic
42 43
44 // Secret, if not nil, is the prefix secret to attach to each outgoing b undle.
45 Secret types.PrefixSecret
46
43 // Compress, if true, enables zlib compression. 47 // Compress, if true, enables zlib compression.
44 Compress bool 48 Compress bool
45 49
46 // Track, if true, tracks all log entries that have been successfully 50 // Track, if true, tracks all log entries that have been successfully
47 // submitted. 51 // submitted.
48 Track bool 52 Track bool
49 } 53 }
50 54
51 // buffer 55 // buffer
52 type buffer struct { 56 type buffer struct {
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
89 return fmt.Sprintf("pubsub(%s)", o.Topic.Name()) 93 return fmt.Sprintf("pubsub(%s)", o.Topic.Name())
90 } 94 }
91 95
92 func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error { 96 func (o *pubSubOutput) SendBundle(bundle *logpb.ButlerLogBundle) error {
93 st := output.StatsBase{} 97 st := output.StatsBase{}
94 defer o.mergeStats(&st) 98 defer o.mergeStats(&st)
95 99
96 b := o.bufferPool.Get().(*buffer) 100 b := o.bufferPool.Get().(*buffer)
97 defer o.bufferPool.Put(b) 101 defer o.bufferPool.Put(b)
98 102
103 bundle.Secret = []byte(o.Secret)
99 message, err := o.buildMessage(b, bundle) 104 message, err := o.buildMessage(b, bundle)
100 if err != nil { 105 if err != nil {
101 log.Fields{ 106 log.Fields{
102 log.ErrorKey: err, 107 log.ErrorKey: err,
103 }.Errorf(o, "Failed to build PubSub Message from bundle.") 108 }.Errorf(o, "Failed to build PubSub Message from bundle.")
104 st.F.DiscardedMessages++ 109 st.F.DiscardedMessages++
105 st.F.Errors++ 110 st.F.Errors++
106 return err 111 return err
107 } 112 }
108 if len(message.Data) > gcps.MaxPublishSize { 113 if len(message.Data) > gcps.MaxPublishSize {
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
215 // indefiniteRetry is a retry.Iterator that will indefinitely retry errors with 220 // indefiniteRetry is a retry.Iterator that will indefinitely retry errors with
216 // a maximum backoff. 221 // a maximum backoff.
217 func indefiniteRetry() retry.Iterator { 222 func indefiniteRetry() retry.Iterator {
218 return &retry.ExponentialBackoff{ 223 return &retry.ExponentialBackoff{
219 Limited: retry.Limited{ 224 Limited: retry.Limited{
220 Retries: -1, 225 Retries: -1,
221 }, 226 },
222 MaxDelay: 30 * time.Second, 227 MaxDelay: 30 * time.Second,
223 } 228 }
224 } 229 }
OLDNEW
« no previous file with comments | « client/internal/logdog/butler/butler_test.go ('k') | server/internal/logdog/collector/coordinator/cache.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698