Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(958)

Side by Side Diff: client/internal/logdog/butler/bundler/bundler_impl.go

Issue 1276923003: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "sort"
9
10 "github.com/luci/luci-go/common/logdog/protocol"
11 "github.com/luci/luci-go/common/logdog/protocol/protoutil"
tandrii(chromium) 2015/08/11 14:06:07 i think you have CL on which this one depends, but
dnj (Google) 2015/08/11 16:24:01 https://codereview.chromium.org/1272893004
12 "github.com/luci/luci-go/common/logdog/types"
13 )
14
15 type bundlerStream struct {
tandrii(chromium) 2015/08/11 14:06:07 this isn't used in this CL.
dnj (Google) 2015/08/11 16:24:01 Hm? It's used all over the place in this file.
tandrii(chromium) 2015/08/11 17:32:22 sorry, that was meant for a different line, and ev
16 entry *protocol.ButlerLogBundle_Entry
17 logs []*protocol.LogEntry
18 }
19
20 func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) {
21 if e.GetTerminal() && !s.entry.GetTerminal() {
22 s.entry.Terminal = e.Terminal
23 s.entry.TerminalIndex = e.TerminalIndex
24 }
25 }
26
27 // bundlerImpl is an implementation of the Bundler interface.
28 type bundlerImpl struct {
29 *Config
30
31 sizer Sizer
32 bundle *protocol.ButlerLogBundle
33 entries map[types.StreamPath]*bundlerStream
34 count int
35 }
36
37 // New instantiates a new Bundler instance.
38 func New(c Config) Bundler {
tandrii(chromium) 2015/08/11 14:06:07 Why pass config as a copy if you take a ref to it
dnj (Google) 2015/08/11 16:24:01 No point. This function owns this instance of Conf
tandrii(chromium) 2015/08/11 17:32:22 sorry, my comment was stupid because I didn't quit
dnj 2015/08/11 18:14:27 In the end it would have the same effect. However,
tandrii(chromium) 2015/08/11 18:31:05 Acknowledged.
39 b := &bundlerImpl{
40 Config: &c,
41 bundle: &protocol.ButlerLogBundle{},
42 entries: make(map[types.StreamPath]*bundlerStream),
tandrii(chromium) 2015/08/11 14:06:07 nit: map[types.StreamPath]*bundlerStream{}
dnj (Google) 2015/08/11 16:24:01 *sigh* Any idea why we seem to be shying away from
tandrii(chromium) 2015/08/11 17:32:22 hm, don't you still need an instance of bundle? It
dnj 2015/08/11 18:14:27 "bundle" has been replaced with "Config.TemplateBu
tandrii(chromium) 2015/08/11 18:31:05 ah, not in PS2, though. In PS3 -> ok :)
dnj (Google) 2015/08/12 03:20:09 Well, not used in PS2, properly removed in PS3 :)
43 }
44 b.reset()
45 return b
46 }
47
48 func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) {
49 // Take ownership of the entry's logs.
50 logs := e.GetLogs()
51 e.Logs = nil
52
53 // Ignore bundle entries that don't contribute to the log stream's
54 // advancement.
55 if !(len(logs) > 0 || e.GetTerminal()) {
56 return
57 }
58
59 // Add this log to our reserve.
60 path := protoutil.DescriptorPath(e.GetDesc())
61 cur, ok := b.entries[path]
62 if !ok {
63 // Add this new entry to the bundle.
64 b.sizer.AppendBundleEntry(e)
65 cur = &bundlerStream{
66 entry: e,
67 logs: make([]*protocol.LogEntry, 0, len(logs)),
68 }
69 b.entries[path] = cur
70 } else {
71 cur.mergeEntry(e)
72 }
73
74 for _, le := range logs {
75 b.sizer.AppendLogEntry(e, le)
76 }
77 cur.logs = append(cur.logs, logs...)
78 b.count += len(logs)
79 return
80 }
81
82 func (b *bundlerImpl) reset() {
83 b.sizer = b.newSizer(b.bundle)
84 b.count = 0
tandrii(chromium) 2015/08/11 14:06:07 shouldn't this also reset the entries?
dnj (Google) 2015/08/11 16:24:01 The idea was that "GetBundles" will consume all bu
tandrii(chromium) 2015/08/11 17:32:22 Acknowledged.
85 }
86
87 func (b *bundlerImpl) Empty() bool {
88 return len(b.entries) == 0
89 }
90
91 func (b *bundlerImpl) Size() int {
92 return b.sizer.Size()
93 }
94
95 func (b *bundlerImpl) GetBundles(threshold int) []*protocol.ButlerLogBundle {
96 b.reset()
97
98 bundles := []*protocol.ButlerLogBundle(nil)
99 for {
100 bundle := b.getNextBundle(threshold)
101 if bundle == nil {
102 return bundles
103 }
104 bundles = append(bundles, bundle)
105 }
106 }
107
108 func (b *bundlerImpl) getNextBundle(threshold int) *protocol.ButlerLogBundle {
109 // Populate the bundle with as many entries as possible.
110 bundle := *b.bundle
tandrii(chromium) 2015/08/11 14:06:07 i'd add comment here that you really want to copy
dnj (Google) 2015/08/11 16:24:01 Done.
111 bundle.Entries = append([]*protocol.ButlerLogBundle_Entry(nil), bundle.E ntries...)
tandrii(chromium) 2015/08/11 14:06:07 I don't understand why you copy bundle entries her
dnj (Google) 2015/08/11 16:24:01 Originally it was to hedge the case where the temp
112 sizer := b.newSizer(&bundle)
tandrii(chromium) 2015/08/11 14:06:07 nit: i'd move this line closer to overThreshold fu
dnj (Google) 2015/08/11 16:24:01 Done.
113
114 // For determinism, add buffered entries in order of path.
115 keys := make([]string, 0, len(b.entries))
116 for k := range b.entries {
117 keys = append(keys, string(k))
118 }
119 sort.Sort(sort.StringSlice(keys))
tandrii(chromium) 2015/08/11 14:06:07 just sort.Strings(keys) https://golang.org/pkg/sor
dnj (Google) 2015/08/11 16:24:01 Done.
120
121 overThreshold := func() bool {
122 return threshold != 0 && threshold < sizer.Size()
123 }
124
125 BuildLoop:
126 for _, k := range keys {
127 e := b.entries[types.StreamPath(k)]
128
129 // Can we add this entry without exceeding our size threshold?
tandrii(chromium) 2015/08/11 14:06:07 what if threshold is small enough that the number
dnj (Google) 2015/08/11 16:24:01 It's allowed to happen. In that case, we will just
130 sizer.AppendBundleEntry(e.entry)
131 if overThreshold() {
132 break BuildLoop
133 }
134 ec := *e.entry
135 ec.Logs = append([]*protocol.LogEntry(nil), ec.Logs...)
tandrii(chromium) 2015/08/11 14:06:07 same as line #111: why copy ec.Logs here if you es
dnj (Google) 2015/08/11 16:24:01 Don't anymore; mandatory "nil" assumption.
136 bundle.Entries = append(bundle.Entries, &ec)
137
138 for i, le := range e.logs {
139 sizer.AppendLogEntry(e.entry, le)
140
141 if overThreshold() {
142 e.logs = append(e.logs[:0], e.logs[i:]...)
tandrii(chromium) 2015/08/11 14:06:07 My test http://play.golang.org/p/g0387lE-R7 sort o
dnj (Google) 2015/08/11 16:24:01 Good thought! However, from here, it does say tha
tandrii(chromium) 2015/08/11 17:32:22 Déjà vu: I knew I saw it somewhere, but yet again
dnj 2015/08/11 18:14:27 No worries, it took me a bit of digging to find, s
143 break BuildLoop
144 }
145 ec.Logs = append(ec.Logs, le)
146 }
147
148 // This entry has been fully added.
149 delete(b.entries, types.StreamPath(k))
150 }
151
152 if len(bundle.Entries) == 0 {
153 return nil
154 }
155 return &bundle
156 }
157
158 func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer {
159 nb := b.NewSizer
160 if nb == nil {
161 nb = NewFastSizer
162 }
163 return nb(bundle)
164 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698