Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(557)

Unified Diff: web/inc/logdog-stream-view/fetcher.ts

Issue 2543323004: Rewrite LogDog log viewer app. (Closed)
Patch Set: Control all fetch sizes, fix follow on initial click, fix small fetch when auth is retrid. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: web/inc/logdog-stream-view/fetcher.ts
diff --git a/web/inc/logdog-stream-view/fetcher.ts b/web/inc/logdog-stream-view/fetcher.ts
new file mode 100644
index 0000000000000000000000000000000000000000..86ee70db22af790241c0c61934a08cd75f5dc57d
--- /dev/null
+++ b/web/inc/logdog-stream-view/fetcher.ts
@@ -0,0 +1,359 @@
+/*
+ Copyright 2016 The LUCI Authors. All rights reserved.
+ Use of this source code is governed under the Apache License, Version 2.0
+ that can be found in the LICENSE file.
+*/
+
+import {luci_rpc} from "rpc/client";
+import * as luci_sleep_promise from "luci-sleep-promise/promise";
+import {LogDog} from "logdog-stream/logdog";
+
+export type FetcherOptions = {
+ byteCount?: number;
+ logCount?: number;
+ sparse?: boolean;
+}
+
+// Type of a "Get" or "Tail" response (protobuf).
+type GetResponse = {
+ state: any;
+ desc: any;
+ logs: any[];
+};
+
+/** The Fetcher's current status. */
+export enum FetcherStatus {
+ // Not doing anything.
+ IDLE,
+ // Attempting to load log data.
+ LOADING,
+ // We're waiting for the log stream to emit more logs.
+ STREAMING,
+ // The log stream is missing.
+ MISSING,
+ // The log stream encountered an error.
+ ERROR
+}
+
+export class Fetcher {
+ private client: luci_rpc.Client;
+
+ private debug: boolean = false;
+ private static maxLogsPerGet = 0;
+
+ private lastDesc: LogDog.LogStreamDescriptor;
+ private lastState: LogDog.LogStreamState;
+ private activePromise: Promise<LogDog.LogEntry[]>;
+
+ private currentStatus: FetcherStatus = FetcherStatus.IDLE;
+ private _lastError: Error;
+ private statusChangedCallback: (() => void);
+
+ private static missingRetry: luci_rpc.Retry = new luci_rpc.Retry(
+ new luci_rpc.RetryIterator(null, 5000, 15000));
+
+ private static streamingRetry = new luci_rpc.Retry(
+ new luci_rpc.RetryIterator(null, 1000, 5000));
+
+ constructor(client: luci_rpc.Client, readonly stream: LogDog.Stream) {
+ this.client = client;
+ }
+
+ get desc() { return this.lastDesc; }
+ get state() { return this.lastState; }
+
+ get status() { return this.currentStatus; }
+
+ /**
+ * Returns the log stream's terminal index.
+ *
+ * If no terminal index is known, or if the log stream is still streaming,
+ * this will return -1.
+ */
+ get terminalIndex(): number {
+ return ( ( this.lastState ) ? this.lastState.terminalIndex : -1 );
+ }
+
+ /** Archived returns true if this log stream is known to be archived. */
+ get archived(): boolean {
+ return ( !! (this.lastState && this.lastState.archive) );
+ }
+
+ get lastError(): Error { return this._lastError; }
+
+ private setCurrentStatus(st: FetcherStatus, err?: Error) {
+ if ( st !== this.currentStatus || err !== this._lastError ) {
+ this.currentStatus = st;
+ this._lastError = err;
+
+ if ( this.statusChangedCallback ) {
+ this.statusChangedCallback();
+ };
+ }
+ }
+
+ setStatusChangedCallback(fn: () => void) {
+ this.statusChangedCallback = fn;
+ }
+
+ /**
+ * Returns a Promise that resolves to the next block of logs in the stream.
+ *
+ * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the
+ * next block of logs in the stream, or null if there are no logs to
+ * return.
+ */
+ get(index: number, opts?: FetcherOptions): Promise<LogDog.LogEntry[]> {
+ return this.getIndex(index, opts);
+ }
+
+ getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> {
+ // Request the tail walkback logs. Since our request for N logs may return
+ // <N logs, we will repeat the request until all requested logs have been
+ // obtained.
+ let allLogs = new Array<LogDog.LogEntry>();
+
+ let getIter = (): Promise<LogDog.LogEntry[]> => {
+ if ( count <= 0 ) {
+ return Promise.resolve(allLogs);
+ }
+
+ // Perform Gets until we have the requested number of logs. We don't have
+ // to constrain the "logCount" parameter b/c we automatically do that in
+ // getIndex.
+ let opts: FetcherOptions = {
+ logCount: count,
+ sparse: true,
+ };
+ return this.getIndex(startIndex, opts).then( (logs) => {
+ if ( logs ) {
+ allLogs.push.apply(allLogs, logs);
+ startIndex += logs.length;
+ count -= logs.length;
+ }
+ return getIter();
+ });
+ };
+ return getIter();
+ }
+
+ tail(): Promise<LogDog.LogEntry[]> {
+ let streamingRetry = Fetcher.streamingRetry.iterator();
+ let tryTail = (): Promise<LogDog.LogEntry[]> => {
+ return this.doTail().then( (logs): Promise<LogDog.LogEntry[]> => {
+ if ( logs && logs.length ) {
+ return Promise.resolve(logs);
+ }
+
+ // No logs were returned, and we expect logs, so we're streaming. Try
+ // again after a delay.
+ this.setCurrentStatus(FetcherStatus.STREAMING);
+ let delay = streamingRetry.next();
+ console.warn(this.stream,
+ `: No logs returned; retrying after ${delay}ms...`);
+ return luci_sleep_promise.sleep(delay).then( () => {
+ return tryTail();
+ });
+ });
+ };
+ return tryTail();
+ }
+
+
+ private getIndex(index: number, opts: FetcherOptions):
+ Promise<LogDog.LogEntry[]> {
+
+ // (Testing) Constrain our max logs, if set.
+ if ( Fetcher.maxLogsPerGet > 0 ) {
+ if ( ! opts ) {
+ opts = {};
+ }
+ if ( (!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet ) {
+ opts.logCount = Fetcher.maxLogsPerGet;
+ }
+ }
+
+ // We will retry continuously until we get a log (streaming).
+ let streamingRetry = Fetcher.streamingRetry.iterator();
+ let tryGet = (): Promise<LogDog.LogEntry[]> => {
+ // If we're asking for a log beyond our stream, don't bother.
+ if ( this.terminalIndex >= 0 && index > this.terminalIndex ) {
+ return Promise.resolve(null);
+ }
+
+ return this.doGet(index, opts).
+ then( (logs) => {
+ if ( logs && logs.length ) {
+ // Since we allow non-contiguous Get, we may get back more logs than
+ // we actually expected. Prune any such additional.
+ if ( opts.logCount > 0 ) {
+ let maxStreamIndex = index + opts.logCount - 1;
+ logs = logs.filter( (le) => {
+ return le.streamIndex <= maxStreamIndex;
+ } );
+ }
+
+ return Promise.resolve(logs);
+ }
+
+ // No logs were returned, and we expect logs, so we're streaming. Try
+ // again after a delay.
+ this.setCurrentStatus(FetcherStatus.STREAMING);
+ let delay = streamingRetry.next();
+ console.warn(this.stream,
+ `: No logs returned; retrying after ${delay}ms...`);
+ return luci_sleep_promise.sleep(delay).then( () => {
+ return tryGet();
+ });
+ });
+ };
+ return tryGet();
+ }
+
+ private doGet(index: number, opts: FetcherOptions):
+ Promise<LogDog.LogEntry[]> {
+
+ let request: {
+ project: string;
+ path: string;
+ state: boolean;
+ index: number;
+
+ nonContiguous?: boolean;
+ byteCount?: number;
+ logCount?: number;
+ } = {
+ project: this.stream.project,
+ path: this.stream.path,
+ state: (this.terminalIndex < 0),
+ index: index,
+ };
+ if ( opts.sparse || this.archived ) {
+ // This log stream is archived. We will relax the contiguous requirement
+ // so we can render sparse log streams.
+ request.nonContiguous = true;
+ }
+ if ( opts ) {
+ if ( opts.byteCount > 0 ) {
+ request.byteCount = opts.byteCount;
+ }
+ if ( opts.logCount > 0 ) {
+ request.logCount = opts.logCount;
+ }
+ }
+
+ if ( this.debug ) {
+ console.log("logdog.Logs.Get:", request);
+ }
+
+ // Perform our Get, waiting until the stream actually exists.
+ return this.doRetryIfMissing( (): Promise<FetchResult> => {
+ return this.client.call("logdog.Logs", "Get", request).
+ then( (resp: GetResponse): FetchResult => {
+ return FetchResult.make(resp, this.lastDesc);
+ });
+ }).then( (fr) => {
+ return this.afterProcessResult(fr);
+ });
+ }
+
+ private doTail(): Promise<LogDog.LogEntry[]> {
+ let request: {
+ project: string;
+ path: string;
+ state: boolean;
+ } = {
+ project: this.stream.project,
+ path: this.stream.path,
+ state: (this.terminalIndex < 0),
+ };
+
+ if ( this.debug ) {
+ console.log("logdog.Logs.Tail:", request);
+ }
+
+ return this.doRetryIfMissing( (): Promise<FetchResult> => {
+ return this.client.call("logdog.Logs", "Tail", request).
+ then( (resp: GetResponse): FetchResult => {
+ return FetchResult.make(resp, this.lastDesc);
+ });
+ }).then( (fr) => {
+ return this.afterProcessResult(fr);
+ });
+ }
+
+ private afterProcessResult(fr: FetchResult): LogDog.LogEntry[] {
+ if ( this.debug ) {
+ if ( fr.logs.length ) {
+ console.log("Request returned:", fr.logs[0].streamIndex, "..",
+ fr.logs[fr.logs.length-1].streamIndex, fr.desc, fr.state);
+ } else {
+ console.log("Request returned no logs:", fr.desc, fr.state);
+ }
+ }
+
+ this.setCurrentStatus(FetcherStatus.IDLE);
+ if ( fr.desc ) {
+ this.lastDesc = fr.desc;
+ }
+ if ( fr.state ) {
+ this.lastState = fr.state;
+ }
+ return fr.logs;
+ }
+
+ private doRetryIfMissing(fn: () => Promise<FetchResult>):
+ Promise<FetchResult> {
+
+ let missingRetry = Fetcher.missingRetry.iterator();
+
+ let doIt = (): Promise<FetchResult> => {
+ this.setCurrentStatus(FetcherStatus.LOADING);
+
+ return fn().catch( (err: Error) => {
+ // Is this a gRPC Error?
+ let grpc = luci_rpc.GrpcError.convert(err);
+ if ( grpc && grpc.code === luci_rpc.Code.NOT_FOUND ) {
+ this.setCurrentStatus(FetcherStatus.MISSING);
+
+ let delay = missingRetry.next();
+ console.warn(this.stream, ": Is not found:", err,
+ `; retrying after ${delay}ms...`);
+ return luci_sleep_promise.sleep(delay).then( () => {
+ return doIt();
+ });
+ }
+
+ this.setCurrentStatus(FetcherStatus.ERROR, err);
+ throw err;
+ });
+ };
+ return doIt();
+ }
+}
+
+export class FetchResult {
+ constructor(readonly logs: LogDog.LogEntry[],
+ readonly desc?: LogDog.LogStreamDescriptor,
+ readonly state?: LogDog.LogStreamState) {}
+
+ static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor):
+ FetchResult {
+
+ let loadDesc: LogDog.LogStreamDescriptor;
+ if ( resp.desc ) {
+ desc = loadDesc = LogDog.makeLogStreamDescriptor(resp.desc);
+ }
+
+ let loadState: LogDog.LogStreamState;
+ if ( resp.state ) {
+ loadState = LogDog.makeLogStreamState( resp.state );
+ }
+
+ let logs = (resp.logs || []).map( (le) => {
+ return LogDog.makeLogEntry(le, desc);
+ });
+ return new FetchResult(logs, loadDesc, loadState);
+ }
+
+}

Powered by Google App Engine
This is Rietveld 408576698