| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package fetcher | 5 package fetcher |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "io" | 9 "io" |
| 10 "time" | 10 "time" |
| (...skipping 13 matching lines...) Expand all Loading... |
| 24 // DefaultBufferBytes is the default number of bytes to buffer. | 24 // DefaultBufferBytes is the default number of bytes to buffer. |
| 25 DefaultBufferBytes = int64(1024 * 1024) // 1MB | 25 DefaultBufferBytes = int64(1024 * 1024) // 1MB |
| 26 ) | 26 ) |
| 27 | 27 |
| 28 // LogRequest is a structure used by the Fetcher to request a range of logs | 28 // LogRequest is a structure used by the Fetcher to request a range of logs |
| 29 // from its Source. | 29 // from its Source. |
| 30 type LogRequest struct { | 30 type LogRequest struct { |
| 31 // Index is the starting log index to request. | 31 // Index is the starting log index to request. |
| 32 Index types.MessageIndex | 32 Index types.MessageIndex |
| 33 // Count, if >0, is the maximum number of log entries to request. | 33 // Count, if >0, is the maximum number of log entries to request. |
| 34 » Count int | 34 » Count int64 |
| 35 // Bytes, if >0, is the maximum number of log bytes to request. At least
one | 35 // Bytes, if >0, is the maximum number of log bytes to request. At least
one |
| 36 // log must be returned regardless of the byte limit. | 36 // log must be returned regardless of the byte limit. |
| 37 Bytes int64 | 37 Bytes int64 |
| 38 } | 38 } |
| 39 | 39 |
| 40 // Source is the source of log stream and log information. | 40 // Source is the source of log stream and log information. |
| 41 // | 41 // |
| 42 // The Source is resposible for handling retries, backoff, and transient errors. | 42 // The Source is resposible for handling retries, backoff, and transient errors. |
| 43 // An error from the Source will shut down the Fetcher. | 43 // An error from the Source will shut down the Fetcher. |
| 44 type Source interface { | 44 type Source interface { |
| (...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 153 } | 153 } |
| 154 if resp.log != nil { | 154 if resp.log != nil { |
| 155 le = resp.log | 155 le = resp.log |
| 156 } | 156 } |
| 157 } | 157 } |
| 158 return le, f.fetchErr | 158 return le, f.fetchErr |
| 159 } | 159 } |
| 160 | 160 |
| 161 type fetchRequest struct { | 161 type fetchRequest struct { |
| 162 index types.MessageIndex | 162 index types.MessageIndex |
| 163 » count int | 163 » count int64 |
| 164 bytes int64 | 164 bytes int64 |
| 165 respC chan *fetchResponse | 165 respC chan *fetchResponse |
| 166 } | 166 } |
| 167 | 167 |
| 168 type fetchResponse struct { | 168 type fetchResponse struct { |
| 169 logs []*logpb.LogEntry | 169 logs []*logpb.LogEntry |
| 170 tidx types.MessageIndex | 170 tidx types.MessageIndex |
| 171 err error | 171 err error |
| 172 } | 172 } |
| 173 | 173 |
| (...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 245 case fetchCount == 0, remaining < fetchCount: | 245 case fetchCount == 0, remaining < fetchCount: |
| 246 fetchCount = remaining | 246 fetchCount = remaining |
| 247 } | 247 } |
| 248 } | 248 } |
| 249 | 249 |
| 250 // Fetch logs if neither constraint has vetoed. | 250 // Fetch logs if neither constraint has vetoed. |
| 251 if fetchCount >= 0 && fetchBytes >= 0 { | 251 if fetchCount >= 0 && fetchBytes >= 0 { |
| 252 logFetchC = logFetchBaseC | 252 logFetchC = logFetchBaseC |
| 253 req := fetchRequest{ | 253 req := fetchRequest{ |
| 254 index: nextFetchIndex, | 254 index: nextFetchIndex, |
| 255 » » » » » count: int(fetchCount), | 255 » » » » » count: fetchCount, |
| 256 bytes: fetchBytes, | 256 bytes: fetchBytes, |
| 257 respC: logFetchC, | 257 respC: logFetchC, |
| 258 } | 258 } |
| 259 | 259 |
| 260 log.Fields{ | 260 log.Fields{ |
| 261 "index": req.index, | 261 "index": req.index, |
| 262 "count": req.count, | 262 "count": req.count, |
| 263 "bytes": req.bytes, | 263 "bytes": req.bytes, |
| 264 }.Debugf(c, "Buffering next round of logs...") | 264 }.Debugf(c, "Buffering next round of logs...") |
| 265 go f.fetchLogs(c, &req) | 265 go f.fetchLogs(c, &req) |
| (...skipping 118 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 384 case want > have: | 384 case want > have: |
| 385 // We have fewer logs buffered. Fetch the difference (including | 385 // We have fewer logs buffered. Fetch the difference (including |
| 386 // prefetch). | 386 // prefetch). |
| 387 return (want * int64(f.o.PrefetchFactor)) - have | 387 return (want * int64(f.o.PrefetchFactor)) - have |
| 388 | 388 |
| 389 default: | 389 default: |
| 390 // We're within our constraint. Do not fetch logs. | 390 // We're within our constraint. Do not fetch logs. |
| 391 return -1 | 391 return -1 |
| 392 } | 392 } |
| 393 } | 393 } |
| OLD | NEW |