Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(156)

Side by Side Diff: client/internal/logdog/butler/bundler/bundler_impl.go

Issue 1276923003: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Updated from review. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "sort"
9
10 "github.com/luci/luci-go/common/logdog/protocol"
11 "github.com/luci/luci-go/common/logdog/protocol/protoutil"
12 )
13
14 // bundlerStream is an aggregate buffered log stream state. It consists of
15 // a ButlerLogBundle_Entry and its aggregate logs.
16 type bundlerStream protocol.ButlerLogBundle_Entry
17
18 // mergeEntry merges the state of the supplied ButlerLogBundle_Entry into the
19 // stream entry template.
20 func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) {
21 if e.GetTerminal() && !s.entry().GetTerminal() {
22 s.Terminal = e.Terminal
23 s.TerminalIndex = e.TerminalIndex
24 }
25 }
26
27 func (s *bundlerStream) entry() *protocol.ButlerLogBundle_Entry {
28 return (*protocol.ButlerLogBundle_Entry)(s)
29 }
30
31 // bundlerImpl is an implementation of the Bundler interface.
32 type bundlerImpl struct {
33 *Config
34
35 sizer Sizer
36 entries map[string]*bundlerStream
37 count int
38
39 omitMap map[int]int64
40 round int64
41 }
42
43 // New instantiates a new Bundler instance.
44 func New(c Config) Bundler {
45 // The template bundle may not have entries; clear our copy if it does.
46 c.TemplateBundle.Entries = nil
47
48 b := &bundlerImpl{
49 Config: &c,
50 omitMap: map[int]int64{},
51 round: 1, // Start at "1" because "0" is default value.
52 }
53 b.reset()
54 return b
55 }
56
57 func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) {
58 logs := e.GetLogs()
59
60 // We first test if a given ButlerLogBundle_Entry is worth
61 // exporting. An entry is worth exporting if the infromation that it con tains
62 // provides new information about the log stream or its state.
63 //
64 // We consider a log stream worth exporting if it satisfies AT LEAST one of
65 // the following conditions:
66 // - It has log data len(e.Logs) > 0
67 // - It is terminal: e.GetTerminal() == true
68 if !(len(logs) > 0 || e.GetTerminal()) {
69 return
70 }
71
72 // Add this log to our reserve.
73 path := string(protoutil.DescriptorPath(e.GetDesc()))
74
75 if cur := b.entries[path]; cur != nil {
76 // Augment the existing stream with the new logs and state.
77 cur.mergeEntry(e)
78 cur.Logs = append(cur.Logs, logs...)
79 } else {
80 // This is a new stream. Register the entry as the template for this stream.
81 b.sizer.Append(e, nil)
82 b.entries[path] = (*bundlerStream)(e)
83 }
84
85 for _, le := range logs {
86 b.sizer.Append(e, le)
87 }
88 b.count += len(logs)
89 return
90 }
91
92 func (b *bundlerImpl) reset() {
93 b.sizer = b.newSizer(&b.TemplateBundle)
94 b.count = 0
95 b.entries = map[string]*bundlerStream{}
96 }
97
98 func (b *bundlerImpl) Empty() bool {
99 return len(b.entries) == 0
100 }
101
102 func (b *bundlerImpl) Size() int64 {
103 return b.sizer.Size()
104 }
105
106 func (b *bundlerImpl) GetBundles() []*protocol.ButlerLogBundle {
107 return b.getBundlesImpl(b.Threshold)
108 }
109
110 func (b *bundlerImpl) getBundlesImpl(threshold int64) []*protocol.ButlerLogBundl e {
111 bundles := []*protocol.ButlerLogBundle(nil)
112
113 for {
114 bundle := b.getBundle(threshold)
115 if bundle == nil {
116 break
117 }
118 bundles = append(bundles, bundle)
119 }
120
121 // If we still have bundle entries, it is likely because no entries can
122 // fit within the threshold. This is a configuration error, and our only
123 // viable response is to clear them out (drop them).
124 b.reset()
125
126 return bundles
127 }
128
129 func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle {
130 // Short circuit if we don't have any entries.
131 if len(b.entries) == 0 {
132 return nil
133 }
134
135 // Clone our template bundle, as we intend to modify it by adding entrie s.
136 bundle := b.TemplateBundle
137
138 // For determinism, add buffered entries in order of path.
139 keys := make([]string, 0, len(b.entries))
140 for k := range b.entries {
141 keys = append(keys, string(k))
142 }
143 if b.Deterministic {
144 sort.Strings(keys)
145 }
146
147 sizer := b.newSizer(&bundle)
148 overThreshold := func() bool {
149 return threshold != 0 && threshold < sizer.Size()
150 }
151
152 // For efficiency, we declare a single omit map. This map is keyed off o f log
153 // index. Each time we enter the bundler loop, we increment our "round"
154 // variable. For any given loop, an entry is omitted if its omitMap
155 // index equals the current round.
156 for _, k := range keys {
157 // Get the current round.
158 round := b.round
159 b.round++
160
161 // We assume that each entry in "entries" is worth exporting (se e
162 // comment in Append), else it would not have been added to the entries
163 // map by Append.
164 e := b.entries[k]
165
166 // If this entry has no logs, it is a terminal entry. Try and ex port it as
167 // a standalone bundle entry.
168 if len(e.Logs) == 0 {
169 sizer.Append(e.entry(), nil)
170 if overThreshold() {
171 // We don't have enough room to export a bundle entry. Our bundle is
172 // full for this round.
173 break
174 }
175
176 // Our bundle is empty. This means that none of the entr y's logs fit in
177 // an empty bundle! Discard the full entry (data loss).
178 bundle.Entries = append(bundle.Entries, e.entry())
179 delete(b.entries, k)
180 continue
181 }
182
183 // Our entry has logs. Export any that fit into our bundle witho ut violating
184 // the threshold.
185 omitted := 0
186 for i, le := range e.Logs {
187 sizer.Append(e.entry(), le)
188 if overThreshold() {
189 sizer.Undo()
190 b.omitMap[i] = round
191 omitted++
192 }
193 }
194
195 // If all log entries available were successfully exported, we a re finished
196 // with this entry. We will take an optimized path and hand its pointer
197 // directly to the export bundle.
198 if omitted == 0 {
199 bundle.Entries = append(bundle.Entries, e.entry())
200 delete(b.entries, k)
201 continue
202 }
203
204 // If none of the entry's logs were exported, we're done with it for this
205 // round.
206 if omitted == len(e.Logs) {
207 // If our bundle is empty, this means that NONE of the e ntry's logs fit in
208 // an empty bundle! Since none will fit individually, we will discard the
209 // full entry (data loss).
210 if len(bundle.GetEntries()) == 0 {
211 if b.DropCallback != nil {
212 b.DropCallback((*protocol.ButlerLogBundl e_Entry)(e))
213 }
214 delete(b.entries, k)
215 }
216 continue
217 }
218
219 // We are exporting some of the entry's logs. We will do this by exporting
220 // a clone of the entry containing just the exported logs.
221 //
222 // We will reuse the entry's Logs array in order to avoid unnece ssary
223 // allocations. In order to do this, we will split the array int o two
224 // slices. The first slice will contain the exported elements, a nd the
225 // second will contain the remainder.
226 //
227 // (It should be noted that there is a potential issue if Append () were to
228 // be called in between getBundle calls. This is firstly not an issue, since
229 // getBundle in practice will empty "entries". However, even if that weren't
230 // the case, since the retained omitted logs are at the end of t he array,
231 // calling append on that slice would properly extend it.)
232 divider := len(e.Logs)
233 for i := range e.Logs {
234 if i >= divider {
235 // Everything to the right of divider is omitted , so we're done.
236 break
237 }
238
239 // Scan forwards until we find an omit spot.
240 if b.omitMap[i] != round {
241 continue
242 }
243
244 // Scan backwards from the end of our log list to find a non-omitted
245 // entry.
246 for j := divider - 1; j > i; j-- {
247 if b.omitMap[j] != round {
248 // We're omitting "i", and not omitting "j". Swap!
249 e.Logs[i], e.Logs[j] = e.Logs[j], e.Logs [i]
250 divider = j
251 omitted -= 1
252 break
253 }
254 }
255 }
256 divider -= omitted
257
258 // Since our entry has more logs, we must retain it. We will exp ort a clone
259 // of the entry.
260 ec := *e.entry()
261 ec.Logs, e.Logs = e.Logs[:divider], e.Logs[divider:]
262 bundle.Entries = append(bundle.Entries, &ec)
263 }
264
265 if len(bundle.Entries) == 0 {
266 return nil
267 }
268 return &bundle
269 }
270
271 func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer {
272 nb := b.NewSizer
273 if nb == nil {
274 nb = NewFastSizer
275 }
276 return nb(bundle)
277 }
OLDNEW
« no previous file with comments | « client/internal/logdog/butler/bundler/bundler.go ('k') | client/internal/logdog/butler/bundler/bundler_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698