 Chromium Code Reviews
 Chromium Code Reviews Issue 1412063008:
  logdog: Add bundler library.  (Closed) 
  Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
    
  
    Issue 1412063008:
  logdog: Add bundler library.  (Closed) 
  Base URL: https://github.com/luci/luci-go@logdog-review-streamserver| Index: client/internal/logdog/butler/bundler/bundler.go | 
| diff --git a/client/internal/logdog/butler/bundler/bundler.go b/client/internal/logdog/butler/bundler/bundler.go | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..b166b97acbbb2cbb17279318b093eae375b9428f | 
| --- /dev/null | 
| +++ b/client/internal/logdog/butler/bundler/bundler.go | 
| @@ -0,0 +1,487 @@ | 
| +// Copyright 2015 The Chromium Authors. All rights reserved. | 
| +// Use of this source code is governed by a BSD-style license that can be | 
| +// found in the LICENSE file. | 
| + | 
| +package bundler | 
| + | 
| +import ( | 
| + "container/heap" | 
| + "fmt" | 
| + "sync" | 
| + "time" | 
| + | 
| + "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" | 
| + "github.com/luci/luci-go/common/clock" | 
| + "github.com/luci/luci-go/common/logdog/protocol" | 
| + "github.com/luci/luci-go/common/logdog/types" | 
| + "github.com/luci/luci-go/common/proto/google" | 
| +) | 
| + | 
| +// Config is the Bundler configuration. | 
| +type Config struct { | 
| + // Clock is the clock instance that will be used for Bundler and stream | 
| + // timing. | 
| + Clock clock.Clock | 
| + | 
| + // Source is the bundle source string to use. This can be empty if there is no | 
| + // source information to include. | 
| + Source string | 
| + | 
| + // MaxBufferedBytes is the maximum number of bytes to buffer in memory per | 
| + // stream. | 
| + MaxBufferedBytes int64 | 
| + | 
| + // MaxBundleSize is the maximum bundle size in bytes that may be generated. | 
| + // | 
| + // If this value is zero, no size constraint will be applied to generated | 
| + // bundles. | 
| + MaxBundleSize int | 
| + | 
| + // MaxBufferDelay is the maximum amount of time we're willing to buffer | 
| + // bundled data. Other factors can cause the bundle to be sent before this, | 
| + // but it is an upper bound. | 
| + MaxBufferDelay time.Duration | 
| +} | 
| + | 
| +type bundlerStream interface { | 
| + isDrained() bool | 
| + name() string | 
| + expireTime() (time.Time, bool) | 
| + nextBundleEntry(*builder, bool) bool | 
| +} | 
| + | 
| +// Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for | 
| +// stream registration and bundle consumption. | 
| +type Bundler interface { | 
| + Register(streamproto.Properties) (Stream, error) | 
| + | 
| + // Next causes the Bundler to pack a new ButlerLogBundle message for | 
| + // transmission. | 
| + // | 
| + // The maximum marshalled byte size of the bundle's protobuf is supplied; the | 
| + // generated bundle will not exceed this size. | 
| + Next() *protocol.ButlerLogBundle | 
| + | 
| + // CloseAndFlush flushes the Bundler, blocking until all registered Streams | 
| + // have closed and all buffered data has been output. | 
| + CloseAndFlush() | 
| +} | 
| + | 
| +type bundlerImpl struct { | 
| + *Config | 
| + | 
| + // finishedC is closed when makeBundles goroutine has terminated. | 
| + finishedC chan struct{} | 
| + bundleC chan *protocol.ButlerLogBundle | 
| + | 
| + // streamsLock is a lock around the `streams` map and its contents. | 
| + streamsLock sync.Mutex | 
| + // streamsCond is a Cond bound to streamsLock, used to signal Next() when new | 
| + // data is available. | 
| + streamsCond *timeoutCond | 
| + // streams is the set of currently-registered Streams. | 
| + streams map[string]bundlerStream | 
| + // flushing is true if we're blocking on CloseAndFlush(). | 
| + flushing bool | 
| + | 
| + // prefixCounter is a global counter for Prefix-wide streams. | 
| + prefixCounter counter | 
| +} | 
| + | 
| +// New instantiates a new Bundler instance. | 
| +func New(c Config) Bundler { | 
| + b := bundlerImpl{ | 
| + Config: &c, | 
| + finishedC: make(chan struct{}), | 
| + bundleC: make(chan *protocol.ButlerLogBundle), | 
| + streams: map[string]bundlerStream{}, | 
| + } | 
| + b.streamsCond = newTimeoutCond(b.getClock(), &b.streamsLock) | 
| + | 
| + go b.makeBundles() | 
| + return &b | 
| +} | 
| + | 
| +func (b *bundlerImpl) Register(p streamproto.Properties) (Stream, error) { | 
| + b.streamsLock.Lock() | 
| + defer b.streamsLock.Unlock() | 
| + | 
| + if s := b.streams[p.Name]; s != nil { | 
| + return nil, fmt.Errorf("a Stream is already registered for %q", p.Name) | 
| + } | 
| + | 
| + // Our Properties must validate. | 
| + if err := p.Validate(); err != nil { | 
| + return nil, err | 
| + } | 
| 
iannucci
2015/11/13 07:16:38
can we do this before the lock? actually... can't
 
dnj
2015/11/14 00:30:36
Yep, Done.
 | 
| + | 
| + // Construct a parser for this stream. | 
| + c := streamConfig{ | 
| + name: p.Name, | 
| + template: protocol.ButlerLogBundle_Entry{ | 
| + Desc: &p.LogStreamDescriptor, | 
| + }, | 
| + maximumBufferDuration: b.MaxBufferDelay, | 
| + maximumBufferedBytes: b.MaxBufferedBytes, | 
| + onAppend: func(appended bool) { | 
| + if appended { | 
| + b.signalStreamUpdate() | 
| + } | 
| + }, | 
| + } | 
| + | 
| + err := error(nil) | 
| + c.parser, err = newParser(&p, &b.prefixCounter) | 
| + if err != nil { | 
| + return nil, fmt.Errorf("failed to create stream parser: %s", err) | 
| + } | 
| + | 
| + // Generate a secret for this Stream instance. | 
| + c.template.Secret, err = types.NewStreamSecret() | 
| + if err != nil { | 
| + return nil, fmt.Errorf("failed to generate stream secret: %s", err) | 
| + } | 
| + | 
| + // Create a new stream. This will kick off its processing goroutine, which | 
| + // will not stop until it is closed. | 
| + s := newStream(c) | 
| + b.registerStreamLocked(s) | 
| + return s, nil | 
| +} | 
| + | 
| +func (b *bundlerImpl) CloseAndFlush() { | 
| + // Mark that we're flushing. This will cause us to perform more aggressive | 
| + // bundling in Next(). | 
| + b.startFlushing() | 
| + <-b.finishedC | 
| +} | 
| + | 
| +func (b *bundlerImpl) Next() *protocol.ButlerLogBundle { | 
| + return <-b.bundleC | 
| +} | 
| + | 
| +func (b *bundlerImpl) startFlushing() { | 
| + b.streamsLock.Lock() | 
| + defer b.streamsLock.Unlock() | 
| + | 
| + if !b.flushing { | 
| + b.flushing = true | 
| + b.signalStreamUpdateLocked() | 
| + } | 
| +} | 
| + | 
| +// makeBundles is run in its own goroutine. It runs continuously, responding | 
| +// to Stream constraints and availability and sending ButlerLogBundles through | 
| +// bundleC when available. | 
| +// | 
| +// makeBundles will terminate when closeC is closed and all streams are drained. | 
| +func (b *bundlerImpl) makeBundles() { | 
| + defer close(b.finishedC) | 
| + defer close(b.bundleC) | 
| + | 
| + b.streamsLock.Lock() | 
| + defer b.streamsLock.Unlock() | 
| + | 
| + bb := (*builder)(nil) | 
| + defer func() { | 
| + if bb != nil && bb.hasContent() { | 
| + b.bundleC <- bb.bundle() | 
| + } | 
| + }() | 
| + | 
| + for { | 
| + bb = &builder{ | 
| + template: protocol.ButlerLogBundle{ | 
| + Source: b.Source, | 
| + Timestamp: google.NewTimestamp(b.getClock().Now()), | 
| + }, | 
| + size: b.MaxBundleSize, | 
| + } | 
| + oldestContentTime := time.Time{} | 
| + | 
| + for { | 
| + state := b.getStreamStateLocked() | 
| + | 
| + // Attempt to create more bundles. | 
| + sendNow := b.bundleRoundLocked(bb, state) | 
| + | 
| + // Prune any drained streams. | 
| + state.forEachStream(func(s bundlerStream) bool { | 
| + if s.isDrained() { | 
| + state.removeStream(s.name()) | 
| + b.unregisterStreamLocked(s) | 
| + } | 
| + | 
| + return true | 
| + }) | 
| + | 
| + if b.flushing && len(state.streams) == 0 { | 
| + // We're flushing, and there are no more streams, so we're completely | 
| + // finished. | 
| + // | 
| + // If we have any content in our builder, it will be exported via defer. | 
| + return | 
| + } | 
| + | 
| + // If we have content, consider emitting this bundle. | 
| + if bb.hasContent() { | 
| + if b.MaxBufferDelay == 0 || sendNow || bb.ready() { | 
| + break | 
| + } | 
| + } | 
| + | 
| + // Mark the first time this round where we actually saw data. | 
| + if oldestContentTime.IsZero() && bb.hasContent() { | 
| + oldestContentTime = state.now | 
| + } | 
| + | 
| + // We will yield our stream lock and sleep, waiting for either: | 
| + // 1) The earliest expiration time. | 
| + // 2) A streams channel signal. | 
| + // | 
| + // We use a Cond here because we want Streams to be able to be added | 
| + // while we're waiting for stream data. | 
| + nextExpire, has := state.nextExpire() | 
| + | 
| + // If we have an oldest content time, that also means that we have | 
| + // content. Factor this constraint in. | 
| + if !oldestContentTime.IsZero() { | 
| + roundExpire := oldestContentTime.Add(b.MaxBufferDelay) | 
| + if !roundExpire.After(state.now) { | 
| + break | 
| + } | 
| + | 
| + if !has || roundExpire.Before(nextExpire) { | 
| + nextExpire = roundExpire | 
| + has = true | 
| + } | 
| + } | 
| + | 
| + if has { | 
| + b.streamsCond.waitTimeout(nextExpire.Sub(state.now)) | 
| + } else { | 
| + // No data, no expire constraints. Wait indefinitely for something | 
| + // to change. | 
| + b.streamsCond.Wait() | 
| + } | 
| + } | 
| + | 
| + // If our bundler has contents, send them. | 
| + if bb.hasContent() { | 
| + b.bundleC <- bb.bundle() | 
| + bb = nil | 
| + } | 
| + } | 
| +} | 
| + | 
| +// Performs a round of bundling with a stream state. polling stream state and adding | 
| 
iannucci
2015/11/13 07:16:38
this seems incomplete?
 
