Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(758)

Side by Side Diff: common/logdog/fetcher/fetcher.go

Issue 1672833003: LogDog: Add log rendering view. Base URL: https://github.com/luci/luci-go@master
Patch Set: Clean up, add tests, little reorg. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/logdog/coordinator/stream_params.go ('k') | common/logdog/fetcher/fetcher_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package fetcher 5 package fetcher
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "io" 9 "io"
10 "time" 10 "time"
(...skipping 13 matching lines...) Expand all
24 // DefaultBufferBytes is the default number of bytes to buffer. 24 // DefaultBufferBytes is the default number of bytes to buffer.
25 DefaultBufferBytes = int64(1024 * 1024) // 1MB 25 DefaultBufferBytes = int64(1024 * 1024) // 1MB
26 ) 26 )
27 27
28 // LogRequest is a structure used by the Fetcher to request a range of logs 28 // LogRequest is a structure used by the Fetcher to request a range of logs
29 // from its Source. 29 // from its Source.
30 type LogRequest struct { 30 type LogRequest struct {
31 // Index is the starting log index to request. 31 // Index is the starting log index to request.
32 Index types.MessageIndex 32 Index types.MessageIndex
33 // Count, if >0, is the maximum number of log entries to request. 33 // Count, if >0, is the maximum number of log entries to request.
34 » Count int 34 » Count int64
35 // Bytes, if >0, is the maximum number of log bytes to request. At least one 35 // Bytes, if >0, is the maximum number of log bytes to request. At least one
36 // log must be returned regardless of the byte limit. 36 // log must be returned regardless of the byte limit.
37 Bytes int64 37 Bytes int64
38 } 38 }
39 39
40 // Source is the source of log stream and log information. 40 // Source is the source of log stream and log information.
41 // 41 //
42 // The Source is resposible for handling retries, backoff, and transient errors. 42 // The Source is resposible for handling retries, backoff, and transient errors.
43 // An error from the Source will shut down the Fetcher. 43 // An error from the Source will shut down the Fetcher.
44 type Source interface { 44 type Source interface {
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
153 } 153 }
154 if resp.log != nil { 154 if resp.log != nil {
155 le = resp.log 155 le = resp.log
156 } 156 }
157 } 157 }
158 return le, f.fetchErr 158 return le, f.fetchErr
159 } 159 }
160 160
161 type fetchRequest struct { 161 type fetchRequest struct {
162 index types.MessageIndex 162 index types.MessageIndex
163 » count int 163 » count int64
164 bytes int64 164 bytes int64
165 respC chan *fetchResponse 165 respC chan *fetchResponse
166 } 166 }
167 167
168 type fetchResponse struct { 168 type fetchResponse struct {
169 logs []*logpb.LogEntry 169 logs []*logpb.LogEntry
170 tidx types.MessageIndex 170 tidx types.MessageIndex
171 err error 171 err error
172 } 172 }
173 173
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
245 case fetchCount == 0, remaining < fetchCount: 245 case fetchCount == 0, remaining < fetchCount:
246 fetchCount = remaining 246 fetchCount = remaining
247 } 247 }
248 } 248 }
249 249
250 // Fetch logs if neither constraint has vetoed. 250 // Fetch logs if neither constraint has vetoed.
251 if fetchCount >= 0 && fetchBytes >= 0 { 251 if fetchCount >= 0 && fetchBytes >= 0 {
252 logFetchC = logFetchBaseC 252 logFetchC = logFetchBaseC
253 req := fetchRequest{ 253 req := fetchRequest{
254 index: nextFetchIndex, 254 index: nextFetchIndex,
255 » » » » » count: int(fetchCount), 255 » » » » » count: fetchCount,
256 bytes: fetchBytes, 256 bytes: fetchBytes,
257 respC: logFetchC, 257 respC: logFetchC,
258 } 258 }
259 259
260 log.Fields{ 260 log.Fields{
261 "index": req.index, 261 "index": req.index,
262 "count": req.count, 262 "count": req.count,
263 "bytes": req.bytes, 263 "bytes": req.bytes,
264 }.Debugf(c, "Buffering next round of logs...") 264 }.Debugf(c, "Buffering next round of logs...")
265 go f.fetchLogs(c, &req) 265 go f.fetchLogs(c, &req)
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after
355 // Will never be satisfied, since we're requesting beyon d the terminal 355 // Will never be satisfied, since we're requesting beyon d the terminal
356 // index. 356 // index.
357 return 357 return
358 358
359 default: 359 default:
360 // No logs this round. Sleep for more. 360 // No logs this round. Sleep for more.
361 log.Fields{ 361 log.Fields{
362 "index": req.index, 362 "index": req.index,
363 "delay": f.o.Delay, 363 "delay": f.o.Delay,
364 }.Infof(c, "No logs returned. Sleeping...") 364 }.Infof(c, "No logs returned. Sleeping...")
365 » » » clock.Sleep(c, f.o.Delay) 365 » » » clock.CancelSleep(c, f.o.Delay)
366 } 366 }
367 } 367 }
368 } 368 }
369 369
370 func (f *Fetcher) sizeOf(le *logpb.LogEntry) int64 { 370 func (f *Fetcher) sizeOf(le *logpb.LogEntry) int64 {
371 sf := f.sizeFunc 371 sf := f.sizeFunc
372 if sf == nil { 372 if sf == nil {
373 sf = proto.Size 373 sf = proto.Size
374 } 374 }
375 return int64(sf(le)) 375 return int64(sf(le))
376 } 376 }
377 377
378 func (f *Fetcher) applyConstraint(want, have int64) int64 { 378 func (f *Fetcher) applyConstraint(want, have int64) int64 {
379 switch { 379 switch {
380 case want <= 0: 380 case want <= 0:
381 // No constraint, unbounded. 381 // No constraint, unbounded.
382 return 0 382 return 0
383 383
384 case want > have: 384 case want > have:
385 // We have fewer logs buffered. Fetch the difference (including 385 // We have fewer logs buffered. Fetch the difference (including
386 // prefetch). 386 // prefetch).
387 return (want * int64(f.o.PrefetchFactor)) - have 387 return (want * int64(f.o.PrefetchFactor)) - have
388 388
389 default: 389 default:
390 // We're within our constraint. Do not fetch logs. 390 // We're within our constraint. Do not fetch logs.
391 return -1 391 return -1
392 } 392 }
393 } 393 }
OLDNEW
« no previous file with comments | « common/logdog/coordinator/stream_params.go ('k') | common/logdog/fetcher/fetcher_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698