Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(46)

Unified Diff: web/inc/logdog-stream-view/viewer.ts

Issue 2543323004: Rewrite LogDog log viewer app. (Closed)
Patch Set: Control all fetch sizes, fix follow on initial click, fix small fetch when auth is retrid. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: web/inc/logdog-stream-view/viewer.ts
diff --git a/web/inc/logdog-stream-view/viewer.ts b/web/inc/logdog-stream-view/viewer.ts
new file mode 100644
index 0000000000000000000000000000000000000000..60dcf8f65dd4cc27efb8e5852f4fbb7a9e9c5004
--- /dev/null
+++ b/web/inc/logdog-stream-view/viewer.ts
@@ -0,0 +1,1150 @@
+/*
+ Copyright 2016 The LUCI Authors. All rights reserved.
+ Use of this source code is governed under the Apache License, Version 2.0
+ that can be found in the LICENSE file.
+*/
+
+import {Fetcher, FetcherOptions, FetcherStatus}
+ from "logdog-stream-view/fetcher";
+import {LogDogQuery, QueryParams, StreamType} from "logdog-stream-view/query";
+import {LogDog} from "logdog-stream/logdog";
+import * as luci_sleep_promise from "luci-sleep-promise/promise";
+import {luci_rpc} from "rpc/client";
+
+/** Sentinel error: not authenticated. */
+let NotAuthenticatedError = new Error("Not Authenticated");
+
+function resolveErr(err: Error) {
+ let grpc = luci_rpc.GrpcError.convert(err);
+ if ( grpc && grpc.code == luci_rpc.Code.UNAUTHENTICATED ) {
+ return NotAuthenticatedError;
+ }
+ return err;
+}
+
+/** Stream status entry, as rendered by the view. */
+type StreamStatusEntry = {
+ name: string;
+ desc: string;
+};
+
+/** An individual log stream's status. */
+type LogStreamStatus = {
+ stream: LogDog.Stream;
+ state: string;
+ fetchStatus: FetcherStatus;
+ finished: boolean;
+ needsAuth: boolean;
+}
+
+type StreamStatusCallback = (v: LogStreamStatus[]) => void;
+
+export enum Location {
+ /**
+ * Represents the upper half of a split view. Logs start at 0 and go through
+ * the HEAD point.
+ */
+ HEAD,
+ /**
+ * Represents the lower half of the split view. Logs start at the TAIL point
+ * and go through the BOTTOM anchor point.
+ */
+ TAIL,
+ /**
+ * Represents an anchor point where the split occurred, obtained through a
+ * single "Tail()" RPC call. If the terminal index is known when the split
+ * occurs, this should be the terminal index.
+ */
+ BOTTOM,
+}
+
+export enum LoadingState {
+ NONE,
+ RESOLVING,
+ LOADING,
+ RENDERING,
+ NEEDS_AUTH,
+ ERROR,
+}
+
+/** Represents control visibility in the view. */
+type Controls = {
+ /** Are we completely finished loading stream data? */
+ canSplit: boolean;
+ /** Are we currently split? */
+ split: boolean;
+ /** Show the bottom bar? */
+ bottom: boolean;
+ /** Is the content fully loaded? */
+ fullyLoaded: boolean;
+
+ /** Text in the status bar. */
+ loadingState: LoadingState;
+ /** Stream status entries, or null for no status window. */
+ streamStatus: StreamStatusEntry[];
+}
+
+/** Registered callbacks from the LogDog stream view. */
+type ViewBinding = {
+ client: luci_rpc.Client;
+ mobile: boolean;
+
+ pushLogEntries: (entries: LogDog.LogEntry[], l: Location) => void;
+ clearLogEntries: () => void;
+
+ updateControls: (c: Controls) => void;
+ locationIsVisible: (l: Location) => boolean;
+};
+
+/** Interface of the specific Model functions used by the view. */
+interface ModelInterface {
+ fetch(cancel: boolean): Promise<void>;
+ split(): Promise<void>;
+
+ reset(): void;
+ setAutomatic(v: boolean): void;
+ setTailing(v: boolean): void;
+ notifyAuthenticationChanged(): void;
+}
+
+export class Model implements ModelInterface {
+ /** If performing a small initial fetch, this is the size of the fetch. */
+ private static SMALL_INITIAL_FETCH_SIZE = (1024 * 4);
+ /** If performing a large initial fetch, this is the size of the fetch. */
+ private static LARGE_INITIAL_FETCH_SIZE = (1024 * 24);
+ /** If fetching on a mobile device, fetch in this chunk size. */
+ private static MOBILE_FETCH_SIZE = (1024 * 256);
+ /** For standard fetching, fetch with this size. */
+ private static STANDARD_FETCH_SIZE = (4 * 1024 * 1024);
+
+ /**
+ * If >0, the maximum number of log lines to push at a time. We will sleep
+ * in between these entries to allow the rest of the app to be responsive
+ * during log dumping.
+ */
+ private static logAppendInterval = 4000;
+ /** Amount of time to sleep in between log append chunks. */
+ private static logAppendDelay = 0;
+
+ /** Our log provider. */
+ private provider: LogProvider = this.nullProvider();
+
+ /**
+ * Promise that is resolved when authentication state changes. When this
+ * happens, a new Promise is installed, and future authentication changes
+ * will resolve the new Promise.
+ */
+ private authChangedPromise: Promise<void> = null;
+ /**
+ * Retained callback (Promise resolve) to invoke when authentication state
+ * changes.
+ */
+ private authChangedCallback: (() => void) = null;
+
+ /** The current fetch Promise. */
+ private currentFetch: Promise<void>;
+ /** The current fetch token. */
+ private currentFetchToken: FetchToken;
+
+ /** Are we in automatic mode? */
+ private automatic = false;
+ /** Are we tailing? */
+ private tailing = false;
+ /** Are we in the middle of rendering logs? */
+ private rendering = true;
+
+ private _loadingState: LoadingState = LoadingState.NONE;
+ private _streamStatus: StreamStatusEntry[];
+
+ /**
+ * When rendering a Promise that will resolve when the render completes. We
+ * use this to pipeline parallel data fetching and rendering.
+ */
+ private renderPromise: Promise<void>;
+
+ constructor(readonly view: ViewBinding) {
+ this.resetAuthChanged();
+ }
+
+ resolve(paths: string[]): Promise<void> {
+ this.reset();
+
+ // For any path that is a query, execute that query.
+ this.loadingState = LoadingState.RESOLVING;
+ return Promise.all( paths.map( (path): Promise<LogDog.Stream[]> => {
+ let stream = LogDog.Stream.splitProject(path);
+ if ( ! LogDogQuery.isQuery(stream.path) ) {
+ return Promise.resolve([stream]);
+ }
+
+ // This "path" is really a query. Construct and execute.
+ let query = new LogDogQuery(this.view.client);
+ let doQuery = (): Promise<LogDog.Stream[]> => {
+ return query.getAll({
+ project: stream.project,
+ path: stream.path,
+ streamType: StreamType.TEXT,
+ }, 100).then( (result): LogDog.Stream[] => {
+ return result.map( (qr): LogDog.Stream => {
+ return qr.stream;
+ } );
+ }).catch( (err: Error) => {
+ err = resolveErr(err);
+ if ( err == NotAuthenticatedError ) {
+ return this.authChangedPromise.then( () => {
+ return doQuery();
+ } );
+ }
+
+ throw err;
+ });
+ }
+ return doQuery();
+ } ) ).then( (streamBlocks) => {
+ let streams = new Array<LogDog.Stream>();
+ (streamBlocks || []).forEach( (streamBlock) => {
+ streams.push.apply(streams, streamBlock);
+ } );
+
+
+ let initialFetchSize = ( (streams.length === 1) ?
+ Model.LARGE_INITIAL_FETCH_SIZE : Model.SMALL_INITIAL_FETCH_SIZE );
+
+ // Determine our fetch size.
+ let maxFetchSize = ( (this.view.mobile) ?
+ Model.MOBILE_FETCH_SIZE : Model.STANDARD_FETCH_SIZE );
+
+ // Generate a LogStream client entry for each composite stream.
+ let logStreams = streams.map( (stream) => {
+ console.log("Resolved log stream:", stream);
+ return new LogStream(
+ this.view.client, stream, initialFetchSize, maxFetchSize);
+ });
+
+ // Reset any existing state.
+ this.reset();
+
+ // If we have exactly one stream, then use it directly. This allows it to
+ // split.
+ let provider: LogProvider;
+ switch( logStreams.length ) {
+ case 0:
+ provider = this.nullProvider();
+ break;
+ case 1:
+ provider = logStreams[0];
+ break;
+ default:
+ provider = new AggregateLogStream(logStreams);
+ break
+ }
+ provider.setStreamStatusCallback((st: LogStreamStatus[]) => {
+ if ( this.provider === provider ) {
+ this.streamStatus = this.buildStreamStatus(st);
+ }
+ });
+ this.provider = provider;
+ this.loadingState = LoadingState.NONE;
+ } ).catch( (err: Error) => {
+ this.loadingState = LoadingState.ERROR;
+ console.error("Failed to resolve log streams:", err);
+ });
+ }
+
+ reset() {
+ this.view.clearLogEntries();
+ this.clearCurrentFetch();
+ this.provider = this.nullProvider();
+
+ this.updateControls();
+ }
+
+ private nullProvider(): LogProvider { return new AggregateLogStream([]); }
+
+ private mintFetchToken(): FetchToken {
+ return (this.currentFetchToken = new FetchToken( (tok: FetchToken) => {
+ return (tok === this.currentFetchToken);
+ } ));
+ }
+
+ private clearCurrentFetch() {
+ this.currentFetch = this.currentFetchToken = null;
+ this.rendering = false;
+ }
+
+ private get loadingState(): LoadingState { return this._loadingState; }
+ private set loadingState(v: LoadingState) {
+ if( v != this._loadingState ) {
+ this._loadingState = v;
+ this.updateControls();
+ }
+ }
+
+ private get streamStatus(): StreamStatusEntry[] { return this._streamStatus; }
+ private set streamStatus(st: StreamStatusEntry[]) {
+ this._streamStatus = st;
+ this.updateControls();
+ }
+
+ private updateControls() {
+ this.view.updateControls({
+ canSplit: this.providerCanSplit,
+ split: this.isSplit,
+ bottom: !this.fetchedEndOfStream,
+ fullyLoaded: (this.fetchedFullStream && (! this.rendering)),
+ loadingState: this.loadingState,
+ streamStatus: this.streamStatus,
+ });
+ }
+
+ /**
+ * Note that the authentication state for the client has changed. This will
+ * trigger an automatic fetch retry if our previous fetch failed due to
+ * lack of authentication.
+ */
+ notifyAuthenticationChanged() {
+ // Resolve our current "auth changed" Promise.
+ this.authChangedCallback();
+ }
+
+ private resetAuthChanged() {
+ // Resolve our previous function, if it's not already resolved.
+ if ( this.authChangedCallback ) {
+ this.authChangedCallback();
+ }
+
+ // Create a new Promise and install it.
+ this.authChangedPromise = new Promise<void>((resolve, reject) => {
+ this.authChangedCallback = resolve;
+ });
+ }
+
+ split(): Promise<void> {
+ // If we haven't already split, and our provider lets us split, then go
+ // ahead and do so.
+ if ( this.providerCanSplit ) {
+ return this.fetchLocation(Location.TAIL, true);
+ }
+ return this.fetch(false);
+ }
+
+ fetch(cancel: boolean): Promise<void> {
+ if ( this.isSplit ) {
+ if ( this.tailing ) {
+ // Next fetch grabs logs from the bottom (continue tailing).
+ if ( ! this.fetchedEndOfStream ) {
+ return this.fetchLocation(Location.BOTTOM, false);
+ } else {
+ return this.fetchLocation(Location.TAIL, false);
+ }
+ }
+
+ // We're split, but not tailing, so fetch logs from HEAD.
+ return this.fetchLocation(Location.HEAD, false);
+ }
+
+ // We're not split. If we haven't reached end of stream, fetch logs from
+ // HEAD.
+ return this.fetchLocation(Location.HEAD, false);
+ }
+
+ /** Fetch logs from an explicit location. */
+ fetchLocation(l: Location, cancel: boolean) {
+ if ( this.currentFetch && (!cancel) ) {
+ return this.currentFetch;
+ }
+
+ // If our provider is finished, then do nothing.
+ if ( this.fetchedFullStream ) {
+ // There are no more logs.
+ return Promise.resolve(null);
+ }
+
+ // If we're asked to fetch BOTTOM, but we're not split, fetch HEAD instead.
+ if ( l === Location.BOTTOM && ! this.isSplit ) {
+ l = Location.HEAD;
+ }
+
+ // If we're not split, always fetch from BOTTOM.
+ this.loadingState = LoadingState.LOADING;
+
+ // Rotate our fetch ID. This will effectively cancel any pending fetches.
+ let token = this.mintFetchToken();
+ return (this.currentFetch = this.provider.fetch(l, token).then( (buf) => {
+ // Clear our fetching status.
+ this.rendering = true;
+ this.loadingState = LoadingState.RENDERING;
+ let pushLogsPromise: Promise<void>;
+ let hasLogs = (buf && buf.peek());
+
+ // Resolve any previous rendering Promise that we have. This makes sure
+ // our rendering and fetching don't get more than one round out of sync.
+ return (this.renderPromise || Promise.resolve(null)).then( () => {
+ // Post-fetch cleanup.
+ this.clearCurrentFetch();
+
+ // Clear our loading state (updates controls automatically).
+ this.loadingState = LoadingState.RENDERING;
+
+ // Initiate the next render. This will happen in the background while
+ // we enqueue our next fetch.
+ this.renderPromise = this.renderLogs(buf, l).then( () => {
+ this.renderPromise = null;
+ if ( this.loadingState === LoadingState.RENDERING ) {
+ this.loadingState = LoadingState.NONE;
+ }
+ });
+
+ if ( this.fetchedFullStream ) {
+ // If we're finished now, perform our finished cleanup.
+ return;
+ }
+
+ // The fetch is finished. If we're automatic, and we got logs, start the
+ // next.
+ if ( this.automatic && hasLogs ) {
+ console.log("Automatic: starting next fetch.")
+ return this.fetch(false);
+ }
+ });
+ }).catch( (err: Error) => {
+ // If we've been canceled, discard this result.
+ if ( ! token.valid ) {
+ return
+ }
+
+ this.clearCurrentFetch();
+ if ( err === NotAuthenticatedError ) {
+ this.loadingState = LoadingState.NEEDS_AUTH;
+
+ // We failed because we were not authenticated. Mark this so we can
+ // retry if that state changes.
+ return this.authChangedPromise.then( () => {
+ // Our authentication state changed during the fetch! Retry
+ // automatically.
+ this.fetchLocation(l, false);
+ });
+ }
+
+ console.error("Failed to load log streams:", err);
+ }));
+ }
+
+ private renderLogs(buf: BufferedLogs, l: Location): Promise<void> {
+ if ( ! (buf && buf.peek()) ) {
+ return Promise.resolve(null);
+ }
+
+ let lines = 0;
+ let logBlock = new Array<LogDog.LogEntry>();
+ let appendBlock = () => {
+ if ( logBlock.length ) {
+ console.log("Rendering", logBlock.length, "logs...");
+ this.view.pushLogEntries(logBlock, l);
+ logBlock.length = 0;
+ lines = 0;
+
+ // Update our status and controls.
+ this.updateControls();
+ }
+ };
+
+ // Create a promise loop to push logs at intervals.
+ let pushLogs = (): Promise<void> => {
+ return Promise.resolve().then( () => {
+ // Add logs until we reach our interval lines.
+ for ( let nextLog = buf.next(); (nextLog); nextLog = buf.next() ) {
+ // If we've exceeded our burst, then interleave a sleep (yield).
+ if (Model.logAppendInterval > 0 &&
+ lines >= Model.logAppendInterval ) {
+ appendBlock();
+
+ return luci_sleep_promise.sleep(Model.logAppendDelay).then(
+ () => {
+ // Enqueue the next push round.
+ return pushLogs();
+ } );
+ }
+
+ // Add the next log to the append block.
+ logBlock.push(nextLog);
+ if ( nextLog.text && nextLog.text.lines ) {
+ lines += nextLog.text.lines.length;
+ }
+ }
+
+ // If there are any buffered logs, append that block.
+ appendBlock();
+ });
+ }
+ return pushLogs();
+ }
+
+ setTailing(v: boolean) {
+ this.tailing = v;
+ }
+
+ setAutomatic(v: boolean) {
+ this.automatic = v;
+ if ( v ) {
+ // Passively kick off a new fetch.
+ this.fetch(false);
+ }
+ }
+
+ private buildStreamStatus(v: LogStreamStatus[]): StreamStatusEntry[] {
+ let maxStatus = FetcherStatus.IDLE;
+ let maxStatusCount = 0;
+ let needsAuth = false;
+
+ // Prune any finished entries and accumulate them for status bar change.
+ v = (v || []).filter( (st) => {
+ needsAuth = (needsAuth || st.needsAuth);
+
+ if ( st.fetchStatus > maxStatus ) {
+ maxStatus = st.fetchStatus;
+ maxStatusCount = 1;
+ } else if ( st.fetchStatus === maxStatus ) {
+ maxStatusCount++;
+ }
+
+ return (! st.finished);
+ });
+
+ return v.map( (st): StreamStatusEntry => {
+ return {
+ name: ".../+/" + st.stream.name,
+ desc: st.state,
+ };
+ } );
+ }
+
+ private get providerCanSplit(): boolean {
+ let split = this.provider.split();
+ return (!! (split && split.canSplit()));
+ }
+
+ private get isSplit(): boolean {
+ let split = this.provider.split();
+ return ( !! (split && split.isSplit()) );
+ }
+
+ private get fetchedEndOfStream(): boolean {
+ return (this.provider.fetchedEndOfStream());
+ }
+
+ private get fetchedFullStream(): boolean {
+ return (this.fetchedEndOfStream && (! this.isSplit));
+ }
+}
+
+/**
+ * A token used to repesent an individual fetch. A token can assert whether its
+ * fetch has been invalidated.
+ */
+class FetchToken {
+ private validate: (tok: FetchToken) => boolean;
+
+ constructor(validate: (tok: FetchToken) => boolean) {
+ this.validate = validate;
+ }
+
+ get valid(): boolean {
+ return this.validate(this);
+ }
+
+ do<T>(p: Promise<T>): Promise<T> {
+ return p.then( (v): T => {
+ if ( ! this.valid ) {
+ throw new Error("Token has been invalidated, discarding fetch.");
+ }
+ return v
+ } );
+ }
+}
+
+interface LogProvider {
+ setStreamStatusCallback(cb: StreamStatusCallback): void;
+ fetch(l: Location, token: FetchToken): Promise<BufferedLogs>;
+
+ /** Will return null if this LogProvider doesn't support splitting. */
+ split(): SplitLogProvider;
+ fetchedEndOfStream(): boolean;
+}
+
+/** Additional methods for log stream splitting, if supported. */
+interface SplitLogProvider {
+ canSplit(): boolean
+ isSplit(): boolean;
+}
+
+/** A LogStream is a LogProvider manages a single log stream. */
+class LogStream implements LogProvider {
+ /**
+ * Always begin with a small fetch. We'll disable this afterward the first
+ * finishes.
+ */
+ private initialFetch = true;
+
+ private fetcher: Fetcher;
+
+ /** The log stream index of the next head() log. */
+ private nextHeadIndex = 0;
+ /**
+ * The lowest log stream index of all of the tail logs. If this is <0, then
+ * it is uninitialized.
+ */
+ private firstTailIndex = -1;
+ /**
+ * The next log stream index to fetch to continue pulling logs from the
+ * bottom. If this is <0, it is uninitialized.
+ */
+ private nextBottomIndex = -1;
+
+ private streamStatusCallback: StreamStatusCallback;
+
+ /** The size of the tail walkback region. */
+ private static TAIL_WALKBACK = 500;
+
+ constructor(client: luci_rpc.Client, readonly stream: LogDog.Stream,
+ readonly initialFetchSize: number,
+ readonly maxFetchSize: number ) {
+ this.fetcher = new Fetcher(client, stream);
+ this.fetcher.setStatusChangedCallback( () => {
+ this.statusChanged();
+ });
+ }
+
+ get fetcherStatus(): FetcherStatus { return this.fetcher.status; }
+
+ fetch(l: Location, token: FetchToken): Promise<BufferedLogs> {
+ // Determine which method to use based on the insertion point and current
+ // log stream fetch state.
+ let getLogs: Promise<LogDog.LogEntry[]>;
+ switch( l ) {
+ case Location.HEAD:
+ getLogs = this.getHead(token);
+ break;
+
+ case Location.TAIL:
+ getLogs = this.getTail(token);
+ break;
+
+ case Location.BOTTOM:
+ getLogs = this.getBottom(token);
+ break;
+ }
+
+ return getLogs.then( (logs: LogDog.LogEntry[]) => {
+ this.initialFetch = false;
+ this.statusChanged();
+ return new BufferedLogs(logs);
+ }).catch( (err: Error) => {
+ err = resolveErr(err);
+ throw err;
+ });
+ }
+
+ setStreamStatusCallback(cb: StreamStatusCallback) {
+ this.streamStatusCallback = cb;
+ }
+
+ private statusChanged() {
+ if ( this.streamStatusCallback ) {
+ this.streamStatusCallback([this.getStreamStatus()]);
+ }
+ }
+
+ getStreamStatus(): LogStreamStatus {
+ let pieces = new Array<string>();
+ let tidx = this.fetcher.terminalIndex;
+ if ( this.nextHeadIndex > 0 ) {
+ pieces.push("1.." + this.nextHeadIndex);
+ } else {
+ pieces.push("0");
+ }
+ if ( this.isSplit() ) {
+ if ( tidx >= 0 ) {
+ pieces.push("| " + this.firstTailIndex + " / " + tidx);
+ tidx = -1;
+ } else {
+ pieces.push("| " + this.firstTailIndex + ".." + this.nextBottomIndex +
+ " ...");
+ }
+ } else if (tidx >= 0) {
+ pieces.push("/ " + tidx);
+ } else {
+ pieces.push("...");
+ }
+
+ let needsAuth = false;
+ let finished = this.finished;
+ if ( finished ) {
+ pieces.push("(Finished)");
+ } else {
+ switch ( this.fetcher.status ) {
+ case FetcherStatus.IDLE:
+ case FetcherStatus.LOADING:
+ pieces.push("(Loading)");
+ break;
+
+ case FetcherStatus.STREAMING:
+ pieces.push("(Streaming)");
+ break;
+
+ case FetcherStatus.MISSING:
+ pieces.push("(Missing)");
+ break;
+
+ case FetcherStatus.ERROR:
+ let err = resolveErr(this.fetcher.lastError);
+ if (err === NotAuthenticatedError ) {
+ pieces.push("(Auth Error)");
+ needsAuth = true;
+ } else {
+ pieces.push("(Error)");
+ }
+ break;
+ }
+ }
+
+ return {
+ stream: this.stream,
+ state: pieces.join(" "),
+ finished: finished,
+ fetchStatus: this.fetcher.status,
+ needsAuth: needsAuth,
+ };
+ }
+
+ split(): SplitLogProvider {
+ return this;
+ }
+
+ isSplit(): boolean {
+ // We're split if we have a bottom and we're not finished tailing.
+ return ( this.firstTailIndex >= 0 &&
+ (this.nextHeadIndex < this.firstTailIndex) );
+ }
+
+ canSplit(): boolean {
+ return ( ! (this.isSplit() || this.caughtUp) );
+ }
+
+ private get caughtUp(): boolean {
+ // We're caught up if we have both a head and bottom index, and the head
+ // is at or past the bottom.
+ return ( this.nextHeadIndex >= 0 && this.nextBottomIndex >= 0 &&
+ this.nextHeadIndex >= this.nextBottomIndex );
+ }
+
+ fetchedEndOfStream(): boolean {
+ let tidx = this.fetcher.terminalIndex;
+ return ( tidx >= 0 && (
+ (this.nextHeadIndex > tidx) || (this.nextBottomIndex > tidx) ) );
+ }
+
+ private get finished(): boolean {
+ return ( (! this.isSplit()) && this.fetchedEndOfStream() );
+ }
+
+ private updateIndexes() {
+ if ( this.firstTailIndex >= 0 ) {
+ if ( this.nextBottomIndex < this.firstTailIndex ) {
+ this.nextBottomIndex = this.firstTailIndex + 1;
+ }
+
+ if ( this.nextHeadIndex >= this.firstTailIndex &&
+ this.nextBottomIndex >= 0) {
+ // Synchronize our head and bottom pointers.
+ this.nextHeadIndex = this.nextBottomIndex =
+ Math.max(this.nextHeadIndex, this.nextBottomIndex);
+ }
+ }
+ }
+
+ private nextFetcherOptions(): FetcherOptions {
+ let opts: FetcherOptions = {};
+ if ( this.initialFetch ) {
+ opts.byteCount = this.initialFetchSize;
+ } else if ( this.maxFetchSize > 0 ) {
+ opts.byteCount = this.maxFetchSize;
+ }
+ return opts;
+ }
+
+ private getHead(token: FetchToken): Promise<LogDog.LogEntry[]> {
+ this.updateIndexes();
+
+ if ( this.finished ) {
+ // Our HEAD region has met/surpassed our TAIL region, so there are no
+ // HEAD logs to return. Only bottom.
+ return Promise.resolve();
+ }
+
+ // If we have a tail pointer, only fetch HEAD up to that point.
+ let opts = this.nextFetcherOptions();
+ if ( this.firstTailIndex >= 0 ) {
+ opts.logCount = (this.firstTailIndex - this.nextHeadIndex);
+ }
+
+ return token.do( this.fetcher.get(this.nextHeadIndex, opts) ).then(
+ (logs) => {
+ if ( logs && logs.length ) {
+ this.nextHeadIndex = (logs[logs.length - 1].streamIndex + 1);
+ this.updateIndexes();
+ }
+ return logs;
+ } );
+ }
+
+ private getTail(token: FetchToken): Promise<LogDog.LogEntry[]> {
+ // If we haven't performed a Tail before, start with one.
+ if ( this.firstTailIndex < 0 ) {
+ let tidx = this.fetcher.terminalIndex;
+ if ( tidx < 0 ) {
+ return token.do( this.fetcher.tail() ).then( (logs) => {
+ // Mark our initial "tail" position.
+ if ( logs && logs.length ) {
+ this.firstTailIndex = logs[0].streamIndex;
+ this.updateIndexes();
+ }
+ return logs;
+ } );
+ }
+
+ this.firstTailIndex = (tidx+1);
+ this.updateIndexes();
+ }
+
+ // We're doing incremental reverse fetches. If we're finished tailing,
+ // return no logs.
+ if ( ! this.isSplit() ) {
+ return Promise.resolve(null);
+ }
+
+ // Determine our walkback region.
+ let startIndex = this.firstTailIndex - LogStream.TAIL_WALKBACK;
+ if ( this.nextHeadIndex >= 0 ) {
+ if ( startIndex < this.nextHeadIndex ) {
+ startIndex = this.nextHeadIndex;
+ }
+ } else if ( startIndex < 0 ) {
+ startIndex = 0;
+ }
+ let count = (this.firstTailIndex - startIndex);
+
+ // Fetch the full walkback region.
+ return token.do( this.fetcher.getAll(startIndex, count) ).then( (logs) => {
+ this.firstTailIndex = startIndex;
+ this.updateIndexes();
+ return logs;
+ });
+ }
+
+ private getBottom(token: FetchToken): Promise<LogDog.LogEntry[]> {
+ this.updateIndexes();
+
+ // If there are no more logs in the stream, return no logs.
+ if ( this.fetchedEndOfStream() ) {
+ return Promise.resolve(null);
+ }
+
+ // If our bottom index isn't initialized, initialize it via tail.
+ if ( this.nextBottomIndex < 0 ) {
+ return this.getTail(token);
+ }
+
+ let opts = this.nextFetcherOptions();
+ return token.do( this.fetcher.get(this.nextBottomIndex, opts) ).then(
+ (logs) => {
+ if ( logs && logs.length ) {
+ this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1);
+ }
+ return logs;
+ } );
+ }
+}
+
+/**
+ * An aggregate log stream. It presents a single-stream view, but is really
+ * composed of several log streams interleaved based on their prefix indices
+ * (if they share a prefix) or timestamps (if they don't).
+ *
+ * At least one log entry from each stream must be buffered before any log
+ * entries can be yielded, since we don't know what ordering to apply otherwise.
+ * To make this fast, we will make the first request for each stream small so
+ * it finishes quickly and we can start rendering. Subsequent entries will be
+ * larger for efficiency.
+ *
+ * @param {LogStream} streams the composite streams.
+ */
+class AggregateLogStream implements LogProvider {
+
+ private streams: AggregateLogStream.Entry[];
+ private active: AggregateLogStream.Entry[];
+ private currentNextPromise: Promise<BufferedLogs>;
+ private compareLogs: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number;
+
+ private streamStatusCallback: StreamStatusCallback;
+
+ constructor(streams: LogStream[]) {
+ // Input streams, ordered by input order.
+ this.streams = streams.map<AggregateLogStream.Entry>( (ls, i) => {
+ ls.setStreamStatusCallback( (st: LogStreamStatus[]) => {
+ if ( st ) {
+ this.streams[i].status = st[0];
+ this.statusChanged();
+ }
+ });
+
+ return {
+ ls: ls,
+ buffer: null,
+ needsAuth: false,
+ status: ls.getStreamStatus(),
+ };
+ } );
+
+ // Subset of input streams that are still active (not finished).
+ this.active = this.streams;
+
+ // The currently-active "next" promise.
+ this.currentNextPromise = null;
+
+ // Determine our log comparison function. If all of our logs share a prefix,
+ // we will use the prefix index. Otherwise, we will use the timestamp.
+ let template: LogDog.Stream = null;
+ let sharedPrefix = this.streams.every( (entry) => {
+ if ( ! template ) {
+ template = entry.ls.stream;
+ return true;
+ }
+ return template.samePrefixAs(entry.ls.stream);
+ });
+
+ this.compareLogs = (( sharedPrefix ) ?
+ (a, b) => {
+ return (a.prefixIndex - b.prefixIndex);
+ } :
+ (a, b) => {
+ return a.timestamp.getTime() - b.timestamp.getTime();
+ });
+ }
+
+ split(): SplitLogProvider { return null; }
+ fetchedEndOfStream(): boolean { return ( ! this.active.length ); }
+
+ setStreamStatusCallback(cb: StreamStatusCallback) {
+ this.streamStatusCallback = cb;
+ }
+
+ private statusChanged() {
+ if ( this.streamStatusCallback ) {
+ // Iterate through our composite stream statuses and pick the one that we
+ // want to report.
+ this.streamStatusCallback( this.streams.map( (entry): LogStreamStatus => {
+ return entry.status;
+ } ));
+ }
+ }
+
+ /**
+ * Implements LogProvider.next
+ */
+ fetch(l: Location, token: FetchToken): Promise<BufferedLogs> {
+ // If we're already are fetching the next buffer, this is an error.
+ if (this.currentNextPromise) {
+ throw new Error("In-progress next(), cannot start another.");
+ }
+
+ // Filter out any finished streams from our active list. A stream is
+ // finished if it is finished streaming and we don't have a retained buffer
+ // from it.
+ //
+ // This updates our "finished" property, since it's derived from the length
+ // of our active array.
+ this.active = this.active.filter( (entry) => {
+ return ( (! entry.buffer) || entry.buffer.peek() ||
+ (! entry.ls.fetchedEndOfStream()) );
+ });
+
+ if ( ! this.active.length ) {
+ // No active streams, so we're finished. Permanently set our promise to
+ // the finished state.
+ return Promise.resolve();
+ }
+
+ // Fill all buffers for all active streams. This may result in an RPC to
+ // load new buffer content for streams whose buffers are empty.
+ //
+ // If any stream doesn't currently have buffered logs, we will call their
+ // "next()" methods to pull the next set of logs. This will result in one of
+ // three possibilities:
+ // - A BufferedLogs will be returned containing the next logs for this stream.
+ // The log stream may also be finished.
+ // - null will be returned, and this log stream must now be finished.
+ // - An error will be returned.
+ //
+ // The error is interesting, since we must present a common error view to our
+ // caller. If all returned errors are "NotAuthenticatedError", we will return
+ // a NotAuthenticatedError. Otherwise, we will return a generic "streams
+ // failed" error.
+ //
+ // The outer Promise will pull logs for any streams that don't have any.
+ // On success, the "buffer" for the entry will be populated. On failure, an
+ // error will be returned. Because Promise.all fails fast, we will catch inner
+ // errors and return them as values (null if no error).
+ this.currentNextPromise = Promise.all( this.active.map( (entry) => {
+ // If the entry's buffer still has data, use it immediately.
+ if (entry.buffer && entry.buffer.peek()) {
+ return null;
+ }
+
+ // No buffered logs. Call the stream's "next()" method to get some.
+ return entry.ls.fetch(Location.HEAD, token).then(
+ (buffer): Error => {
+ // Retain this buffer.
+ entry.buffer = buffer;
+ return null;
+ }
+ ).catch( (error: Error) => {
+ // Log stream source of error. Raise a generic "failed to buffer"
+ // error. This will become a permanent failure.
+ console.error("Error loading buffer for", entry.ls.stream.fullName(),
+ "(", entry.ls, "): ", error);
+ return error;
+ });
+ })).then( (results: Error[]): BufferedLogs => {
+ // Identify any errors that we hit.
+ let buffers = new Array<BufferedLogs>(this.active.length);
+ let errors: Error[] = [];
+ results.forEach( (err, idx) => {
+ buffers[idx] = this.active[idx].buffer;
+ if ( err ) { errors[idx] = err; }
+ });
+
+ // We are done, and will return a value.
+ this.currentNextPromise = null;
+ if ( errors.length ) {
+ throw this._aggregateErrors(errors);
+ }
+ return this._aggregateBuffers(buffers);
+ });
+
+ return this.currentNextPromise;
+ }
+
+ private _aggregateErrors(errors: Error[]): Error {
+ let isNotAuthenticated = false;
+ errors.every( (err) => {
+ if ( ! err ) { return true; }
+ if ( err === NotAuthenticatedError ) {
+ isNotAuthenticated = true;
+ return true;
+ }
+ isNotAuthenticated = false;
+ return false;
+ });
+ return (( isNotAuthenticated ) ?
+ (NotAuthenticatedError) : new Error("Stream Error"));
+ }
+
+ private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs {
+ switch ( buffers.length ) {
+ case 0:
+ // No buffers, so no logs.
+ return new BufferedLogs(null);
+ case 1:
+ // As a special case, if we only have one buffer, and we assume that its
+ // entries are sorted, then that buffer is a return value.
+ return new BufferedLogs(buffers[0].getAll());
+ }
+
+ // Preload our peek array.
+ let incomplete = false;
+ let peek = buffers.map(function(buf) {
+ var le = buf.peek();
+ if (! le) {
+ incomplete = true;
+ }
+ return le;
+ });
+ if (incomplete) {
+ // One of our input buffers had no log entries.
+ return new BufferedLogs(null);
+ }
+
+ // Assemble our aggregate buffer array.
+ let entries: LogDog.LogEntry[] = [];
+ while (true) {
+ // Choose the next stream.
+ var earliest = 0;
+ for (var i = 1; i < buffers.length; i++) {
+ if (this.compareLogs(peek[i], peek[earliest]) < 0) {
+ earliest = i;
+ }
+ }
+
+ // Get the next log from the earliest stream.
+ entries.push(buffers[earliest].next());
+
+ // Repopulate that buffer's "peek" value. If the buffer has no more
+ // entries, then we're done.
+ var next = buffers[earliest].peek();
+ if (!next) {
+ return new BufferedLogs(entries);
+ }
+ peek[earliest] = next;
+ }
+ }
+}
+
+module AggregateLogStream {
+ export type Entry = {
+ ls: LogStream;
+ buffer: BufferedLogs;
+ needsAuth: boolean;
+ status: LogStreamStatus;
+ }
+}
+
+/**
+ * A buffer of ordered log entries.
+ *
+ * Assumes total ownership of the input log buffer, which can be null to
+ * indicate no logs.
+ */
+class BufferedLogs {
+ private logs: LogDog.LogEntry[] | null;
+ private index: number;
+
+ constructor(logs: LogDog.LogEntry[] | null) {
+ this.logs = logs;
+ this.index = 0;
+ }
+
+ peek(): LogDog.LogEntry | null {
+ return (this.logs) ? (this.logs[this.index]) : (null);
+ }
+
+ getAll(): LogDog.LogEntry[] {
+ // Pop all logs.
+ var logs = this.logs;
+ this.logs = null;
+ return logs;
+ }
+
+ next() : LogDog.LogEntry | null {
+ if (! (this.logs && this.logs.length)) {
+ return null;
+ }
+
+ // Get the next log and increment our index.
+ var log = this.logs[this.index++];
+ if (this.index >= this.logs.length) {
+ this.logs = null;
+ }
+ return log;
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698