Chromium Code Reviews| Index: web/inc/logdog-stream-view/fetcher.ts |
| diff --git a/web/inc/logdog-stream-view/fetcher.ts b/web/inc/logdog-stream-view/fetcher.ts |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..c4c4a080d043daad784106d4201385b8761e1dcc |
| --- /dev/null |
| +++ b/web/inc/logdog-stream-view/fetcher.ts |
| @@ -0,0 +1,463 @@ |
| +/* |
| + Copyright 2016 The LUCI Authors. All rights reserved. |
| + Use of this source code is governed under the Apache License, Version 2.0 |
| + that can be found in the LICENSE file. |
| +*/ |
| + |
| +///<reference path="../logdog-stream/logdog.ts" /> |
| +///<reference path="../rpc/client.ts" /> |
| + |
| +namespace LogDog { |
| + |
| + /** Options that can be passed to fetch operations. */ |
| + export type FetcherOptions = { |
| + /** |
| + * The maximum number of bytes to fetch. If undefined, no maximum will be |
| + * specified, and the service will constrain the results. |
| + */ |
| + byteCount?: number; |
| + /** |
| + * The maximum number of logs to fetch. If undefined, no maximum will be |
| + * specified, and the service will constrain the results. |
| + */ |
| + logCount?: number; |
| + /** If defined and true, allow a fetch to return non-continuous entries. */ |
| + sparse?: boolean; |
| + }; |
| + |
| + // Type of a "Get" or "Tail" response (protobuf). |
| + type GetResponse = {state: any; desc: any; logs: any[];}; |
| + |
| + /** The Fetcher's current status. */ |
| + export enum FetchStatus { |
| + // Not doing anything. |
| + IDLE, |
| + // Attempting to load log data. |
| + LOADING, |
| + // We're waiting for the log stream to emit more logs. |
| + STREAMING, |
| + // The log stream is missing. |
| + MISSING, |
| + // The log stream encountered an error. |
| + ERROR, |
| + // The operaiton has been cancelled. |
| + CANCELLED, |
| + } |
| + |
| + /** Operation is a cancellable operation. */ |
| + class Operation { |
| + static CANCELLED = new Error('operation is cancelled'); |
| + |
| + /** If set, a callback to invoke if the status changes. */ |
| + stateChanged: (op: Operation) => void; |
| + |
| + private cancelledValue = false; |
|
nodir
2017/03/13 19:48:22
nit: _cancelled
?
_foo is a typical name when foo
dnj
2017/03/14 00:14:41
Had to modify tslint rules to allow this, but done
|
| + private lastStatusValue = FetchStatus.IDLE; |
| + private lastErrorValue: Error|undefined; |
| + |
| + /** |
| + * Cancels the Fetch operation. If the operation completes or returns an |
| + * error after it is cancelled, the result will be ignored. |
| + * |
| + * Additionally, no status callbacks will be invoked after a Fetch is |
| + * cancelled. |
| + * |
| + * Calling cancel multiple times is safe. |
| + */ |
| + cancel() { |
| + this.updateStatus(FetchStatus.CANCELLED); |
| + this.cancelledValue = true; |
| + } |
| + |
| + /** |
| + * Assert will throw Operation.CANCELLED if the operation has been |
| + * cancelled. Otherwise, it will do nothing. |
| + */ |
| + assert() { |
| + if (this.cancelledValue) { |
| + throw Operation.CANCELLED; |
| + } |
| + } |
| + |
| + get cancelled() { |
| + return this.cancelledValue; |
| + } |
| + |
| + get lastStatus(): FetchStatus { |
| + return this.lastStatusValue; |
| + } |
| + |
| + get lastError(): Error|undefined { |
| + return this.lastErrorValue; |
| + } |
| + |
| + updateStatus(st: FetchStatus, err?: Error) { |
| + if (this.cancelled) { |
| + // No more status updates. |
| + return; |
| + } |
| + |
| + this.lastStatusValue = st; |
| + this.lastErrorValue = err; |
| + if (this.stateChanged) { |
| + this.stateChanged(this); |
| + } |
| + } |
| + } |
| + |
| + /** Fetch represents a single fetch operation. */ |
| + class Fetch<T> { |
| + readonly promise: Promise<FetchResult>; |
| + |
| + constructor(readonly op: Operation, p: Promise<T>) { |
| + this.promise = p.then( |
| + result => { |
| + this.op.updateStatus(FetchStatus.IDLE); |
| + return result; |
| + }, |
| + err => { |
| + this.op.updateStatus(FetchStatus.ERROR, err); |
| + return Promise.reject(err); |
| + }); |
| + } |
| + } |
| + |
| + /** |
| + * Fetcher is responsible for fetching LogDog log stream entries from the |
| + * remote service via an RPC client. |
| + * |
| + * Fetcher is responsible for wrapping the raw RPC calls and their results, |
| + * and retrying calls due to: |
| + * |
| + * - Transient failures (via RPC client). |
| + * - Missing stream (assumption is that the stream is still being ingested and |
| + * registered, and therefore a repeated retry is appropriate). |
| + * - Streaming stream (log stream is not terminated, but more records are not |
| + * yet available). |
| + * |
| + * The interface that Fetcher presents to its caller is a simple Promise-based |
| + * method to retrieve log stream data. |
| + * |
| + * Fetcher offers fetching via "get", "getAll", and "getLatest". |
| + */ |
| + export class Fetcher { |
| + private debug = false; |
| + private static maxLogsPerGet = 0; |
| + |
| + private lastDesc: LogDog.LogStreamDescriptor; |
| + private lastState: LogDog.LogStreamState; |
| + |
| + private static missingRetry: luci.Retry = {delay: 5000, maxDelay: 15000}; |
| + private static streamingRetry: luci.Retry = {delay: 1000, maxDelay: 5000}; |
| + |
| + constructor( |
| + private client: luci.Client, readonly stream: LogDog.StreamPath) {} |
| + |
| + get desc() { |
| + return this.lastDesc; |
| + } |
| + get state() { |
| + return this.lastState; |
| + } |
| + |
| + /** |
| + * Returns the log stream's terminal index. |
| + * |
| + * If no terminal index is known (the log is still streaming) this will |
| + * return -1. |
| + */ |
| + get terminalIndex(): number { |
| + return ((this.lastState) ? this.lastState.terminalIndex : -1); |
| + } |
| + |
| + /** Archived returns true if this log stream is known to be archived. */ |
| + get archived(): boolean { |
| + return (!!(this.lastState && this.lastState.archive)); |
| + } |
| + |
| + /** |
| + * Returns a Promise that will resolve to the next block of logs in the |
| + * stream. |
| + * |
| + * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the |
| + * next block of logs in the stream. |
| + */ |
| + get(index: number, opts: FetcherOptions): Fetch<LogDog.LogEntry[]> { |
| + let op = new Operation(); |
| + return new Fetch(op, this.getIndex(index, opts, op)); |
| + } |
| + |
| + /** |
| + * Returns a Promise that will resolve to "count" log entries starting at |
| + * "startIndex". |
| + * |
| + * If multiple RPC calls are required to retrieve "count" entries, these |
| + * will be scheduled, and the Promise will block until the full set of |
| + * requested stream entries is retrieved. |
| + */ |
| + getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> { |
| + // Request the tail walkback logs. Since our request for N logs may return |
| + // <N logs, we will repeat the request until all requested logs have been |
| + // obtained. |
| + let allLogs: LogDog.LogEntry[] = []; |
| + |
| + let op = new Operation(); |
| + let getIter = (): Promise<LogDog.LogEntry[]> => { |
| + if (count <= 0) { |
| + return Promise.resolve(allLogs); |
| + } |
| + |
| + // Perform Gets until we have the requested number of logs. We don't |
| + // have to constrain the "logCount" parameter b/c we automatically do |
| + // that in getIndex. |
| + let opts: FetcherOptions = { |
| + logCount: count, |
| + sparse: true, |
| + }; |
| + return this.getIndex(startIndex, opts, op).then(logs => { |
| + if (logs && logs.length) { |
| + allLogs.push.apply(allLogs, logs); |
| + startIndex += logs.length; |
| + count -= logs.length; |
| + } |
| + if (count > 0) { |
| + // Recurse. |
| + } |
|
nodir
2017/03/13 19:48:22
return getIter()
|
| + return Promise.resolve(allLogs); |
| + }); |
| + }; |
| + return getIter(); |
| + } |
| + |
| + /** |
| + * Fetches the latest log entry. |
| + */ |
| + getLatest(): Fetch<LogDog.LogEntry[]> { |
| + let errNoLogs = new Error('no logs, streaming'); |
| + let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); |
| + let op = new Operation(); |
| + return new Fetch( |
| + op, |
| + streamingRetry.do( |
| + () => { |
| + return this.doTail(op).then(logs => { |
| + if (!(logs && logs.length)) { |
| + throw errNoLogs; |
| + } |
| + return logs; |
| + }); |
| + }, |
| + (err: Error, delay: number) => { |
| + if (err !== errNoLogs) { |
| + throw err; |
| + } |
| + |
| + // No logs were returned, and we expect logs, so we're |
| + // streaming. Try again after a delay. |
| + op.updateStatus(FetchStatus.STREAMING); |
| + console.warn( |
| + this.stream, |
| + `: No logs returned; retrying after ${delay}ms...`); |
| + })); |
| + } |
| + |
| + private getIndex(index: number, opts: FetcherOptions, op: Operation): |
| + Promise<LogDog.LogEntry[]> { |
| + // (Testing) Constrain our max logs, if set. |
| + if (Fetcher.maxLogsPerGet > 0) { |
| + if ((!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet) { |
| + opts.logCount = Fetcher.maxLogsPerGet; |
| + } |
| + } |
| + |
| + // We will retry continuously until we get a log (streaming). |
| + let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry); |
| + let errNoLogs = new Error('no logs, streaming'); |
| + return streamingRetry |
| + .do( |
| + () => { |
| + // If we're asking for a log beyond our stream, don't bother. |
| + if (this.terminalIndex >= 0 && index > this.terminalIndex) { |
| + return Promise.resolve([]); |
| + } |
| + |
| + return this.doGet(index, opts, op).then(logs => { |
| + op.assert(); |
| + |
| + if (!(logs && logs.length)) { |
| + // (Retry) |
| + throw errNoLogs; |
| + } |
| + |
| + return logs; |
| + }); |
| + }, |
| + (err: Error, delay: number) => { |
| + op.assert(); |
| + |
| + if (err !== errNoLogs) { |
| + throw err; |
| + } |
| + |
| + // No logs were returned, and we expect logs, so we're |
| + // streaming. Try again after a delay. |
| + op.updateStatus(FetchStatus.STREAMING); |
| + console.warn( |
| + this.stream, |
| + `: No logs returned; retrying after ${delay}ms...`); |
| + }) |
| + .then(logs => { |
| + op.assert(); |
| + |
| + // Since we allow non-contiguous Get, we may get back more logs than |
| + // we actually expected. Prune any such additional. |
| + if (opts.sparse && opts.logCount && opts.logCount > 0) { |
| + let maxStreamIndex = index + opts.logCount - 1; |
| + logs = logs.filter(le => le.streamIndex <= maxStreamIndex); |
| + } |
| + return logs; |
| + }); |
| + } |
| + |
| + private doGet(index: number, opts: FetcherOptions, op: Operation): |
| + Promise<LogDog.LogEntry[]> { |
| + let request: { |
| + project: string; path: string; state: boolean; index: number; |
| + |
| + nonContiguous?: boolean; |
| + byteCount?: number; |
| + logCount?: number; |
| + } = { |
| + project: this.stream.project, |
| + path: this.stream.path, |
| + state: (this.terminalIndex < 0), |
| + index: index, |
| + }; |
| + if (opts.sparse || this.archived) { |
| + // This log stream is archived. We will relax the contiguous requirement |
| + // so we can render sparse log streams. |
| + request.nonContiguous = true; |
| + } |
| + if (opts.byteCount && opts.byteCount > 0) { |
| + request.byteCount = opts.byteCount; |
| + } |
| + if (opts.logCount && opts.logCount > 0) { |
| + request.logCount = opts.logCount; |
| + } |
| + |
| + if (this.debug) { |
| + console.log('logdog.Logs.Get:', request); |
| + } |
| + |
| + // Perform our Get, waiting until the stream actually exists. |
| + let missingRetry = new luci.RetryIterator(Fetcher.missingRetry); |
| + return missingRetry |
| + .do( |
| + () => { |
| + op.updateStatus(FetchStatus.LOADING); |
| + return this.client.call('logdog.Logs', 'Get', request); |
| + }, |
| + this.doRetryIfMissing(op)) |
| + .then((resp: GetResponse) => { |
| + let fr = FetchResult.make(resp, this.lastDesc); |
| + return this.afterProcessResult(fr, op); |
| + }); |
| + } |
| + |
| + private doTail(op: Operation): Promise<LogDog.LogEntry[]> { |
| + let request: {project: string; path: string; state: boolean;} = { |
| + project: this.stream.project, |
| + path: this.stream.path, |
| + state: (this.terminalIndex < 0), |
| + }; |
| + |
| + if (this.debug) { |
| + console.log('logdog.Logs.Tail:', request); |
| + } |
| + |
| + let missingRetry = new luci.RetryIterator(Fetcher.missingRetry); |
| + return missingRetry |
| + .do( |
| + () => { |
| + op.updateStatus(FetchStatus.LOADING); |
| + return this.client.call('logdog.Logs', 'Tail', request); |
| + }, |
| + this.doRetryIfMissing(op)) |
| + .then((resp: GetResponse) => { |
| + let fr = FetchResult.make(resp, this.lastDesc); |
| + return this.afterProcessResult(fr, op); |
| + }); |
| + } |
| + |
| + private afterProcessResult(fr: FetchResult, op: Operation): |
| + LogDog.LogEntry[] { |
| + if (this.debug) { |
| + if (fr.logs.length) { |
| + console.log( |
| + 'Request returned:', fr.logs[0].streamIndex, '..', |
| + fr.logs[fr.logs.length - 1].streamIndex, fr.desc, fr.state); |
| + } else { |
| + console.log('Request returned no logs:', fr.desc, fr.state); |
| + } |
| + } |
| + |
| + op.updateStatus(FetchStatus.IDLE); |
| + if (fr.desc) { |
| + this.lastDesc = fr.desc; |
| + } |
| + if (fr.state) { |
| + this.lastState = fr.state; |
| + } |
| + return fr.logs; |
| + } |
| + |
| + private doRetryIfMissing(op: Operation) { |
| + return (err: Error, delay: number) => { |
| + op.assert(); |
| + |
| + // Is this a gRPC Error? |
| + let grpc = luci.GrpcError.convert(err); |
| + if (grpc && grpc.code === luci.Code.NOT_FOUND) { |
| + op.updateStatus(FetchStatus.MISSING); |
| + |
| + console.warn( |
| + this.stream, ': Is not found:', err, |
| + `; retrying after ${delay}ms...`); |
| + return; |
| + } |
| + |
| + op.updateStatus(FetchStatus.ERROR, err); |
| + throw err; |
| + }; |
| + } |
| + } |
| + |
| + /** |
| + * The result of a log stream fetch, for internal usage. |
| + * |
| + * It will include zero or more log entries, and optionally (if requested) |
| + * the log stream's descriptor and state. |
| + */ |
| + class FetchResult { |
| + constructor( |
| + readonly logs: LogDog.LogEntry[], |
| + readonly desc?: LogDog.LogStreamDescriptor, |
| + readonly state?: LogDog.LogStreamState) {} |
| + |
| + static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor): |
| + FetchResult { |
| + let loadDesc: LogDog.LogStreamDescriptor|undefined; |
| + if (resp.desc) { |
| + desc = loadDesc = LogDog.LogStreamDescriptor.make(resp.desc); |
| + } |
| + |
| + let loadState: LogDog.LogStreamState|undefined; |
| + if (resp.state) { |
| + loadState = LogDog.LogStreamState.make(resp.state); |
| + } |
| + |
| + let logs = (resp.logs || []).map(le => LogDog.LogEntry.make(le, desc)); |
| + return new FetchResult(logs, loadDesc, loadState); |
| + } |
| + } |
| +} |