Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 /* | |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. | |
| 3 Use of this source code is governed under the Apache License, Version 2.0 | |
| 4 that can be found in the LICENSE file. | |
| 5 */ | |
| 6 | |
| 7 ///<reference path="../logdog-stream/logdog.ts" /> | |
| 8 ///<reference path="../rpc/client.ts" /> | |
| 9 | |
| 10 namespace LogDog { | |
| 11 | |
| 12 export type FetcherOptions = { | |
| 13 byteCount?: number; logCount?: number; sparse?: boolean; | |
| 14 }; | |
| 15 | |
| 16 // Type of a "Get" or "Tail" response (protobuf). | |
| 17 type GetResponse = {state: any; desc: any; logs: any[];}; | |
| 18 | |
| 19 /** The Fetcher's current status. */ | |
| 20 export enum FetcherStatus { | |
| 21 // Not doing anything. | |
| 22 IDLE, | |
| 23 // Attempting to load log data. | |
| 24 LOADING, | |
| 25 // We're waiting for the log stream to emit more logs. | |
| 26 STREAMING, | |
| 27 // The log stream is missing. | |
| 28 MISSING, | |
| 29 // The log stream encountered an error. | |
| 30 ERROR | |
| 31 } | |
| 32 | |
| 33 /** | |
| 34 * Fetcher is responsible for fetching LogDog log stream entries from the | |
| 35 * remote service via an RPC client. | |
| 36 * | |
| 37 * Fetcher is responsible for wrapping the raw RPC calls and their results, | |
| 38 * and retrying calls due to: | |
| 39 * | |
| 40 * - Transient failures (via RPC client). | |
| 41 * - Missing stream (assumption is that the stream is still being ingested and | |
| 42 * registered, and therefore a repeated retry is appropriate). | |
| 43 * - Streaming stream (log stream is not terminated, but more records are not | |
| 44 * yet available). | |
| 45 * | |
| 46 * The interface that Fetcher presents to its caller is a simple Promise-based | |
| 47 * method to retrieve log stream data. | |
| 48 * | |
| 49 * Fetcher offers fetching via "get", "getAll", and "getLatest". | |
| 50 */ | |
| 51 export class Fetcher { | |
| 52 private debug = false; | |
| 53 private static maxLogsPerGet = 0; | |
| 54 | |
| 55 private lastDesc: LogDog.LogStreamDescriptor; | |
| 56 private lastState: LogDog.LogStreamState; | |
| 57 | |
| 58 private lastStatus: FetcherStatus = FetcherStatus.IDLE; | |
| 59 private lastErrorValue: Error|null; | |
| 60 private statusChangedCallback: (() => void); | |
| 61 | |
| 62 private static missingRetry: luci.Retry = {delay: 5000, maxDelay: 15000}; | |
| 63 private static streamingRetry: luci.Retry = {delay: 1000, maxDelay: 5000}; | |
| 64 | |
| 65 constructor( | |
| 66 private client: luci.Client, readonly stream: LogDog.StreamPath) {} | |
| 67 | |
| 68 get desc() { | |
| 69 return this.lastDesc; | |
| 70 } | |
| 71 get state() { | |
| 72 return this.lastState; | |
| 73 } | |
| 74 | |
| 75 get status() { | |
| 76 return this.lastStatus; | |
| 77 } | |
| 78 | |
| 79 /** | |
| 80 * Returns the log stream's terminal index. | |
| 81 * | |
| 82 * If no terminal index is known (the log is still streaming) this will | |
| 83 * return -1. | |
| 84 */ | |
| 85 get terminalIndex(): number { | |
| 86 return ((this.lastState) ? this.lastState.terminalIndex : -1); | |
| 87 } | |
| 88 | |
| 89 /** Archived returns true if this log stream is known to be archived. */ | |
| 90 get archived(): boolean { | |
| 91 return (!!(this.lastState && this.lastState.archive)); | |
| 92 } | |
| 93 | |
| 94 get lastError(): Error|null { | |
| 95 return this.lastErrorValue; | |
| 96 } | |
| 97 | |
| 98 private updateStatus(st: FetcherStatus, err?: Error) { | |
| 99 if (st !== this.lastStatus || err !== this.lastErrorValue) { | |
| 100 this.lastStatus = st; | |
| 101 this.lastErrorValue = (err || null); | |
| 102 | |
| 103 if (this.statusChangedCallback) { | |
| 104 this.statusChangedCallback(); | |
| 105 }; | |
| 106 } | |
| 107 } | |
| 108 | |
| 109 /** | |
| 110 * Sets the status changed callback, which will be invoked whenever the | |
| 111 * Fetcher's status has changed. | |
| 112 */ | |
| 113 setStatusChangedCallback(fn: () => void) { | |
| 114 this.statusChangedCallback = fn; | |
| 115 } | |
| 116 | |
| 117 /** | |
| 118 * Returns a Promise that will resolve to the next block of logs in the | |
| 119 * stream. | |
| 120 * | |
| 121 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the | |
| 122 * next block of logs in the stream. | |
| 123 */ | |
| 124 get(index: number, opts: FetcherOptions): Promise<LogDog.LogEntry[]> { | |
| 125 return this.getIndex(index, opts); | |
| 126 } | |
| 127 | |
| 128 /** | |
| 129 * Returns a Promise that will resolve to "count" log entries starting at | |
| 130 * "startIndex". | |
| 131 * | |
| 132 * If multiple RPC calls are required to retrieve "count" entries, these | |
| 133 * will be scheduled, and the Promise will block until the full set of | |
| 134 * requested stream entries is retrieved. | |
| 135 */ | |
| 136 getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> { | |
| 137 // Request the tail walkback logs. Since our request for N logs may return | |
| 138 // <N logs, we will repeat the request until all requested logs have been | |
| 139 // obtained. | |
| 140 let allLogs: LogDog.LogEntry[] = []; | |
| 141 | |
| 142 let getIter = (): Promise<LogDog.LogEntry[]> => { | |
| 143 if (count <= 0) { | |
| 144 return Promise.resolve(allLogs); | |
| 145 } | |
| 146 | |
| 147 // Perform Gets until we have the requested number of logs. We don't | |
| 148 // have to constrain the "logCount" parameter b/c we automatically do | |
| 149 // that in getIndex. | |
| 150 let opts: FetcherOptions = { | |
| 151 logCount: count, | |
| 152 sparse: true, | |
| 153 }; | |
| 154 return this.getIndex(startIndex, opts).then((logs) => { | |
| 155 if (logs) { | |
| 156 allLogs.push.apply(allLogs, logs); | |
|
nodir
2017/03/08 07:41:28
nit, in my experience this is usually written as
A
dnj
2017/03/08 08:50:14
TypeScript complains: Property 'push' does not exi
nodir
2017/03/13 19:48:21
:(
| |
| 157 startIndex += logs.length; | |
| 158 count -= logs.length; | |
| 159 } | |
| 160 return getIter(); | |
|
nodir
2017/03/08 07:41:28
return resolved promise instead of recursive if co
dnj
2017/03/08 08:50:14
Done.
| |
| 161 }); | |
|
nodir
2017/03/08 07:41:28
then return allLogs? currently allLogs is not used
dnj
2017/03/08 08:50:14
It's used above (see line #144).
nodir
2017/03/13 19:48:21
Acknowledged.
| |
| 162 }; | |
| 163 return getIter(); | |
| 164 } | |
| 165 | |
| 166 /** | |
| 167 * Fetches the latest log entry. | |
| 168 */ | |
| 169 getLatest(): Promise<LogDog.LogEntry[]> { | |
| 170 let errNoLogs = new Error('no logs, streaming'); | |
| 171 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); | |
| 172 return streamingRetry.do( | |
| 173 () => { | |
| 174 return this.doTail().then((logs) => { | |
| 175 if (!(logs && logs.length)) { | |
| 176 throw errNoLogs; | |
| 177 } | |
| 178 return logs; | |
| 179 }); | |
| 180 }, | |
| 181 (err: Error, delay: number) => { | |
| 182 if (err !== errNoLogs) { | |
| 183 throw err; | |
| 184 } | |
| 185 | |
| 186 // No logs were returned, and we expect logs, so we're streaming. | |
| 187 // Try again after a delay. | |
| 188 this.updateStatus(FetcherStatus.STREAMING); | |
| 189 console.warn( | |
| 190 this.stream, | |
| 191 `: No logs returned; retrying after ${delay}ms...`); | |
| 192 }); | |
| 193 } | |
| 194 | |
| 195 private getIndex(index: number, opts: FetcherOptions): | |
| 196 Promise<LogDog.LogEntry[]> { | |
| 197 // (Testing) Constrain our max logs, if set. | |
| 198 if (Fetcher.maxLogsPerGet > 0) { | |
| 199 if (!opts) { | |
|
nodir
2017/03/08 07:41:28
doGet assumes opts is not null. doGet is called be
dnj
2017/03/08 08:50:14
Actually now that I do strict null checks, I can d
| |
| 200 opts = {}; | |
| 201 } | |
| 202 if ((!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet) { | |
| 203 opts.logCount = Fetcher.maxLogsPerGet; | |
| 204 } | |
| 205 } | |
| 206 | |
| 207 // We will retry continuously until we get a log (streaming). | |
| 208 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); | |
| 209 let errNoLogs = new Error('no logs, streaming'); | |
| 210 return streamingRetry | |
| 211 .do( | |
| 212 () => { | |
| 213 // If we're asking for a log beyond our stream, don't bother. | |
| 214 if (this.terminalIndex >= 0 && index > this.terminalIndex) { | |
| 215 return Promise.resolve([]); | |
| 216 } | |
| 217 | |
| 218 return this.doGet(index, opts).then((logs) => { | |
|
nodir
2017/03/08 07:41:28
nit: parens around parameters of a lambda expressi
dnj
2017/03/08 08:50:15
Done.
| |
| 219 if (!(logs && !logs.length)) { | |
|
nodir
2017/03/08 07:41:28
this is hard to read. This is equivalent to
if (!
dnj
2017/03/08 08:50:15
Actually that inner "!" was a bug, it should be: !
| |
| 220 // (Retry) | |
| 221 throw errNoLogs; | |
| 222 } | |
| 223 | |
| 224 return logs; | |
| 225 }); | |
| 226 }, | |
| 227 (err: Error, delay: number) => { | |
| 228 if (err !== errNoLogs) { | |
| 229 throw err; | |
| 230 } | |
| 231 | |
| 232 // No logs were returned, and we expect logs, so we're | |
| 233 // streaming. Try again after a delay. | |
| 234 this.updateStatus(FetcherStatus.STREAMING); | |
|
nodir
2017/03/08 07:41:28
all mutations (like updateStatus calls) in callbac
dnj
2017/03/08 08:50:14
Originally I didn't really care, but I think you'r
nodir
2017/03/13 19:48:21
great
| |
| 235 console.warn( | |
| 236 this.stream, | |
| 237 `: No logs returned; retrying after ${delay}ms...`); | |
| 238 }) | |
| 239 .then((logs) => { | |
|
nodir
2017/03/08 07:41:28
nit: unnecessary parens
dnj
2017/03/08 08:50:15
Done.
| |
| 240 // Since we allow non-contiguous Get, we may get back more logs than | |
| 241 // we actually expected. Prune any such additional. | |
| 242 if (opts.logCount && opts.logCount > 0) { | |
|
nodir
2017/03/08 07:41:28
shouldn't this check opts.nonContiguous? currently
dnj
2017/03/08 08:50:14
Not in this function. Probably should, Done.
| |
| 243 let maxStreamIndex = index + opts.logCount - 1; | |
|
nodir
2017/03/08 07:41:28
i don't fully understand this. Can you document lo
dnj
2017/03/08 08:50:14
Done.
| |
| 244 logs = logs.filter((le) => { | |
|
nodir
2017/03/08 07:41:28
logs = logs.filter(le => le.streamIndex <= maxStre
dnj
2017/03/08 08:50:14
Oh nice, done.
| |
| 245 return le.streamIndex <= maxStreamIndex; | |
| 246 }); | |
| 247 } | |
| 248 return logs; | |
| 249 }); | |
| 250 } | |
| 251 | |
| 252 private doGet(index: number, opts: FetcherOptions): | |
| 253 Promise<LogDog.LogEntry[]> { | |
| 254 let request: { | |
| 255 project: string; path: string; state: boolean; index: number; | |
| 256 | |
| 257 nonContiguous?: boolean; | |
| 258 byteCount?: number; | |
| 259 logCount?: number; | |
| 260 } = { | |
| 261 project: this.stream.project, | |
| 262 path: this.stream.path, | |
| 263 state: (this.terminalIndex < 0), | |
| 264 index: index, | |
| 265 }; | |
| 266 if (opts.sparse || this.archived) { | |
| 267 // This log stream is archived. We will relax the contiguous requirement | |
| 268 // so we can render sparse log streams. | |
| 269 request.nonContiguous = true; | |
| 270 } | |
| 271 if (opts) { | |
|
nodir
2017/03/08 07:41:28
opts is not falsy since L266 didn't explode
dnj
2017/03/08 08:50:14
Ah yeah, it used to be allowed to be null, but not
| |
| 272 if (opts.byteCount && opts.byteCount > 0) { | |
| 273 request.byteCount = opts.byteCount; | |
| 274 } | |
| 275 if (opts.logCount && opts.logCount > 0) { | |
| 276 request.logCount = opts.logCount; | |
| 277 } | |
| 278 } | |
| 279 | |
| 280 if (this.debug) { | |
| 281 console.log('logdog.Logs.Get:', request); | |
| 282 } | |
| 283 | |
| 284 // Perform our Get, waiting until the stream actually exists. | |
| 285 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry); | |
| 286 return missingRetry | |
| 287 .do( | |
| 288 () => { | |
| 289 this.updateStatus(FetcherStatus.LOADING); | |
| 290 return this.client.call('logdog.Logs', 'Get', request); | |
| 291 }, | |
| 292 this.doRetryIfMissing) | |
| 293 .then((resp: GetResponse) => { | |
| 294 let fr = FetchResult.make(resp, this.lastDesc); | |
| 295 return this.afterProcessResult(fr); | |
| 296 }); | |
| 297 } | |
| 298 | |
| 299 private doTail(): Promise<LogDog.LogEntry[]> { | |
| 300 let request: {project: string; path: string; state: boolean;} = { | |
| 301 project: this.stream.project, | |
| 302 path: this.stream.path, | |
| 303 state: (this.terminalIndex < 0), | |
| 304 }; | |
| 305 | |
| 306 if (this.debug) { | |
| 307 console.log('logdog.Logs.Tail:', request); | |
| 308 } | |
| 309 | |
| 310 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry); | |
| 311 return missingRetry | |
| 312 .do( | |
| 313 () => { | |
| 314 this.updateStatus(FetcherStatus.LOADING); | |
| 315 return this.client.call('logdog.Logs', 'Tail', request); | |
| 316 }, | |
| 317 this.doRetryIfMissing) | |
| 318 .then((resp: GetResponse) => { | |
| 319 let fr = FetchResult.make(resp, this.lastDesc); | |
| 320 return this.afterProcessResult(fr); | |
| 321 }); | |
| 322 } | |
| 323 | |
| 324 private afterProcessResult(fr: FetchResult): LogDog.LogEntry[] { | |
| 325 if (this.debug) { | |
| 326 if (fr.logs.length) { | |
| 327 console.log( | |
| 328 'Request returned:', fr.logs[0].streamIndex, '..', | |
| 329 fr.logs[fr.logs.length - 1].streamIndex, fr.desc, fr.state); | |
| 330 } else { | |
| 331 console.log('Request returned no logs:', fr.desc, fr.state); | |
| 332 } | |
| 333 } | |
| 334 | |
| 335 this.updateStatus(FetcherStatus.IDLE); | |
|
nodir
2017/03/08 07:41:28
probably a race
dnj
2017/03/08 08:50:15
Done.
| |
| 336 if (fr.desc) { | |
| 337 this.lastDesc = fr.desc; | |
|
nodir
2017/03/08 07:41:28
here too
dnj
2017/03/08 08:50:14
Done.
| |
| 338 } | |
| 339 if (fr.state) { | |
| 340 this.lastState = fr.state; | |
| 341 } | |
| 342 return fr.logs; | |
| 343 } | |
| 344 | |
| 345 private doRetryIfMissing(err: Error, delay: number) { | |
| 346 // Is this a gRPC Error? | |
| 347 let grpc = luci.GrpcError.convert(err); | |
| 348 if (grpc && grpc.code === luci.Code.NOT_FOUND) { | |
| 349 this.updateStatus(FetcherStatus.MISSING); | |
| 350 | |
| 351 console.warn( | |
| 352 this.stream, ': Is not found:', err, | |
| 353 `; retrying after ${delay}ms...`); | |
| 354 } | |
|
nodir
2017/03/08 07:41:28
return
dnj
2017/03/08 08:50:15
Done.
| |
| 355 | |
| 356 this.updateStatus(FetcherStatus.ERROR, err); | |
| 357 throw err; | |
| 358 } | |
| 359 } | |
| 360 | |
| 361 /** | |
| 362 * The result of a log stream fetch, for internal usage. | |
| 363 * | |
| 364 * It will include zero or more log entries, and optionally (if requested) | |
| 365 * the log stream's descriptor and state. | |
| 366 */ | |
| 367 class FetchResult { | |
| 368 constructor( | |
| 369 readonly logs: LogDog.LogEntry[], | |
| 370 readonly desc?: LogDog.LogStreamDescriptor, | |
| 371 readonly state?: LogDog.LogStreamState) {} | |
| 372 | |
| 373 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor): | |
| 374 FetchResult { | |
| 375 let loadDesc: LogDog.LogStreamDescriptor|undefined; | |
| 376 if (resp.desc) { | |
| 377 desc = loadDesc = LogDog.LogStreamDescriptor.make(resp.desc); | |
| 378 } | |
| 379 | |
| 380 let loadState: LogDog.LogStreamState|undefined; | |
| 381 if (resp.state) { | |
| 382 loadState = LogDog.LogStreamState.make(resp.state); | |
| 383 } | |
| 384 | |
| 385 let logs = (resp.logs || []).map((le) => { | |
|
nodir
2017/03/08 07:41:28
nit
let logs = (resp.logs || []).map(le =>
dnj
2017/03/08 08:50:15
Done.
| |
| 386 return LogDog.LogEntry.make(le, desc); | |
| 387 }); | |
| 388 return new FetchResult(logs, loadDesc, loadState); | |
| 389 } | |
| 390 } | |
| 391 } | |
| OLD | NEW |