| Index: client/internal/logdog/butler/bundler/parser.go | 
| diff --git a/client/internal/logdog/butler/bundler/parser.go b/client/internal/logdog/butler/bundler/parser.go | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..4f237de1e8abf4f0b01292af47b2fd2b6e3ef8e2 | 
| --- /dev/null | 
| +++ b/client/internal/logdog/butler/bundler/parser.go | 
| @@ -0,0 +1,118 @@ | 
| +// Copyright 2015 The Chromium Authors. All rights reserved. | 
| +// Use of this source code is governed by a BSD-style license that can be | 
| +// found in the LICENSE file. | 
| + | 
| +package bundler | 
| + | 
| +import ( | 
| +	"fmt" | 
| +	"time" | 
| + | 
| +	"github.com/luci/luci-go/client/logdog/butlerlib/streamproto" | 
| +	"github.com/luci/luci-go/common/chunkstream" | 
| +	"github.com/luci/luci-go/common/logdog/protocol" | 
| +	"github.com/luci/luci-go/common/logdog/types" | 
| +	"github.com/luci/luci-go/common/proto/google" | 
| +) | 
| + | 
| +// constraints is the set of Constraints to apply when generating a LogEntry. | 
| +type constraints struct { | 
| +	// limit is the maximum size, in bytes, of the serialized LogEntry protobuf | 
| +	// that may be produced. | 
| +	limit int | 
| + | 
| +	// truncate indicates that bundles should be generated to fill the specified | 
| +	// space. The bundler may choose to forego bundling if the result is very | 
| +	// suboptimal, but is encouraged to fill the space if it's reasonable. | 
| +	truncate bool | 
| + | 
| +	// closed means that bundles should be aggressively generated with the | 
| +	// expectation that no further data will be buffered. It is only relevant | 
| +	// if truncate is also true. | 
| +	closed bool | 
| +} | 
| + | 
| +// parser is a stateful presence bound to a single log stream. A parser yields | 
| +// LogEntry messages one at a time and shapes them based on constraints. | 
| +// | 
| +// parser instances are owned by a single Stream and are not goroutine-safe. | 
| +type parser interface { | 
| +	// appendData adds a data chunk to this parser's chunk.Buffer. | 
| +	appendData(Data) | 
| + | 
| +	// nextEntry returns the next LogEntry in the stream. | 
| +	// | 
| +	// This method may return nil if there is insuffuicient data to produce a | 
| +	// LogEntry given the | 
| +	nextEntry(*constraints) (*protocol.LogEntry, error) | 
| + | 
| +	bufferedBytes() int64 | 
| + | 
| +	firstChunkTime() (time.Time, bool) | 
| +} | 
| + | 
| +func newParser(p *streamproto.Properties, c *counter) (parser, error) { | 
| +	base := baseParser{ | 
| +		counter:  c, | 
| +		timeBase: p.Timestamp.Time(), | 
| +	} | 
| + | 
| +	switch p.StreamType { | 
| +	case protocol.LogStreamDescriptor_TEXT: | 
| +		return &textParser{ | 
| +			baseParser: base, | 
| +		}, nil | 
| + | 
| +	case protocol.LogStreamDescriptor_BINARY: | 
| +		return &binaryParser{ | 
| +			baseParser: base, | 
| +		}, nil | 
| + | 
| +	case protocol.LogStreamDescriptor_DATAGRAM: | 
| +		return &datagramParser{ | 
| +			baseParser: base, | 
| +			maxSize:    int64(types.MaxDatagramSize), | 
| +		}, nil | 
| + | 
| +	default: | 
| +		return nil, fmt.Errorf("unknown stream type: %v", p.StreamType) | 
| +	} | 
| +} | 
| + | 
| +// baseParser is a common set of parser capabilities. | 
| +type baseParser struct { | 
| +	chunkstream.Buffer | 
| + | 
| +	counter *counter | 
| + | 
| +	timeBase  time.Time | 
| +	nextIndex int64 | 
| +} | 
| + | 
| +func (p *baseParser) baseLogEntry(ts time.Time) *protocol.LogEntry { | 
| +	e := protocol.LogEntry{ | 
| +		TimeOffset:  google.NewDuration(ts.Sub(p.timeBase)), | 
| +		PrefixIndex: uint64(p.counter.next()), | 
| +		StreamIndex: uint64(p.nextIndex), | 
| +	} | 
| +	p.nextIndex++ | 
| +	return &e | 
| +} | 
| + | 
| +func (p *baseParser) appendData(d Data) { | 
| +	p.Append(d) | 
| +} | 
| + | 
| +func (p *baseParser) bufferedBytes() int64 { | 
| +	return p.Len() | 
| +} | 
| + | 
| +func (p *baseParser) firstChunkTime() (time.Time, bool) { | 
| +	// Get the first data chunk in our Buffer. | 
| +	chunk := p.FirstChunk() | 
| +	if chunk == nil { | 
| +		return time.Time{}, false | 
| +	} | 
| + | 
| +	return chunk.(Data).Timestamp(), true | 
| +} | 
|  |