Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(281)

Side by Side Diff: common/cloudlogging/buffer.go

Issue 2937693003: Make luci-go compile again after deps.lock roll. (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | common/cloudlogging/buffer_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package cloudlogging
6
7 import (
8 "time"
9
10 "github.com/luci/luci-go/common/retry"
11 "github.com/luci/luci-go/common/sync/parallel"
12 "golang.org/x/net/context"
13 )
14
15 const (
16 // DefaultBatchSize is the number of log messages that will be sent in a
17 // single cloud logging publish request.
18 DefaultBatchSize = 1000
19
20 // DefaultParallelPushLimit is the maximum number of goroutines that may
21 // simultaneously push cloud logging data.
22 DefaultParallelPushLimit = 8
23 )
24
25 // Buffer is a goroutine-safe intermediate buffer that implements Client. Any
26 // logs that are written to the Buffer are batched together before being sent
27 // to Cloud Logging.
28 type Buffer interface {
29 Client
30
31 // StopAndFlush flushes the Buffer, blocking until all buffered data has been
32 // transmitted. After closing, log writes will be discarded.
33 StopAndFlush()
34
35 // Abort stops any current attempts to send messages. It is goroutine-sa fe and
36 // can be called multiple times.
37 //
38 // If StopAndFlush is blocking on cloud logging send/retry, calling Abor t will
39 // quickly terminate the attempts, causing StopAndFlush to complete.
40 Abort()
41 }
42
43 // BufferOptions specifies configuration parameters for an instantiated Buffer
44 // instance.
45 type BufferOptions struct {
46 // BatchSize is the number of messages to batch together when uploading to
47 // Cloud Logging endpoint. If zero, DefaultBatchSize will be used.
48 BatchSize int
49
50 // ParallelPushLimit is the maximum number of PushEntries calls that wil l be
51 // executed at the same time. If zero, DefaultParallelPushLimit will be used.
52 ParallelPushLimit int
53
54 // Retry is a generator. If not nil, it will be used to produce a retry
55 // Iterator that will be used to retry the PushEntries call to the wrapp ed
56 // Client.
57 Retry func() retry.Iterator
58 }
59
60 // bufferImpl is the default implementation of the Buffer interface.
61 type bufferImpl struct {
62 *BufferOptions
63
64 ctx context.Context // The context to use for operations.
65 cancelFunc context.CancelFunc // The context's cancel function.
66
67 client Client // The Cloud Logging Client to push through.
68 commonLabels map[string]string
69
70 logC chan *Entry
71 finishedC chan struct{}
72
73 testCB *testCallbacks
74 }
75
76 // (Testing) a set of callbacks that can be installed by testing for
77 // fine-grained control.
78 type testCallbacks struct {
79 // bufferRound is called when the buffer's inner loop has completed a bu ffer
80 // round.
81 bufferRound func([]*Entry)
82 // receivedLogEntry, if not nil, is called for each LogEntry that is rec eived
83 // by the buffer's inner loop the moment that it is received.
84 receivedLogEntry func(*Entry) // (Testing) If not nil, callback when a l og Entry is received.
85 }
86
87 // NewBuffer instantiates and starts a new cloud logging Buffer.
88 // implementation.
89 func NewBuffer(ctx context.Context, o BufferOptions, c Client) Buffer {
90 if o.BatchSize <= 0 {
91 o.BatchSize = DefaultBatchSize
92 }
93
94 if o.ParallelPushLimit <= 0 {
95 o.ParallelPushLimit = DefaultParallelPushLimit
96 }
97
98 ctx, cancelFunc := context.WithCancel(ctx)
99 b := &bufferImpl{
100 BufferOptions: &o,
101
102 ctx: ctx,
103 cancelFunc: cancelFunc,
104 client: c,
105
106 // Use a >1 multiple of BatchSize so casual logging doesn't imme diately
107 // block pending buffer flush.
108 logC: make(chan *Entry, o.BatchSize*4),
109 finishedC: make(chan struct{}),
110 }
111
112 go b.process()
113 return b
114 }
115
116 func (b *bufferImpl) PushEntries(e []*Entry) error {
117 for _, entry := range e {
118 b.logC <- entry
119 }
120 return nil
121 }
122
123 func (b *bufferImpl) StopAndFlush() {
124 close(b.logC)
125 <-b.finishedC
126 }
127
128 func (b *bufferImpl) Abort() {
129 b.cancelFunc()
130 }
131
132 // process is run in a separate goroutine to pull log entries and publish them
133 // to cloud logging.
134 func (b *bufferImpl) process() {
135 defer close(b.finishedC)
136
137 // Create a push semaphore channel; fill with push tokens.
138 pushSemC := make(parallel.Semaphore, b.ParallelPushLimit)
139
140 entries := make([]*Entry, b.BatchSize)
141 for e := range b.logC {
142 // Pull up to our entry capacity.
143 entries[0] = e
144 count := 1
145 b.receivedLogEntry(e)
146
147 // Buffer other logs that are also available in the channel.
148 for count < len(entries) {
149 moreE := (*Entry)(nil)
150 select {
151 case moreE = <-b.logC:
152 b.receivedLogEntry(moreE)
153 break
154 default:
155 break
156 }
157 if moreE == nil {
158 break
159 }
160 entries[count] = moreE
161 count++
162 }
163
164 // Clone entries so we can dispatch to goroutine.
165 entryCopy := make([]*Entry, count)
166 copy(entryCopy, entries[:count])
167
168 // (Testing) ACK any log entries that were received, for synchro nization.
169 b.bufferRound(entryCopy)
170
171 // Acquire a push channel semaphore token.
172 pushSemC.Lock()
173 go func() {
174 defer pushSemC.Unlock()
175 b.publishLogs(entryCopy)
176 }()
177 }
178
179 // Acquire all of our push channel semaphore tokens (block until gorouti nes
180 // are done).
181 pushSemC.TakeAll()
182 }
183
184 // publishLogs writes a slice of log Entry to the wrapped Client. The underlying
185 // PushEntries call will be retried.
186 func (b *bufferImpl) publishLogs(entries []*Entry) {
187 // If we are aborted, Retry will detect this and abort.
188 err := retry.Retry(b.ctx, b.newRetryIterator, func() error {
189 return b.client.PushEntries(entries)
190 }, func(err error, delay time.Duration) {
191 b.writeError("cloudlogging: Failed to push entries, retrying in %v: %v", delay, err)
192 })
193 if err != nil {
194 b.writeError("cloudlogging: Failed to push entries: %s", err)
195 }
196 }
197
198 func (b *bufferImpl) bufferRound(entries []*Entry) {
199 if b.testCB != nil && b.testCB.bufferRound != nil {
200 b.testCB.bufferRound(entries)
201 }
202 }
203
204 func (b *bufferImpl) receivedLogEntry(e *Entry) {
205 if b.testCB != nil && b.testCB.receivedLogEntry != nil {
206 b.testCB.receivedLogEntry(e)
207 }
208 }
209
210 // newRetryIterator generates a new retry iterator. If configured, the iterator
211 // will be generated by the Retry method; otherwise, a nil retry iterator (no
212 // retries) will be returned.
213 func (b *bufferImpl) newRetryIterator() retry.Iterator {
214 if b.Retry == nil {
215 return nil
216 }
217 return b.Retry()
218 }
219
220 func (b *bufferImpl) writeError(f string, args ...interface{}) {
221 if c, ok := b.client.(*clientImpl); ok {
222 c.writeError(f, args...)
223 }
224 }
OLDNEW
« no previous file with comments | « no previous file | common/cloudlogging/buffer_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698