Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(183)

Unified Diff: client/internal/logdog/butler/bundler/stream.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Updated from comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/bundler/stream.go
diff --git a/client/internal/logdog/butler/bundler/stream.go b/client/internal/logdog/butler/bundler/stream.go
new file mode 100644
index 0000000000000000000000000000000000000000..fe9b9363e0578f899fcdfe1144b2ac87f1b6cf9d
--- /dev/null
+++ b/client/internal/logdog/butler/bundler/stream.go
@@ -0,0 +1,329 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package bundler
+
+import (
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/luci/luci-go/common/logdog/protocol"
+)
+
+var (
+ // dataBufferSize is the size (in bytes) of the Data objects that a Stream
+ // will lease.
+ dataBufferSize = 4096
+)
+
+// Stream is an individual Bundler Stream. Data is added to the Stream as a
+// series of ordered binary chunks.
+//
+// A Stream is not goroutine-safe.
+type Stream interface {
+ // LeaseData allocates and returns a Data block that stream data can be
+ // loaded into. The caller should Release() the Data, or transfer ownership to
+ // something that will (e.g., Append()).
+ LeaseData() Data
+
+ // Append adds a sequential chunk of data to the Stream. Append may block if
+ // the data isn't ready to be consumed.
+ //
+ // Append takes possession of the supplied Data, and will Release it when
+ // finished.
+ Append(Data) error
+
+ // Close closes the Stream, flushing any remaining data.
+ Close()
+}
+
+// streamConfig is the set of static configuration parameters for the stream.
+type streamConfig struct {
+ // name is the name of this stream.
+ name string
+
+ // parser is the stream parser to use.
+ parser parser
+
+ // maximumBufferedBytes is the maximum number of bytes that this stream will
+ // retain in its parser before blocking subsequent Append attempts.
+ maximumBufferedBytes int64
+ // maximumBufferDuration is the maximum amount of time that a block of data
+ // can be comfortably buffered in the stream.
+ maximumBufferDuration time.Duration
+
+ // template is the minimally-populated Butler log bundle entry.
+ template protocol.ButlerLogBundle_Entry
+
+ // onAppend, if not nil, is invoked when an attempt to append data to the
+ // stream occurs. If true is passed, the data was successfully appended. If
+ // false was passed, the data could not be appended immediately and the stream
+ // will block pending data consumption.
+ //
+ // The stream's append lock will be held when this method is called.
+ onAppend func(bool)
+}
+
+// streamImpl is a Stream implementation that is bound to a Bundler.
+type streamImpl struct {
+ c *streamConfig
+
+ // drained is true if the stream is finished emitting data.
+ //
+ // It is an atomic value, with zero indicating not drained and non-zero
+ // indicating drained. It should be accessed via isDrained, and set with
+ // setDrained.
+ drained int32
+
+ // parserLock is a Mutex protecting the stream's parser instance and its
+ // underlying chunk.Buffer. Any access to either of these fields must be done
+ // while holding this lock.
+ parserLock sync.Mutex
+
+ // dataConsumedSignalC is a channel that can be used to signal when data has
+ // been consumed. It is set via signalDataConsumed.
+ dataConsumedSignalC chan struct{}
+ // appendErrValue is an atomically-set error. It will be returned by Append if
+ // an error occurs during stream processing.
+ appendErrValue atomic.Value
+
+ // stateLock protects stream state against concurrent access.
+ stateLock sync.Mutex
+ // closed, if non-zero, indicates that we have been closed and our stream has
+ // finished reading.
+ //
+ // stateLock must be held when accessing this field.
+ closed bool
+ // lastLogEntry is a pointer to the last LogEntry that was exported.
+ //
+ // stateLock must be held when accessing this field.
+ lastLogEntry *protocol.LogEntry
+
+ // testAppendWaitCallback, if not nil, is called before Append blocks.
+ // This callback is used for testing coordination.
+ testAppendWaitCallback func()
+}
+
+func newStream(c streamConfig) *streamImpl {
+ return &streamImpl{
+ c: &c,
+
+ dataConsumedSignalC: make(chan struct{}, 1),
+ }
+}
+
+func (s *streamImpl) LeaseData() Data {
+ return globalDataPoolRegistry.getPool(dataBufferSize).getData()
+}
+
+func (s *streamImpl) Append(d Data) error {
+ defer func() {
+ if d != nil {
+ d.Release()
+ }
+ }()
+
+ // Block/loop until we've successfully appended the data.
+ for {
+ dLen := int64(len(d.Bytes()))
+ if err := s.appendError(); err != nil || dLen == 0 {
+ return err
+ }
+
+ s.withParserLock(func() {
+ if s.c.parser.bufferedBytes() == 0 ||
+ s.c.parser.bufferedBytes()+dLen <= s.c.maximumBufferedBytes {
+ s.c.parser.appendData(d)
+ d = nil
+ }
+ })
+
+ // The data was appended; we're done.
+ if s.c.onAppend != nil {
+ s.c.onAppend(d == nil)
+ }
+ if d == nil {
+ break
+ }
+
+ // Not ready to append; wait for a data event and re-evaluate.
+ <-s.dataConsumedSignalC
+ }
+
+ return nil
+}
+
+// Signals our Append loop that data has been consumed.
+func (s *streamImpl) signalDataConsumed() {
+ select {
+ case s.dataConsumedSignalC <- struct{}{}:
+ break
+
+ default:
+ break
+ }
+}
+
+func (s *streamImpl) Close() {
+ s.stateLock.Lock()
+ defer s.stateLock.Unlock()
+
+ s.closed = true
+ s.maybeSetDrainedLocked()
+}
+
+func (s *streamImpl) name() string {
+ return s.c.name
+}
+
+// isDrained returns true if this stream is finished emitting data.
+//
+// This can happen if either:
+// - The stream is closed and has no more buffered data, or
+// - The strema has encountered a fatal error during processing.
+func (s *streamImpl) isDrained() bool {
+ return atomic.LoadInt32(&s.drained) != 0
+}
+
+// setDrained marks this stream as drained.
+func (s *streamImpl) setDrained() {
+ atomic.StoreInt32(&s.drained, 1)
+}
+
+// maybeSetDrainedLocked evaluates our buffer stream status. If the stream is
+// closed and our buffer is empty, it will set the drained state to true.
+//
+// The stream's stateLock must be held when calling this method.
+//
+// The resulting drained state will be returned.
+func (s *streamImpl) maybeSetDrainedLocked() bool {
+ if s.isDrained() {
+ return true
+ }
+
+ // Not drained ... should we be?
+ if s.closed {
+ bufSize := int64(0)
+ s.withParserLock(func() {
+ bufSize = s.c.parser.bufferedBytes()
+ })
+ if bufSize == 0 {
+ s.setDrained()
+ return true
+ }
+ }
+
+ return false
+}
+
+// expireTime returns the Time when the oldest chunk in the stream will expire.
+//
+// This is calculated ask:
+// oldest.Timestamp + stream.maximumBufferDuration
+// If there is no buffered data, oldest will return nil.
+func (s *streamImpl) expireTime() (t time.Time, has bool) {
+ s.withParserLock(func() {
+ t, has = s.c.parser.firstChunkTime()
+ })
+
+ if has {
+ t = t.Add(s.c.maximumBufferDuration)
+ }
+ return
+}
+
+// nextBundleEntry generates bundles for this stream. The total bundle data size
+// must not exceed the supplied size.
+//
+// If no bundle entry could be generated given the constraints, nil will be
+// returned.
+//
+// It is possible for some entries to be returned alongside an error.
+func (s *streamImpl) nextBundleEntry(bb *builder, aggressive bool) bool {
+ s.stateLock.Lock()
+ defer s.stateLock.Unlock()
+
+ // If we're not drained, try and get the next bundle.
+ modified := false
+ if !s.maybeSetDrainedLocked() {
+ err := error(nil)
+ modified, err = s.nextBundleEntryLocked(bb, aggressive)
+ if err != nil {
+ s.setAppendError(err)
+ s.setDrained()
+ }
+
+ if modified {
+ s.signalDataConsumed()
+ }
+ }
+
+ // If we're drained, populate our terminal state.
+ if s.maybeSetDrainedLocked() && s.lastLogEntry != nil {
+ bb.setStreamTerminal(&s.c.template, s.lastLogEntry.StreamIndex)
+ }
+
+ return modified
+}
+
+func (s *streamImpl) nextBundleEntryLocked(bb *builder, aggressive bool) (bool, error) {
+ c := constraints{
+ allowSplit: aggressive,
+ closed: s.closed,
+ }
+
+ // Extract as many entries as possible from the stream. As we extract, adjust
+ // our byte size.
+ //
+ // If we're closed, this will continue to consume until finished. If an error
+ // occurs, shut down data collection.
+ modified := false
+ ierr := error(nil)
+
+ for c.limit = bb.remaining(); c.limit > 0; c.limit = bb.remaining() {
+ emittedLog := false
+ s.withParserLock(func() {
+ le, err := s.c.parser.nextEntry(&c)
+ if err != nil {
+ ierr = err
+ return
+ }
+
+ if le == nil {
+ return
+ }
+
+ emittedLog = true
+ modified = true
+ s.lastLogEntry = le
+ bb.add(&s.c.template, le)
+ })
+
+ if !emittedLog {
+ break
+ }
+ }
+
+ return modified, ierr
+}
+
+func (s *streamImpl) withParserLock(f func()) {
+ s.parserLock.Lock()
+ defer s.parserLock.Unlock()
+
+ f()
+}
+
+func (s *streamImpl) appendError() error {
+ if err := s.appendErrValue.Load(); err != nil {
+ return err.(error)
+ }
+ return nil
+}
+
+func (s *streamImpl) setAppendError(err error) {
+ s.appendErrValue.Store(err)
+ s.signalDataConsumed()
+}
« no previous file with comments | « client/internal/logdog/butler/bundler/sizer_test.go ('k') | client/internal/logdog/butler/bundler/stream_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698