Chromium Code Reviews| Index: client/internal/logdog/butler/bundler/bundler_impl.go |
| diff --git a/client/internal/logdog/butler/bundler/bundler_impl.go b/client/internal/logdog/butler/bundler/bundler_impl.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..890b90cb67f919b53aa66aed52f396b6e6b4dc29 |
| --- /dev/null |
| +++ b/client/internal/logdog/butler/bundler/bundler_impl.go |
| @@ -0,0 +1,182 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package bundler |
| + |
| +import ( |
| + "sort" |
| + |
| + "github.com/luci/luci-go/common/logdog/protocol" |
| + "github.com/luci/luci-go/common/logdog/protocol/protoutil" |
| + "github.com/luci/luci-go/common/logdog/types" |
| +) |
| + |
| +type bundlerStream struct { |
| + entry *protocol.ButlerLogBundle_Entry |
| + logs []*protocol.LogEntry |
| +} |
| + |
| +func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) { |
| + if e.GetTerminal() && !s.entry.GetTerminal() { |
| + s.entry.Terminal = e.Terminal |
| + s.entry.TerminalIndex = e.TerminalIndex |
| + } |
| +} |
| + |
| +// bundlerImpl is an implementation of the Bundler interface. |
| +type bundlerImpl struct { |
| + *Config |
| + |
| + sizer Sizer |
| + bundle *protocol.ButlerLogBundle |
| + entries map[types.StreamPath]*bundlerStream |
| + count int |
| +} |
| + |
| +// New instantiates a new Bundler instance. |
| +func New(c Config) Bundler { |
| + // The template bundle may not have entries; clear our copy if it does. |
| + c.TemplateBundle.Entries = nil |
| + |
| + b := &bundlerImpl{ |
| + Config: &c, |
| + } |
| + b.reset() |
| + return b |
| +} |
| + |
| +func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) { |
| + logs := e.GetLogs() |
| + e.Logs = nil |
| + |
| + // Ignore bundle entries that don't contribute to the log stream's |
| + // advancement. |
| + if !(len(logs) > 0 || e.GetTerminal()) { |
| + return |
| + } |
| + |
| + // Add this log to our reserve. |
| + path := protoutil.DescriptorPath(e.GetDesc()) |
| + cur, ok := b.entries[path] |
| + if !ok { |
| + // Add this new entry to the bundle. |
| + b.sizer.AppendBundleEntry(e) |
| + |
| + cur = &bundlerStream{ |
| + entry: e, |
| + logs: make([]*protocol.LogEntry, 0, len(logs)), |
| + } |
| + b.entries[path] = cur |
| + } else { |
| + cur.mergeEntry(e) |
| + } |
| + |
| + for _, le := range logs { |
| + b.sizer.AppendLogEntry(e, le) |
| + } |
| + cur.logs = append(cur.logs, logs...) |
| + b.count += len(logs) |
| + return |
| +} |
| + |
| +func (b *bundlerImpl) reset() { |
| + b.sizer = b.newSizer(&b.TemplateBundle) |
| + b.count = 0 |
| + b.entries = map[types.StreamPath]*bundlerStream{} |
| +} |
| + |
| +func (b *bundlerImpl) Empty() bool { |
| + return len(b.entries) == 0 |
| +} |
| + |
| +func (b *bundlerImpl) Size() int64 { |
| + return b.sizer.Size() |
| +} |
| + |
| +func (b *bundlerImpl) GetBundles(threshold int64) []*protocol.ButlerLogBundle { |
| + bundles := []*protocol.ButlerLogBundle(nil) |
| + for { |
| + bundle := b.getBundle(threshold) |
| + if bundle == nil { |
| + break |
| + } |
| + bundles = append(bundles, bundle) |
| + } |
| + |
| + // If we still have bundle entries, it is likely because no entries fit with |
| + // the threshold. Clear them out. |
| + b.reset() |
| + |
| + return bundles |
| +} |
| + |
| +func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle { |
| + // Clone our template bundle, as we intend to modify it by adding entries. |
| + bundle := b.TemplateBundle |
| + |
| + // For determinism, add buffered entries in order of path. |
| + keys := make([]string, 0, len(b.entries)) |
| + for k := range b.entries { |
| + keys = append(keys, string(k)) |
| + } |
| + sort.Strings(keys) |
| + |
| + sizer := b.newSizer(&bundle) |
| + overThreshold := func() bool { |
| + return threshold != 0 && threshold < sizer.Size() |
| + } |
| + |
| + for _, k := range keys { |
| + e := b.entries[types.StreamPath(k)] |
| + |
| + // Can we add this entry without exceeding our size threshold? |
| + sizer.AppendBundleEntry(e.entry) |
| + if overThreshold() { |
| + break |
| + } |
| + |
| + // Count how many logs we can add without hitting our threshold. |
| + count := 0 |
| + for _, le := range e.logs { |
| + sizer.AppendLogEntry(e.entry, le) |
| + if overThreshold() { |
| + break |
| + } |
| + |
| + count++ |
| + } |
|
tandrii(chromium)
2015/08/11 17:32:22
I think you need to check for (count == 0). And al
dnj
2015/08/11 18:14:28
Ah I see what you're saying. Not correct, though.
tandrii(chromium)
2015/08/11 18:31:05
OK, i take back that early check is not necessary.
dnj (Google)
2015/08/12 03:20:09
(See latest message)
|
| + |
| + // If we can't add all of the logs, we will export a clone of the template |
| + // bundle entry with specifically the logs that fit. |
| + if count < len(e.logs) { |
| + ec := *e.entry |
| + ec.Logs = make([]*protocol.LogEntry, count) |
| + copy(ec.Logs, e.logs) |
| + bundle.Entries = append(bundle.Entries, &ec) |
| + |
| + // Left-shift our retained logs to consume the ones that we've exported. |
| + e.logs = append(e.logs[:0], e.logs[count:]...) |
| + break |
| + } |
| + |
| + // We've consumed all logs for this entry. Since we're deleting the |
| + // bundlerStream from the map, export its template and logs slice directly. |
| + e.entry.Logs = e.logs |
| + bundle.Entries = append(bundle.Entries, e.entry) |
| + delete(b.entries, types.StreamPath(k)) |
| + } |
| + |
| + if len(bundle.Entries) == 0 { |
| + return nil |
| + } |
| + return &bundle |
| +} |
| + |
| +func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer { |
| + nb := b.NewSizer |
| + if nb == nil { |
| + nb = NewFastSizer |
| + } |
| + return nb(bundle) |
| +} |