| Index: client/internal/logdog/butler/bundler/bundler_impl.go
|
| diff --git a/client/internal/logdog/butler/bundler/bundler_impl.go b/client/internal/logdog/butler/bundler/bundler_impl.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..6f9cf1c20d384e8653124109a39d532f0bf46df8
|
| --- /dev/null
|
| +++ b/client/internal/logdog/butler/bundler/bundler_impl.go
|
| @@ -0,0 +1,277 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package bundler
|
| +
|
| +import (
|
| + "sort"
|
| +
|
| + "github.com/luci/luci-go/common/logdog/protocol"
|
| + "github.com/luci/luci-go/common/logdog/protocol/protoutil"
|
| +)
|
| +
|
| +// bundlerStream is an aggregate buffered log stream state. It consists of
|
| +// a ButlerLogBundle_Entry and its aggregate logs.
|
| +type bundlerStream protocol.ButlerLogBundle_Entry
|
| +
|
| +// mergeEntry merges the state of the supplied ButlerLogBundle_Entry into the
|
| +// stream entry template.
|
| +func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) {
|
| + if e.GetTerminal() && !s.entry().GetTerminal() {
|
| + s.Terminal = e.Terminal
|
| + s.TerminalIndex = e.TerminalIndex
|
| + }
|
| +}
|
| +
|
| +func (s *bundlerStream) entry() *protocol.ButlerLogBundle_Entry {
|
| + return (*protocol.ButlerLogBundle_Entry)(s)
|
| +}
|
| +
|
| +// bundlerImpl is an implementation of the Bundler interface.
|
| +type bundlerImpl struct {
|
| + *Config
|
| +
|
| + sizer Sizer
|
| + entries map[string]*bundlerStream
|
| + count int
|
| +
|
| + omitMap map[int]int64
|
| + round int64
|
| +}
|
| +
|
| +// New instantiates a new Bundler instance.
|
| +func New(c Config) Bundler {
|
| + // The template bundle may not have entries; clear our copy if it does.
|
| + c.TemplateBundle.Entries = nil
|
| +
|
| + b := &bundlerImpl{
|
| + Config: &c,
|
| + omitMap: map[int]int64{},
|
| + round: 1, // Start at "1" because "0" is default value.
|
| + }
|
| + b.reset()
|
| + return b
|
| +}
|
| +
|
| +func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) {
|
| + logs := e.GetLogs()
|
| +
|
| + // We first test if a given ButlerLogBundle_Entry is worth
|
| + // exporting. An entry is worth exporting if the infromation that it contains
|
| + // provides new information about the log stream or its state.
|
| + //
|
| + // We consider a log stream worth exporting if it satisfies AT LEAST one of
|
| + // the following conditions:
|
| + // - It has log data len(e.Logs) > 0
|
| + // - It is terminal: e.GetTerminal() == true
|
| + if !(len(logs) > 0 || e.GetTerminal()) {
|
| + return
|
| + }
|
| +
|
| + // Add this log to our reserve.
|
| + path := string(protoutil.DescriptorPath(e.GetDesc()))
|
| +
|
| + if cur := b.entries[path]; cur != nil {
|
| + // Augment the existing stream with the new logs and state.
|
| + cur.mergeEntry(e)
|
| + cur.Logs = append(cur.Logs, logs...)
|
| + } else {
|
| + // This is a new stream. Register the entry as the template for this stream.
|
| + b.sizer.Append(e, nil)
|
| + b.entries[path] = (*bundlerStream)(e)
|
| + }
|
| +
|
| + for _, le := range logs {
|
| + b.sizer.Append(e, le)
|
| + }
|
| + b.count += len(logs)
|
| + return
|
| +}
|
| +
|
| +func (b *bundlerImpl) reset() {
|
| + b.sizer = b.newSizer(&b.TemplateBundle)
|
| + b.count = 0
|
| + b.entries = map[string]*bundlerStream{}
|
| +}
|
| +
|
| +func (b *bundlerImpl) Empty() bool {
|
| + return len(b.entries) == 0
|
| +}
|
| +
|
| +func (b *bundlerImpl) Size() int64 {
|
| + return b.sizer.Size()
|
| +}
|
| +
|
| +func (b *bundlerImpl) GetBundles() []*protocol.ButlerLogBundle {
|
| + return b.getBundlesImpl(b.Threshold)
|
| +}
|
| +
|
| +func (b *bundlerImpl) getBundlesImpl(threshold int64) []*protocol.ButlerLogBundle {
|
| + bundles := []*protocol.ButlerLogBundle(nil)
|
| +
|
| + for {
|
| + bundle := b.getBundle(threshold)
|
| + if bundle == nil {
|
| + break
|
| + }
|
| + bundles = append(bundles, bundle)
|
| + }
|
| +
|
| + // If we still have bundle entries, it is likely because no entries can
|
| + // fit within the threshold. This is a configuration error, and our only
|
| + // viable response is to clear them out (drop them).
|
| + b.reset()
|
| +
|
| + return bundles
|
| +}
|
| +
|
| +func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle {
|
| + // Short circuit if we don't have any entries.
|
| + if len(b.entries) == 0 {
|
| + return nil
|
| + }
|
| +
|
| + // Clone our template bundle, as we intend to modify it by adding entries.
|
| + bundle := b.TemplateBundle
|
| +
|
| + // For determinism, add buffered entries in order of path.
|
| + keys := make([]string, 0, len(b.entries))
|
| + for k := range b.entries {
|
| + keys = append(keys, string(k))
|
| + }
|
| + if b.Deterministic {
|
| + sort.Strings(keys)
|
| + }
|
| +
|
| + sizer := b.newSizer(&bundle)
|
| + overThreshold := func() bool {
|
| + return threshold != 0 && threshold < sizer.Size()
|
| + }
|
| +
|
| + // For efficiency, we declare a single omit map. This map is keyed off of log
|
| + // index. Each time we enter the bundler loop, we increment our "round"
|
| + // variable. For any given loop, an entry is omitted if its omitMap
|
| + // index equals the current round.
|
| + for _, k := range keys {
|
| + // Get the current round.
|
| + round := b.round
|
| + b.round++
|
| +
|
| + // We assume that each entry in "entries" is worth exporting (see
|
| + // comment in Append), else it would not have been added to the entries
|
| + // map by Append.
|
| + e := b.entries[k]
|
| +
|
| + // If this entry has no logs, it is a terminal entry. Try and export it as
|
| + // a standalone bundle entry.
|
| + if len(e.Logs) == 0 {
|
| + sizer.Append(e.entry(), nil)
|
| + if overThreshold() {
|
| + // We don't have enough room to export a bundle entry. Our bundle is
|
| + // full for this round.
|
| + break
|
| + }
|
| +
|
| + // Our bundle is empty. This means that none of the entry's logs fit in
|
| + // an empty bundle! Discard the full entry (data loss).
|
| + bundle.Entries = append(bundle.Entries, e.entry())
|
| + delete(b.entries, k)
|
| + continue
|
| + }
|
| +
|
| + // Our entry has logs. Export any that fit into our bundle without violating
|
| + // the threshold.
|
| + omitted := 0
|
| + for i, le := range e.Logs {
|
| + sizer.Append(e.entry(), le)
|
| + if overThreshold() {
|
| + sizer.Undo()
|
| + b.omitMap[i] = round
|
| + omitted++
|
| + }
|
| + }
|
| +
|
| + // If all log entries available were successfully exported, we are finished
|
| + // with this entry. We will take an optimized path and hand its pointer
|
| + // directly to the export bundle.
|
| + if omitted == 0 {
|
| + bundle.Entries = append(bundle.Entries, e.entry())
|
| + delete(b.entries, k)
|
| + continue
|
| + }
|
| +
|
| + // If none of the entry's logs were exported, we're done with it for this
|
| + // round.
|
| + if omitted == len(e.Logs) {
|
| + // If our bundle is empty, this means that NONE of the entry's logs fit in
|
| + // an empty bundle! Since none will fit individually, we will discard the
|
| + // full entry (data loss).
|
| + if len(bundle.GetEntries()) == 0 {
|
| + if b.DropCallback != nil {
|
| + b.DropCallback((*protocol.ButlerLogBundle_Entry)(e))
|
| + }
|
| + delete(b.entries, k)
|
| + }
|
| + continue
|
| + }
|
| +
|
| + // We are exporting some of the entry's logs. We will do this by exporting
|
| + // a clone of the entry containing just the exported logs.
|
| + //
|
| + // We will reuse the entry's Logs array in order to avoid unnecessary
|
| + // allocations. In order to do this, we will split the array into two
|
| + // slices. The first slice will contain the exported elements, and the
|
| + // second will contain the remainder.
|
| + //
|
| + // (It should be noted that there is a potential issue if Append() were to
|
| + // be called in between getBundle calls. This is firstly not an issue, since
|
| + // getBundle in practice will empty "entries". However, even if that weren't
|
| + // the case, since the retained omitted logs are at the end of the array,
|
| + // calling append on that slice would properly extend it.)
|
| + divider := len(e.Logs)
|
| + for i := range e.Logs {
|
| + if i >= divider {
|
| + // Everything to the right of divider is omitted, so we're done.
|
| + break
|
| + }
|
| +
|
| + // Scan forwards until we find an omit spot.
|
| + if b.omitMap[i] != round {
|
| + continue
|
| + }
|
| +
|
| + // Scan backwards from the end of our log list to find a non-omitted
|
| + // entry.
|
| + for j := divider - 1; j > i; j-- {
|
| + if b.omitMap[j] != round {
|
| + // We're omitting "i", and not omitting "j". Swap!
|
| + e.Logs[i], e.Logs[j] = e.Logs[j], e.Logs[i]
|
| + divider = j
|
| + omitted -= 1
|
| + break
|
| + }
|
| + }
|
| + }
|
| + divider -= omitted
|
| +
|
| + // Since our entry has more logs, we must retain it. We will export a clone
|
| + // of the entry.
|
| + ec := *e.entry()
|
| + ec.Logs, e.Logs = e.Logs[:divider], e.Logs[divider:]
|
| + bundle.Entries = append(bundle.Entries, &ec)
|
| + }
|
| +
|
| + if len(bundle.Entries) == 0 {
|
| + return nil
|
| + }
|
| + return &bundle
|
| +}
|
| +
|
| +func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer {
|
| + nb := b.NewSizer
|
| + if nb == nil {
|
| + nb = NewFastSizer
|
| + }
|
| + return nb(bundle)
|
| +}
|
|
|