Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 /* | |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. | |
| 3 Use of this source code is governed under the Apache License, Version 2.0 | |
| 4 that can be found in the LICENSE file. | |
| 5 */ | |
| 6 | |
| 7 ///<reference path="../logdog-stream/logdog.ts" /> | |
| 8 ///<reference path="../rpc/client.ts" /> | |
| 9 | |
| 10 namespace LogDog { | |
| 11 | |
| 12 /** Options that can be passed to fetch operations. */ | |
| 13 export type FetcherOptions = { | |
| 14 /** | |
| 15 * The maximum number of bytes to fetch. If undefined, no maximum will be | |
| 16 * specified, and the service will constrain the results. | |
| 17 */ | |
| 18 byteCount?: number; | |
| 19 /** | |
| 20 * The maximum number of logs to fetch. If undefined, no maximum will be | |
| 21 * specified, and the service will constrain the results. | |
| 22 */ | |
| 23 logCount?: number; | |
| 24 /** If defined and true, allow a fetch to return non-continuous entries. */ | |
| 25 sparse?: boolean; | |
| 26 }; | |
| 27 | |
| 28 // Type of a "Get" or "Tail" response (protobuf). | |
| 29 type GetResponse = {state: any; desc: any; logs: any[];}; | |
| 30 | |
| 31 /** The Fetcher's current status. */ | |
| 32 export enum FetchStatus { | |
| 33 // Not doing anything. | |
| 34 IDLE, | |
| 35 // Attempting to load log data. | |
| 36 LOADING, | |
| 37 // We're waiting for the log stream to emit more logs. | |
| 38 STREAMING, | |
| 39 // The log stream is missing. | |
| 40 MISSING, | |
| 41 // The log stream encountered an error. | |
| 42 ERROR, | |
| 43 // The operaiton has been cancelled. | |
| 44 CANCELLED, | |
| 45 } | |
| 46 | |
| 47 /** Operation is a cancellable operation. */ | |
| 48 class Operation { | |
| 49 static CANCELLED = new Error('operation is cancelled'); | |
| 50 | |
| 51 /** If set, a callback to invoke if the status changes. */ | |
| 52 stateChanged: (op: Operation) => void; | |
| 53 | |
| 54 private cancelledValue = false; | |
|
nodir
2017/03/13 19:48:22
nit: _cancelled
?
_foo is a typical name when foo
dnj
2017/03/14 00:14:41
Had to modify tslint rules to allow this, but done
| |
| 55 private lastStatusValue = FetchStatus.IDLE; | |
| 56 private lastErrorValue: Error|undefined; | |
| 57 | |
| 58 /** | |
| 59 * Cancels the Fetch operation. If the operation completes or returns an | |
| 60 * error after it is cancelled, the result will be ignored. | |
| 61 * | |
| 62 * Additionally, no status callbacks will be invoked after a Fetch is | |
| 63 * cancelled. | |
| 64 * | |
| 65 * Calling cancel multiple times is safe. | |
| 66 */ | |
| 67 cancel() { | |
| 68 this.updateStatus(FetchStatus.CANCELLED); | |
| 69 this.cancelledValue = true; | |
| 70 } | |
| 71 | |
| 72 /** | |
| 73 * Assert will throw Operation.CANCELLED if the operation has been | |
| 74 * cancelled. Otherwise, it will do nothing. | |
| 75 */ | |
| 76 assert() { | |
| 77 if (this.cancelledValue) { | |
| 78 throw Operation.CANCELLED; | |
| 79 } | |
| 80 } | |
| 81 | |
| 82 get cancelled() { | |
| 83 return this.cancelledValue; | |
| 84 } | |
| 85 | |
| 86 get lastStatus(): FetchStatus { | |
| 87 return this.lastStatusValue; | |
| 88 } | |
| 89 | |
| 90 get lastError(): Error|undefined { | |
| 91 return this.lastErrorValue; | |
| 92 } | |
| 93 | |
| 94 updateStatus(st: FetchStatus, err?: Error) { | |
| 95 if (this.cancelled) { | |
| 96 // No more status updates. | |
| 97 return; | |
| 98 } | |
| 99 | |
| 100 this.lastStatusValue = st; | |
| 101 this.lastErrorValue = err; | |
| 102 if (this.stateChanged) { | |
| 103 this.stateChanged(this); | |
| 104 } | |
| 105 } | |
| 106 } | |
| 107 | |
| 108 /** Fetch represents a single fetch operation. */ | |
| 109 class Fetch<T> { | |
| 110 readonly promise: Promise<FetchResult>; | |
| 111 | |
| 112 constructor(readonly op: Operation, p: Promise<T>) { | |
| 113 this.promise = p.then( | |
| 114 result => { | |
| 115 this.op.updateStatus(FetchStatus.IDLE); | |
| 116 return result; | |
| 117 }, | |
| 118 err => { | |
| 119 this.op.updateStatus(FetchStatus.ERROR, err); | |
| 120 return Promise.reject(err); | |
| 121 }); | |
| 122 } | |
| 123 } | |
| 124 | |
| 125 /** | |
| 126 * Fetcher is responsible for fetching LogDog log stream entries from the | |
| 127 * remote service via an RPC client. | |
| 128 * | |
| 129 * Fetcher is responsible for wrapping the raw RPC calls and their results, | |
| 130 * and retrying calls due to: | |
| 131 * | |
| 132 * - Transient failures (via RPC client). | |
| 133 * - Missing stream (assumption is that the stream is still being ingested and | |
| 134 * registered, and therefore a repeated retry is appropriate). | |
| 135 * - Streaming stream (log stream is not terminated, but more records are not | |
| 136 * yet available). | |
| 137 * | |
| 138 * The interface that Fetcher presents to its caller is a simple Promise-based | |
| 139 * method to retrieve log stream data. | |
| 140 * | |
| 141 * Fetcher offers fetching via "get", "getAll", and "getLatest". | |
| 142 */ | |
| 143 export class Fetcher { | |
| 144 private debug = false; | |
| 145 private static maxLogsPerGet = 0; | |
| 146 | |
| 147 private lastDesc: LogDog.LogStreamDescriptor; | |
| 148 private lastState: LogDog.LogStreamState; | |
| 149 | |
| 150 private static missingRetry: luci.Retry = {delay: 5000, maxDelay: 15000}; | |
| 151 private static streamingRetry: luci.Retry = {delay: 1000, maxDelay: 5000}; | |
| 152 | |
| 153 constructor( | |
| 154 private client: luci.Client, readonly stream: LogDog.StreamPath) {} | |
| 155 | |
| 156 get desc() { | |
| 157 return this.lastDesc; | |
| 158 } | |
| 159 get state() { | |
| 160 return this.lastState; | |
| 161 } | |
| 162 | |
| 163 /** | |
| 164 * Returns the log stream's terminal index. | |
| 165 * | |
| 166 * If no terminal index is known (the log is still streaming) this will | |
| 167 * return -1. | |
| 168 */ | |
| 169 get terminalIndex(): number { | |
| 170 return ((this.lastState) ? this.lastState.terminalIndex : -1); | |
| 171 } | |
| 172 | |
| 173 /** Archived returns true if this log stream is known to be archived. */ | |
| 174 get archived(): boolean { | |
| 175 return (!!(this.lastState && this.lastState.archive)); | |
| 176 } | |
| 177 | |
| 178 /** | |
| 179 * Returns a Promise that will resolve to the next block of logs in the | |
| 180 * stream. | |
| 181 * | |
| 182 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the | |
| 183 * next block of logs in the stream. | |
| 184 */ | |
| 185 get(index: number, opts: FetcherOptions): Fetch<LogDog.LogEntry[]> { | |
| 186 let op = new Operation(); | |
| 187 return new Fetch(op, this.getIndex(index, opts, op)); | |
| 188 } | |
| 189 | |
| 190 /** | |
| 191 * Returns a Promise that will resolve to "count" log entries starting at | |
| 192 * "startIndex". | |
| 193 * | |
| 194 * If multiple RPC calls are required to retrieve "count" entries, these | |
| 195 * will be scheduled, and the Promise will block until the full set of | |
| 196 * requested stream entries is retrieved. | |
| 197 */ | |
| 198 getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> { | |
| 199 // Request the tail walkback logs. Since our request for N logs may return | |
| 200 // <N logs, we will repeat the request until all requested logs have been | |
| 201 // obtained. | |
| 202 let allLogs: LogDog.LogEntry[] = []; | |
| 203 | |
| 204 let op = new Operation(); | |
| 205 let getIter = (): Promise<LogDog.LogEntry[]> => { | |
| 206 if (count <= 0) { | |
| 207 return Promise.resolve(allLogs); | |
| 208 } | |
| 209 | |
| 210 // Perform Gets until we have the requested number of logs. We don't | |
| 211 // have to constrain the "logCount" parameter b/c we automatically do | |
| 212 // that in getIndex. | |
| 213 let opts: FetcherOptions = { | |
| 214 logCount: count, | |
| 215 sparse: true, | |
| 216 }; | |
| 217 return this.getIndex(startIndex, opts, op).then(logs => { | |
| 218 if (logs && logs.length) { | |
| 219 allLogs.push.apply(allLogs, logs); | |
| 220 startIndex += logs.length; | |
| 221 count -= logs.length; | |
| 222 } | |
| 223 if (count > 0) { | |
| 224 // Recurse. | |
| 225 } | |
|
nodir
2017/03/13 19:48:22
return getIter()
| |
| 226 return Promise.resolve(allLogs); | |
| 227 }); | |
| 228 }; | |
| 229 return getIter(); | |
| 230 } | |
| 231 | |
| 232 /** | |
| 233 * Fetches the latest log entry. | |
| 234 */ | |
| 235 getLatest(): Fetch<LogDog.LogEntry[]> { | |
| 236 let errNoLogs = new Error('no logs, streaming'); | |
| 237 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); | |
| 238 let op = new Operation(); | |
| 239 return new Fetch( | |
| 240 op, | |
| 241 streamingRetry.do( | |
| 242 () => { | |
| 243 return this.doTail(op).then(logs => { | |
| 244 if (!(logs && logs.length)) { | |
| 245 throw errNoLogs; | |
| 246 } | |
| 247 return logs; | |
| 248 }); | |
| 249 }, | |
| 250 (err: Error, delay: number) => { | |
| 251 if (err !== errNoLogs) { | |
| 252 throw err; | |
| 253 } | |
| 254 | |
| 255 // No logs were returned, and we expect logs, so we're | |
| 256 // streaming. Try again after a delay. | |
| 257 op.updateStatus(FetchStatus.STREAMING); | |
| 258 console.warn( | |
| 259 this.stream, | |
| 260 `: No logs returned; retrying after ${delay}ms...`); | |
| 261 })); | |
| 262 } | |
| 263 | |
| 264 private getIndex(index: number, opts: FetcherOptions, op: Operation): | |
| 265 Promise<LogDog.LogEntry[]> { | |
| 266 // (Testing) Constrain our max logs, if set. | |
| 267 if (Fetcher.maxLogsPerGet > 0) { | |
| 268 if ((!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet) { | |
| 269 opts.logCount = Fetcher.maxLogsPerGet; | |
| 270 } | |
| 271 } | |
| 272 | |
| 273 // We will retry continuously until we get a log (streaming). | |
| 274 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); | |
| 275 let errNoLogs = new Error('no logs, streaming'); | |
| 276 return streamingRetry | |
| 277 .do( | |
| 278 () => { | |
| 279 // If we're asking for a log beyond our stream, don't bother. | |
| 280 if (this.terminalIndex >= 0 && index > this.terminalIndex) { | |
| 281 return Promise.resolve([]); | |
| 282 } | |
| 283 | |
| 284 return this.doGet(index, opts, op).then(logs => { | |
| 285 op.assert(); | |
| 286 | |
| 287 if (!(logs && logs.length)) { | |
| 288 // (Retry) | |
| 289 throw errNoLogs; | |
| 290 } | |
| 291 | |
| 292 return logs; | |
| 293 }); | |
| 294 }, | |
| 295 (err: Error, delay: number) => { | |
| 296 op.assert(); | |
| 297 | |
| 298 if (err !== errNoLogs) { | |
| 299 throw err; | |
| 300 } | |
| 301 | |
| 302 // No logs were returned, and we expect logs, so we're | |
| 303 // streaming. Try again after a delay. | |
| 304 op.updateStatus(FetchStatus.STREAMING); | |
| 305 console.warn( | |
| 306 this.stream, | |
| 307 `: No logs returned; retrying after ${delay}ms...`); | |
| 308 }) | |
| 309 .then(logs => { | |
| 310 op.assert(); | |
| 311 | |
| 312 // Since we allow non-contiguous Get, we may get back more logs than | |
| 313 // we actually expected. Prune any such additional. | |
| 314 if (opts.sparse && opts.logCount && opts.logCount > 0) { | |
| 315 let maxStreamIndex = index + opts.logCount - 1; | |
| 316 logs = logs.filter(le => le.streamIndex <= maxStreamIndex); | |
| 317 } | |
| 318 return logs; | |
| 319 }); | |
| 320 } | |
| 321 | |
| 322 private doGet(index: number, opts: FetcherOptions, op: Operation): | |
| 323 Promise<LogDog.LogEntry[]> { | |
| 324 let request: { | |
| 325 project: string; path: string; state: boolean; index: number; | |
| 326 | |
| 327 nonContiguous?: boolean; | |
| 328 byteCount?: number; | |
| 329 logCount?: number; | |
| 330 } = { | |
| 331 project: this.stream.project, | |
| 332 path: this.stream.path, | |
| 333 state: (this.terminalIndex < 0), | |
| 334 index: index, | |
| 335 }; | |
| 336 if (opts.sparse || this.archived) { | |
| 337 // This log stream is archived. We will relax the contiguous requirement | |
| 338 // so we can render sparse log streams. | |
| 339 request.nonContiguous = true; | |
| 340 } | |
| 341 if (opts.byteCount && opts.byteCount > 0) { | |
| 342 request.byteCount = opts.byteCount; | |
| 343 } | |
| 344 if (opts.logCount && opts.logCount > 0) { | |
| 345 request.logCount = opts.logCount; | |
| 346 } | |
| 347 | |
| 348 if (this.debug) { | |
| 349 console.log('logdog.Logs.Get:', request); | |
| 350 } | |
| 351 | |
| 352 // Perform our Get, waiting until the stream actually exists. | |
| 353 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry); | |
| 354 return missingRetry | |
| 355 .do( | |
| 356 () => { | |
| 357 op.updateStatus(FetchStatus.LOADING); | |
| 358 return this.client.call('logdog.Logs', 'Get', request); | |
| 359 }, | |
| 360 this.doRetryIfMissing(op)) | |
| 361 .then((resp: GetResponse) => { | |
| 362 let fr = FetchResult.make(resp, this.lastDesc); | |
| 363 return this.afterProcessResult(fr, op); | |
| 364 }); | |
| 365 } | |
| 366 | |
| 367 private doTail(op: Operation): Promise<LogDog.LogEntry[]> { | |
| 368 let request: {project: string; path: string; state: boolean;} = { | |
| 369 project: this.stream.project, | |
| 370 path: this.stream.path, | |
| 371 state: (this.terminalIndex < 0), | |
| 372 }; | |
| 373 | |
| 374 if (this.debug) { | |
| 375 console.log('logdog.Logs.Tail:', request); | |
| 376 } | |
| 377 | |
| 378 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry); | |
| 379 return missingRetry | |
| 380 .do( | |
| 381 () => { | |
| 382 op.updateStatus(FetchStatus.LOADING); | |
| 383 return this.client.call('logdog.Logs', 'Tail', request); | |
| 384 }, | |
| 385 this.doRetryIfMissing(op)) | |
| 386 .then((resp: GetResponse) => { | |
| 387 let fr = FetchResult.make(resp, this.lastDesc); | |
| 388 return this.afterProcessResult(fr, op); | |
| 389 }); | |
| 390 } | |
| 391 | |
| 392 private afterProcessResult(fr: FetchResult, op: Operation): | |
| 393 LogDog.LogEntry[] { | |
| 394 if (this.debug) { | |
| 395 if (fr.logs.length) { | |
| 396 console.log( | |
| 397 'Request returned:', fr.logs[0].streamIndex, '..', | |
| 398 fr.logs[fr.logs.length - 1].streamIndex, fr.desc, fr.state); | |
| 399 } else { | |
| 400 console.log('Request returned no logs:', fr.desc, fr.state); | |
| 401 } | |
| 402 } | |
| 403 | |
| 404 op.updateStatus(FetchStatus.IDLE); | |
| 405 if (fr.desc) { | |
| 406 this.lastDesc = fr.desc; | |
| 407 } | |
| 408 if (fr.state) { | |
| 409 this.lastState = fr.state; | |
| 410 } | |
| 411 return fr.logs; | |
| 412 } | |
| 413 | |
| 414 private doRetryIfMissing(op: Operation) { | |
| 415 return (err: Error, delay: number) => { | |
| 416 op.assert(); | |
| 417 | |
| 418 // Is this a gRPC Error? | |
| 419 let grpc = luci.GrpcError.convert(err); | |
| 420 if (grpc && grpc.code === luci.Code.NOT_FOUND) { | |
| 421 op.updateStatus(FetchStatus.MISSING); | |
| 422 | |
| 423 console.warn( | |
| 424 this.stream, ': Is not found:', err, | |
| 425 `; retrying after ${delay}ms...`); | |
| 426 return; | |
| 427 } | |
| 428 | |
| 429 op.updateStatus(FetchStatus.ERROR, err); | |
| 430 throw err; | |
| 431 }; | |
| 432 } | |
| 433 } | |
| 434 | |
| 435 /** | |
| 436 * The result of a log stream fetch, for internal usage. | |
| 437 * | |
| 438 * It will include zero or more log entries, and optionally (if requested) | |
| 439 * the log stream's descriptor and state. | |
| 440 */ | |
| 441 class FetchResult { | |
| 442 constructor( | |
| 443 readonly logs: LogDog.LogEntry[], | |
| 444 readonly desc?: LogDog.LogStreamDescriptor, | |
| 445 readonly state?: LogDog.LogStreamState) {} | |
| 446 | |
| 447 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor): | |
| 448 FetchResult { | |
| 449 let loadDesc: LogDog.LogStreamDescriptor|undefined; | |
| 450 if (resp.desc) { | |
| 451 desc = loadDesc = LogDog.LogStreamDescriptor.make(resp.desc); | |
| 452 } | |
| 453 | |
| 454 let loadState: LogDog.LogStreamState|undefined; | |
| 455 if (resp.state) { | |
| 456 loadState = LogDog.LogStreamState.make(resp.state); | |
| 457 } | |
| 458 | |
| 459 let logs = (resp.logs || []).map(le => LogDog.LogEntry.make(le, desc)); | |
| 460 return new FetchResult(logs, loadDesc, loadState); | |
| 461 } | |
| 462 } | |
| 463 } | |
| OLD | NEW |