| OLD | NEW |
| (Empty) | |
| 1 /* |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 Use)) of this source code is governed under the Apache License, Version 2.0 |
| 4 that can be found in the LICENSE file. |
| 5 */ |
| 6 |
| 7 ///<reference path="../logdog-stream/logdog.ts" /> |
| 8 ///<reference path="../logdog-stream/client.ts" /> |
| 9 ///<reference path="../luci-operation/operation.ts" /> |
| 10 |
| 11 namespace LogDog { |
| 12 |
| 13 /** Options that can be passed to fetch operations. */ |
| 14 export type FetcherOptions = { |
| 15 /** |
| 16 * The maximum number of bytes to fetch. If undefined, no maximum will be |
| 17 * specified, and the service will constrain the results. |
| 18 */ |
| 19 byteCount?: number; |
| 20 /** |
| 21 * The maximum number of logs to fetch. If undefined, no maximum will be |
| 22 * specified, and the service will constrain the results. |
| 23 */ |
| 24 logCount?: number; |
| 25 /** If defined and true, allow a fetch to return non-continuous entries. */ |
| 26 sparse?: boolean; |
| 27 }; |
| 28 |
| 29 /** The Fetcher's current status. */ |
| 30 export enum FetchStatus { |
| 31 // Not doing anything. |
| 32 IDLE, |
| 33 // Attempting to load log data. |
| 34 LOADING, |
| 35 // We're waiting for the log stream to emit more logs. |
| 36 STREAMING, |
| 37 // The log stream is missing. |
| 38 MISSING, |
| 39 // The log stream encountered an error. |
| 40 ERROR, |
| 41 // The operaiton has been cancelled. |
| 42 CANCELLED, |
| 43 } |
| 44 |
| 45 /** Fetch represents a single fetch operation. */ |
| 46 export class Fetch { |
| 47 readonly op: luci.Operation; |
| 48 private stateChangedCallbacks = new Array<(f: Fetch) => void>(); |
| 49 |
| 50 constructor( |
| 51 private ctx: FetchContext, readonly p: Promise<LogDog.LogEntry[]>) { |
| 52 this.op = this.ctx.op; |
| 53 |
| 54 // We will now get notifications if our Context's state changes. |
| 55 this.ctx.stateChangedCallback = this.onStateChanged.bind(this); |
| 56 } |
| 57 |
| 58 get lastStatus(): FetchStatus { |
| 59 return this.ctx.lastStatus; |
| 60 } |
| 61 |
| 62 get lastError(): Error|undefined { |
| 63 return this.ctx.lastError; |
| 64 } |
| 65 |
| 66 addStateChangedCallback(cb: (f: Fetch) => void) { |
| 67 this.stateChangedCallbacks.push(cb); |
| 68 } |
| 69 |
| 70 private onStateChanged() { |
| 71 this.stateChangedCallbacks.forEach(cb => cb(this)); |
| 72 } |
| 73 } |
| 74 |
| 75 /** |
| 76 * FetchContext is an internal context used to pair all of the common |
| 77 * parameters involved in a fetch operation. |
| 78 */ |
| 79 class FetchContext { |
| 80 private _lastStatus = FetchStatus.IDLE; |
| 81 private _lastError?: Error; |
| 82 |
| 83 stateChangedCallback: () => void; |
| 84 |
| 85 constructor(readonly op: luci.Operation) { |
| 86 // If our operation is cancelled, update our status to note this. |
| 87 op.addCancelCallback(() => { |
| 88 this._lastStatus = FetchStatus.CANCELLED; |
| 89 this._lastError = undefined; |
| 90 }); |
| 91 } |
| 92 |
| 93 get lastStatus(): FetchStatus { |
| 94 return this._lastStatus; |
| 95 } |
| 96 |
| 97 get lastError(): Error|undefined { |
| 98 return this._lastError; |
| 99 } |
| 100 |
| 101 updateStatus(st: FetchStatus, err?: Error) { |
| 102 if (this.op.cancelled) { |
| 103 // No more status updates, force cancelled. |
| 104 st = FetchStatus.CANCELLED; |
| 105 err = undefined; |
| 106 } |
| 107 |
| 108 if (st === this._lastStatus && err === this._lastError) { |
| 109 return; |
| 110 } |
| 111 |
| 112 this._lastStatus = st; |
| 113 this._lastError = err; |
| 114 this.notifyStateChanged(); |
| 115 } |
| 116 |
| 117 notifyStateChanged() { |
| 118 // If our Fetch has assigned our callback, notify it. |
| 119 if (this.stateChangedCallback) { |
| 120 this.stateChangedCallback(); |
| 121 } |
| 122 } |
| 123 } |
| 124 |
| 125 /** |
| 126 * Fetcher is responsible for fetching LogDog log stream entries from the |
| 127 * remote service via an RPC client. |
| 128 * |
| 129 * Fetcher is responsible for wrapping the raw RPC calls and their results, |
| 130 * and retrying calls due to: |
| 131 * |
| 132 * - Transient failures (via RPC client). |
| 133 * - Missing stream (assumption is that the stream is still being ingested and |
| 134 * registered, and therefore a repeated retry is appropriate). |
| 135 * - Streaming stream (log stream is not terminated, but more records are not |
| 136 * yet available). |
| 137 * |
| 138 * The interface that Fetcher presents to its caller is a simple Promise-based |
| 139 * method to retrieve log stream data. |
| 140 * |
| 141 * Fetcher offers fetching via "get", "getAll", and "getLatest". |
| 142 */ |
| 143 export class Fetcher { |
| 144 private debug = false; |
| 145 private static maxLogsPerGet = 0; |
| 146 |
| 147 private lastDesc: LogDog.LogStreamDescriptor; |
| 148 private lastState: LogDog.LogStreamState; |
| 149 |
| 150 private static missingRetry: luci.Retry = {delay: 5000, maxDelay: 15000}; |
| 151 private static streamingRetry: luci.Retry = {delay: 1000, maxDelay: 5000}; |
| 152 |
| 153 constructor( |
| 154 private client: LogDog.Client, readonly stream: LogDog.StreamPath) {} |
| 155 |
| 156 get desc() { |
| 157 return this.lastDesc; |
| 158 } |
| 159 get state() { |
| 160 return this.lastState; |
| 161 } |
| 162 |
| 163 /** |
| 164 * Returns the log stream's terminal index. |
| 165 * |
| 166 * If no terminal index is known (the log is still streaming) this will |
| 167 * return -1. |
| 168 */ |
| 169 get terminalIndex(): number { |
| 170 return ((this.lastState) ? this.lastState.terminalIndex : -1); |
| 171 } |
| 172 |
| 173 /** Archived returns true if this log stream is known to be archived. */ |
| 174 get archived(): boolean { |
| 175 return (!!(this.lastState && this.lastState.archive)); |
| 176 } |
| 177 |
| 178 /** |
| 179 * Returns a Promise that will resolve to the next block of logs in the |
| 180 * stream. |
| 181 * |
| 182 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the |
| 183 * next block of logs in the stream. |
| 184 */ |
| 185 get(op: luci.Operation, index: number, opts: FetcherOptions): Fetch { |
| 186 let ctx = new FetchContext(op); |
| 187 return new Fetch(ctx, this.getIndex(ctx, index, opts)); |
| 188 } |
| 189 |
| 190 /** |
| 191 * Returns a Promise that will resolve to "count" log entries starting at |
| 192 * "startIndex". |
| 193 * |
| 194 * If multiple RPC calls are required to retrieve "count" entries, these |
| 195 * will be scheduled, and the Promise will block until the full set of |
| 196 * requested stream entries is retrieved. |
| 197 */ |
| 198 getAll(op: luci.Operation, startIndex: number, count: number): Fetch { |
| 199 // Request the tail walkback logs. Since our request for N logs may return |
| 200 // <N logs, we will repeat the request until all requested logs have been |
| 201 // obtained. |
| 202 let allLogs: LogDog.LogEntry[] = []; |
| 203 |
| 204 let ctx = new FetchContext(op); |
| 205 let getIter = (): Promise<LogDog.LogEntry[]> => { |
| 206 op.assert(); |
| 207 |
| 208 if (count <= 0) { |
| 209 return Promise.resolve(allLogs); |
| 210 } |
| 211 |
| 212 // Perform Gets until we have the requested number of logs. We don't |
| 213 // have to constrain the "logCount" parameter b/c we automatically do |
| 214 // that in getIndex. |
| 215 let opts: FetcherOptions = { |
| 216 logCount: count, |
| 217 sparse: true, |
| 218 }; |
| 219 return this.getIndex(ctx, startIndex, opts).then(logs => { |
| 220 op.assert(); |
| 221 |
| 222 if (logs && logs.length) { |
| 223 allLogs.push.apply(allLogs, logs); |
| 224 startIndex += logs.length; |
| 225 count -= logs.length; |
| 226 } |
| 227 if (count > 0) { |
| 228 // Recurse. |
| 229 } |
| 230 return Promise.resolve(allLogs); |
| 231 }); |
| 232 }; |
| 233 return new Fetch(ctx, getIter()); |
| 234 } |
| 235 |
| 236 /** |
| 237 * Fetches the latest log entry. |
| 238 */ |
| 239 getLatest(op: luci.Operation): Fetch { |
| 240 let errNoLogs = new Error('no logs, streaming'); |
| 241 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); |
| 242 let ctx = new FetchContext(op); |
| 243 return new Fetch( |
| 244 ctx, |
| 245 streamingRetry.do( |
| 246 () => { |
| 247 return this.doTail(ctx).then(logs => { |
| 248 if (!(logs && logs.length)) { |
| 249 throw errNoLogs; |
| 250 } |
| 251 return logs; |
| 252 }); |
| 253 }, |
| 254 (err: Error, delay: number) => { |
| 255 if (err !== errNoLogs) { |
| 256 throw err; |
| 257 } |
| 258 |
| 259 // No logs were returned, and we expect logs, so we're |
| 260 // streaming. Try again after a delay. |
| 261 ctx.updateStatus(FetchStatus.STREAMING); |
| 262 console.warn( |
| 263 this.stream, |
| 264 `: No logs returned; retrying after ${delay}ms...`); |
| 265 })); |
| 266 } |
| 267 |
| 268 private getIndex(ctx: FetchContext, index: number, opts: FetcherOptions): |
| 269 Promise<LogDog.LogEntry[]> { |
| 270 // (Testing) Constrain our max logs, if set. |
| 271 if (Fetcher.maxLogsPerGet > 0) { |
| 272 if ((!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet) { |
| 273 opts.logCount = Fetcher.maxLogsPerGet; |
| 274 } |
| 275 } |
| 276 |
| 277 // We will retry continuously until we get a log (streaming). |
| 278 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); |
| 279 let errNoLogs = new Error('no logs, streaming'); |
| 280 return streamingRetry |
| 281 .do( |
| 282 () => { |
| 283 // If we're asking for a log beyond our stream, don't bother. |
| 284 if (this.terminalIndex >= 0 && index > this.terminalIndex) { |
| 285 return Promise.resolve([]); |
| 286 } |
| 287 |
| 288 return this.doGet(ctx, index, opts).then(logs => { |
| 289 ctx.op.assert(); |
| 290 |
| 291 if (!(logs && logs.length)) { |
| 292 // (Retry) |
| 293 throw errNoLogs; |
| 294 } |
| 295 |
| 296 return logs; |
| 297 }); |
| 298 }, |
| 299 (err: Error, delay: number) => { |
| 300 ctx.op.assert(); |
| 301 |
| 302 if (err !== errNoLogs) { |
| 303 throw err; |
| 304 } |
| 305 |
| 306 // No logs were returned, and we expect logs, so we're |
| 307 // streaming. Try again after a delay. |
| 308 ctx.updateStatus(FetchStatus.STREAMING); |
| 309 console.warn( |
| 310 this.stream, |
| 311 `: No logs returned; retrying after ${delay}ms...`); |
| 312 }) |
| 313 .then(logs => { |
| 314 ctx.op.assert(); |
| 315 |
| 316 // Since we allow non-contiguous Get, we may get back more logs than |
| 317 // we actually expected. Prune any such additional. |
| 318 if (opts.sparse && opts.logCount && opts.logCount > 0) { |
| 319 let maxStreamIndex = index + opts.logCount - 1; |
| 320 logs = logs.filter(le => le.streamIndex <= maxStreamIndex); |
| 321 } |
| 322 return logs; |
| 323 }); |
| 324 } |
| 325 |
| 326 private doGet(ctx: FetchContext, index: number, opts: FetcherOptions): |
| 327 Promise<LogDog.LogEntry[]> { |
| 328 let request: LogDog.GetRequest = { |
| 329 project: this.stream.project, |
| 330 path: this.stream.path, |
| 331 state: (this.terminalIndex < 0), |
| 332 index: index, |
| 333 }; |
| 334 if (opts.sparse || this.archived) { |
| 335 // This log stream is archived. We will relax the contiguous requirement |
| 336 // so we can render sparse log streams. |
| 337 request.nonContiguous = true; |
| 338 } |
| 339 if (opts.byteCount && opts.byteCount > 0) { |
| 340 request.byteCount = opts.byteCount; |
| 341 } |
| 342 if (opts.logCount && opts.logCount > 0) { |
| 343 request.logCount = opts.logCount; |
| 344 } |
| 345 |
| 346 if (this.debug) { |
| 347 console.log('logdog.Logs.Get:', request); |
| 348 } |
| 349 |
| 350 // Perform our Get, waiting until the stream actually exists. |
| 351 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry); |
| 352 return missingRetry |
| 353 .do( |
| 354 () => { |
| 355 ctx.updateStatus(FetchStatus.LOADING); |
| 356 return this.client.get(request); |
| 357 }, |
| 358 this.doRetryIfMissing(ctx)) |
| 359 .then((resp: GetResponse) => { |
| 360 let fr = FetchResult.make(resp, this.lastDesc); |
| 361 return this.afterProcessResult(ctx, fr); |
| 362 }); |
| 363 } |
| 364 |
| 365 private doTail(ctx: FetchContext): Promise<LogDog.LogEntry[]> { |
| 366 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry); |
| 367 return missingRetry |
| 368 .do( |
| 369 () => { |
| 370 ctx.updateStatus(FetchStatus.LOADING); |
| 371 let needsState = (this.terminalIndex < 0); |
| 372 return this.client.tail(this.stream, needsState); |
| 373 }, |
| 374 this.doRetryIfMissing(ctx)) |
| 375 .then((resp: GetResponse) => { |
| 376 let fr = FetchResult.make(resp, this.lastDesc); |
| 377 return this.afterProcessResult(ctx, fr); |
| 378 }); |
| 379 } |
| 380 |
| 381 private afterProcessResult(ctx: FetchContext, fr: FetchResult): |
| 382 LogDog.LogEntry[] { |
| 383 if (this.debug) { |
| 384 if (fr.logs.length) { |
| 385 console.log( |
| 386 'Request returned:', fr.logs[0].streamIndex, '..', |
| 387 fr.logs[fr.logs.length - 1].streamIndex, fr.desc, fr.state); |
| 388 } else { |
| 389 console.log('Request returned no logs:', fr.desc, fr.state); |
| 390 } |
| 391 } |
| 392 |
| 393 ctx.updateStatus(FetchStatus.IDLE); |
| 394 if (fr.desc) { |
| 395 this.lastDesc = fr.desc; |
| 396 } |
| 397 if (fr.state) { |
| 398 this.lastState = fr.state; |
| 399 } |
| 400 return fr.logs; |
| 401 } |
| 402 |
| 403 private doRetryIfMissing(ctx: FetchContext) { |
| 404 return (err: Error, delay: number) => { |
| 405 ctx.op.assert(); |
| 406 |
| 407 // Is this a gRPC Error? |
| 408 let grpc = luci.GrpcError.convert(err); |
| 409 if (grpc && grpc.code === luci.Code.NOT_FOUND) { |
| 410 ctx.updateStatus(FetchStatus.MISSING); |
| 411 |
| 412 console.warn( |
| 413 this.stream, ': Is not found:', err, |
| 414 `; retrying after ${delay}ms...`); |
| 415 return; |
| 416 } |
| 417 |
| 418 ctx.updateStatus(FetchStatus.ERROR, err); |
| 419 throw err; |
| 420 }; |
| 421 } |
| 422 } |
| 423 |
| 424 /** |
| 425 * The result of a log stream fetch, for internal usage. |
| 426 * |
| 427 * It will include zero or more log entries, and optionally (if requested) |
| 428 * the log stream's descriptor and state. |
| 429 */ |
| 430 class FetchResult { |
| 431 constructor( |
| 432 readonly logs: LogDog.LogEntry[], |
| 433 readonly desc?: LogDog.LogStreamDescriptor, |
| 434 readonly state?: LogDog.LogStreamState) {} |
| 435 |
| 436 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor): |
| 437 FetchResult { |
| 438 let loadDesc: LogDog.LogStreamDescriptor|undefined; |
| 439 if (resp.desc) { |
| 440 desc = loadDesc = LogDog.LogStreamDescriptor.make(resp.desc); |
| 441 } |
| 442 |
| 443 let loadState: LogDog.LogStreamState|undefined; |
| 444 if (resp.state) { |
| 445 loadState = LogDog.LogStreamState.make(resp.state); |
| 446 } |
| 447 |
| 448 let logs = (resp.logs || []).map(le => LogDog.LogEntry.make(le, desc)); |
| 449 return new FetchResult(logs, loadDesc, loadState); |
| 450 } |
| 451 } |
| 452 } |
| OLD | NEW |