Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(352)

Side by Side Diff: client/internal/logdog/butler/bundler/stream.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Updated from comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "sync"
9 "sync/atomic"
10 "time"
11
12 "github.com/luci/luci-go/common/logdog/protocol"
13 )
14
15 var (
16 // dataBufferSize is the size (in bytes) of the Data objects that a Stre am
17 // will lease.
18 dataBufferSize = 4096
19 )
20
21 // Stream is an individual Bundler Stream. Data is added to the Stream as a
22 // series of ordered binary chunks.
23 //
24 // A Stream is not goroutine-safe.
25 type Stream interface {
26 // LeaseData allocates and returns a Data block that stream data can be
27 // loaded into. The caller should Release() the Data, or transfer owners hip to
28 // something that will (e.g., Append()).
29 LeaseData() Data
30
31 // Append adds a sequential chunk of data to the Stream. Append may bloc k if
32 // the data isn't ready to be consumed.
33 //
34 // Append takes possession of the supplied Data, and will Release it whe n
35 // finished.
36 Append(Data) error
37
38 // Close closes the Stream, flushing any remaining data.
39 Close()
40 }
41
42 // streamConfig is the set of static configuration parameters for the stream.
43 type streamConfig struct {
44 // name is the name of this stream.
45 name string
46
47 // parser is the stream parser to use.
48 parser parser
49
50 // maximumBufferedBytes is the maximum number of bytes that this stream will
51 // retain in its parser before blocking subsequent Append attempts.
52 maximumBufferedBytes int64
53 // maximumBufferDuration is the maximum amount of time that a block of d ata
54 // can be comfortably buffered in the stream.
55 maximumBufferDuration time.Duration
56
57 // template is the minimally-populated Butler log bundle entry.
58 template protocol.ButlerLogBundle_Entry
59
60 // onAppend, if not nil, is invoked when an attempt to append data to th e
61 // stream occurs. If true is passed, the data was successfully appended. If
62 // false was passed, the data could not be appended immediately and the stream
63 // will block pending data consumption.
64 //
65 // The stream's append lock will be held when this method is called.
66 onAppend func(bool)
67 }
68
69 // streamImpl is a Stream implementation that is bound to a Bundler.
70 type streamImpl struct {
71 c *streamConfig
72
73 // drained is true if the stream is finished emitting data.
74 //
75 // It is an atomic value, with zero indicating not drained and non-zero
76 // indicating drained. It should be accessed via isDrained, and set with
77 // setDrained.
78 drained int32
79
80 // parserLock is a Mutex protecting the stream's parser instance and its
81 // underlying chunk.Buffer. Any access to either of these fields must be done
82 // while holding this lock.
83 parserLock sync.Mutex
84
85 // dataConsumedSignalC is a channel that can be used to signal when data has
86 // been consumed. It is set via signalDataConsumed.
87 dataConsumedSignalC chan struct{}
88 // appendErrValue is an atomically-set error. It will be returned by App end if
89 // an error occurs during stream processing.
90 appendErrValue atomic.Value
91
92 // stateLock protects stream state against concurrent access.
93 stateLock sync.Mutex
94 // closed, if non-zero, indicates that we have been closed and our strea m has
95 // finished reading.
96 //
97 // stateLock must be held when accessing this field.
98 closed bool
99 // lastLogEntry is a pointer to the last LogEntry that was exported.
100 //
101 // stateLock must be held when accessing this field.
102 lastLogEntry *protocol.LogEntry
103
104 // testAppendWaitCallback, if not nil, is called before Append blocks.
105 // This callback is used for testing coordination.
106 testAppendWaitCallback func()
107 }
108
109 func newStream(c streamConfig) *streamImpl {
110 return &streamImpl{
111 c: &c,
112
113 dataConsumedSignalC: make(chan struct{}, 1),
114 }
115 }
116
117 func (s *streamImpl) LeaseData() Data {
118 return globalDataPoolRegistry.getPool(dataBufferSize).getData()
119 }
120
121 func (s *streamImpl) Append(d Data) error {
122 defer func() {
123 if d != nil {
124 d.Release()
125 }
126 }()
127
128 // Block/loop until we've successfully appended the data.
129 for {
130 dLen := int64(len(d.Bytes()))
131 if err := s.appendError(); err != nil || dLen == 0 {
132 return err
133 }
134
135 s.withParserLock(func() {
136 if s.c.parser.bufferedBytes() == 0 ||
137 s.c.parser.bufferedBytes()+dLen <= s.c.maximumBu fferedBytes {
138 s.c.parser.appendData(d)
139 d = nil
140 }
141 })
142
143 // The data was appended; we're done.
144 if s.c.onAppend != nil {
145 s.c.onAppend(d == nil)
146 }
147 if d == nil {
148 break
149 }
150
151 // Not ready to append; wait for a data event and re-evaluate.
152 <-s.dataConsumedSignalC
153 }
154
155 return nil
156 }
157
158 // Signals our Append loop that data has been consumed.
159 func (s *streamImpl) signalDataConsumed() {
160 select {
161 case s.dataConsumedSignalC <- struct{}{}:
162 break
163
164 default:
165 break
166 }
167 }
168
169 func (s *streamImpl) Close() {
170 s.stateLock.Lock()
171 defer s.stateLock.Unlock()
172
173 s.closed = true
174 s.maybeSetDrainedLocked()
175 }
176
177 func (s *streamImpl) name() string {
178 return s.c.name
179 }
180
181 // isDrained returns true if this stream is finished emitting data.
182 //
183 // This can happen if either:
184 // - The stream is closed and has no more buffered data, or
185 // - The strema has encountered a fatal error during processing.
186 func (s *streamImpl) isDrained() bool {
187 return atomic.LoadInt32(&s.drained) != 0
188 }
189
190 // setDrained marks this stream as drained.
191 func (s *streamImpl) setDrained() {
192 atomic.StoreInt32(&s.drained, 1)
193 }
194
195 // maybeSetDrainedLocked evaluates our buffer stream status. If the stream is
196 // closed and our buffer is empty, it will set the drained state to true.
197 //
198 // The stream's stateLock must be held when calling this method.
199 //
200 // The resulting drained state will be returned.
201 func (s *streamImpl) maybeSetDrainedLocked() bool {
202 if s.isDrained() {
203 return true
204 }
205
206 // Not drained ... should we be?
207 if s.closed {
208 bufSize := int64(0)
209 s.withParserLock(func() {
210 bufSize = s.c.parser.bufferedBytes()
211 })
212 if bufSize == 0 {
213 s.setDrained()
214 return true
215 }
216 }
217
218 return false
219 }
220
221 // expireTime returns the Time when the oldest chunk in the stream will expire.
222 //
223 // This is calculated ask:
224 // oldest.Timestamp + stream.maximumBufferDuration
225 // If there is no buffered data, oldest will return nil.
226 func (s *streamImpl) expireTime() (t time.Time, has bool) {
227 s.withParserLock(func() {
228 t, has = s.c.parser.firstChunkTime()
229 })
230
231 if has {
232 t = t.Add(s.c.maximumBufferDuration)
233 }
234 return
235 }
236
237 // nextBundleEntry generates bundles for this stream. The total bundle data size
238 // must not exceed the supplied size.
239 //
240 // If no bundle entry could be generated given the constraints, nil will be
241 // returned.
242 //
243 // It is possible for some entries to be returned alongside an error.
244 func (s *streamImpl) nextBundleEntry(bb *builder, aggressive bool) bool {
245 s.stateLock.Lock()
246 defer s.stateLock.Unlock()
247
248 // If we're not drained, try and get the next bundle.
249 modified := false
250 if !s.maybeSetDrainedLocked() {
251 err := error(nil)
252 modified, err = s.nextBundleEntryLocked(bb, aggressive)
253 if err != nil {
254 s.setAppendError(err)
255 s.setDrained()
256 }
257
258 if modified {
259 s.signalDataConsumed()
260 }
261 }
262
263 // If we're drained, populate our terminal state.
264 if s.maybeSetDrainedLocked() && s.lastLogEntry != nil {
265 bb.setStreamTerminal(&s.c.template, s.lastLogEntry.StreamIndex)
266 }
267
268 return modified
269 }
270
271 func (s *streamImpl) nextBundleEntryLocked(bb *builder, aggressive bool) (bool, error) {
272 c := constraints{
273 allowSplit: aggressive,
274 closed: s.closed,
275 }
276
277 // Extract as many entries as possible from the stream. As we extract, a djust
278 // our byte size.
279 //
280 // If we're closed, this will continue to consume until finished. If an error
281 // occurs, shut down data collection.
282 modified := false
283 ierr := error(nil)
284
285 for c.limit = bb.remaining(); c.limit > 0; c.limit = bb.remaining() {
286 emittedLog := false
287 s.withParserLock(func() {
288 le, err := s.c.parser.nextEntry(&c)
289 if err != nil {
290 ierr = err
291 return
292 }
293
294 if le == nil {
295 return
296 }
297
298 emittedLog = true
299 modified = true
300 s.lastLogEntry = le
301 bb.add(&s.c.template, le)
302 })
303
304 if !emittedLog {
305 break
306 }
307 }
308
309 return modified, ierr
310 }
311
312 func (s *streamImpl) withParserLock(f func()) {
313 s.parserLock.Lock()
314 defer s.parserLock.Unlock()
315
316 f()
317 }
318
319 func (s *streamImpl) appendError() error {
320 if err := s.appendErrValue.Load(); err != nil {
321 return err.(error)
322 }
323 return nil
324 }
325
326 func (s *streamImpl) setAppendError(err error) {
327 s.appendErrValue.Store(err)
328 s.signalDataConsumed()
329 }
OLDNEW
« no previous file with comments | « client/internal/logdog/butler/bundler/sizer_test.go ('k') | client/internal/logdog/butler/bundler/stream_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698