| Index: client/internal/logdog/butler/bundler/builder.go
|
| diff --git a/client/internal/logdog/butler/bundler/builder.go b/client/internal/logdog/butler/bundler/builder.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..d80f766f393070da45ced53875cf069accffd895
|
| --- /dev/null
|
| +++ b/client/internal/logdog/butler/bundler/builder.go
|
| @@ -0,0 +1,116 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package bundler
|
| +
|
| +import (
|
| + "fmt"
|
| + "sort"
|
| +
|
| + "github.com/luci/luci-go/common/logdog/protocol"
|
| +)
|
| +
|
| +// builder incrementally constructs ButlerLogBundle entries.
|
| +type builder struct {
|
| + // template is the base bundle template.
|
| + template protocol.ButlerLogBundle
|
| + // size is the maximum permitted bundle size.
|
| + size int
|
| +
|
| + // templateCachedSize is the cached size of the ButlerLogBundle template.
|
| + templateCachedSize int
|
| + // entries is the set of ButlerLogBundle_Entry stream state.
|
| + beMap map[string]*protocol.ButlerLogBundle_Entry
|
| + // beSizes tracks the size of a ButlerLogBundle_Entry.
|
| + beSizes map[string]int
|
| +}
|
| +
|
| +func (b *builder) remaining() int {
|
| + return b.size - b.bundleSize()
|
| +}
|
| +
|
| +func (b *builder) ready() bool {
|
| + // Have we reached our desired size?
|
| + return b.hasContent() && (b.bundleSize() >= b.size)
|
| +}
|
| +
|
| +func (b *builder) bundleSize() int {
|
| + if b.templateCachedSize == 0 {
|
| + b.templateCachedSize = protoSize(&b.template)
|
| + }
|
| +
|
| + size := b.templateCachedSize
|
| + for _, sz := range b.beSizes {
|
| + size += sizeOfBundleEntryTag + varintLength(uint64(sz)) + sz
|
| + }
|
| +
|
| + return size
|
| +}
|
| +
|
| +func (b *builder) hasContent() bool {
|
| + return len(b.beMap) > 0
|
| +}
|
| +
|
| +func (b *builder) add(template *protocol.ButlerLogBundle_Entry, le *protocol.LogEntry) {
|
| + be := b.getCreateBundleEntry(template)
|
| +
|
| + be.Logs = append(be.Logs, le)
|
| + psize := protoSize(le)
|
| +
|
| + // Pay the cost of the additional LogEntry.
|
| + b.beSizes[template.Desc.Name] += sizeOfLogEntryTag + varintLength(uint64(psize)) + psize
|
| +}
|
| +
|
| +func (b *builder) setStreamTerminal(template *protocol.ButlerLogBundle_Entry, tidx uint64) {
|
| + be := b.getCreateBundleEntry(template)
|
| + if be.Terminal {
|
| + if be.TerminalIndex != tidx {
|
| + panic(fmt.Errorf("attempt to change terminal index %d => %d", be.TerminalIndex, tidx))
|
| + }
|
| + return
|
| + }
|
| +
|
| + be.Terminal = true
|
| + be.TerminalIndex = tidx
|
| +
|
| + // Pay the cost of the additional terminal fields.
|
| + b.beSizes[template.Desc.Name] += (sizeOfTerminalTag + sizeOfBoolTrue +
|
| + sizeOfTerminalIndexTag + varintLength(be.TerminalIndex))
|
| +}
|
| +
|
| +func (b *builder) bundle() *protocol.ButlerLogBundle {
|
| + bundle := b.template
|
| +
|
| + names := make([]string, 0, len(b.beMap))
|
| + for k := range b.beMap {
|
| + names = append(names, k)
|
| + }
|
| + sort.Strings(names)
|
| +
|
| + bundle.Entries = make([]*protocol.ButlerLogBundle_Entry, len(names))
|
| + for idx, name := range names {
|
| + bundle.Entries[idx] = b.beMap[name]
|
| + }
|
| +
|
| + return &bundle
|
| +}
|
| +
|
| +func (b *builder) getCreateBundleEntry(template *protocol.ButlerLogBundle_Entry) *protocol.ButlerLogBundle_Entry {
|
| + if be := b.beMap[template.Desc.Name]; be != nil {
|
| + return be
|
| + }
|
| +
|
| + // Initialize our maps (first time only).
|
| + if b.beMap == nil {
|
| + b.beMap = map[string]*protocol.ButlerLogBundle_Entry{}
|
| + b.beSizes = map[string]int{}
|
| + }
|
| +
|
| + templateCopy := *template
|
| + b.beMap[template.Desc.Name] = &templateCopy
|
| +
|
| + // Pay the up-front cost of the template.
|
| + b.beSizes[template.Desc.Name] += protoSize(&templateCopy)
|
| + return &templateCopy
|
| +}
|
|
|