Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(170)

Unified Diff: client/internal/logdog/butler/bundler/parser.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Split off `chunk` library. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/bundler/parser.go
diff --git a/client/internal/logdog/butler/bundler/parser.go b/client/internal/logdog/butler/bundler/parser.go
new file mode 100644
index 0000000000000000000000000000000000000000..4f237de1e8abf4f0b01292af47b2fd2b6e3ef8e2
--- /dev/null
+++ b/client/internal/logdog/butler/bundler/parser.go
@@ -0,0 +1,118 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package bundler
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
+ "github.com/luci/luci-go/common/chunkstream"
+ "github.com/luci/luci-go/common/logdog/protocol"
+ "github.com/luci/luci-go/common/logdog/types"
+ "github.com/luci/luci-go/common/proto/google"
+)
+
+// constraints is the set of Constraints to apply when generating a LogEntry.
+type constraints struct {
+ // limit is the maximum size, in bytes, of the serialized LogEntry protobuf
+ // that may be produced.
+ limit int
+
+ // truncate indicates that bundles should be generated to fill the specified
+ // space. The bundler may choose to forego bundling if the result is very
+ // suboptimal, but is encouraged to fill the space if it's reasonable.
+ truncate bool
+
+ // closed means that bundles should be aggressively generated with the
+ // expectation that no further data will be buffered. It is only relevant
+ // if truncate is also true.
+ closed bool
+}
+
+// parser is a stateful presence bound to a single log stream. A parser yields
+// LogEntry messages one at a time and shapes them based on constraints.
+//
+// parser instances are owned by a single Stream and are not goroutine-safe.
+type parser interface {
+ // appendData adds a data chunk to this parser's chunk.Buffer.
+ appendData(Data)
+
+ // nextEntry returns the next LogEntry in the stream.
+ //
+ // This method may return nil if there is insuffuicient data to produce a
+ // LogEntry given the
+ nextEntry(*constraints) (*protocol.LogEntry, error)
+
+ bufferedBytes() int64
+
+ firstChunkTime() (time.Time, bool)
+}
+
+func newParser(p *streamproto.Properties, c *counter) (parser, error) {
+ base := baseParser{
+ counter: c,
+ timeBase: p.Timestamp.Time(),
+ }
+
+ switch p.StreamType {
+ case protocol.LogStreamDescriptor_TEXT:
+ return &textParser{
+ baseParser: base,
+ }, nil
+
+ case protocol.LogStreamDescriptor_BINARY:
+ return &binaryParser{
+ baseParser: base,
+ }, nil
+
+ case protocol.LogStreamDescriptor_DATAGRAM:
+ return &datagramParser{
+ baseParser: base,
+ maxSize: int64(types.MaxDatagramSize),
+ }, nil
+
+ default:
+ return nil, fmt.Errorf("unknown stream type: %v", p.StreamType)
+ }
+}
+
+// baseParser is a common set of parser capabilities.
+type baseParser struct {
+ chunkstream.Buffer
+
+ counter *counter
+
+ timeBase time.Time
+ nextIndex int64
+}
+
+func (p *baseParser) baseLogEntry(ts time.Time) *protocol.LogEntry {
+ e := protocol.LogEntry{
+ TimeOffset: google.NewDuration(ts.Sub(p.timeBase)),
+ PrefixIndex: uint64(p.counter.next()),
+ StreamIndex: uint64(p.nextIndex),
+ }
+ p.nextIndex++
+ return &e
+}
+
+func (p *baseParser) appendData(d Data) {
+ p.Append(d)
+}
+
+func (p *baseParser) bufferedBytes() int64 {
+ return p.Len()
+}
+
+func (p *baseParser) firstChunkTime() (time.Time, bool) {
+ // Get the first data chunk in our Buffer.
+ chunk := p.FirstChunk()
+ if chunk == nil {
+ return time.Time{}, false
+ }
+
+ return chunk.(Data).Timestamp(), true
+}

Powered by Google App Engine
This is Rietveld 408576698