Chromium Code Reviews| Index: client/internal/logdog/butler/bundler/bundler_impl.go |
| diff --git a/client/internal/logdog/butler/bundler/bundler_impl.go b/client/internal/logdog/butler/bundler/bundler_impl.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..90f5c5477f02a2707d30e9682333634c9810279f |
| --- /dev/null |
| +++ b/client/internal/logdog/butler/bundler/bundler_impl.go |
| @@ -0,0 +1,164 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package bundler |
| + |
| +import ( |
| + "sort" |
| + |
| + "github.com/luci/luci-go/common/logdog/protocol" |
| + "github.com/luci/luci-go/common/logdog/protocol/protoutil" |
|
tandrii(chromium)
2015/08/11 14:06:07
i think you have CL on which this one depends, but
dnj (Google)
2015/08/11 16:24:01
https://codereview.chromium.org/1272893004
|
| + "github.com/luci/luci-go/common/logdog/types" |
| +) |
| + |
| +type bundlerStream struct { |
|
tandrii(chromium)
2015/08/11 14:06:07
this isn't used in this CL.
dnj (Google)
2015/08/11 16:24:01
Hm? It's used all over the place in this file.
tandrii(chromium)
2015/08/11 17:32:22
sorry, that was meant for a different line, and ev
|
| + entry *protocol.ButlerLogBundle_Entry |
| + logs []*protocol.LogEntry |
| +} |
| + |
| +func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) { |
| + if e.GetTerminal() && !s.entry.GetTerminal() { |
| + s.entry.Terminal = e.Terminal |
| + s.entry.TerminalIndex = e.TerminalIndex |
| + } |
| +} |
| + |
| +// bundlerImpl is an implementation of the Bundler interface. |
| +type bundlerImpl struct { |
| + *Config |
| + |
| + sizer Sizer |
| + bundle *protocol.ButlerLogBundle |
| + entries map[types.StreamPath]*bundlerStream |
| + count int |
| +} |
| + |
| +// New instantiates a new Bundler instance. |
| +func New(c Config) Bundler { |
|
tandrii(chromium)
2015/08/11 14:06:07
Why pass config as a copy if you take a ref to it
dnj (Google)
2015/08/11 16:24:01
No point. This function owns this instance of Conf
tandrii(chromium)
2015/08/11 17:32:22
sorry, my comment was stupid because I didn't quit
dnj
2015/08/11 18:14:27
In the end it would have the same effect. However,
tandrii(chromium)
2015/08/11 18:31:05
Acknowledged.
|
| + b := &bundlerImpl{ |
| + Config: &c, |
| + bundle: &protocol.ButlerLogBundle{}, |
| + entries: make(map[types.StreamPath]*bundlerStream), |
|
tandrii(chromium)
2015/08/11 14:06:07
nit: map[types.StreamPath]*bundlerStream{}
dnj (Google)
2015/08/11 16:24:01
*sigh* Any idea why we seem to be shying away from
tandrii(chromium)
2015/08/11 17:32:22
hm, don't you still need an instance of bundle? It
dnj
2015/08/11 18:14:27
"bundle" has been replaced with "Config.TemplateBu
tandrii(chromium)
2015/08/11 18:31:05
ah, not in PS2, though. In PS3 -> ok :)
dnj (Google)
2015/08/12 03:20:09
Well, not used in PS2, properly removed in PS3 :)
|
| + } |
| + b.reset() |
| + return b |
| +} |
| + |
| +func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) { |
| + // Take ownership of the entry's logs. |
| + logs := e.GetLogs() |
| + e.Logs = nil |
| + |
| + // Ignore bundle entries that don't contribute to the log stream's |
| + // advancement. |
| + if !(len(logs) > 0 || e.GetTerminal()) { |
| + return |
| + } |
| + |
| + // Add this log to our reserve. |
| + path := protoutil.DescriptorPath(e.GetDesc()) |
| + cur, ok := b.entries[path] |
| + if !ok { |
| + // Add this new entry to the bundle. |
| + b.sizer.AppendBundleEntry(e) |
| + cur = &bundlerStream{ |
| + entry: e, |
| + logs: make([]*protocol.LogEntry, 0, len(logs)), |
| + } |
| + b.entries[path] = cur |
| + } else { |
| + cur.mergeEntry(e) |
| + } |
| + |
| + for _, le := range logs { |
| + b.sizer.AppendLogEntry(e, le) |
| + } |
| + cur.logs = append(cur.logs, logs...) |
| + b.count += len(logs) |
| + return |
| +} |
| + |
| +func (b *bundlerImpl) reset() { |
| + b.sizer = b.newSizer(b.bundle) |
| + b.count = 0 |
|
tandrii(chromium)
2015/08/11 14:06:07
shouldn't this also reset the entries?
dnj (Google)
2015/08/11 16:24:01
The idea was that "GetBundles" will consume all bu
tandrii(chromium)
2015/08/11 17:32:22
Acknowledged.
|
| +} |
| + |
| +func (b *bundlerImpl) Empty() bool { |
| + return len(b.entries) == 0 |
| +} |
| + |
| +func (b *bundlerImpl) Size() int { |
| + return b.sizer.Size() |
| +} |
| + |
| +func (b *bundlerImpl) GetBundles(threshold int) []*protocol.ButlerLogBundle { |
| + b.reset() |
| + |
| + bundles := []*protocol.ButlerLogBundle(nil) |
| + for { |
| + bundle := b.getNextBundle(threshold) |
| + if bundle == nil { |
| + return bundles |
| + } |
| + bundles = append(bundles, bundle) |
| + } |
| +} |
| + |
| +func (b *bundlerImpl) getNextBundle(threshold int) *protocol.ButlerLogBundle { |
| + // Populate the bundle with as many entries as possible. |
| + bundle := *b.bundle |
|
tandrii(chromium)
2015/08/11 14:06:07
i'd add comment here that you really want to copy
dnj (Google)
2015/08/11 16:24:01
Done.
|
| + bundle.Entries = append([]*protocol.ButlerLogBundle_Entry(nil), bundle.Entries...) |
|
tandrii(chromium)
2015/08/11 14:06:07
I don't understand why you copy bundle entries her
dnj (Google)
2015/08/11 16:24:01
Originally it was to hedge the case where the temp
|
| + sizer := b.newSizer(&bundle) |
|
tandrii(chromium)
2015/08/11 14:06:07
nit: i'd move this line closer to overThreshold fu
dnj (Google)
2015/08/11 16:24:01
Done.
|
| + |
| + // For determinism, add buffered entries in order of path. |
| + keys := make([]string, 0, len(b.entries)) |
| + for k := range b.entries { |
| + keys = append(keys, string(k)) |
| + } |
| + sort.Sort(sort.StringSlice(keys)) |
|
tandrii(chromium)
2015/08/11 14:06:07
just sort.Strings(keys)
https://golang.org/pkg/sor
dnj (Google)
2015/08/11 16:24:01
Done.
|
| + |
| + overThreshold := func() bool { |
| + return threshold != 0 && threshold < sizer.Size() |
| + } |
| + |
| +BuildLoop: |
| + for _, k := range keys { |
| + e := b.entries[types.StreamPath(k)] |
| + |
| + // Can we add this entry without exceeding our size threshold? |
|
tandrii(chromium)
2015/08/11 14:06:07
what if threshold is small enough that the number
dnj (Google)
2015/08/11 16:24:01
It's allowed to happen. In that case, we will just
|
| + sizer.AppendBundleEntry(e.entry) |
| + if overThreshold() { |
| + break BuildLoop |
| + } |
| + ec := *e.entry |
| + ec.Logs = append([]*protocol.LogEntry(nil), ec.Logs...) |
|
tandrii(chromium)
2015/08/11 14:06:07
same as line #111: why copy ec.Logs here if you es
dnj (Google)
2015/08/11 16:24:01
Don't anymore; mandatory "nil" assumption.
|
| + bundle.Entries = append(bundle.Entries, &ec) |
| + |
| + for i, le := range e.logs { |
| + sizer.AppendLogEntry(e.entry, le) |
| + |
| + if overThreshold() { |
| + e.logs = append(e.logs[:0], e.logs[i:]...) |
|
tandrii(chromium)
2015/08/11 14:06:07
My test http://play.golang.org/p/g0387lE-R7 sort o
dnj (Google)
2015/08/11 16:24:01
Good thought! However, from here, it does say tha
tandrii(chromium)
2015/08/11 17:32:22
Déjà vu: I knew I saw it somewhere, but yet again
dnj
2015/08/11 18:14:27
No worries, it took me a bit of digging to find, s
|
| + break BuildLoop |
| + } |
| + ec.Logs = append(ec.Logs, le) |
| + } |
| + |
| + // This entry has been fully added. |
| + delete(b.entries, types.StreamPath(k)) |
| + } |
| + |
| + if len(bundle.Entries) == 0 { |
| + return nil |
| + } |
| + return &bundle |
| +} |
| + |
| +func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer { |
| + nb := b.NewSizer |
| + if nb == nil { |
| + nb = NewFastSizer |
| + } |
| + return nb(bundle) |
| +} |