| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package cloudlogging | |
| 6 | |
| 7 import ( | |
| 8 "time" | |
| 9 | |
| 10 "github.com/luci/luci-go/common/retry" | |
| 11 "github.com/luci/luci-go/common/sync/parallel" | |
| 12 "golang.org/x/net/context" | |
| 13 ) | |
| 14 | |
| 15 const ( | |
| 16 // DefaultBatchSize is the number of log messages that will be sent in a | |
| 17 // single cloud logging publish request. | |
| 18 DefaultBatchSize = 1000 | |
| 19 | |
| 20 // DefaultParallelPushLimit is the maximum number of goroutines that may | |
| 21 // simultaneously push cloud logging data. | |
| 22 DefaultParallelPushLimit = 8 | |
| 23 ) | |
| 24 | |
| 25 // Buffer is a goroutine-safe intermediate buffer that implements Client. Any | |
| 26 // logs that are written to the Buffer are batched together before being sent | |
| 27 // to Cloud Logging. | |
| 28 type Buffer interface { | |
| 29 Client | |
| 30 | |
| 31 // StopAndFlush flushes the Buffer, blocking until all buffered data has
been | |
| 32 // transmitted. After closing, log writes will be discarded. | |
| 33 StopAndFlush() | |
| 34 | |
| 35 // Abort stops any current attempts to send messages. It is goroutine-sa
fe and | |
| 36 // can be called multiple times. | |
| 37 // | |
| 38 // If StopAndFlush is blocking on cloud logging send/retry, calling Abor
t will | |
| 39 // quickly terminate the attempts, causing StopAndFlush to complete. | |
| 40 Abort() | |
| 41 } | |
| 42 | |
| 43 // BufferOptions specifies configuration parameters for an instantiated Buffer | |
| 44 // instance. | |
| 45 type BufferOptions struct { | |
| 46 // BatchSize is the number of messages to batch together when uploading
to | |
| 47 // Cloud Logging endpoint. If zero, DefaultBatchSize will be used. | |
| 48 BatchSize int | |
| 49 | |
| 50 // ParallelPushLimit is the maximum number of PushEntries calls that wil
l be | |
| 51 // executed at the same time. If zero, DefaultParallelPushLimit will be
used. | |
| 52 ParallelPushLimit int | |
| 53 | |
| 54 // Retry is a generator. If not nil, it will be used to produce a retry | |
| 55 // Iterator that will be used to retry the PushEntries call to the wrapp
ed | |
| 56 // Client. | |
| 57 Retry func() retry.Iterator | |
| 58 } | |
| 59 | |
| 60 // bufferImpl is the default implementation of the Buffer interface. | |
| 61 type bufferImpl struct { | |
| 62 *BufferOptions | |
| 63 | |
| 64 ctx context.Context // The context to use for operations. | |
| 65 cancelFunc context.CancelFunc // The context's cancel function. | |
| 66 | |
| 67 client Client // The Cloud Logging Client to push through. | |
| 68 commonLabels map[string]string | |
| 69 | |
| 70 logC chan *Entry | |
| 71 finishedC chan struct{} | |
| 72 | |
| 73 testCB *testCallbacks | |
| 74 } | |
| 75 | |
| 76 // (Testing) a set of callbacks that can be installed by testing for | |
| 77 // fine-grained control. | |
| 78 type testCallbacks struct { | |
| 79 // bufferRound is called when the buffer's inner loop has completed a bu
ffer | |
| 80 // round. | |
| 81 bufferRound func([]*Entry) | |
| 82 // receivedLogEntry, if not nil, is called for each LogEntry that is rec
eived | |
| 83 // by the buffer's inner loop the moment that it is received. | |
| 84 receivedLogEntry func(*Entry) // (Testing) If not nil, callback when a l
og Entry is received. | |
| 85 } | |
| 86 | |
| 87 // NewBuffer instantiates and starts a new cloud logging Buffer. | |
| 88 // implementation. | |
| 89 func NewBuffer(ctx context.Context, o BufferOptions, c Client) Buffer { | |
| 90 if o.BatchSize <= 0 { | |
| 91 o.BatchSize = DefaultBatchSize | |
| 92 } | |
| 93 | |
| 94 if o.ParallelPushLimit <= 0 { | |
| 95 o.ParallelPushLimit = DefaultParallelPushLimit | |
| 96 } | |
| 97 | |
| 98 ctx, cancelFunc := context.WithCancel(ctx) | |
| 99 b := &bufferImpl{ | |
| 100 BufferOptions: &o, | |
| 101 | |
| 102 ctx: ctx, | |
| 103 cancelFunc: cancelFunc, | |
| 104 client: c, | |
| 105 | |
| 106 // Use a >1 multiple of BatchSize so casual logging doesn't imme
diately | |
| 107 // block pending buffer flush. | |
| 108 logC: make(chan *Entry, o.BatchSize*4), | |
| 109 finishedC: make(chan struct{}), | |
| 110 } | |
| 111 | |
| 112 go b.process() | |
| 113 return b | |
| 114 } | |
| 115 | |
| 116 func (b *bufferImpl) PushEntries(e []*Entry) error { | |
| 117 for _, entry := range e { | |
| 118 b.logC <- entry | |
| 119 } | |
| 120 return nil | |
| 121 } | |
| 122 | |
| 123 func (b *bufferImpl) StopAndFlush() { | |
| 124 close(b.logC) | |
| 125 <-b.finishedC | |
| 126 } | |
| 127 | |
| 128 func (b *bufferImpl) Abort() { | |
| 129 b.cancelFunc() | |
| 130 } | |
| 131 | |
| 132 // process is run in a separate goroutine to pull log entries and publish them | |
| 133 // to cloud logging. | |
| 134 func (b *bufferImpl) process() { | |
| 135 defer close(b.finishedC) | |
| 136 | |
| 137 // Create a push semaphore channel; fill with push tokens. | |
| 138 pushSemC := make(parallel.Semaphore, b.ParallelPushLimit) | |
| 139 | |
| 140 entries := make([]*Entry, b.BatchSize) | |
| 141 for e := range b.logC { | |
| 142 // Pull up to our entry capacity. | |
| 143 entries[0] = e | |
| 144 count := 1 | |
| 145 b.receivedLogEntry(e) | |
| 146 | |
| 147 // Buffer other logs that are also available in the channel. | |
| 148 for count < len(entries) { | |
| 149 moreE := (*Entry)(nil) | |
| 150 select { | |
| 151 case moreE = <-b.logC: | |
| 152 b.receivedLogEntry(moreE) | |
| 153 break | |
| 154 default: | |
| 155 break | |
| 156 } | |
| 157 if moreE == nil { | |
| 158 break | |
| 159 } | |
| 160 entries[count] = moreE | |
| 161 count++ | |
| 162 } | |
| 163 | |
| 164 // Clone entries so we can dispatch to goroutine. | |
| 165 entryCopy := make([]*Entry, count) | |
| 166 copy(entryCopy, entries[:count]) | |
| 167 | |
| 168 // (Testing) ACK any log entries that were received, for synchro
nization. | |
| 169 b.bufferRound(entryCopy) | |
| 170 | |
| 171 // Acquire a push channel semaphore token. | |
| 172 pushSemC.Lock() | |
| 173 go func() { | |
| 174 defer pushSemC.Unlock() | |
| 175 b.publishLogs(entryCopy) | |
| 176 }() | |
| 177 } | |
| 178 | |
| 179 // Acquire all of our push channel semaphore tokens (block until gorouti
nes | |
| 180 // are done). | |
| 181 pushSemC.TakeAll() | |
| 182 } | |
| 183 | |
| 184 // publishLogs writes a slice of log Entry to the wrapped Client. The underlying | |
| 185 // PushEntries call will be retried. | |
| 186 func (b *bufferImpl) publishLogs(entries []*Entry) { | |
| 187 // If we are aborted, Retry will detect this and abort. | |
| 188 err := retry.Retry(b.ctx, b.newRetryIterator, func() error { | |
| 189 return b.client.PushEntries(entries) | |
| 190 }, func(err error, delay time.Duration) { | |
| 191 b.writeError("cloudlogging: Failed to push entries, retrying in
%v: %v", delay, err) | |
| 192 }) | |
| 193 if err != nil { | |
| 194 b.writeError("cloudlogging: Failed to push entries: %s", err) | |
| 195 } | |
| 196 } | |
| 197 | |
| 198 func (b *bufferImpl) bufferRound(entries []*Entry) { | |
| 199 if b.testCB != nil && b.testCB.bufferRound != nil { | |
| 200 b.testCB.bufferRound(entries) | |
| 201 } | |
| 202 } | |
| 203 | |
| 204 func (b *bufferImpl) receivedLogEntry(e *Entry) { | |
| 205 if b.testCB != nil && b.testCB.receivedLogEntry != nil { | |
| 206 b.testCB.receivedLogEntry(e) | |
| 207 } | |
| 208 } | |
| 209 | |
| 210 // newRetryIterator generates a new retry iterator. If configured, the iterator | |
| 211 // will be generated by the Retry method; otherwise, a nil retry iterator (no | |
| 212 // retries) will be returned. | |
| 213 func (b *bufferImpl) newRetryIterator() retry.Iterator { | |
| 214 if b.Retry == nil { | |
| 215 return nil | |
| 216 } | |
| 217 return b.Retry() | |
| 218 } | |
| 219 | |
| 220 func (b *bufferImpl) writeError(f string, args ...interface{}) { | |
| 221 if c, ok := b.client.(*clientImpl); ok { | |
| 222 c.writeError(f, args...) | |
| 223 } | |
| 224 } | |
| OLD | NEW |