OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package fetcher | 5 package fetcher |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "io" | 9 "io" |
10 "time" | 10 "time" |
(...skipping 13 matching lines...) Expand all Loading... |
24 // DefaultBufferBytes is the default number of bytes to buffer. | 24 // DefaultBufferBytes is the default number of bytes to buffer. |
25 DefaultBufferBytes = int64(1024 * 1024) // 1MB | 25 DefaultBufferBytes = int64(1024 * 1024) // 1MB |
26 ) | 26 ) |
27 | 27 |
28 // LogRequest is a structure used by the Fetcher to request a range of logs | 28 // LogRequest is a structure used by the Fetcher to request a range of logs |
29 // from its Source. | 29 // from its Source. |
30 type LogRequest struct { | 30 type LogRequest struct { |
31 // Index is the starting log index to request. | 31 // Index is the starting log index to request. |
32 Index types.MessageIndex | 32 Index types.MessageIndex |
33 // Count, if >0, is the maximum number of log entries to request. | 33 // Count, if >0, is the maximum number of log entries to request. |
34 » Count int | 34 » Count int64 |
35 // Bytes, if >0, is the maximum number of log bytes to request. At least
one | 35 // Bytes, if >0, is the maximum number of log bytes to request. At least
one |
36 // log must be returned regardless of the byte limit. | 36 // log must be returned regardless of the byte limit. |
37 Bytes int64 | 37 Bytes int64 |
38 } | 38 } |
39 | 39 |
40 // Source is the source of log stream and log information. | 40 // Source is the source of log stream and log information. |
41 // | 41 // |
42 // The Source is resposible for handling retries, backoff, and transient errors. | 42 // The Source is resposible for handling retries, backoff, and transient errors. |
43 // An error from the Source will shut down the Fetcher. | 43 // An error from the Source will shut down the Fetcher. |
44 type Source interface { | 44 type Source interface { |
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
153 } | 153 } |
154 if resp.log != nil { | 154 if resp.log != nil { |
155 le = resp.log | 155 le = resp.log |
156 } | 156 } |
157 } | 157 } |
158 return le, f.fetchErr | 158 return le, f.fetchErr |
159 } | 159 } |
160 | 160 |
161 type fetchRequest struct { | 161 type fetchRequest struct { |
162 index types.MessageIndex | 162 index types.MessageIndex |
163 » count int | 163 » count int64 |
164 bytes int64 | 164 bytes int64 |
165 respC chan *fetchResponse | 165 respC chan *fetchResponse |
166 } | 166 } |
167 | 167 |
168 type fetchResponse struct { | 168 type fetchResponse struct { |
169 logs []*logpb.LogEntry | 169 logs []*logpb.LogEntry |
170 tidx types.MessageIndex | 170 tidx types.MessageIndex |
171 err error | 171 err error |
172 } | 172 } |
173 | 173 |
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
245 case fetchCount == 0, remaining < fetchCount: | 245 case fetchCount == 0, remaining < fetchCount: |
246 fetchCount = remaining | 246 fetchCount = remaining |
247 } | 247 } |
248 } | 248 } |
249 | 249 |
250 // Fetch logs if neither constraint has vetoed. | 250 // Fetch logs if neither constraint has vetoed. |
251 if fetchCount >= 0 && fetchBytes >= 0 { | 251 if fetchCount >= 0 && fetchBytes >= 0 { |
252 logFetchC = logFetchBaseC | 252 logFetchC = logFetchBaseC |
253 req := fetchRequest{ | 253 req := fetchRequest{ |
254 index: nextFetchIndex, | 254 index: nextFetchIndex, |
255 » » » » » count: int(fetchCount), | 255 » » » » » count: fetchCount, |
256 bytes: fetchBytes, | 256 bytes: fetchBytes, |
257 respC: logFetchC, | 257 respC: logFetchC, |
258 } | 258 } |
259 | 259 |
260 log.Fields{ | 260 log.Fields{ |
261 "index": req.index, | 261 "index": req.index, |
262 "count": req.count, | 262 "count": req.count, |
263 "bytes": req.bytes, | 263 "bytes": req.bytes, |
264 }.Debugf(c, "Buffering next round of logs...") | 264 }.Debugf(c, "Buffering next round of logs...") |
265 go f.fetchLogs(c, &req) | 265 go f.fetchLogs(c, &req) |
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
355 // Will never be satisfied, since we're requesting beyon
d the terminal | 355 // Will never be satisfied, since we're requesting beyon
d the terminal |
356 // index. | 356 // index. |
357 return | 357 return |
358 | 358 |
359 default: | 359 default: |
360 // No logs this round. Sleep for more. | 360 // No logs this round. Sleep for more. |
361 log.Fields{ | 361 log.Fields{ |
362 "index": req.index, | 362 "index": req.index, |
363 "delay": f.o.Delay, | 363 "delay": f.o.Delay, |
364 }.Infof(c, "No logs returned. Sleeping...") | 364 }.Infof(c, "No logs returned. Sleeping...") |
365 » » » clock.Sleep(c, f.o.Delay) | 365 » » » clock.CancelSleep(c, f.o.Delay) |
366 } | 366 } |
367 } | 367 } |
368 } | 368 } |
369 | 369 |
370 func (f *Fetcher) sizeOf(le *logpb.LogEntry) int64 { | 370 func (f *Fetcher) sizeOf(le *logpb.LogEntry) int64 { |
371 sf := f.sizeFunc | 371 sf := f.sizeFunc |
372 if sf == nil { | 372 if sf == nil { |
373 sf = proto.Size | 373 sf = proto.Size |
374 } | 374 } |
375 return int64(sf(le)) | 375 return int64(sf(le)) |
376 } | 376 } |
377 | 377 |
378 func (f *Fetcher) applyConstraint(want, have int64) int64 { | 378 func (f *Fetcher) applyConstraint(want, have int64) int64 { |
379 switch { | 379 switch { |
380 case want <= 0: | 380 case want <= 0: |
381 // No constraint, unbounded. | 381 // No constraint, unbounded. |
382 return 0 | 382 return 0 |
383 | 383 |
384 case want > have: | 384 case want > have: |
385 // We have fewer logs buffered. Fetch the difference (including | 385 // We have fewer logs buffered. Fetch the difference (including |
386 // prefetch). | 386 // prefetch). |
387 return (want * int64(f.o.PrefetchFactor)) - have | 387 return (want * int64(f.o.PrefetchFactor)) - have |
388 | 388 |
389 default: | 389 default: |
390 // We're within our constraint. Do not fetch logs. | 390 // We're within our constraint. Do not fetch logs. |
391 return -1 | 391 return -1 |
392 } | 392 } |
393 } | 393 } |
OLD | NEW |