dnj
2015/11/14 00:30:36
Done.
 | 
| +// | 
| +// This method will block until a suitable bundle is available. Availability | 
| +// is subject both to time and data constraints: | 
| +// - If data has exceeded its buffer duration threshold, a Bundle will be cut | 
| +// immediately. | 
| 
iannucci
2015/11/13 07:16:38
are thresholds calculated absolutely, or from the
 
dnj
2015/11/14 00:30:36
They are relative to when the buffered data was in
 | 
| +// - If no data is set to expire, the Bundler may wait for more data to | 
| +// produce a more optimally-packed bundle. | 
| +// | 
| +// At a high level, Next operates as follows: | 
| +// 1) Freeze all stream state. | 
| +// 2) Scan streams for data that has exceeded its threshold; if data is found: | 
| +// - Aggressively pack expired data into a Bundle. | 
| +// - Optimally pack the remainder of the Bundle with any available data. | 
| +// - Return the Bundle. | 
| +// | 
| +// 3) Examine the remaining data sizes, waiting for either: | 
| +// - Enough stream data to fill our Bundle. | 
| +// - Our timeout, if the Bundler is not closed. | 
| +// 4) Pack a Bundle with the remaining data optimally, emphasizing streams | 
| +// with older data. | 
| +// | 
| +// Returns true if bundle some data was added that should be sent immediately. | 
| +func (b *bundlerImpl) bundleRoundLocked(bb *builder, state *streamState) bool { | 
| + sendNow := false | 
| + | 
| + // First pass: non-blocking data that has exceeded its storage threshold. | 
| + for bb.remaining() > 0 { | 
| + s := state.next() | 
| + if s == nil { | 
| + break | 
| + } | 
| + | 
| + if et, has := s.expireTime(); !has || et.After(state.now) { | 
| + // This stream (and all other streams, since we're sorted) expires in | 
| + // the future, so we're done with the first pass. | 
| + break | 
| + } | 
| + | 
| + // Pull bundles from this stream. | 
| + if modified := s.nextBundleEntry(bb, true); modified { | 
| + state.streamUpdated(s.name()) | 
| + | 
| + // We have at least one time-sensitive bundle, so send this round. | 
| + sendNow = true | 
| + } | 
| + } | 
| + | 
| + // Second pass: bundle any available data. | 
| + state.forEachStream(func(s bundlerStream) bool { | 
| + if bb.remaining() == 0 { | 
| + return false | 
| + } | 
| + | 
| + if modified := s.nextBundleEntry(bb, b.flushing); modified { | 
| + state.streamUpdated(s.name()) | 
| + } | 
| + return true | 
| + }) | 
| + | 
| + return sendNow | 
| +} | 
| + | 
| +func (b *bundlerImpl) getStreamStateLocked() *streamState { | 
| + // Lock and collect each stream. | 
| + state := &streamState{ | 
| + streams: make([]bundlerStream, 0, len(b.streams)), | 
| + now: b.getClock().Now(), | 
| + } | 
| + | 
| + for _, s := range b.streams { | 
| + state.streams = append(state.streams, s) | 
| + } | 
| + heap.Init(state) | 
| + | 
| + return state | 
| +} | 
| + | 
| +func (b *bundlerImpl) registerStreamLocked(s bundlerStream) { | 
| + b.streams[s.name()] = s | 
| + b.signalStreamUpdateLocked() | 
| +} | 
| + | 
| +func (b *bundlerImpl) unregisterStreamLocked(s bundlerStream) { | 
| + delete(b.streams, s.name()) | 
| +} | 
| + | 
| +func (b *bundlerImpl) signalStreamUpdate() { | 
| + b.streamsLock.Lock() | 
| + defer b.streamsLock.Unlock() | 
| + | 
| + b.signalStreamUpdateLocked() | 
| +} | 
| + | 
| +func (b *bundlerImpl) signalStreamUpdateLocked() { | 
| + b.streamsCond.Broadcast() | 
| +} | 
| + | 
| +// nextPrefixIndex is a goroutine-safe method that returns the next prefix index | 
| +// for the given Bundler. | 
| +func (b *bundlerImpl) nextPrefixIndex() uint64 { | 
| + return uint64(b.prefixCounter.next()) | 
| +} | 
| + | 
| +func (b *bundlerImpl) getClock() clock.Clock { | 
| + c := b.Clock | 
| + if c != nil { | 
| + return c | 
| + } | 
| + return clock.GetSystemClock() | 
| +} | 
| + | 
| +// streamState is a snapshot of the current stream registration. All operations | 
| +// performed on the state require streamLock to be held. | 
| +// | 
| +// streamState implements heap.Interface for its streams array. Streams without | 
| +// data times (nil) are considered to be greater than those with times. | 
| +type streamState struct { | 
| + streams []bundlerStream | 
| + now time.Time | 
| +} | 
| + | 
| +var _ heap.Interface = (*streamState)(nil) | 
| + | 
| +func (s *streamState) next() bundlerStream { | 
| + if len(s.streams) == 0 { | 
| + return nil | 
| + } | 
| + return s.streams[0] | 
| +} | 
| + | 
| +func (s *streamState) nextExpire() (time.Time, bool) { | 
| + if next := s.next(); next != nil { | 
| + if ts, ok := next.expireTime(); ok { | 
| + return ts, true | 
| + } | 
| + } | 
| + return time.Time{}, false | 
| +} | 
| + | 
| +func (s *streamState) streamUpdated(name string) { | 
| + if si, idx := s.streamIndex(name); si != nil { | 
| + heap.Fix(s, idx) | 
| + } | 
| +} | 
| + | 
| +func (s *streamState) forEachStream(f func(bundlerStream) bool) { | 
| + // Clone our streams, since the callback may mutate their order. | 
| + streams := make([]bundlerStream, len(s.streams)) | 
| + for i, s := range s.streams { | 
| + streams[i] = s | 
| + } | 
| + | 
| + for _, s := range streams { | 
| + if !f(s) { | 
| + break | 
| + } | 
| + } | 
| +} | 
| + | 
| +// removeStream removes a stream from the stream state. | 
| +func (s *streamState) removeStream(name string) bundlerStream { | 
| + if si, idx := s.streamIndex(name); si != nil { | 
| + heap.Remove(s, idx) | 
| + return si | 
| + } | 
| + return nil | 
| +} | 
| + | 
| +func (s *streamState) streamIndex(name string) (bundlerStream, int) { | 
| + for i, si := range s.streams { | 
| + if si.name() == name { | 
| + return si, i | 
| + } | 
| + } | 
| + return nil, -1 | 
| +} | 
| + | 
| +func (s *streamState) Len() int { | 
| + return len(s.streams) | 
| +} | 
| + | 
| +func (s *streamState) Less(i, j int) bool { | 
| + si, sj := s.streams[i], s.streams[j] | 
| + | 
| + if it, ok := si.expireTime(); ok { | 
| + if jt, ok := sj.expireTime(); ok { | 
| + return it.Before(jt) | 
| + } | 
| + | 
| + // i has data, but j does not, so i < j. | 
| + return true | 
| + } | 
| + | 
| + // i has no data, so i us greater than all other streams. | 
| + return false | 
| +} | 
| + | 
| +func (s *streamState) Swap(i, j int) { | 
| + s.streams[i], s.streams[j] = s.streams[j], s.streams[i] | 
| +} | 
| + | 
| +func (s *streamState) Push(x interface{}) { | 
| + s.streams = append(s.streams, x.(bundlerStream)) | 
| +} | 
| + | 
| +func (s *streamState) Pop() interface{} { | 
| + last := s.streams[len(s.streams)-1] | 
| + s.streams = s.streams[:len(s.streams)-1] | 
| + return last | 
| +} |