Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Unified Diff: web/inc/logdog-stream-view/viewer.ts

Issue 2570963003: Revert of Rewrite LogDog log viewer app. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « web/inc/logdog-stream-view/query.ts ('k') | web/inc/logdog-stream/logdog.ts » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: web/inc/logdog-stream-view/viewer.ts
diff --git a/web/inc/logdog-stream-view/viewer.ts b/web/inc/logdog-stream-view/viewer.ts
deleted file mode 100644
index 60dcf8f65dd4cc27efb8e5852f4fbb7a9e9c5004..0000000000000000000000000000000000000000
--- a/web/inc/logdog-stream-view/viewer.ts
+++ /dev/null
@@ -1,1150 +0,0 @@
-/*
- Copyright 2016 The LUCI Authors. All rights reserved.
- Use of this source code is governed under the Apache License, Version 2.0
- that can be found in the LICENSE file.
-*/
-
-import {Fetcher, FetcherOptions, FetcherStatus}
- from "logdog-stream-view/fetcher";
-import {LogDogQuery, QueryParams, StreamType} from "logdog-stream-view/query";
-import {LogDog} from "logdog-stream/logdog";
-import * as luci_sleep_promise from "luci-sleep-promise/promise";
-import {luci_rpc} from "rpc/client";
-
-/** Sentinel error: not authenticated. */
-let NotAuthenticatedError = new Error("Not Authenticated");
-
-function resolveErr(err: Error) {
- let grpc = luci_rpc.GrpcError.convert(err);
- if ( grpc && grpc.code == luci_rpc.Code.UNAUTHENTICATED ) {
- return NotAuthenticatedError;
- }
- return err;
-}
-
-/** Stream status entry, as rendered by the view. */
-type StreamStatusEntry = {
- name: string;
- desc: string;
-};
-
-/** An individual log stream's status. */
-type LogStreamStatus = {
- stream: LogDog.Stream;
- state: string;
- fetchStatus: FetcherStatus;
- finished: boolean;
- needsAuth: boolean;
-}
-
-type StreamStatusCallback = (v: LogStreamStatus[]) => void;
-
-export enum Location {
- /**
- * Represents the upper half of a split view. Logs start at 0 and go through
- * the HEAD point.
- */
- HEAD,
- /**
- * Represents the lower half of the split view. Logs start at the TAIL point
- * and go through the BOTTOM anchor point.
- */
- TAIL,
- /**
- * Represents an anchor point where the split occurred, obtained through a
- * single "Tail()" RPC call. If the terminal index is known when the split
- * occurs, this should be the terminal index.
- */
- BOTTOM,
-}
-
-export enum LoadingState {
- NONE,
- RESOLVING,
- LOADING,
- RENDERING,
- NEEDS_AUTH,
- ERROR,
-}
-
-/** Represents control visibility in the view. */
-type Controls = {
- /** Are we completely finished loading stream data? */
- canSplit: boolean;
- /** Are we currently split? */
- split: boolean;
- /** Show the bottom bar? */
- bottom: boolean;
- /** Is the content fully loaded? */
- fullyLoaded: boolean;
-
- /** Text in the status bar. */
- loadingState: LoadingState;
- /** Stream status entries, or null for no status window. */
- streamStatus: StreamStatusEntry[];
-}
-
-/** Registered callbacks from the LogDog stream view. */
-type ViewBinding = {
- client: luci_rpc.Client;
- mobile: boolean;
-
- pushLogEntries: (entries: LogDog.LogEntry[], l: Location) => void;
- clearLogEntries: () => void;
-
- updateControls: (c: Controls) => void;
- locationIsVisible: (l: Location) => boolean;
-};
-
-/** Interface of the specific Model functions used by the view. */
-interface ModelInterface {
- fetch(cancel: boolean): Promise<void>;
- split(): Promise<void>;
-
- reset(): void;
- setAutomatic(v: boolean): void;
- setTailing(v: boolean): void;
- notifyAuthenticationChanged(): void;
-}
-
-export class Model implements ModelInterface {
- /** If performing a small initial fetch, this is the size of the fetch. */
- private static SMALL_INITIAL_FETCH_SIZE = (1024 * 4);
- /** If performing a large initial fetch, this is the size of the fetch. */
- private static LARGE_INITIAL_FETCH_SIZE = (1024 * 24);
- /** If fetching on a mobile device, fetch in this chunk size. */
- private static MOBILE_FETCH_SIZE = (1024 * 256);
- /** For standard fetching, fetch with this size. */
- private static STANDARD_FETCH_SIZE = (4 * 1024 * 1024);
-
- /**
- * If >0, the maximum number of log lines to push at a time. We will sleep
- * in between these entries to allow the rest of the app to be responsive
- * during log dumping.
- */
- private static logAppendInterval = 4000;
- /** Amount of time to sleep in between log append chunks. */
- private static logAppendDelay = 0;
-
- /** Our log provider. */
- private provider: LogProvider = this.nullProvider();
-
- /**
- * Promise that is resolved when authentication state changes. When this
- * happens, a new Promise is installed, and future authentication changes
- * will resolve the new Promise.
- */
- private authChangedPromise: Promise<void> = null;
- /**
- * Retained callback (Promise resolve) to invoke when authentication state
- * changes.
- */
- private authChangedCallback: (() => void) = null;
-
- /** The current fetch Promise. */
- private currentFetch: Promise<void>;
- /** The current fetch token. */
- private currentFetchToken: FetchToken;
-
- /** Are we in automatic mode? */
- private automatic = false;
- /** Are we tailing? */
- private tailing = false;
- /** Are we in the middle of rendering logs? */
- private rendering = true;
-
- private _loadingState: LoadingState = LoadingState.NONE;
- private _streamStatus: StreamStatusEntry[];
-
- /**
- * When rendering a Promise that will resolve when the render completes. We
- * use this to pipeline parallel data fetching and rendering.
- */
- private renderPromise: Promise<void>;
-
- constructor(readonly view: ViewBinding) {
- this.resetAuthChanged();
- }
-
- resolve(paths: string[]): Promise<void> {
- this.reset();
-
- // For any path that is a query, execute that query.
- this.loadingState = LoadingState.RESOLVING;
- return Promise.all( paths.map( (path): Promise<LogDog.Stream[]> => {
- let stream = LogDog.Stream.splitProject(path);
- if ( ! LogDogQuery.isQuery(stream.path) ) {
- return Promise.resolve([stream]);
- }
-
- // This "path" is really a query. Construct and execute.
- let query = new LogDogQuery(this.view.client);
- let doQuery = (): Promise<LogDog.Stream[]> => {
- return query.getAll({
- project: stream.project,
- path: stream.path,
- streamType: StreamType.TEXT,
- }, 100).then( (result): LogDog.Stream[] => {
- return result.map( (qr): LogDog.Stream => {
- return qr.stream;
- } );
- }).catch( (err: Error) => {
- err = resolveErr(err);
- if ( err == NotAuthenticatedError ) {
- return this.authChangedPromise.then( () => {
- return doQuery();
- } );
- }
-
- throw err;
- });
- }
- return doQuery();
- } ) ).then( (streamBlocks) => {
- let streams = new Array<LogDog.Stream>();
- (streamBlocks || []).forEach( (streamBlock) => {
- streams.push.apply(streams, streamBlock);
- } );
-
-
- let initialFetchSize = ( (streams.length === 1) ?
- Model.LARGE_INITIAL_FETCH_SIZE : Model.SMALL_INITIAL_FETCH_SIZE );
-
- // Determine our fetch size.
- let maxFetchSize = ( (this.view.mobile) ?
- Model.MOBILE_FETCH_SIZE : Model.STANDARD_FETCH_SIZE );
-
- // Generate a LogStream client entry for each composite stream.
- let logStreams = streams.map( (stream) => {
- console.log("Resolved log stream:", stream);
- return new LogStream(
- this.view.client, stream, initialFetchSize, maxFetchSize);
- });
-
- // Reset any existing state.
- this.reset();
-
- // If we have exactly one stream, then use it directly. This allows it to
- // split.
- let provider: LogProvider;
- switch( logStreams.length ) {
- case 0:
- provider = this.nullProvider();
- break;
- case 1:
- provider = logStreams[0];
- break;
- default:
- provider = new AggregateLogStream(logStreams);
- break
- }
- provider.setStreamStatusCallback((st: LogStreamStatus[]) => {
- if ( this.provider === provider ) {
- this.streamStatus = this.buildStreamStatus(st);
- }
- });
- this.provider = provider;
- this.loadingState = LoadingState.NONE;
- } ).catch( (err: Error) => {
- this.loadingState = LoadingState.ERROR;
- console.error("Failed to resolve log streams:", err);
- });
- }
-
- reset() {
- this.view.clearLogEntries();
- this.clearCurrentFetch();
- this.provider = this.nullProvider();
-
- this.updateControls();
- }
-
- private nullProvider(): LogProvider { return new AggregateLogStream([]); }
-
- private mintFetchToken(): FetchToken {
- return (this.currentFetchToken = new FetchToken( (tok: FetchToken) => {
- return (tok === this.currentFetchToken);
- } ));
- }
-
- private clearCurrentFetch() {
- this.currentFetch = this.currentFetchToken = null;
- this.rendering = false;
- }
-
- private get loadingState(): LoadingState { return this._loadingState; }
- private set loadingState(v: LoadingState) {
- if( v != this._loadingState ) {
- this._loadingState = v;
- this.updateControls();
- }
- }
-
- private get streamStatus(): StreamStatusEntry[] { return this._streamStatus; }
- private set streamStatus(st: StreamStatusEntry[]) {
- this._streamStatus = st;
- this.updateControls();
- }
-
- private updateControls() {
- this.view.updateControls({
- canSplit: this.providerCanSplit,
- split: this.isSplit,
- bottom: !this.fetchedEndOfStream,
- fullyLoaded: (this.fetchedFullStream && (! this.rendering)),
- loadingState: this.loadingState,
- streamStatus: this.streamStatus,
- });
- }
-
- /**
- * Note that the authentication state for the client has changed. This will
- * trigger an automatic fetch retry if our previous fetch failed due to
- * lack of authentication.
- */
- notifyAuthenticationChanged() {
- // Resolve our current "auth changed" Promise.
- this.authChangedCallback();
- }
-
- private resetAuthChanged() {
- // Resolve our previous function, if it's not already resolved.
- if ( this.authChangedCallback ) {
- this.authChangedCallback();
- }
-
- // Create a new Promise and install it.
- this.authChangedPromise = new Promise<void>((resolve, reject) => {
- this.authChangedCallback = resolve;
- });
- }
-
- split(): Promise<void> {
- // If we haven't already split, and our provider lets us split, then go
- // ahead and do so.
- if ( this.providerCanSplit ) {
- return this.fetchLocation(Location.TAIL, true);
- }
- return this.fetch(false);
- }
-
- fetch(cancel: boolean): Promise<void> {
- if ( this.isSplit ) {
- if ( this.tailing ) {
- // Next fetch grabs logs from the bottom (continue tailing).
- if ( ! this.fetchedEndOfStream ) {
- return this.fetchLocation(Location.BOTTOM, false);
- } else {
- return this.fetchLocation(Location.TAIL, false);
- }
- }
-
- // We're split, but not tailing, so fetch logs from HEAD.
- return this.fetchLocation(Location.HEAD, false);
- }
-
- // We're not split. If we haven't reached end of stream, fetch logs from
- // HEAD.
- return this.fetchLocation(Location.HEAD, false);
- }
-
- /** Fetch logs from an explicit location. */
- fetchLocation(l: Location, cancel: boolean) {
- if ( this.currentFetch && (!cancel) ) {
- return this.currentFetch;
- }
-
- // If our provider is finished, then do nothing.
- if ( this.fetchedFullStream ) {
- // There are no more logs.
- return Promise.resolve(null);
- }
-
- // If we're asked to fetch BOTTOM, but we're not split, fetch HEAD instead.
- if ( l === Location.BOTTOM && ! this.isSplit ) {
- l = Location.HEAD;
- }
-
- // If we're not split, always fetch from BOTTOM.
- this.loadingState = LoadingState.LOADING;
-
- // Rotate our fetch ID. This will effectively cancel any pending fetches.
- let token = this.mintFetchToken();
- return (this.currentFetch = this.provider.fetch(l, token).then( (buf) => {
- // Clear our fetching status.
- this.rendering = true;
- this.loadingState = LoadingState.RENDERING;
- let pushLogsPromise: Promise<void>;
- let hasLogs = (buf && buf.peek());
-
- // Resolve any previous rendering Promise that we have. This makes sure
- // our rendering and fetching don't get more than one round out of sync.
- return (this.renderPromise || Promise.resolve(null)).then( () => {
- // Post-fetch cleanup.
- this.clearCurrentFetch();
-
- // Clear our loading state (updates controls automatically).
- this.loadingState = LoadingState.RENDERING;
-
- // Initiate the next render. This will happen in the background while
- // we enqueue our next fetch.
- this.renderPromise = this.renderLogs(buf, l).then( () => {
- this.renderPromise = null;
- if ( this.loadingState === LoadingState.RENDERING ) {
- this.loadingState = LoadingState.NONE;
- }
- });
-
- if ( this.fetchedFullStream ) {
- // If we're finished now, perform our finished cleanup.
- return;
- }
-
- // The fetch is finished. If we're automatic, and we got logs, start the
- // next.
- if ( this.automatic && hasLogs ) {
- console.log("Automatic: starting next fetch.")
- return this.fetch(false);
- }
- });
- }).catch( (err: Error) => {
- // If we've been canceled, discard this result.
- if ( ! token.valid ) {
- return
- }
-
- this.clearCurrentFetch();
- if ( err === NotAuthenticatedError ) {
- this.loadingState = LoadingState.NEEDS_AUTH;
-
- // We failed because we were not authenticated. Mark this so we can
- // retry if that state changes.
- return this.authChangedPromise.then( () => {
- // Our authentication state changed during the fetch! Retry
- // automatically.
- this.fetchLocation(l, false);
- });
- }
-
- console.error("Failed to load log streams:", err);
- }));
- }
-
- private renderLogs(buf: BufferedLogs, l: Location): Promise<void> {
- if ( ! (buf && buf.peek()) ) {
- return Promise.resolve(null);
- }
-
- let lines = 0;
- let logBlock = new Array<LogDog.LogEntry>();
- let appendBlock = () => {
- if ( logBlock.length ) {
- console.log("Rendering", logBlock.length, "logs...");
- this.view.pushLogEntries(logBlock, l);
- logBlock.length = 0;
- lines = 0;
-
- // Update our status and controls.
- this.updateControls();
- }
- };
-
- // Create a promise loop to push logs at intervals.
- let pushLogs = (): Promise<void> => {
- return Promise.resolve().then( () => {
- // Add logs until we reach our interval lines.
- for ( let nextLog = buf.next(); (nextLog); nextLog = buf.next() ) {
- // If we've exceeded our burst, then interleave a sleep (yield).
- if (Model.logAppendInterval > 0 &&
- lines >= Model.logAppendInterval ) {
- appendBlock();
-
- return luci_sleep_promise.sleep(Model.logAppendDelay).then(
- () => {
- // Enqueue the next push round.
- return pushLogs();
- } );
- }
-
- // Add the next log to the append block.
- logBlock.push(nextLog);
- if ( nextLog.text && nextLog.text.lines ) {
- lines += nextLog.text.lines.length;
- }
- }
-
- // If there are any buffered logs, append that block.
- appendBlock();
- });
- }
- return pushLogs();
- }
-
- setTailing(v: boolean) {
- this.tailing = v;
- }
-
- setAutomatic(v: boolean) {
- this.automatic = v;
- if ( v ) {
- // Passively kick off a new fetch.
- this.fetch(false);
- }
- }
-
- private buildStreamStatus(v: LogStreamStatus[]): StreamStatusEntry[] {
- let maxStatus = FetcherStatus.IDLE;
- let maxStatusCount = 0;
- let needsAuth = false;
-
- // Prune any finished entries and accumulate them for status bar change.
- v = (v || []).filter( (st) => {
- needsAuth = (needsAuth || st.needsAuth);
-
- if ( st.fetchStatus > maxStatus ) {
- maxStatus = st.fetchStatus;
- maxStatusCount = 1;
- } else if ( st.fetchStatus === maxStatus ) {
- maxStatusCount++;
- }
-
- return (! st.finished);
- });
-
- return v.map( (st): StreamStatusEntry => {
- return {
- name: ".../+/" + st.stream.name,
- desc: st.state,
- };
- } );
- }
-
- private get providerCanSplit(): boolean {
- let split = this.provider.split();
- return (!! (split && split.canSplit()));
- }
-
- private get isSplit(): boolean {
- let split = this.provider.split();
- return ( !! (split && split.isSplit()) );
- }
-
- private get fetchedEndOfStream(): boolean {
- return (this.provider.fetchedEndOfStream());
- }
-
- private get fetchedFullStream(): boolean {
- return (this.fetchedEndOfStream && (! this.isSplit));
- }
-}
-
-/**
- * A token used to repesent an individual fetch. A token can assert whether its
- * fetch has been invalidated.
- */
-class FetchToken {
- private validate: (tok: FetchToken) => boolean;
-
- constructor(validate: (tok: FetchToken) => boolean) {
- this.validate = validate;
- }
-
- get valid(): boolean {
- return this.validate(this);
- }
-
- do<T>(p: Promise<T>): Promise<T> {
- return p.then( (v): T => {
- if ( ! this.valid ) {
- throw new Error("Token has been invalidated, discarding fetch.");
- }
- return v
- } );
- }
-}
-
-interface LogProvider {
- setStreamStatusCallback(cb: StreamStatusCallback): void;
- fetch(l: Location, token: FetchToken): Promise<BufferedLogs>;
-
- /** Will return null if this LogProvider doesn't support splitting. */
- split(): SplitLogProvider;
- fetchedEndOfStream(): boolean;
-}
-
-/** Additional methods for log stream splitting, if supported. */
-interface SplitLogProvider {
- canSplit(): boolean
- isSplit(): boolean;
-}
-
-/** A LogStream is a LogProvider manages a single log stream. */
-class LogStream implements LogProvider {
- /**
- * Always begin with a small fetch. We'll disable this afterward the first
- * finishes.
- */
- private initialFetch = true;
-
- private fetcher: Fetcher;
-
- /** The log stream index of the next head() log. */
- private nextHeadIndex = 0;
- /**
- * The lowest log stream index of all of the tail logs. If this is <0, then
- * it is uninitialized.
- */
- private firstTailIndex = -1;
- /**
- * The next log stream index to fetch to continue pulling logs from the
- * bottom. If this is <0, it is uninitialized.
- */
- private nextBottomIndex = -1;
-
- private streamStatusCallback: StreamStatusCallback;
-
- /** The size of the tail walkback region. */
- private static TAIL_WALKBACK = 500;
-
- constructor(client: luci_rpc.Client, readonly stream: LogDog.Stream,
- readonly initialFetchSize: number,
- readonly maxFetchSize: number ) {
- this.fetcher = new Fetcher(client, stream);
- this.fetcher.setStatusChangedCallback( () => {
- this.statusChanged();
- });
- }
-
- get fetcherStatus(): FetcherStatus { return this.fetcher.status; }
-
- fetch(l: Location, token: FetchToken): Promise<BufferedLogs> {
- // Determine which method to use based on the insertion point and current
- // log stream fetch state.
- let getLogs: Promise<LogDog.LogEntry[]>;
- switch( l ) {
- case Location.HEAD:
- getLogs = this.getHead(token);
- break;
-
- case Location.TAIL:
- getLogs = this.getTail(token);
- break;
-
- case Location.BOTTOM:
- getLogs = this.getBottom(token);
- break;
- }
-
- return getLogs.then( (logs: LogDog.LogEntry[]) => {
- this.initialFetch = false;
- this.statusChanged();
- return new BufferedLogs(logs);
- }).catch( (err: Error) => {
- err = resolveErr(err);
- throw err;
- });
- }
-
- setStreamStatusCallback(cb: StreamStatusCallback) {
- this.streamStatusCallback = cb;
- }
-
- private statusChanged() {
- if ( this.streamStatusCallback ) {
- this.streamStatusCallback([this.getStreamStatus()]);
- }
- }
-
- getStreamStatus(): LogStreamStatus {
- let pieces = new Array<string>();
- let tidx = this.fetcher.terminalIndex;
- if ( this.nextHeadIndex > 0 ) {
- pieces.push("1.." + this.nextHeadIndex);
- } else {
- pieces.push("0");
- }
- if ( this.isSplit() ) {
- if ( tidx >= 0 ) {
- pieces.push("| " + this.firstTailIndex + " / " + tidx);
- tidx = -1;
- } else {
- pieces.push("| " + this.firstTailIndex + ".." + this.nextBottomIndex +
- " ...");
- }
- } else if (tidx >= 0) {
- pieces.push("/ " + tidx);
- } else {
- pieces.push("...");
- }
-
- let needsAuth = false;
- let finished = this.finished;
- if ( finished ) {
- pieces.push("(Finished)");
- } else {
- switch ( this.fetcher.status ) {
- case FetcherStatus.IDLE:
- case FetcherStatus.LOADING:
- pieces.push("(Loading)");
- break;
-
- case FetcherStatus.STREAMING:
- pieces.push("(Streaming)");
- break;
-
- case FetcherStatus.MISSING:
- pieces.push("(Missing)");
- break;
-
- case FetcherStatus.ERROR:
- let err = resolveErr(this.fetcher.lastError);
- if (err === NotAuthenticatedError ) {
- pieces.push("(Auth Error)");
- needsAuth = true;
- } else {
- pieces.push("(Error)");
- }
- break;
- }
- }
-
- return {
- stream: this.stream,
- state: pieces.join(" "),
- finished: finished,
- fetchStatus: this.fetcher.status,
- needsAuth: needsAuth,
- };
- }
-
- split(): SplitLogProvider {
- return this;
- }
-
- isSplit(): boolean {
- // We're split if we have a bottom and we're not finished tailing.
- return ( this.firstTailIndex >= 0 &&
- (this.nextHeadIndex < this.firstTailIndex) );
- }
-
- canSplit(): boolean {
- return ( ! (this.isSplit() || this.caughtUp) );
- }
-
- private get caughtUp(): boolean {
- // We're caught up if we have both a head and bottom index, and the head
- // is at or past the bottom.
- return ( this.nextHeadIndex >= 0 && this.nextBottomIndex >= 0 &&
- this.nextHeadIndex >= this.nextBottomIndex );
- }
-
- fetchedEndOfStream(): boolean {
- let tidx = this.fetcher.terminalIndex;
- return ( tidx >= 0 && (
- (this.nextHeadIndex > tidx) || (this.nextBottomIndex > tidx) ) );
- }
-
- private get finished(): boolean {
- return ( (! this.isSplit()) && this.fetchedEndOfStream() );
- }
-
- private updateIndexes() {
- if ( this.firstTailIndex >= 0 ) {
- if ( this.nextBottomIndex < this.firstTailIndex ) {
- this.nextBottomIndex = this.firstTailIndex + 1;
- }
-
- if ( this.nextHeadIndex >= this.firstTailIndex &&
- this.nextBottomIndex >= 0) {
- // Synchronize our head and bottom pointers.
- this.nextHeadIndex = this.nextBottomIndex =
- Math.max(this.nextHeadIndex, this.nextBottomIndex);
- }
- }
- }
-
- private nextFetcherOptions(): FetcherOptions {
- let opts: FetcherOptions = {};
- if ( this.initialFetch ) {
- opts.byteCount = this.initialFetchSize;
- } else if ( this.maxFetchSize > 0 ) {
- opts.byteCount = this.maxFetchSize;
- }
- return opts;
- }
-
- private getHead(token: FetchToken): Promise<LogDog.LogEntry[]> {
- this.updateIndexes();
-
- if ( this.finished ) {
- // Our HEAD region has met/surpassed our TAIL region, so there are no
- // HEAD logs to return. Only bottom.
- return Promise.resolve();
- }
-
- // If we have a tail pointer, only fetch HEAD up to that point.
- let opts = this.nextFetcherOptions();
- if ( this.firstTailIndex >= 0 ) {
- opts.logCount = (this.firstTailIndex - this.nextHeadIndex);
- }
-
- return token.do( this.fetcher.get(this.nextHeadIndex, opts) ).then(
- (logs) => {
- if ( logs && logs.length ) {
- this.nextHeadIndex = (logs[logs.length - 1].streamIndex + 1);
- this.updateIndexes();
- }
- return logs;
- } );
- }
-
- private getTail(token: FetchToken): Promise<LogDog.LogEntry[]> {
- // If we haven't performed a Tail before, start with one.
- if ( this.firstTailIndex < 0 ) {
- let tidx = this.fetcher.terminalIndex;
- if ( tidx < 0 ) {
- return token.do( this.fetcher.tail() ).then( (logs) => {
- // Mark our initial "tail" position.
- if ( logs && logs.length ) {
- this.firstTailIndex = logs[0].streamIndex;
- this.updateIndexes();
- }
- return logs;
- } );
- }
-
- this.firstTailIndex = (tidx+1);
- this.updateIndexes();
- }
-
- // We're doing incremental reverse fetches. If we're finished tailing,
- // return no logs.
- if ( ! this.isSplit() ) {
- return Promise.resolve(null);
- }
-
- // Determine our walkback region.
- let startIndex = this.firstTailIndex - LogStream.TAIL_WALKBACK;
- if ( this.nextHeadIndex >= 0 ) {
- if ( startIndex < this.nextHeadIndex ) {
- startIndex = this.nextHeadIndex;
- }
- } else if ( startIndex < 0 ) {
- startIndex = 0;
- }
- let count = (this.firstTailIndex - startIndex);
-
- // Fetch the full walkback region.
- return token.do( this.fetcher.getAll(startIndex, count) ).then( (logs) => {
- this.firstTailIndex = startIndex;
- this.updateIndexes();
- return logs;
- });
- }
-
- private getBottom(token: FetchToken): Promise<LogDog.LogEntry[]> {
- this.updateIndexes();
-
- // If there are no more logs in the stream, return no logs.
- if ( this.fetchedEndOfStream() ) {
- return Promise.resolve(null);
- }
-
- // If our bottom index isn't initialized, initialize it via tail.
- if ( this.nextBottomIndex < 0 ) {
- return this.getTail(token);
- }
-
- let opts = this.nextFetcherOptions();
- return token.do( this.fetcher.get(this.nextBottomIndex, opts) ).then(
- (logs) => {
- if ( logs && logs.length ) {
- this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1);
- }
- return logs;
- } );
- }
-}
-
-/**
- * An aggregate log stream. It presents a single-stream view, but is really
- * composed of several log streams interleaved based on their prefix indices
- * (if they share a prefix) or timestamps (if they don't).
- *
- * At least one log entry from each stream must be buffered before any log
- * entries can be yielded, since we don't know what ordering to apply otherwise.
- * To make this fast, we will make the first request for each stream small so
- * it finishes quickly and we can start rendering. Subsequent entries will be
- * larger for efficiency.
- *
- * @param {LogStream} streams the composite streams.
- */
-class AggregateLogStream implements LogProvider {
-
- private streams: AggregateLogStream.Entry[];
- private active: AggregateLogStream.Entry[];
- private currentNextPromise: Promise<BufferedLogs>;
- private compareLogs: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number;
-
- private streamStatusCallback: StreamStatusCallback;
-
- constructor(streams: LogStream[]) {
- // Input streams, ordered by input order.
- this.streams = streams.map<AggregateLogStream.Entry>( (ls, i) => {
- ls.setStreamStatusCallback( (st: LogStreamStatus[]) => {
- if ( st ) {
- this.streams[i].status = st[0];
- this.statusChanged();
- }
- });
-
- return {
- ls: ls,
- buffer: null,
- needsAuth: false,
- status: ls.getStreamStatus(),
- };
- } );
-
- // Subset of input streams that are still active (not finished).
- this.active = this.streams;
-
- // The currently-active "next" promise.
- this.currentNextPromise = null;
-
- // Determine our log comparison function. If all of our logs share a prefix,
- // we will use the prefix index. Otherwise, we will use the timestamp.
- let template: LogDog.Stream = null;
- let sharedPrefix = this.streams.every( (entry) => {
- if ( ! template ) {
- template = entry.ls.stream;
- return true;
- }
- return template.samePrefixAs(entry.ls.stream);
- });
-
- this.compareLogs = (( sharedPrefix ) ?
- (a, b) => {
- return (a.prefixIndex - b.prefixIndex);
- } :
- (a, b) => {
- return a.timestamp.getTime() - b.timestamp.getTime();
- });
- }
-
- split(): SplitLogProvider { return null; }
- fetchedEndOfStream(): boolean { return ( ! this.active.length ); }
-
- setStreamStatusCallback(cb: StreamStatusCallback) {
- this.streamStatusCallback = cb;
- }
-
- private statusChanged() {
- if ( this.streamStatusCallback ) {
- // Iterate through our composite stream statuses and pick the one that we
- // want to report.
- this.streamStatusCallback( this.streams.map( (entry): LogStreamStatus => {
- return entry.status;
- } ));
- }
- }
-
- /**
- * Implements LogProvider.next
- */
- fetch(l: Location, token: FetchToken): Promise<BufferedLogs> {
- // If we're already are fetching the next buffer, this is an error.
- if (this.currentNextPromise) {
- throw new Error("In-progress next(), cannot start another.");
- }
-
- // Filter out any finished streams from our active list. A stream is
- // finished if it is finished streaming and we don't have a retained buffer
- // from it.
- //
- // This updates our "finished" property, since it's derived from the length
- // of our active array.
- this.active = this.active.filter( (entry) => {
- return ( (! entry.buffer) || entry.buffer.peek() ||
- (! entry.ls.fetchedEndOfStream()) );
- });
-
- if ( ! this.active.length ) {
- // No active streams, so we're finished. Permanently set our promise to
- // the finished state.
- return Promise.resolve();
- }
-
- // Fill all buffers for all active streams. This may result in an RPC to
- // load new buffer content for streams whose buffers are empty.
- //
- // If any stream doesn't currently have buffered logs, we will call their
- // "next()" methods to pull the next set of logs. This will result in one of
- // three possibilities:
- // - A BufferedLogs will be returned containing the next logs for this stream.
- // The log stream may also be finished.
- // - null will be returned, and this log stream must now be finished.
- // - An error will be returned.
- //
- // The error is interesting, since we must present a common error view to our
- // caller. If all returned errors are "NotAuthenticatedError", we will return
- // a NotAuthenticatedError. Otherwise, we will return a generic "streams
- // failed" error.
- //
- // The outer Promise will pull logs for any streams that don't have any.
- // On success, the "buffer" for the entry will be populated. On failure, an
- // error will be returned. Because Promise.all fails fast, we will catch inner
- // errors and return them as values (null if no error).
- this.currentNextPromise = Promise.all( this.active.map( (entry) => {
- // If the entry's buffer still has data, use it immediately.
- if (entry.buffer && entry.buffer.peek()) {
- return null;
- }
-
- // No buffered logs. Call the stream's "next()" method to get some.
- return entry.ls.fetch(Location.HEAD, token).then(
- (buffer): Error => {
- // Retain this buffer.
- entry.buffer = buffer;
- return null;
- }
- ).catch( (error: Error) => {
- // Log stream source of error. Raise a generic "failed to buffer"
- // error. This will become a permanent failure.
- console.error("Error loading buffer for", entry.ls.stream.fullName(),
- "(", entry.ls, "): ", error);
- return error;
- });
- })).then( (results: Error[]): BufferedLogs => {
- // Identify any errors that we hit.
- let buffers = new Array<BufferedLogs>(this.active.length);
- let errors: Error[] = [];
- results.forEach( (err, idx) => {
- buffers[idx] = this.active[idx].buffer;
- if ( err ) { errors[idx] = err; }
- });
-
- // We are done, and will return a value.
- this.currentNextPromise = null;
- if ( errors.length ) {
- throw this._aggregateErrors(errors);
- }
- return this._aggregateBuffers(buffers);
- });
-
- return this.currentNextPromise;
- }
-
- private _aggregateErrors(errors: Error[]): Error {
- let isNotAuthenticated = false;
- errors.every( (err) => {
- if ( ! err ) { return true; }
- if ( err === NotAuthenticatedError ) {
- isNotAuthenticated = true;
- return true;
- }
- isNotAuthenticated = false;
- return false;
- });
- return (( isNotAuthenticated ) ?
- (NotAuthenticatedError) : new Error("Stream Error"));
- }
-
- private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs {
- switch ( buffers.length ) {
- case 0:
- // No buffers, so no logs.
- return new BufferedLogs(null);
- case 1:
- // As a special case, if we only have one buffer, and we assume that its
- // entries are sorted, then that buffer is a return value.
- return new BufferedLogs(buffers[0].getAll());
- }
-
- // Preload our peek array.
- let incomplete = false;
- let peek = buffers.map(function(buf) {
- var le = buf.peek();
- if (! le) {
- incomplete = true;
- }
- return le;
- });
- if (incomplete) {
- // One of our input buffers had no log entries.
- return new BufferedLogs(null);
- }
-
- // Assemble our aggregate buffer array.
- let entries: LogDog.LogEntry[] = [];
- while (true) {
- // Choose the next stream.
- var earliest = 0;
- for (var i = 1; i < buffers.length; i++) {
- if (this.compareLogs(peek[i], peek[earliest]) < 0) {
- earliest = i;
- }
- }
-
- // Get the next log from the earliest stream.
- entries.push(buffers[earliest].next());
-
- // Repopulate that buffer's "peek" value. If the buffer has no more
- // entries, then we're done.
- var next = buffers[earliest].peek();
- if (!next) {
- return new BufferedLogs(entries);
- }
- peek[earliest] = next;
- }
- }
-}
-
-module AggregateLogStream {
- export type Entry = {
- ls: LogStream;
- buffer: BufferedLogs;
- needsAuth: boolean;
- status: LogStreamStatus;
- }
-}
-
-/**
- * A buffer of ordered log entries.
- *
- * Assumes total ownership of the input log buffer, which can be null to
- * indicate no logs.
- */
-class BufferedLogs {
- private logs: LogDog.LogEntry[] | null;
- private index: number;
-
- constructor(logs: LogDog.LogEntry[] | null) {
- this.logs = logs;
- this.index = 0;
- }
-
- peek(): LogDog.LogEntry | null {
- return (this.logs) ? (this.logs[this.index]) : (null);
- }
-
- getAll(): LogDog.LogEntry[] {
- // Pop all logs.
- var logs = this.logs;
- this.logs = null;
- return logs;
- }
-
- next() : LogDog.LogEntry | null {
- if (! (this.logs && this.logs.length)) {
- return null;
- }
-
- // Get the next log and increment our index.
- var log = this.logs[this.index++];
- if (this.index >= this.logs.length) {
- this.logs = null;
- }
- return log;
- }
-}
« no previous file with comments | « web/inc/logdog-stream-view/query.ts ('k') | web/inc/logdog-stream/logdog.ts » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698