Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 /* | |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. | |
| 3 Use of this source code is governed under the Apache License, Version 2.0 | |
| 4 that can be found in the LICENSE file. | |
| 5 */ | |
| 6 | |
| 7 ///<reference path="../logdog-stream/logdog.ts" /> | |
| 8 ///<reference path="../luci-sleep-promise/promise.ts" /> | |
| 9 ///<reference path="../rpc/client.ts" /> | |
| 10 | |
| 11 namespace LogDog { | |
| 12 | |
| 13 export type FetcherOptions = { | |
| 14 byteCount?: number; logCount?: number; sparse?: boolean; | |
| 15 }; | |
| 16 | |
| 17 // Type of a "Get" or "Tail" response (protobuf). | |
| 18 type GetResponse = {state: any; desc: any; logs: any[];}; | |
| 19 | |
| 20 /** The Fetcher's current status. */ | |
| 21 export enum FetcherStatus { | |
| 22 // Not doing anything. | |
| 23 IDLE, | |
| 24 // Attempting to load log data. | |
| 25 LOADING, | |
| 26 // We're waiting for the log stream to emit more logs. | |
| 27 STREAMING, | |
| 28 // The log stream is missing. | |
| 29 MISSING, | |
| 30 // The log stream encountered an error. | |
| 31 ERROR | |
| 32 } | |
| 33 | |
| 34 /** | |
| 35 * Fetcher is responsible for fetching LogDog log stream entries from the | |
| 36 * remote service via an RPC client. | |
| 37 * | |
| 38 * Fetcher is responsible for wrapping the raw RPC calls and their results, | |
| 39 * and retrying calls due to: - Transient failures (via RPC client). - Missing | |
|
hinoka
2017/02/27 21:50:25
newline for bullet points.
dnj
2017/03/08 04:13:43
Done.
| |
| 40 * stream (assumption is that the stream is still being ingested and | |
| 41 * registered, and therefore a repeated retry is appropriate). | |
| 42 * - Streaming stream (log stream is not terminated, but more records are not | |
| 43 * yet available). | |
| 44 * | |
| 45 * The interface that Fetcher presents to its caller is a simple Promise-based | |
| 46 * method to retrieve log stream data. | |
| 47 * | |
| 48 * Fetcher offers fetching via "get", "getAll", and "tail". | |
|
hinoka
2017/02/27 21:50:25
how about "getLast" instead of "tail"?
dnj
2017/03/08 04:13:43
It's not the last though. Maybe "latest"?
| |
| 49 */ | |
| 50 export class Fetcher { | |
| 51 private debug = false; | |
| 52 private static maxLogsPerGet = 0; | |
| 53 | |
| 54 private lastDesc: LogDog.LogStreamDescriptor; | |
| 55 private lastState: LogDog.LogStreamState; | |
| 56 | |
| 57 private currentStatus: FetcherStatus = FetcherStatus.IDLE; | |
| 58 private lastErrorValue: Error|null; | |
| 59 private statusChangedCallback: (() => void); | |
| 60 | |
| 61 private static missingRetry: luci.Retry = {delay: 5000, maxDelay: 15000}; | |
| 62 private static streamingRetry: luci.Retry = {delay: 1000, maxDelay: 5000}; | |
| 63 | |
| 64 constructor(private client: luci.Client, readonly stream: LogDog.Stream) {} | |
| 65 | |
| 66 get desc() { | |
| 67 return this.lastDesc; | |
| 68 } | |
| 69 get state() { | |
| 70 return this.lastState; | |
| 71 } | |
| 72 | |
| 73 get status() { | |
| 74 return this.currentStatus; | |
| 75 } | |
| 76 | |
| 77 /** | |
| 78 * Returns the log stream's terminal index. | |
| 79 * | |
| 80 * If no terminal index is known, or if the log stream is still streaming, | |
|
hinoka
2017/02/27 21:50:26
"If no terminal index is known because the log is
dnj
2017/03/08 04:13:43
Done.
| |
| 81 * this will return -1. | |
| 82 */ | |
| 83 get terminalIndex(): number { | |
| 84 return ((this.lastState) ? this.lastState.terminalIndex : -1); | |
| 85 } | |
| 86 | |
| 87 /** Archived returns true if this log stream is known to be archived. */ | |
| 88 get archived(): boolean { | |
| 89 return (!!(this.lastState && this.lastState.archive)); | |
|
hinoka
2017/02/27 21:50:25
does bool(...) exist? That would be more readable
dnj
2017/03/08 04:13:43
No :(
| |
| 90 } | |
| 91 | |
| 92 get lastError(): Error|null { | |
| 93 return this.lastErrorValue; | |
| 94 } | |
| 95 | |
| 96 private setCurrentStatus(st: FetcherStatus, err?: Error) { | |
| 97 if (st !== this.currentStatus || err !== this.lastErrorValue) { | |
| 98 this.currentStatus = st; | |
|
hinoka
2017/02/27 21:50:25
If these are set together, should they be called e
dnj
2017/03/08 04:13:43
Done.
| |
| 99 this.lastErrorValue = (err || null); | |
| 100 | |
| 101 if (this.statusChangedCallback) { | |
| 102 this.statusChangedCallback(); | |
| 103 }; | |
| 104 } | |
| 105 } | |
| 106 | |
| 107 /** | |
| 108 * Sets the status changed callback, which will be invoked whenever the | |
| 109 * Fetcher's status has changed. | |
| 110 */ | |
| 111 setStatusChangedCallback(fn: () => void) { | |
| 112 this.statusChangedCallback = fn; | |
| 113 } | |
| 114 | |
| 115 /** | |
| 116 * Returns a Promise that resolves to the next block of logs in the stream. | |
|
hinoka
2017/02/27 21:50:25
"that will resolve to"
dnj
2017/03/08 04:13:43
Done.
| |
| 117 * | |
| 118 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the | |
| 119 * next block of logs in the stream, or null if there are no logs to | |
|
hinoka
2017/02/27 21:50:25
"no logs to return currently" or "no more logs to
dnj
2017/03/08 04:13:43
Pretty sure this can't be null. Updated comment.
| |
| 120 * return. | |
| 121 */ | |
| 122 get(index: number, opts: FetcherOptions): Promise<LogDog.LogEntry[]> { | |
| 123 return this.getIndex(index, opts); | |
| 124 } | |
| 125 | |
| 126 /** | |
| 127 * Returns a Promise that resolves to "count" log entries starting at | |
| 128 * "startIndex". | |
| 129 * | |
| 130 * If multiple RPC calls are required to retrieve "count" entries, these | |
| 131 * will be scheduled, and the Promise will block until the full set of | |
| 132 * requested stream entries is retrieved. | |
| 133 */ | |
| 134 getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> { | |
| 135 // Request the tail walkback logs. Since our request for N logs may return | |
| 136 // <N logs, we will repeat the request until all requested logs have been | |
| 137 // obtained. | |
| 138 let allLogs: LogDog.LogEntry[] = []; | |
| 139 | |
| 140 let getIter = (): Promise<LogDog.LogEntry[]> => { | |
| 141 if (count <= 0) { | |
| 142 return Promise.resolve(allLogs); | |
| 143 } | |
| 144 | |
| 145 // Perform Gets until we have the requested number of logs. We don't | |
| 146 // have to constrain the "logCount" parameter b/c we automatically do | |
| 147 // that in getIndex. | |
| 148 let opts: FetcherOptions = { | |
| 149 logCount: count, | |
| 150 sparse: true, | |
| 151 }; | |
| 152 return this.getIndex(startIndex, opts).then((logs) => { | |
| 153 if (logs) { | |
| 154 allLogs.push.apply(allLogs, logs); | |
| 155 startIndex += logs.length; | |
| 156 count -= logs.length; | |
| 157 } | |
| 158 return getIter(); | |
| 159 }); | |
| 160 }; | |
| 161 return getIter(); | |
| 162 } | |
| 163 | |
| 164 /** | |
| 165 * Fetches the tail log entry. | |
| 166 */ | |
| 167 tail(): Promise<LogDog.LogEntry[]> { | |
|
hinoka
2017/02/27 21:50:25
getLast()?
dnj
2017/03/08 04:13:43
getLatest
| |
| 168 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); | |
| 169 let tryTail = (): Promise<LogDog.LogEntry[]> => { | |
| 170 return this.doTail().then((logs): Promise<LogDog.LogEntry[]> => { | |
| 171 if (logs && logs.length) { | |
| 172 return Promise.resolve(logs); | |
| 173 } | |
| 174 | |
| 175 // No logs were returned, and we expect logs, so we're streaming. Try | |
| 176 // again after a delay. | |
| 177 this.setCurrentStatus(FetcherStatus.STREAMING); | |
| 178 let delay = streamingRetry.next(); | |
| 179 console.warn( | |
| 180 this.stream, `: No logs returned; retrying after ${delay}ms...`); | |
| 181 return luci.sleepPromise(delay).then(() => { | |
| 182 return tryTail(); | |
| 183 }); | |
| 184 }); | |
| 185 }; | |
| 186 return tryTail(); | |
| 187 } | |
| 188 | |
| 189 | |
| 190 private getIndex(index: number, opts: FetcherOptions): | |
| 191 Promise<LogDog.LogEntry[]> { | |
| 192 // (Testing) Constrain our max logs, if set. | |
|
hinoka
2017/02/27 21:50:25
Is this actually testing?
dnj
2017/03/08 04:13:43
Used by another module, but yes.
| |
| 193 if (Fetcher.maxLogsPerGet > 0) { | |
| 194 if (!opts) { | |
| 195 opts = {}; | |
| 196 } | |
| 197 if ((!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet) { | |
| 198 opts.logCount = Fetcher.maxLogsPerGet; | |
| 199 } | |
| 200 } | |
| 201 | |
| 202 // We will retry continuously until we get a log (streaming). | |
| 203 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); | |
|
hinoka
2017/02/27 21:50:25
Why are there 3 layers of retries (I counted 3. t
dnj
2017/03/08 04:13:43
There should be two: "retryIterator":
streaming:
| |
| 204 let tryGet = (): Promise<LogDog.LogEntry[]> => { | |
| 205 // If we're asking for a log beyond our stream, don't bother. | |
| 206 if (this.terminalIndex >= 0 && index > this.terminalIndex) { | |
| 207 return Promise.resolve([]); | |
| 208 } | |
| 209 | |
| 210 return this.doGet(index, opts).then((logs) => { | |
| 211 if (logs && logs.length) { | |
| 212 // Since we allow non-contiguous Get, we may get back more logs than | |
| 213 // we actually expected. Prune any such additional. | |
| 214 if (opts.logCount > 0) { | |
| 215 let maxStreamIndex = index + opts.logCount - 1; | |
| 216 logs = logs.filter((le) => { | |
| 217 return le.streamIndex <= maxStreamIndex; | |
| 218 }); | |
| 219 } | |
| 220 | |
| 221 return Promise.resolve(logs); | |
| 222 } | |
| 223 | |
| 224 // No logs were returned, and we expect logs, so we're streaming. Try | |
| 225 // again after a delay. | |
| 226 this.setCurrentStatus(FetcherStatus.STREAMING); | |
| 227 let delay = streamingRetry.next(); | |
| 228 console.warn( | |
| 229 this.stream, `: No logs returned; retrying after ${delay}ms...`); | |
| 230 return luci.sleepPromise(delay).then(() => { | |
| 231 return tryGet(); | |
|
hinoka
2017/02/27 21:50:25
Wouldn't this increase the calling stack size on e
dnj
2017/03/08 04:13:43
I think so. Simplified via "do", which should not
| |
| 232 }); | |
| 233 }); | |
| 234 }; | |
| 235 return tryGet(); | |
| 236 } | |
| 237 | |
| 238 private doGet(index: number, opts: FetcherOptions): | |
| 239 Promise<LogDog.LogEntry[]> { | |
| 240 let request: { | |
| 241 project: string; path: string; state: boolean; index: number; | |
| 242 | |
| 243 nonContiguous?: boolean; | |
| 244 byteCount?: number; | |
| 245 logCount?: number; | |
| 246 } = { | |
| 247 project: this.stream.project, | |
| 248 path: this.stream.path, | |
| 249 state: (this.terminalIndex < 0), | |
| 250 index: index, | |
| 251 }; | |
| 252 if (opts.sparse || this.archived) { | |
| 253 // This log stream is archived. We will relax the contiguous requirement | |
| 254 // so we can render sparse log streams. | |
| 255 request.nonContiguous = true; | |
| 256 } | |
| 257 if (opts) { | |
| 258 if (opts.byteCount > 0) { | |
| 259 request.byteCount = opts.byteCount; | |
| 260 } | |
| 261 if (opts.logCount > 0) { | |
| 262 request.logCount = opts.logCount; | |
| 263 } | |
| 264 } | |
| 265 | |
| 266 if (this.debug) { | |
| 267 console.log('logdog.Logs.Get:', request); | |
| 268 } | |
| 269 | |
| 270 // Perform our Get, waiting until the stream actually exists. | |
| 271 return this | |
| 272 .doRetryIfMissing( | |
| 273 (): | |
| 274 Promise<FetchResult> => { | |
| 275 return this.client.call('logdog.Logs', 'Get', request) | |
| 276 .then((resp: GetResponse): FetchResult => { | |
| 277 return FetchResult.make(resp, this.lastDesc); | |
| 278 }); | |
| 279 }) | |
| 280 .then((fr) => { | |
| 281 return this.afterProcessResult(fr); | |
| 282 }); | |
| 283 } | |
| 284 | |
| 285 private doTail(): Promise<LogDog.LogEntry[]> { | |
|
hinoka
2017/02/27 21:50:25
doGetLast()?
dnj
2017/03/08 04:13:43
Since Tail is actually the name of the RPC call, I
| |
| 286 let request: {project: string; path: string; state: boolean;} = { | |
| 287 project: this.stream.project, | |
| 288 path: this.stream.path, | |
| 289 state: (this.terminalIndex < 0), | |
| 290 }; | |
| 291 | |
| 292 if (this.debug) { | |
| 293 console.log('logdog.Logs.Tail:', request); | |
| 294 } | |
| 295 | |
| 296 return this | |
| 297 .doRetryIfMissing( | |
| 298 (): | |
| 299 Promise<FetchResult> => { | |
| 300 return this.client.call('logdog.Logs', 'Tail', request) | |
| 301 .then((resp: GetResponse): FetchResult => { | |
| 302 return FetchResult.make(resp, this.lastDesc); | |
| 303 }); | |
| 304 }) | |
| 305 .then((fr) => { | |
| 306 return this.afterProcessResult(fr); | |
| 307 }); | |
| 308 } | |
| 309 | |
| 310 private afterProcessResult(fr: FetchResult): LogDog.LogEntry[] { | |
| 311 if (this.debug) { | |
| 312 if (fr.logs.length) { | |
| 313 console.log( | |
| 314 'Request returned:', fr.logs[0].streamIndex, '..', | |
| 315 fr.logs[fr.logs.length - 1].streamIndex, fr.desc, fr.state); | |
| 316 } else { | |
| 317 console.log('Request returned no logs:', fr.desc, fr.state); | |
| 318 } | |
| 319 } | |
| 320 | |
| 321 this.setCurrentStatus(FetcherStatus.IDLE); | |
| 322 if (fr.desc) { | |
| 323 this.lastDesc = fr.desc; | |
| 324 } | |
| 325 if (fr.state) { | |
| 326 this.lastState = fr.state; | |
| 327 } | |
| 328 return fr.logs; | |
| 329 } | |
| 330 | |
| 331 private doRetryIfMissing(fn: () => Promise<FetchResult>): | |
| 332 Promise<FetchResult> { | |
| 333 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry); | |
| 334 | |
| 335 let doIt = (): Promise<FetchResult> => { | |
| 336 this.setCurrentStatus(FetcherStatus.LOADING); | |
| 337 | |
| 338 return fn().catch((err: Error) => { | |
| 339 // Is this a gRPC Error? | |
| 340 let grpc = luci.GrpcError.convert(err); | |
| 341 if (grpc && grpc.code === luci.Code.NOT_FOUND) { | |
| 342 this.setCurrentStatus(FetcherStatus.MISSING); | |
| 343 | |
| 344 let delay = missingRetry.next(); | |
| 345 console.warn( | |
| 346 this.stream, ': Is not found:', err, | |
| 347 `; retrying after ${delay}ms...`); | |
| 348 return luci.sleepPromise(delay).then(() => { | |
| 349 return doIt(); | |
| 350 }); | |
| 351 } | |
| 352 | |
| 353 this.setCurrentStatus(FetcherStatus.ERROR, err); | |
| 354 throw err; | |
| 355 }); | |
| 356 }; | |
| 357 return doIt(); | |
| 358 } | |
| 359 } | |
| 360 | |
| 361 /** | |
| 362 * The result of a log stream fetch, for internal usage. | |
| 363 * | |
| 364 * It will include zero or more log entries, and optionally (if requested) | |
| 365 * the log stream's descriptor and state. | |
| 366 */ | |
| 367 class FetchResult { | |
| 368 constructor( | |
| 369 readonly logs: LogDog.LogEntry[], | |
| 370 readonly desc?: LogDog.LogStreamDescriptor, | |
| 371 readonly state?: LogDog.LogStreamState) {} | |
| 372 | |
| 373 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor): | |
| 374 FetchResult { | |
| 375 let loadDesc: LogDog.LogStreamDescriptor|undefined; | |
| 376 if (resp.desc) { | |
| 377 desc = loadDesc = LogDog.LogStreamDescriptor.make(resp.desc); | |
| 378 } | |
| 379 | |
| 380 let loadState: LogDog.LogStreamState|undefined; | |
| 381 if (resp.state) { | |
| 382 loadState = LogDog.LogStreamState.make(resp.state); | |
| 383 } | |
| 384 | |
| 385 let logs = (resp.logs || []).map((le) => { | |
| 386 return LogDog.LogEntry.make(le, desc); | |
| 387 }); | |
| 388 return new FetchResult(logs, loadDesc, loadState); | |
| 389 } | |
| 390 } | |
| 391 } | |
| OLD | NEW |