Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(136)

Side by Side Diff: client/internal/logdog/butler/bundler/bundler.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Updated from comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "container/heap"
9 "fmt"
10 "sync"
11 "time"
12
13 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
14 "github.com/luci/luci-go/common/clock"
15 "github.com/luci/luci-go/common/logdog/protocol"
16 "github.com/luci/luci-go/common/logdog/types"
17 "github.com/luci/luci-go/common/proto/google"
18 )
19
20 // Config is the Bundler configuration.
21 type Config struct {
22 // Clock is the clock instance that will be used for Bundler and stream
23 // timing.
24 Clock clock.Clock
25
26 // Source is the bundle source string to use. This can be empty if there is no
27 // source information to include.
28 Source string
29
30 // MaxBufferedBytes is the maximum number of bytes to buffer in memory p er
31 // stream.
32 MaxBufferedBytes int64
33
34 // MaxBundleSize is the maximum bundle size in bytes that may be generat ed.
35 //
36 // If this value is zero, no size constraint will be applied to generate d
37 // bundles.
38 MaxBundleSize int
39
40 // MaxBufferDelay is the maximum amount of time we're willing to buffer
41 // bundled data. Other factors can cause the bundle to be sent before th is,
42 // but it is an upper bound.
43 MaxBufferDelay time.Duration
44 }
45
46 type bundlerStream interface {
47 isDrained() bool
48 name() string
49 expireTime() (time.Time, bool)
50 nextBundleEntry(*builder, bool) bool
51 }
52
53 // Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for
54 // stream registration and bundle consumption.
55 type Bundler interface {
56 Register(streamproto.Properties) (Stream, error)
57
58 // Next causes the Bundler to pack a new ButlerLogBundle message for
59 // transmission.
60 //
61 // The maximum marshalled byte size of the bundle's protobuf is supplied ; the
62 // generated bundle will not exceed this size.
63 Next() *protocol.ButlerLogBundle
64
65 // CloseAndFlush flushes the Bundler, blocking until all registered Stre ams
66 // have closed and all buffered data has been output.
67 CloseAndFlush()
68 }
69
70 type bundlerImpl struct {
71 *Config
72
73 // finishedC is closed when makeBundles goroutine has terminated.
74 finishedC chan struct{}
75 bundleC chan *protocol.ButlerLogBundle
76
77 // streamsLock is a lock around the `streams` map and its contents.
78 streamsLock sync.Mutex
79 // streamsCond is a Cond bound to streamsLock, used to signal Next() whe n new
80 // data is available.
81 streamsCond *timeoutCond
82 // streams is the set of currently-registered Streams.
83 streams map[string]bundlerStream
84 // flushing is true if we're blocking on CloseAndFlush().
85 flushing bool
86
87 // prefixCounter is a global counter for Prefix-wide streams.
88 prefixCounter counter
89 }
90
91 // New instantiates a new Bundler instance.
92 func New(c Config) Bundler {
93 b := bundlerImpl{
94 Config: &c,
95 finishedC: make(chan struct{}),
96 bundleC: make(chan *protocol.ButlerLogBundle),
97 streams: map[string]bundlerStream{},
98 }
99 b.streamsCond = newTimeoutCond(b.getClock(), &b.streamsLock)
100
101 go b.makeBundles()
102 return &b
103 }
104
105 func (b *bundlerImpl) Register(p streamproto.Properties) (Stream, error) {
106 if s := b.streams[p.Name]; s != nil {
107 return nil, fmt.Errorf("a Stream is already registered for %q", p.Name)
108 }
109
110 // Our Properties must validate.
111 if err := p.Validate(); err != nil {
112 return nil, err
113 }
114
115 // Construct a parser for this stream.
116 c := streamConfig{
117 name: p.Name,
118 template: protocol.ButlerLogBundle_Entry{
119 Desc: &p.LogStreamDescriptor,
120 },
121 maximumBufferDuration: b.MaxBufferDelay,
122 maximumBufferedBytes: b.MaxBufferedBytes,
123 onAppend: func(appended bool) {
124 if appended {
125 b.signalStreamUpdate()
126 }
127 },
128 }
129
130 err := error(nil)
131 c.parser, err = newParser(&p, &b.prefixCounter)
132 if err != nil {
133 return nil, fmt.Errorf("failed to create stream parser: %s", err )
134 }
135
136 // Generate a secret for this Stream instance.
137 c.template.Secret, err = types.NewStreamSecret()
138 if err != nil {
139 return nil, fmt.Errorf("failed to generate stream secret: %s", e rr)
140 }
141
142 // Create a new stream. This will kick off its processing goroutine, whi ch
143 // will not stop until it is closed.
144 s := newStream(c)
145
146 // Register the stream.
147 b.streamsLock.Lock()
148 defer b.streamsLock.Unlock()
149
150 b.registerStreamLocked(s)
151 return s, nil
152 }
153
154 func (b *bundlerImpl) CloseAndFlush() {
155 // Mark that we're flushing. This will cause us to perform more aggressi ve
156 // bundling in Next().
157 b.startFlushing()
158 <-b.finishedC
159 }
160
161 func (b *bundlerImpl) Next() *protocol.ButlerLogBundle {
162 return <-b.bundleC
163 }
164
165 func (b *bundlerImpl) startFlushing() {
166 b.streamsLock.Lock()
167 defer b.streamsLock.Unlock()
168
169 if !b.flushing {
170 b.flushing = true
171 b.signalStreamUpdateLocked()
172 }
173 }
174
175 // makeBundles is run in its own goroutine. It runs continuously, responding
176 // to Stream constraints and availability and sending ButlerLogBundles through
177 // bundleC when available.
178 //
179 // makeBundles will terminate when closeC is closed and all streams are drained.
180 func (b *bundlerImpl) makeBundles() {
181 defer close(b.finishedC)
182 defer close(b.bundleC)
183
184 b.streamsLock.Lock()
185 defer b.streamsLock.Unlock()
186
187 bb := (*builder)(nil)
188 defer func() {
189 if bb != nil && bb.hasContent() {
190 b.bundleC <- bb.bundle()
191 }
192 }()
193
194 for {
195 bb = &builder{
196 size: b.MaxBundleSize,
197 template: protocol.ButlerLogBundle{
198 Source: b.Source,
199 Timestamp: google.NewTimestamp(b.getClock().Now( )),
200 },
201 }
202 oldestContentTime := time.Time{}
203
204 for {
205 state := b.getStreamStateLocked()
206
207 // Attempt to create more bundles.
208 sendNow := b.bundleRoundLocked(bb, state)
209
210 // Prune any drained streams.
211 state.forEachStream(func(s bundlerStream) bool {
212 if s.isDrained() {
213 state.removeStream(s.name())
214 b.unregisterStreamLocked(s)
215 }
216
217 return true
218 })
219
220 if b.flushing && len(state.streams) == 0 {
221 // We're flushing, and there are no more streams , so we're completely
222 // finished.
223 //
224 // If we have any content in our builder, it wil l be exported via defer.
225 return
226 }
227
228 // If we have content, consider emitting this bundle.
229 if bb.hasContent() {
230 if b.MaxBufferDelay == 0 || sendNow || bb.ready( ) {
231 break
232 }
233 }
234
235 // Mark the first time this round where we actually saw data.
236 if oldestContentTime.IsZero() && bb.hasContent() {
237 oldestContentTime = state.now
238 }
239
240 // We will yield our stream lock and sleep, waiting for either:
241 // 1) The earliest expiration time.
242 // 2) A streams channel signal.
243 //
244 // We use a Cond here because we want Streams to be able to be added
245 // while we're waiting for stream data.
246 nextExpire, has := state.nextExpire()
247
248 // If we have an oldest content time, that also means th at we have
249 // content. Factor this constraint in.
250 if !oldestContentTime.IsZero() {
251 roundExpire := oldestContentTime.Add(b.MaxBuffer Delay)
252 if !roundExpire.After(state.now) {
253 break
254 }
255
256 if !has || roundExpire.Before(nextExpire) {
257 nextExpire = roundExpire
258 has = true
259 }
260 }
261
262 if has {
263 b.streamsCond.waitTimeout(nextExpire.Sub(state.n ow))
264 } else {
265 // No data, no expire constraints. Wait indefini tely for something
266 // to change.
267 b.streamsCond.Wait()
268 }
269 }
270
271 // If our bundler has contents, send them.
272 if bb.hasContent() {
273 b.bundleC <- bb.bundle()
274 bb = nil
275 }
276 }
277 }
278
279 // Implements a single bundle building round. This incrementally adds data from
280 // the stream state to the supplied builder.
281 //
282 // This method will block until a suitable bundle is available. Availability
283 // is subject both to time and data constraints:
284 // - If buffered data, which is timestampped at ingest, has exceeded its
285 // buffer duration threshold, a Bundle will be cut immediately.
286 // - If no data is set to expire, the Bundler may wait for more data to
287 // produce a more optimally-packed bundle.
288 //
289 // At a high level, Next operates as follows:
290 // 1) Freeze all stream state.
291 // 2) Scan streams for data that has exceeded its threshold; if data is found:
292 // - Aggressively pack expired data into a Bundle.
293 // - Optimally pack the remainder of the Bundle with any available data.
294 // - Return the Bundle.
295 //
296 // 3) Examine the remaining data sizes, waiting for either:
297 // - Enough stream data to fill our Bundle.
298 // - Our timeout, if the Bundler is not closed.
299 // 4) Pack a Bundle with the remaining data optimally, emphasizing streams
300 // with older data.
301 //
302 // Returns true if bundle some data was added that should be sent immediately.
303 func (b *bundlerImpl) bundleRoundLocked(bb *builder, state *streamState) bool {
304 sendNow := false
305
306 // First pass: non-blocking data that has exceeded its storage threshold .
307 for bb.remaining() > 0 {
308 s := state.next()
309 if s == nil {
310 break
311 }
312
313 if et, has := s.expireTime(); !has || et.After(state.now) {
314 // This stream (and all other streams, since we're sorte d) expires in
315 // the future, so we're done with the first pass.
316 break
317 }
318
319 // Pull bundles from this stream.
320 if modified := s.nextBundleEntry(bb, true); modified {
321 state.streamUpdated(s.name())
322
323 // We have at least one time-sensitive bundle, so send t his round.
324 sendNow = true
325 }
326 }
327
328 // Second pass: bundle any available data.
329 state.forEachStream(func(s bundlerStream) bool {
330 if bb.remaining() == 0 {
331 return false
332 }
333
334 if modified := s.nextBundleEntry(bb, b.flushing); modified {
335 state.streamUpdated(s.name())
336 }
337 return true
338 })
339
340 return sendNow
341 }
342
343 func (b *bundlerImpl) getStreamStateLocked() *streamState {
344 // Lock and collect each stream.
345 state := &streamState{
346 streams: make([]bundlerStream, 0, len(b.streams)),
347 now: b.getClock().Now(),
348 }
349
350 for _, s := range b.streams {
351 state.streams = append(state.streams, s)
352 }
353 heap.Init(state)
354
355 return state
356 }
357
358 func (b *bundlerImpl) registerStreamLocked(s bundlerStream) {
359 b.streams[s.name()] = s
360 b.signalStreamUpdateLocked()
361 }
362
363 func (b *bundlerImpl) unregisterStreamLocked(s bundlerStream) {
364 delete(b.streams, s.name())
365 }
366
367 func (b *bundlerImpl) signalStreamUpdate() {
368 b.streamsLock.Lock()
369 defer b.streamsLock.Unlock()
370
371 b.signalStreamUpdateLocked()
372 }
373
374 func (b *bundlerImpl) signalStreamUpdateLocked() {
375 b.streamsCond.Broadcast()
376 }
377
378 // nextPrefixIndex is a goroutine-safe method that returns the next prefix index
379 // for the given Bundler.
380 func (b *bundlerImpl) nextPrefixIndex() uint64 {
381 return uint64(b.prefixCounter.next())
382 }
383
384 func (b *bundlerImpl) getClock() clock.Clock {
385 c := b.Clock
386 if c != nil {
387 return c
388 }
389 return clock.GetSystemClock()
390 }
391
392 // streamState is a snapshot of the current stream registration. All operations
393 // performed on the state require streamLock to be held.
394 //
395 // streamState implements heap.Interface for its streams array. Streams without
396 // data times (nil) are considered to be greater than those with times.
397 type streamState struct {
398 streams []bundlerStream
399 now time.Time
400 }
401
402 var _ heap.Interface = (*streamState)(nil)
403
404 func (s *streamState) next() bundlerStream {
405 if len(s.streams) == 0 {
406 return nil
407 }
408 return s.streams[0]
409 }
410
411 func (s *streamState) nextExpire() (time.Time, bool) {
412 if next := s.next(); next != nil {
413 if ts, ok := next.expireTime(); ok {
414 return ts, true
415 }
416 }
417 return time.Time{}, false
418 }
419
420 func (s *streamState) streamUpdated(name string) {
421 if si, idx := s.streamIndex(name); si != nil {
422 heap.Fix(s, idx)
423 }
424 }
425
426 func (s *streamState) forEachStream(f func(bundlerStream) bool) {
427 // Clone our streams, since the callback may mutate their order.
428 streams := make([]bundlerStream, len(s.streams))
429 for i, s := range s.streams {
430 streams[i] = s
431 }
432
433 for _, s := range streams {
434 if !f(s) {
435 break
436 }
437 }
438 }
439
440 // removeStream removes a stream from the stream state.
441 func (s *streamState) removeStream(name string) bundlerStream {
442 if si, idx := s.streamIndex(name); si != nil {
443 heap.Remove(s, idx)
444 return si
445 }
446 return nil
447 }
448
449 func (s *streamState) streamIndex(name string) (bundlerStream, int) {
450 for i, si := range s.streams {
451 if si.name() == name {
452 return si, i
453 }
454 }
455 return nil, -1
456 }
457
458 func (s *streamState) Len() int {
459 return len(s.streams)
460 }
461
462 func (s *streamState) Less(i, j int) bool {
463 si, sj := s.streams[i], s.streams[j]
464
465 if it, ok := si.expireTime(); ok {
466 if jt, ok := sj.expireTime(); ok {
467 return it.Before(jt)
468 }
469
470 // i has data, but j does not, so i < j.
471 return true
472 }
473
474 // i has no data, so i us greater than all other streams.
475 return false
476 }
477
478 func (s *streamState) Swap(i, j int) {
479 s.streams[i], s.streams[j] = s.streams[j], s.streams[i]
480 }
481
482 func (s *streamState) Push(x interface{}) {
483 s.streams = append(s.streams, x.(bundlerStream))
484 }
485
486 func (s *streamState) Pop() interface{} {
487 last := s.streams[len(s.streams)-1]
488 s.streams = s.streams[:len(s.streams)-1]
489 return last
490 }
OLDNEW
« no previous file with comments | « client/internal/logdog/butler/bundler/builder_test.go ('k') | client/internal/logdog/butler/bundler/bundler_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698