Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(248)

Unified Diff: client/internal/logdog/butler/bundler/bundler_impl.go

Issue 1276923003: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Addressed code review comments. Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/bundler/bundler_impl.go
diff --git a/client/internal/logdog/butler/bundler/bundler_impl.go b/client/internal/logdog/butler/bundler/bundler_impl.go
new file mode 100644
index 0000000000000000000000000000000000000000..890b90cb67f919b53aa66aed52f396b6e6b4dc29
--- /dev/null
+++ b/client/internal/logdog/butler/bundler/bundler_impl.go
@@ -0,0 +1,182 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package bundler
+
+import (
+ "sort"
+
+ "github.com/luci/luci-go/common/logdog/protocol"
+ "github.com/luci/luci-go/common/logdog/protocol/protoutil"
+ "github.com/luci/luci-go/common/logdog/types"
+)
+
+type bundlerStream struct {
+ entry *protocol.ButlerLogBundle_Entry
+ logs []*protocol.LogEntry
+}
+
+func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) {
+ if e.GetTerminal() && !s.entry.GetTerminal() {
+ s.entry.Terminal = e.Terminal
+ s.entry.TerminalIndex = e.TerminalIndex
+ }
+}
+
+// bundlerImpl is an implementation of the Bundler interface.
+type bundlerImpl struct {
+ *Config
+
+ sizer Sizer
+ bundle *protocol.ButlerLogBundle
+ entries map[types.StreamPath]*bundlerStream
+ count int
+}
+
+// New instantiates a new Bundler instance.
+func New(c Config) Bundler {
+ // The template bundle may not have entries; clear our copy if it does.
+ c.TemplateBundle.Entries = nil
+
+ b := &bundlerImpl{
+ Config: &c,
+ }
+ b.reset()
+ return b
+}
+
+func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) {
+ logs := e.GetLogs()
+ e.Logs = nil
+
+ // Ignore bundle entries that don't contribute to the log stream's
+ // advancement.
+ if !(len(logs) > 0 || e.GetTerminal()) {
+ return
+ }
+
+ // Add this log to our reserve.
+ path := protoutil.DescriptorPath(e.GetDesc())
+ cur, ok := b.entries[path]
+ if !ok {
+ // Add this new entry to the bundle.
+ b.sizer.AppendBundleEntry(e)
+
+ cur = &bundlerStream{
+ entry: e,
+ logs: make([]*protocol.LogEntry, 0, len(logs)),
+ }
+ b.entries[path] = cur
+ } else {
+ cur.mergeEntry(e)
+ }
+
+ for _, le := range logs {
+ b.sizer.AppendLogEntry(e, le)
+ }
+ cur.logs = append(cur.logs, logs...)
+ b.count += len(logs)
+ return
+}
+
+func (b *bundlerImpl) reset() {
+ b.sizer = b.newSizer(&b.TemplateBundle)
+ b.count = 0
+ b.entries = map[types.StreamPath]*bundlerStream{}
+}
+
+func (b *bundlerImpl) Empty() bool {
+ return len(b.entries) == 0
+}
+
+func (b *bundlerImpl) Size() int64 {
+ return b.sizer.Size()
+}
+
+func (b *bundlerImpl) GetBundles(threshold int64) []*protocol.ButlerLogBundle {
+ bundles := []*protocol.ButlerLogBundle(nil)
+ for {
+ bundle := b.getBundle(threshold)
+ if bundle == nil {
+ break
+ }
+ bundles = append(bundles, bundle)
+ }
+
+ // If we still have bundle entries, it is likely because no entries fit with
+ // the threshold. Clear them out.
+ b.reset()
+
+ return bundles
+}
+
+func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle {
+ // Clone our template bundle, as we intend to modify it by adding entries.
+ bundle := b.TemplateBundle
+
+ // For determinism, add buffered entries in order of path.
+ keys := make([]string, 0, len(b.entries))
+ for k := range b.entries {
+ keys = append(keys, string(k))
+ }
+ sort.Strings(keys)
+
+ sizer := b.newSizer(&bundle)
+ overThreshold := func() bool {
+ return threshold != 0 && threshold < sizer.Size()
+ }
+
+ for _, k := range keys {
+ e := b.entries[types.StreamPath(k)]
+
+ // Can we add this entry without exceeding our size threshold?
+ sizer.AppendBundleEntry(e.entry)
+ if overThreshold() {
+ break
+ }
+
+ // Count how many logs we can add without hitting our threshold.
+ count := 0
+ for _, le := range e.logs {
+ sizer.AppendLogEntry(e.entry, le)
+ if overThreshold() {
+ break
+ }
+
+ count++
+ }
tandrii(chromium) 2015/08/11 17:32:22 I think you need to check for (count == 0). And al
dnj 2015/08/11 18:14:28 Ah I see what you're saying. Not correct, though.
tandrii(chromium) 2015/08/11 18:31:05 OK, i take back that early check is not necessary.
dnj (Google) 2015/08/12 03:20:09 (See latest message)
+
+ // If we can't add all of the logs, we will export a clone of the template
+ // bundle entry with specifically the logs that fit.
+ if count < len(e.logs) {
+ ec := *e.entry
+ ec.Logs = make([]*protocol.LogEntry, count)
+ copy(ec.Logs, e.logs)
+ bundle.Entries = append(bundle.Entries, &ec)
+
+ // Left-shift our retained logs to consume the ones that we've exported.
+ e.logs = append(e.logs[:0], e.logs[count:]...)
+ break
+ }
+
+ // We've consumed all logs for this entry. Since we're deleting the
+ // bundlerStream from the map, export its template and logs slice directly.
+ e.entry.Logs = e.logs
+ bundle.Entries = append(bundle.Entries, e.entry)
+ delete(b.entries, types.StreamPath(k))
+ }
+
+ if len(bundle.Entries) == 0 {
+ return nil
+ }
+ return &bundle
+}
+
+func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer {
+ nb := b.NewSizer
+ if nb == nil {
+ nb = NewFastSizer
+ }
+ return nb(bundle)
+}

Powered by Google App Engine
This is Rietveld 408576698