Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 /* | |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. | |
| 3 Use)) of this source code is governed under the Apache License, Version 2.0 | |
| 4 that can be found in the LICENSE file. | |
| 5 */ | |
| 6 | |
| 7 ///<reference path="../logdog-stream/logdog.ts" /> | |
| 8 ///<reference path="../logdog-stream/client.ts" /> | |
| 9 ///<reference path="../luci-operation/operation.ts" /> | |
| 10 | |
| 11 namespace LogDog { | |
| 12 | |
| 13 /** Options that can be passed to fetch operations. */ | |
| 14 export type FetcherOptions = { | |
| 15 /** | |
| 16 * The maximum number of bytes to fetch. If undefined, no maximum will be | |
| 17 * specified, and the service will constrain the results. | |
| 18 */ | |
| 19 byteCount?: number; | |
| 20 /** | |
| 21 * The maximum number of logs to fetch. If undefined, no maximum will be | |
| 22 * specified, and the service will constrain the results. | |
| 23 */ | |
| 24 logCount?: number; | |
| 25 /** If defined and true, allow a fetch to return non-continuous entries. */ | |
| 26 sparse?: boolean; | |
| 27 }; | |
| 28 | |
| 29 /** The Fetcher's current status. */ | |
| 30 export enum FetchStatus { | |
| 31 // Not doing anything. | |
| 32 IDLE, | |
| 33 // Attempting to load log data. | |
| 34 LOADING, | |
| 35 // We're waiting for the log stream to emit more logs. | |
| 36 STREAMING, | |
| 37 // The log stream is missing. | |
| 38 MISSING, | |
| 39 // The log stream encountered an error. | |
| 40 ERROR, | |
| 41 // The operaiton has been cancelled. | |
| 42 CANCELLED, | |
| 43 } | |
| 44 | |
| 45 /** Fetch represents a single fetch operation. */ | |
| 46 export class Fetch { | |
| 47 readonly op: luci.Operation; | |
| 48 private stateChangedCallbacks = new Array<(f: Fetch) => void>(); | |
| 49 | |
| 50 constructor( | |
| 51 private ctx: FetchContext, readonly p: Promise<LogDog.LogEntry[]>) { | |
| 52 this.op = this.ctx.op; | |
| 53 | |
| 54 // We will now get notifications if our Context's state changes. | |
| 55 this.ctx.stateChangedCallback = this.onStateChanged.bind(this); | |
| 56 } | |
| 57 | |
| 58 get lastStatus(): FetchStatus { | |
| 59 return this.ctx.lastStatus; | |
| 60 } | |
| 61 | |
| 62 get lastError(): Error|undefined { | |
| 63 return this.ctx.lastError; | |
| 64 } | |
| 65 | |
| 66 addStateChangedCallback(cb: (f: Fetch) => void) { | |
|
nodir
2017/03/13 19:48:22
add removeStateChangedCallback?
dnj
2017/03/14 00:14:41
Not necessary, won't ever be called.
| |
| 67 this.stateChangedCallbacks.push(cb); | |
| 68 } | |
| 69 | |
| 70 private onStateChanged() { | |
| 71 this.stateChangedCallbacks.forEach(cb => cb(this)); | |
| 72 } | |
| 73 } | |
| 74 | |
| 75 /** | |
| 76 * FetchContext is an internal context used to pair all of the common | |
| 77 * parameters involved in a fetch operation. | |
| 78 */ | |
| 79 class FetchContext { | |
| 80 private lastStatusValue = FetchStatus.IDLE; | |
| 81 private lastErrorValue?: Error; | |
| 82 | |
| 83 stateChangedCallback: () => void; | |
| 84 | |
| 85 constructor(readonly op: luci.Operation) { | |
| 86 // If our operation is cancelled, update our status to note this. | |
| 87 op.addCancelCallback(() => { | |
| 88 this.lastStatusValue = FetchStatus.CANCELLED; | |
| 89 this.lastErrorValue = undefined; | |
| 90 }); | |
| 91 } | |
| 92 | |
| 93 get lastStatus(): FetchStatus { | |
| 94 return this.lastStatusValue; | |
| 95 } | |
| 96 | |
| 97 get lastError(): Error|undefined { | |
| 98 return this.lastErrorValue; | |
| 99 } | |
| 100 | |
| 101 updateStatus(st: FetchStatus, err?: Error) { | |
| 102 if (this.op.cancelled) { | |
| 103 // No more status updates, force cancelled. | |
| 104 st = FetchStatus.CANCELLED; | |
| 105 err = undefined; | |
| 106 } | |
| 107 | |
| 108 if (!(st === this.lastStatusValue && err !== this.lastErrorValue)) { | |
|
nodir
2017/03/13 19:48:22
err === this.lastErrorValue
?
dnj
2017/03/14 00:14:41
good catch, fixed.
| |
| 109 this.lastStatusValue = st; | |
| 110 this.lastErrorValue = err; | |
| 111 | |
| 112 this.notifyStateChanged(); | |
| 113 } | |
| 114 } | |
| 115 | |
| 116 notifyStateChanged() { | |
| 117 // If our Fetch has assigned our callback, notify it. | |
| 118 if (this.stateChangedCallback) { | |
| 119 this.stateChangedCallback(); | |
| 120 } | |
| 121 } | |
| 122 } | |
| 123 | |
| 124 /** | |
| 125 * Fetcher is responsible for fetching LogDog log stream entries from the | |
| 126 * remote service via an RPC client. | |
| 127 * | |
| 128 * Fetcher is responsible for wrapping the raw RPC calls and their results, | |
| 129 * and retrying calls due to: | |
| 130 * | |
| 131 * - Transient failures (via RPC client). | |
| 132 * - Missing stream (assumption is that the stream is still being ingested and | |
| 133 * registered, and therefore a repeated retry is appropriate). | |
| 134 * - Streaming stream (log stream is not terminated, but more records are not | |
| 135 * yet available). | |
| 136 * | |
| 137 * The interface that Fetcher presents to its caller is a simple Promise-based | |
| 138 * method to retrieve log stream data. | |
| 139 * | |
| 140 * Fetcher offers fetching via "get", "getAll", and "getLatest". | |
| 141 */ | |
| 142 export class Fetcher { | |
| 143 private debug = false; | |
| 144 private static maxLogsPerGet = 0; | |
| 145 | |
| 146 private lastDesc: LogDog.LogStreamDescriptor; | |
| 147 private lastState: LogDog.LogStreamState; | |
| 148 | |
| 149 private static missingRetry: luci.Retry = {delay: 5000, maxDelay: 15000}; | |
| 150 private static streamingRetry: luci.Retry = {delay: 1000, maxDelay: 5000}; | |
| 151 | |
| 152 constructor( | |
| 153 private client: LogDog.Client, readonly stream: LogDog.StreamPath) {} | |
| 154 | |
| 155 get desc() { | |
| 156 return this.lastDesc; | |
| 157 } | |
| 158 get state() { | |
| 159 return this.lastState; | |
| 160 } | |
| 161 | |
| 162 /** | |
| 163 * Returns the log stream's terminal index. | |
| 164 * | |
| 165 * If no terminal index is known (the log is still streaming) this will | |
| 166 * return -1. | |
| 167 */ | |
| 168 get terminalIndex(): number { | |
| 169 return ((this.lastState) ? this.lastState.terminalIndex : -1); | |
| 170 } | |
| 171 | |
| 172 /** Archived returns true if this log stream is known to be archived. */ | |
| 173 get archived(): boolean { | |
| 174 return (!!(this.lastState && this.lastState.archive)); | |
| 175 } | |
| 176 | |
| 177 /** | |
| 178 * Returns a Promise that will resolve to the next block of logs in the | |
| 179 * stream. | |
| 180 * | |
| 181 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the | |
| 182 * next block of logs in the stream. | |
| 183 */ | |
| 184 get(op: luci.Operation, index: number, opts: FetcherOptions): Fetch { | |
| 185 let ctx = new FetchContext(op); | |
| 186 return new Fetch(ctx, this.getIndex(ctx, index, opts)); | |
| 187 } | |
| 188 | |
| 189 /** | |
| 190 * Returns a Promise that will resolve to "count" log entries starting at | |
| 191 * "startIndex". | |
| 192 * | |
| 193 * If multiple RPC calls are required to retrieve "count" entries, these | |
| 194 * will be scheduled, and the Promise will block until the full set of | |
| 195 * requested stream entries is retrieved. | |
| 196 */ | |
| 197 getAll(op: luci.Operation, startIndex: number, count: number): Fetch { | |
| 198 // Request the tail walkback logs. Since our request for N logs may return | |
| 199 // <N logs, we will repeat the request until all requested logs have been | |
| 200 // obtained. | |
| 201 let allLogs: LogDog.LogEntry[] = []; | |
| 202 | |
| 203 let ctx = new FetchContext(op); | |
| 204 let getIter = (): Promise<LogDog.LogEntry[]> => { | |
| 205 op.assert(); | |
| 206 | |
| 207 if (count <= 0) { | |
| 208 return Promise.resolve(allLogs); | |
| 209 } | |
| 210 | |
| 211 // Perform Gets until we have the requested number of logs. We don't | |
| 212 // have to constrain the "logCount" parameter b/c we automatically do | |
| 213 // that in getIndex. | |
| 214 let opts: FetcherOptions = { | |
| 215 logCount: count, | |
| 216 sparse: true, | |
| 217 }; | |
| 218 return this.getIndex(ctx, startIndex, opts).then(logs => { | |
| 219 op.assert(); | |
| 220 | |
| 221 if (logs && logs.length) { | |
| 222 allLogs.push.apply(allLogs, logs); | |
| 223 startIndex += logs.length; | |
| 224 count -= logs.length; | |
| 225 } | |
| 226 if (count > 0) { | |
| 227 // Recurse. | |
| 228 } | |
| 229 return Promise.resolve(allLogs); | |
| 230 }); | |
| 231 }; | |
| 232 return new Fetch(ctx, getIter()); | |
| 233 } | |
| 234 | |
| 235 /** | |
| 236 * Fetches the latest log entry. | |
| 237 */ | |
| 238 getLatest(op: luci.Operation): Fetch { | |
| 239 let errNoLogs = new Error('no logs, streaming'); | |
| 240 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); | |
| 241 let ctx = new FetchContext(op); | |
| 242 return new Fetch( | |
| 243 ctx, | |
| 244 streamingRetry.do( | |
| 245 () => { | |
| 246 return this.doTail(ctx).then(logs => { | |
| 247 if (!(logs && logs.length)) { | |
| 248 throw errNoLogs; | |
| 249 } | |
| 250 return logs; | |
| 251 }); | |
| 252 }, | |
| 253 (err: Error, delay: number) => { | |
| 254 if (err !== errNoLogs) { | |
| 255 throw err; | |
| 256 } | |
| 257 | |
| 258 // No logs were returned, and we expect logs, so we're | |
| 259 // streaming. Try again after a delay. | |
| 260 ctx.updateStatus(FetchStatus.STREAMING); | |
| 261 console.warn( | |
| 262 this.stream, | |
| 263 `: No logs returned; retrying after ${delay}ms...`); | |
| 264 })); | |
| 265 } | |
| 266 | |
| 267 private getIndex(ctx: FetchContext, index: number, opts: FetcherOptions): | |
| 268 Promise<LogDog.LogEntry[]> { | |
| 269 // (Testing) Constrain our max logs, if set. | |
| 270 if (Fetcher.maxLogsPerGet > 0) { | |
| 271 if ((!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet) { | |
| 272 opts.logCount = Fetcher.maxLogsPerGet; | |
| 273 } | |
| 274 } | |
| 275 | |
| 276 // We will retry continuously until we get a log (streaming). | |
| 277 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); | |
| 278 let errNoLogs = new Error('no logs, streaming'); | |
| 279 return streamingRetry | |
| 280 .do( | |
| 281 () => { | |
| 282 // If we're asking for a log beyond our stream, don't bother. | |
| 283 if (this.terminalIndex >= 0 && index > this.terminalIndex) { | |
| 284 return Promise.resolve([]); | |
| 285 } | |
| 286 | |
| 287 return this.doGet(ctx, index, opts).then(logs => { | |
| 288 ctx.op.assert(); | |
| 289 | |
| 290 if (!(logs && logs.length)) { | |
| 291 // (Retry) | |
| 292 throw errNoLogs; | |
| 293 } | |
| 294 | |
| 295 return logs; | |
| 296 }); | |
| 297 }, | |
| 298 (err: Error, delay: number) => { | |
| 299 ctx.op.assert(); | |
| 300 | |
| 301 if (err !== errNoLogs) { | |
| 302 throw err; | |
| 303 } | |
| 304 | |
| 305 // No logs were returned, and we expect logs, so we're | |
| 306 // streaming. Try again after a delay. | |
| 307 ctx.updateStatus(FetchStatus.STREAMING); | |
| 308 console.warn( | |
| 309 this.stream, | |
| 310 `: No logs returned; retrying after ${delay}ms...`); | |
| 311 }) | |
| 312 .then(logs => { | |
| 313 ctx.op.assert(); | |
| 314 | |
| 315 // Since we allow non-contiguous Get, we may get back more logs than | |
| 316 // we actually expected. Prune any such additional. | |
| 317 if (opts.sparse && opts.logCount && opts.logCount > 0) { | |
| 318 let maxStreamIndex = index + opts.logCount - 1; | |
| 319 logs = logs.filter(le => le.streamIndex <= maxStreamIndex); | |
| 320 } | |
| 321 return logs; | |
| 322 }); | |
| 323 } | |
| 324 | |
| 325 private doGet(ctx: FetchContext, index: number, opts: FetcherOptions): | |
| 326 Promise<LogDog.LogEntry[]> { | |
| 327 let request: LogDog.GetRequest = { | |
| 328 project: this.stream.project, | |
| 329 path: this.stream.path, | |
| 330 state: (this.terminalIndex < 0), | |
| 331 index: index, | |
| 332 }; | |
| 333 if (opts.sparse || this.archived) { | |
| 334 // This log stream is archived. We will relax the contiguous requirement | |
| 335 // so we can render sparse log streams. | |
| 336 request.nonContiguous = true; | |
| 337 } | |
| 338 if (opts.byteCount && opts.byteCount > 0) { | |
| 339 request.byteCount = opts.byteCount; | |
| 340 } | |
| 341 if (opts.logCount && opts.logCount > 0) { | |
| 342 request.logCount = opts.logCount; | |
| 343 } | |
| 344 | |
| 345 if (this.debug) { | |
| 346 console.log('logdog.Logs.Get:', request); | |
| 347 } | |
| 348 | |
| 349 // Perform our Get, waiting until the stream actually exists. | |
| 350 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry); | |
| 351 return missingRetry | |
| 352 .do( | |
| 353 () => { | |
| 354 ctx.updateStatus(FetchStatus.LOADING); | |
| 355 return this.client.get(request); | |
| 356 }, | |
| 357 this.doRetryIfMissing(ctx)) | |
| 358 .then((resp: GetResponse) => { | |
| 359 let fr = FetchResult.make(resp, this.lastDesc); | |
| 360 return this.afterProcessResult(ctx, fr); | |
| 361 }); | |
| 362 } | |
| 363 | |
| 364 private doTail(ctx: FetchContext): Promise<LogDog.LogEntry[]> { | |
| 365 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry); | |
| 366 return missingRetry | |
| 367 .do( | |
| 368 () => { | |
| 369 ctx.updateStatus(FetchStatus.LOADING); | |
| 370 let needsState = (this.terminalIndex < 0); | |
| 371 return this.client.tail(this.stream, needsState); | |
| 372 }, | |
| 373 this.doRetryIfMissing(ctx)) | |
| 374 .then((resp: GetResponse) => { | |
| 375 let fr = FetchResult.make(resp, this.lastDesc); | |
| 376 return this.afterProcessResult(ctx, fr); | |
| 377 }); | |
| 378 } | |
| 379 | |
| 380 private afterProcessResult(ctx: FetchContext, fr: FetchResult): | |
| 381 LogDog.LogEntry[] { | |
| 382 if (this.debug) { | |
| 383 if (fr.logs.length) { | |
| 384 console.log( | |
| 385 'Request returned:', fr.logs[0].streamIndex, '..', | |
| 386 fr.logs[fr.logs.length - 1].streamIndex, fr.desc, fr.state); | |
| 387 } else { | |
| 388 console.log('Request returned no logs:', fr.desc, fr.state); | |
| 389 } | |
| 390 } | |
| 391 | |
| 392 ctx.updateStatus(FetchStatus.IDLE); | |
| 393 if (fr.desc) { | |
| 394 this.lastDesc = fr.desc; | |
|
nodir
2017/03/13 19:48:22
AFAIU no race here because desc is going to be the
dnj
2017/03/14 00:14:41
Correct.
| |
| 395 } | |
| 396 if (fr.state) { | |
| 397 this.lastState = fr.state; | |
| 398 } | |
| 399 return fr.logs; | |
| 400 } | |
| 401 | |
| 402 private doRetryIfMissing(ctx: FetchContext) { | |
| 403 return (err: Error, delay: number) => { | |
| 404 ctx.op.assert(); | |
| 405 | |
| 406 // Is this a gRPC Error? | |
| 407 let grpc = luci.GrpcError.convert(err); | |
| 408 if (grpc && grpc.code === luci.Code.NOT_FOUND) { | |
| 409 ctx.updateStatus(FetchStatus.MISSING); | |
| 410 | |
| 411 console.warn( | |
| 412 this.stream, ': Is not found:', err, | |
| 413 `; retrying after ${delay}ms...`); | |
| 414 return; | |
| 415 } | |
| 416 | |
| 417 ctx.updateStatus(FetchStatus.ERROR, err); | |
| 418 throw err; | |
| 419 }; | |
| 420 } | |
| 421 } | |
| 422 | |
| 423 /** | |
| 424 * The result of a log stream fetch, for internal usage. | |
| 425 * | |
| 426 * It will include zero or more log entries, and optionally (if requested) | |
| 427 * the log stream's descriptor and state. | |
| 428 */ | |
| 429 class FetchResult { | |
| 430 constructor( | |
| 431 readonly logs: LogDog.LogEntry[], | |
| 432 readonly desc?: LogDog.LogStreamDescriptor, | |
| 433 readonly state?: LogDog.LogStreamState) {} | |
| 434 | |
| 435 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor): | |
| 436 FetchResult { | |
| 437 let loadDesc: LogDog.LogStreamDescriptor|undefined; | |
| 438 if (resp.desc) { | |
| 439 desc = loadDesc = LogDog.LogStreamDescriptor.make(resp.desc); | |
| 440 } | |
| 441 | |
| 442 let loadState: LogDog.LogStreamState|undefined; | |
| 443 if (resp.state) { | |
| 444 loadState = LogDog.LogStreamState.make(resp.state); | |
| 445 } | |
| 446 | |
| 447 let logs = (resp.logs || []).map(le => LogDog.LogEntry.make(le, desc)); | |
| 448 return new FetchResult(logs, loadDesc, loadState); | |
| 449 } | |
| 450 } | |
| 451 } | |
| OLD | NEW |