Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1308)

Side by Side Diff: client/internal/logdog/butler/bundler/bundler_impl.go

Issue 1276923003: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Addressed code review comments. Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "sort"
9
10 "github.com/luci/luci-go/common/logdog/protocol"
11 "github.com/luci/luci-go/common/logdog/protocol/protoutil"
12 "github.com/luci/luci-go/common/logdog/types"
13 )
14
15 type bundlerStream struct {
16 entry *protocol.ButlerLogBundle_Entry
17 logs []*protocol.LogEntry
18 }
19
20 func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) {
21 if e.GetTerminal() && !s.entry.GetTerminal() {
22 s.entry.Terminal = e.Terminal
23 s.entry.TerminalIndex = e.TerminalIndex
24 }
25 }
26
27 // bundlerImpl is an implementation of the Bundler interface.
28 type bundlerImpl struct {
29 *Config
30
31 sizer Sizer
32 bundle *protocol.ButlerLogBundle
33 entries map[types.StreamPath]*bundlerStream
34 count int
35 }
36
37 // New instantiates a new Bundler instance.
38 func New(c Config) Bundler {
39 // The template bundle may not have entries; clear our copy if it does.
40 c.TemplateBundle.Entries = nil
41
42 b := &bundlerImpl{
43 Config: &c,
44 }
45 b.reset()
46 return b
47 }
48
49 func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) {
50 logs := e.GetLogs()
51 e.Logs = nil
52
53 // Ignore bundle entries that don't contribute to the log stream's
54 // advancement.
55 if !(len(logs) > 0 || e.GetTerminal()) {
56 return
57 }
58
59 // Add this log to our reserve.
60 path := protoutil.DescriptorPath(e.GetDesc())
61 cur, ok := b.entries[path]
62 if !ok {
63 // Add this new entry to the bundle.
64 b.sizer.AppendBundleEntry(e)
65
66 cur = &bundlerStream{
67 entry: e,
68 logs: make([]*protocol.LogEntry, 0, len(logs)),
69 }
70 b.entries[path] = cur
71 } else {
72 cur.mergeEntry(e)
73 }
74
75 for _, le := range logs {
76 b.sizer.AppendLogEntry(e, le)
77 }
78 cur.logs = append(cur.logs, logs...)
79 b.count += len(logs)
80 return
81 }
82
83 func (b *bundlerImpl) reset() {
84 b.sizer = b.newSizer(&b.TemplateBundle)
85 b.count = 0
86 b.entries = map[types.StreamPath]*bundlerStream{}
87 }
88
89 func (b *bundlerImpl) Empty() bool {
90 return len(b.entries) == 0
91 }
92
93 func (b *bundlerImpl) Size() int64 {
94 return b.sizer.Size()
95 }
96
97 func (b *bundlerImpl) GetBundles(threshold int64) []*protocol.ButlerLogBundle {
98 bundles := []*protocol.ButlerLogBundle(nil)
99 for {
100 bundle := b.getBundle(threshold)
101 if bundle == nil {
102 break
103 }
104 bundles = append(bundles, bundle)
105 }
106
107 // If we still have bundle entries, it is likely because no entries fit with
108 // the threshold. Clear them out.
109 b.reset()
110
111 return bundles
112 }
113
114 func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle {
115 // Clone our template bundle, as we intend to modify it by adding entrie s.
116 bundle := b.TemplateBundle
117
118 // For determinism, add buffered entries in order of path.
119 keys := make([]string, 0, len(b.entries))
120 for k := range b.entries {
121 keys = append(keys, string(k))
122 }
123 sort.Strings(keys)
124
125 sizer := b.newSizer(&bundle)
126 overThreshold := func() bool {
127 return threshold != 0 && threshold < sizer.Size()
128 }
129
130 for _, k := range keys {
131 e := b.entries[types.StreamPath(k)]
132
133 // Can we add this entry without exceeding our size threshold?
134 sizer.AppendBundleEntry(e.entry)
135 if overThreshold() {
136 break
137 }
138
139 // Count how many logs we can add without hitting our threshold.
140 count := 0
141 for _, le := range e.logs {
142 sizer.AppendLogEntry(e.entry, le)
143 if overThreshold() {
144 break
145 }
146
147 count++
148 }
tandrii(chromium) 2015/08/11 17:32:22 I think you need to check for (count == 0). And al
dnj 2015/08/11 18:14:28 Ah I see what you're saying. Not correct, though.
tandrii(chromium) 2015/08/11 18:31:05 OK, i take back that early check is not necessary.
dnj (Google) 2015/08/12 03:20:09 (See latest message)
149
150 // If we can't add all of the logs, we will export a clone of th e template
151 // bundle entry with specifically the logs that fit.
152 if count < len(e.logs) {
153 ec := *e.entry
154 ec.Logs = make([]*protocol.LogEntry, count)
155 copy(ec.Logs, e.logs)
156 bundle.Entries = append(bundle.Entries, &ec)
157
158 // Left-shift our retained logs to consume the ones that we've exported.
159 e.logs = append(e.logs[:0], e.logs[count:]...)
160 break
161 }
162
163 // We've consumed all logs for this entry. Since we're deleting the
164 // bundlerStream from the map, export its template and logs slic e directly.
165 e.entry.Logs = e.logs
166 bundle.Entries = append(bundle.Entries, e.entry)
167 delete(b.entries, types.StreamPath(k))
168 }
169
170 if len(bundle.Entries) == 0 {
171 return nil
172 }
173 return &bundle
174 }
175
176 func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer {
177 nb := b.NewSizer
178 if nb == nil {
179 nb = NewFastSizer
180 }
181 return nb(bundle)
182 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698