Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(247)

Unified Diff: web/inc/logdog-stream-view/fetcher.ts

Issue 2570963003: Revert of Rewrite LogDog log viewer app. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « web/inc/logdog-app-base/logdog-app-base.html ('k') | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: web/inc/logdog-stream-view/fetcher.ts
diff --git a/web/inc/logdog-stream-view/fetcher.ts b/web/inc/logdog-stream-view/fetcher.ts
deleted file mode 100644
index 86ee70db22af790241c0c61934a08cd75f5dc57d..0000000000000000000000000000000000000000
--- a/web/inc/logdog-stream-view/fetcher.ts
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- Copyright 2016 The LUCI Authors. All rights reserved.
- Use of this source code is governed under the Apache License, Version 2.0
- that can be found in the LICENSE file.
-*/
-
-import {luci_rpc} from "rpc/client";
-import * as luci_sleep_promise from "luci-sleep-promise/promise";
-import {LogDog} from "logdog-stream/logdog";
-
-export type FetcherOptions = {
- byteCount?: number;
- logCount?: number;
- sparse?: boolean;
-}
-
-// Type of a "Get" or "Tail" response (protobuf).
-type GetResponse = {
- state: any;
- desc: any;
- logs: any[];
-};
-
-/** The Fetcher's current status. */
-export enum FetcherStatus {
- // Not doing anything.
- IDLE,
- // Attempting to load log data.
- LOADING,
- // We're waiting for the log stream to emit more logs.
- STREAMING,
- // The log stream is missing.
- MISSING,
- // The log stream encountered an error.
- ERROR
-}
-
-export class Fetcher {
- private client: luci_rpc.Client;
-
- private debug: boolean = false;
- private static maxLogsPerGet = 0;
-
- private lastDesc: LogDog.LogStreamDescriptor;
- private lastState: LogDog.LogStreamState;
- private activePromise: Promise<LogDog.LogEntry[]>;
-
- private currentStatus: FetcherStatus = FetcherStatus.IDLE;
- private _lastError: Error;
- private statusChangedCallback: (() => void);
-
- private static missingRetry: luci_rpc.Retry = new luci_rpc.Retry(
- new luci_rpc.RetryIterator(null, 5000, 15000));
-
- private static streamingRetry = new luci_rpc.Retry(
- new luci_rpc.RetryIterator(null, 1000, 5000));
-
- constructor(client: luci_rpc.Client, readonly stream: LogDog.Stream) {
- this.client = client;
- }
-
- get desc() { return this.lastDesc; }
- get state() { return this.lastState; }
-
- get status() { return this.currentStatus; }
-
- /**
- * Returns the log stream's terminal index.
- *
- * If no terminal index is known, or if the log stream is still streaming,
- * this will return -1.
- */
- get terminalIndex(): number {
- return ( ( this.lastState ) ? this.lastState.terminalIndex : -1 );
- }
-
- /** Archived returns true if this log stream is known to be archived. */
- get archived(): boolean {
- return ( !! (this.lastState && this.lastState.archive) );
- }
-
- get lastError(): Error { return this._lastError; }
-
- private setCurrentStatus(st: FetcherStatus, err?: Error) {
- if ( st !== this.currentStatus || err !== this._lastError ) {
- this.currentStatus = st;
- this._lastError = err;
-
- if ( this.statusChangedCallback ) {
- this.statusChangedCallback();
- };
- }
- }
-
- setStatusChangedCallback(fn: () => void) {
- this.statusChangedCallback = fn;
- }
-
- /**
- * Returns a Promise that resolves to the next block of logs in the stream.
- *
- * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the
- * next block of logs in the stream, or null if there are no logs to
- * return.
- */
- get(index: number, opts?: FetcherOptions): Promise<LogDog.LogEntry[]> {
- return this.getIndex(index, opts);
- }
-
- getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> {
- // Request the tail walkback logs. Since our request for N logs may return
- // <N logs, we will repeat the request until all requested logs have been
- // obtained.
- let allLogs = new Array<LogDog.LogEntry>();
-
- let getIter = (): Promise<LogDog.LogEntry[]> => {
- if ( count <= 0 ) {
- return Promise.resolve(allLogs);
- }
-
- // Perform Gets until we have the requested number of logs. We don't have
- // to constrain the "logCount" parameter b/c we automatically do that in
- // getIndex.
- let opts: FetcherOptions = {
- logCount: count,
- sparse: true,
- };
- return this.getIndex(startIndex, opts).then( (logs) => {
- if ( logs ) {
- allLogs.push.apply(allLogs, logs);
- startIndex += logs.length;
- count -= logs.length;
- }
- return getIter();
- });
- };
- return getIter();
- }
-
- tail(): Promise<LogDog.LogEntry[]> {
- let streamingRetry = Fetcher.streamingRetry.iterator();
- let tryTail = (): Promise<LogDog.LogEntry[]> => {
- return this.doTail().then( (logs): Promise<LogDog.LogEntry[]> => {
- if ( logs && logs.length ) {
- return Promise.resolve(logs);
- }
-
- // No logs were returned, and we expect logs, so we're streaming. Try
- // again after a delay.
- this.setCurrentStatus(FetcherStatus.STREAMING);
- let delay = streamingRetry.next();
- console.warn(this.stream,
- `: No logs returned; retrying after ${delay}ms...`);
- return luci_sleep_promise.sleep(delay).then( () => {
- return tryTail();
- });
- });
- };
- return tryTail();
- }
-
-
- private getIndex(index: number, opts: FetcherOptions):
- Promise<LogDog.LogEntry[]> {
-
- // (Testing) Constrain our max logs, if set.
- if ( Fetcher.maxLogsPerGet > 0 ) {
- if ( ! opts ) {
- opts = {};
- }
- if ( (!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet ) {
- opts.logCount = Fetcher.maxLogsPerGet;
- }
- }
-
- // We will retry continuously until we get a log (streaming).
- let streamingRetry = Fetcher.streamingRetry.iterator();
- let tryGet = (): Promise<LogDog.LogEntry[]> => {
- // If we're asking for a log beyond our stream, don't bother.
- if ( this.terminalIndex >= 0 && index > this.terminalIndex ) {
- return Promise.resolve(null);
- }
-
- return this.doGet(index, opts).
- then( (logs) => {
- if ( logs && logs.length ) {
- // Since we allow non-contiguous Get, we may get back more logs than
- // we actually expected. Prune any such additional.
- if ( opts.logCount > 0 ) {
- let maxStreamIndex = index + opts.logCount - 1;
- logs = logs.filter( (le) => {
- return le.streamIndex <= maxStreamIndex;
- } );
- }
-
- return Promise.resolve(logs);
- }
-
- // No logs were returned, and we expect logs, so we're streaming. Try
- // again after a delay.
- this.setCurrentStatus(FetcherStatus.STREAMING);
- let delay = streamingRetry.next();
- console.warn(this.stream,
- `: No logs returned; retrying after ${delay}ms...`);
- return luci_sleep_promise.sleep(delay).then( () => {
- return tryGet();
- });
- });
- };
- return tryGet();
- }
-
- private doGet(index: number, opts: FetcherOptions):
- Promise<LogDog.LogEntry[]> {
-
- let request: {
- project: string;
- path: string;
- state: boolean;
- index: number;
-
- nonContiguous?: boolean;
- byteCount?: number;
- logCount?: number;
- } = {
- project: this.stream.project,
- path: this.stream.path,
- state: (this.terminalIndex < 0),
- index: index,
- };
- if ( opts.sparse || this.archived ) {
- // This log stream is archived. We will relax the contiguous requirement
- // so we can render sparse log streams.
- request.nonContiguous = true;
- }
- if ( opts ) {
- if ( opts.byteCount > 0 ) {
- request.byteCount = opts.byteCount;
- }
- if ( opts.logCount > 0 ) {
- request.logCount = opts.logCount;
- }
- }
-
- if ( this.debug ) {
- console.log("logdog.Logs.Get:", request);
- }
-
- // Perform our Get, waiting until the stream actually exists.
- return this.doRetryIfMissing( (): Promise<FetchResult> => {
- return this.client.call("logdog.Logs", "Get", request).
- then( (resp: GetResponse): FetchResult => {
- return FetchResult.make(resp, this.lastDesc);
- });
- }).then( (fr) => {
- return this.afterProcessResult(fr);
- });
- }
-
- private doTail(): Promise<LogDog.LogEntry[]> {
- let request: {
- project: string;
- path: string;
- state: boolean;
- } = {
- project: this.stream.project,
- path: this.stream.path,
- state: (this.terminalIndex < 0),
- };
-
- if ( this.debug ) {
- console.log("logdog.Logs.Tail:", request);
- }
-
- return this.doRetryIfMissing( (): Promise<FetchResult> => {
- return this.client.call("logdog.Logs", "Tail", request).
- then( (resp: GetResponse): FetchResult => {
- return FetchResult.make(resp, this.lastDesc);
- });
- }).then( (fr) => {
- return this.afterProcessResult(fr);
- });
- }
-
- private afterProcessResult(fr: FetchResult): LogDog.LogEntry[] {
- if ( this.debug ) {
- if ( fr.logs.length ) {
- console.log("Request returned:", fr.logs[0].streamIndex, "..",
- fr.logs[fr.logs.length-1].streamIndex, fr.desc, fr.state);
- } else {
- console.log("Request returned no logs:", fr.desc, fr.state);
- }
- }
-
- this.setCurrentStatus(FetcherStatus.IDLE);
- if ( fr.desc ) {
- this.lastDesc = fr.desc;
- }
- if ( fr.state ) {
- this.lastState = fr.state;
- }
- return fr.logs;
- }
-
- private doRetryIfMissing(fn: () => Promise<FetchResult>):
- Promise<FetchResult> {
-
- let missingRetry = Fetcher.missingRetry.iterator();
-
- let doIt = (): Promise<FetchResult> => {
- this.setCurrentStatus(FetcherStatus.LOADING);
-
- return fn().catch( (err: Error) => {
- // Is this a gRPC Error?
- let grpc = luci_rpc.GrpcError.convert(err);
- if ( grpc && grpc.code === luci_rpc.Code.NOT_FOUND ) {
- this.setCurrentStatus(FetcherStatus.MISSING);
-
- let delay = missingRetry.next();
- console.warn(this.stream, ": Is not found:", err,
- `; retrying after ${delay}ms...`);
- return luci_sleep_promise.sleep(delay).then( () => {
- return doIt();
- });
- }
-
- this.setCurrentStatus(FetcherStatus.ERROR, err);
- throw err;
- });
- };
- return doIt();
- }
-}
-
-export class FetchResult {
- constructor(readonly logs: LogDog.LogEntry[],
- readonly desc?: LogDog.LogStreamDescriptor,
- readonly state?: LogDog.LogStreamState) {}
-
- static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor):
- FetchResult {
-
- let loadDesc: LogDog.LogStreamDescriptor;
- if ( resp.desc ) {
- desc = loadDesc = LogDog.makeLogStreamDescriptor(resp.desc);
- }
-
- let loadState: LogDog.LogStreamState;
- if ( resp.state ) {
- loadState = LogDog.makeLogStreamState( resp.state );
- }
-
- let logs = (resp.logs || []).map( (le) => {
- return LogDog.makeLogEntry(le, desc);
- });
- return new FetchResult(logs, loadDesc, loadState);
- }
-
-}
« no previous file with comments | « web/inc/logdog-app-base/logdog-app-base.html ('k') | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698