| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bundler |
| 6 |
| 7 import ( |
| 8 "sort" |
| 9 |
| 10 "github.com/luci/luci-go/common/logdog/protocol" |
| 11 "github.com/luci/luci-go/common/logdog/protocol/protoutil" |
| 12 ) |
| 13 |
| 14 // bundlerStream is an aggregate buffered log stream state. It consists of |
| 15 // a ButlerLogBundle_Entry and its aggregate logs. |
| 16 type bundlerStream protocol.ButlerLogBundle_Entry |
| 17 |
| 18 // mergeEntry merges the state of the supplied ButlerLogBundle_Entry into the |
| 19 // stream entry template. |
| 20 func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) { |
| 21 if e.GetTerminal() && !s.entry().GetTerminal() { |
| 22 s.Terminal = e.Terminal |
| 23 s.TerminalIndex = e.TerminalIndex |
| 24 } |
| 25 } |
| 26 |
| 27 func (s *bundlerStream) entry() *protocol.ButlerLogBundle_Entry { |
| 28 return (*protocol.ButlerLogBundle_Entry)(s) |
| 29 } |
| 30 |
| 31 // bundlerImpl is an implementation of the Bundler interface. |
| 32 type bundlerImpl struct { |
| 33 *Config |
| 34 |
| 35 sizer Sizer |
| 36 entries map[string]*bundlerStream |
| 37 count int |
| 38 |
| 39 omitMap map[int]int64 |
| 40 round int64 |
| 41 } |
| 42 |
| 43 // New instantiates a new Bundler instance. |
| 44 func New(c Config) Bundler { |
| 45 // The template bundle may not have entries; clear our copy if it does. |
| 46 c.TemplateBundle.Entries = nil |
| 47 |
| 48 b := &bundlerImpl{ |
| 49 Config: &c, |
| 50 omitMap: map[int]int64{}, |
| 51 round: 1, // Start at "1" because "0" is default value. |
| 52 } |
| 53 b.reset() |
| 54 return b |
| 55 } |
| 56 |
| 57 func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) { |
| 58 logs := e.GetLogs() |
| 59 |
| 60 // We first test if a given ButlerLogBundle_Entry is worth |
| 61 // exporting. An entry is worth exporting if the infromation that it con
tains |
| 62 // provides new information about the log stream or its state. |
| 63 // |
| 64 // We consider a log stream worth exporting if it satisfies AT LEAST one
of |
| 65 // the following conditions: |
| 66 // - It has log data len(e.Logs) > 0 |
| 67 // - It is terminal: e.GetTerminal() == true |
| 68 if !(len(logs) > 0 || e.GetTerminal()) { |
| 69 return |
| 70 } |
| 71 |
| 72 // Add this log to our reserve. |
| 73 path := string(protoutil.DescriptorPath(e.GetDesc())) |
| 74 |
| 75 if cur := b.entries[path]; cur != nil { |
| 76 // Augment the existing stream with the new logs and state. |
| 77 cur.mergeEntry(e) |
| 78 cur.Logs = append(cur.Logs, logs...) |
| 79 } else { |
| 80 // This is a new stream. Register the entry as the template for
this stream. |
| 81 b.sizer.Append(e, nil) |
| 82 b.entries[path] = (*bundlerStream)(e) |
| 83 } |
| 84 |
| 85 for _, le := range logs { |
| 86 b.sizer.Append(e, le) |
| 87 } |
| 88 b.count += len(logs) |
| 89 return |
| 90 } |
| 91 |
| 92 func (b *bundlerImpl) reset() { |
| 93 b.sizer = b.newSizer(&b.TemplateBundle) |
| 94 b.count = 0 |
| 95 b.entries = map[string]*bundlerStream{} |
| 96 } |
| 97 |
| 98 func (b *bundlerImpl) Empty() bool { |
| 99 return len(b.entries) == 0 |
| 100 } |
| 101 |
| 102 func (b *bundlerImpl) Size() int64 { |
| 103 return b.sizer.Size() |
| 104 } |
| 105 |
| 106 func (b *bundlerImpl) GetBundles() []*protocol.ButlerLogBundle { |
| 107 return b.getBundlesImpl(b.Threshold) |
| 108 } |
| 109 |
| 110 func (b *bundlerImpl) getBundlesImpl(threshold int64) []*protocol.ButlerLogBundl
e { |
| 111 bundles := []*protocol.ButlerLogBundle(nil) |
| 112 |
| 113 for { |
| 114 bundle := b.getBundle(threshold) |
| 115 if bundle == nil { |
| 116 break |
| 117 } |
| 118 bundles = append(bundles, bundle) |
| 119 } |
| 120 |
| 121 // If we still have bundle entries, it is likely because no entries can |
| 122 // fit within the threshold. This is a configuration error, and our only |
| 123 // viable response is to clear them out (drop them). |
| 124 b.reset() |
| 125 |
| 126 return bundles |
| 127 } |
| 128 |
| 129 func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle { |
| 130 // Short circuit if we don't have any entries. |
| 131 if len(b.entries) == 0 { |
| 132 return nil |
| 133 } |
| 134 |
| 135 // Clone our template bundle, as we intend to modify it by adding entrie
s. |
| 136 bundle := b.TemplateBundle |
| 137 |
| 138 // For determinism, add buffered entries in order of path. |
| 139 keys := make([]string, 0, len(b.entries)) |
| 140 for k := range b.entries { |
| 141 keys = append(keys, string(k)) |
| 142 } |
| 143 if b.Deterministic { |
| 144 sort.Strings(keys) |
| 145 } |
| 146 |
| 147 sizer := b.newSizer(&bundle) |
| 148 overThreshold := func() bool { |
| 149 return threshold != 0 && threshold < sizer.Size() |
| 150 } |
| 151 |
| 152 // For efficiency, we declare a single omit map. This map is keyed off o
f log |
| 153 // index. Each time we enter the bundler loop, we increment our "round" |
| 154 // variable. For any given loop, an entry is omitted if its omitMap |
| 155 // index equals the current round. |
| 156 for _, k := range keys { |
| 157 // Get the current round. |
| 158 round := b.round |
| 159 b.round++ |
| 160 |
| 161 // We assume that each entry in "entries" is worth exporting (se
e |
| 162 // comment in Append), else it would not have been added to the
entries |
| 163 // map by Append. |
| 164 e := b.entries[k] |
| 165 |
| 166 // If this entry has no logs, it is a terminal entry. Try and ex
port it as |
| 167 // a standalone bundle entry. |
| 168 if len(e.Logs) == 0 { |
| 169 sizer.Append(e.entry(), nil) |
| 170 if overThreshold() { |
| 171 // We don't have enough room to export a bundle
entry. Our bundle is |
| 172 // full for this round. |
| 173 break |
| 174 } |
| 175 |
| 176 // Our bundle is empty. This means that none of the entr
y's logs fit in |
| 177 // an empty bundle! Discard the full entry (data loss). |
| 178 bundle.Entries = append(bundle.Entries, e.entry()) |
| 179 delete(b.entries, k) |
| 180 continue |
| 181 } |
| 182 |
| 183 // Our entry has logs. Export any that fit into our bundle witho
ut violating |
| 184 // the threshold. |
| 185 omitted := 0 |
| 186 for i, le := range e.Logs { |
| 187 sizer.Append(e.entry(), le) |
| 188 if overThreshold() { |
| 189 sizer.Undo() |
| 190 b.omitMap[i] = round |
| 191 omitted++ |
| 192 } |
| 193 } |
| 194 |
| 195 // If all log entries available were successfully exported, we a
re finished |
| 196 // with this entry. We will take an optimized path and hand its
pointer |
| 197 // directly to the export bundle. |
| 198 if omitted == 0 { |
| 199 bundle.Entries = append(bundle.Entries, e.entry()) |
| 200 delete(b.entries, k) |
| 201 continue |
| 202 } |
| 203 |
| 204 // If none of the entry's logs were exported, we're done with it
for this |
| 205 // round. |
| 206 if omitted == len(e.Logs) { |
| 207 // If our bundle is empty, this means that NONE of the e
ntry's logs fit in |
| 208 // an empty bundle! Since none will fit individually, we
will discard the |
| 209 // full entry (data loss). |
| 210 if len(bundle.GetEntries()) == 0 { |
| 211 if b.DropCallback != nil { |
| 212 b.DropCallback((*protocol.ButlerLogBundl
e_Entry)(e)) |
| 213 } |
| 214 delete(b.entries, k) |
| 215 } |
| 216 continue |
| 217 } |
| 218 |
| 219 // We are exporting some of the entry's logs. We will do this by
exporting |
| 220 // a clone of the entry containing just the exported logs. |
| 221 // |
| 222 // We will reuse the entry's Logs array in order to avoid unnece
ssary |
| 223 // allocations. In order to do this, we will split the array int
o two |
| 224 // slices. The first slice will contain the exported elements, a
nd the |
| 225 // second will contain the remainder. |
| 226 // |
| 227 // (It should be noted that there is a potential issue if Append
() were to |
| 228 // be called in between getBundle calls. This is firstly not an
issue, since |
| 229 // getBundle in practice will empty "entries". However, even if
that weren't |
| 230 // the case, since the retained omitted logs are at the end of t
he array, |
| 231 // calling append on that slice would properly extend it.) |
| 232 divider := len(e.Logs) |
| 233 for i := range e.Logs { |
| 234 if i >= divider { |
| 235 // Everything to the right of divider is omitted
, so we're done. |
| 236 break |
| 237 } |
| 238 |
| 239 // Scan forwards until we find an omit spot. |
| 240 if b.omitMap[i] != round { |
| 241 continue |
| 242 } |
| 243 |
| 244 // Scan backwards from the end of our log list to find a
non-omitted |
| 245 // entry. |
| 246 for j := divider - 1; j > i; j-- { |
| 247 if b.omitMap[j] != round { |
| 248 // We're omitting "i", and not omitting
"j". Swap! |
| 249 e.Logs[i], e.Logs[j] = e.Logs[j], e.Logs
[i] |
| 250 divider = j |
| 251 omitted -= 1 |
| 252 break |
| 253 } |
| 254 } |
| 255 } |
| 256 divider -= omitted |
| 257 |
| 258 // Since our entry has more logs, we must retain it. We will exp
ort a clone |
| 259 // of the entry. |
| 260 ec := *e.entry() |
| 261 ec.Logs, e.Logs = e.Logs[:divider], e.Logs[divider:] |
| 262 bundle.Entries = append(bundle.Entries, &ec) |
| 263 } |
| 264 |
| 265 if len(bundle.Entries) == 0 { |
| 266 return nil |
| 267 } |
| 268 return &bundle |
| 269 } |
| 270 |
| 271 func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer { |
| 272 nb := b.NewSizer |
| 273 if nb == nil { |
| 274 nb = NewFastSizer |
| 275 } |
| 276 return nb(bundle) |
| 277 } |
| OLD | NEW |