Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(234)

Side by Side Diff: web/inc/logdog-stream-view/viewer.ts

Issue 2570963003: Revert of Rewrite LogDog log viewer app. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « web/inc/logdog-stream-view/query.ts ('k') | web/inc/logdog-stream/logdog.ts » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 Copyright 2016 The LUCI Authors. All rights reserved.
3 Use of this source code is governed under the Apache License, Version 2.0
4 that can be found in the LICENSE file.
5 */
6
7 import {Fetcher, FetcherOptions, FetcherStatus}
8 from "logdog-stream-view/fetcher";
9 import {LogDogQuery, QueryParams, StreamType} from "logdog-stream-view/query";
10 import {LogDog} from "logdog-stream/logdog";
11 import * as luci_sleep_promise from "luci-sleep-promise/promise";
12 import {luci_rpc} from "rpc/client";
13
14 /** Sentinel error: not authenticated. */
15 let NotAuthenticatedError = new Error("Not Authenticated");
16
17 function resolveErr(err: Error) {
18 let grpc = luci_rpc.GrpcError.convert(err);
19 if ( grpc && grpc.code == luci_rpc.Code.UNAUTHENTICATED ) {
20 return NotAuthenticatedError;
21 }
22 return err;
23 }
24
25 /** Stream status entry, as rendered by the view. */
26 type StreamStatusEntry = {
27 name: string;
28 desc: string;
29 };
30
31 /** An individual log stream's status. */
32 type LogStreamStatus = {
33 stream: LogDog.Stream;
34 state: string;
35 fetchStatus: FetcherStatus;
36 finished: boolean;
37 needsAuth: boolean;
38 }
39
40 type StreamStatusCallback = (v: LogStreamStatus[]) => void;
41
42 export enum Location {
43 /**
44 * Represents the upper half of a split view. Logs start at 0 and go through
45 * the HEAD point.
46 */
47 HEAD,
48 /**
49 * Represents the lower half of the split view. Logs start at the TAIL point
50 * and go through the BOTTOM anchor point.
51 */
52 TAIL,
53 /**
54 * Represents an anchor point where the split occurred, obtained through a
55 * single "Tail()" RPC call. If the terminal index is known when the split
56 * occurs, this should be the terminal index.
57 */
58 BOTTOM,
59 }
60
61 export enum LoadingState {
62 NONE,
63 RESOLVING,
64 LOADING,
65 RENDERING,
66 NEEDS_AUTH,
67 ERROR,
68 }
69
70 /** Represents control visibility in the view. */
71 type Controls = {
72 /** Are we completely finished loading stream data? */
73 canSplit: boolean;
74 /** Are we currently split? */
75 split: boolean;
76 /** Show the bottom bar? */
77 bottom: boolean;
78 /** Is the content fully loaded? */
79 fullyLoaded: boolean;
80
81 /** Text in the status bar. */
82 loadingState: LoadingState;
83 /** Stream status entries, or null for no status window. */
84 streamStatus: StreamStatusEntry[];
85 }
86
87 /** Registered callbacks from the LogDog stream view. */
88 type ViewBinding = {
89 client: luci_rpc.Client;
90 mobile: boolean;
91
92 pushLogEntries: (entries: LogDog.LogEntry[], l: Location) => void;
93 clearLogEntries: () => void;
94
95 updateControls: (c: Controls) => void;
96 locationIsVisible: (l: Location) => boolean;
97 };
98
99 /** Interface of the specific Model functions used by the view. */
100 interface ModelInterface {
101 fetch(cancel: boolean): Promise<void>;
102 split(): Promise<void>;
103
104 reset(): void;
105 setAutomatic(v: boolean): void;
106 setTailing(v: boolean): void;
107 notifyAuthenticationChanged(): void;
108 }
109
110 export class Model implements ModelInterface {
111 /** If performing a small initial fetch, this is the size of the fetch. */
112 private static SMALL_INITIAL_FETCH_SIZE = (1024 * 4);
113 /** If performing a large initial fetch, this is the size of the fetch. */
114 private static LARGE_INITIAL_FETCH_SIZE = (1024 * 24);
115 /** If fetching on a mobile device, fetch in this chunk size. */
116 private static MOBILE_FETCH_SIZE = (1024 * 256);
117 /** For standard fetching, fetch with this size. */
118 private static STANDARD_FETCH_SIZE = (4 * 1024 * 1024);
119
120 /**
121 * If >0, the maximum number of log lines to push at a time. We will sleep
122 * in between these entries to allow the rest of the app to be responsive
123 * during log dumping.
124 */
125 private static logAppendInterval = 4000;
126 /** Amount of time to sleep in between log append chunks. */
127 private static logAppendDelay = 0;
128
129 /** Our log provider. */
130 private provider: LogProvider = this.nullProvider();
131
132 /**
133 * Promise that is resolved when authentication state changes. When this
134 * happens, a new Promise is installed, and future authentication changes
135 * will resolve the new Promise.
136 */
137 private authChangedPromise: Promise<void> = null;
138 /**
139 * Retained callback (Promise resolve) to invoke when authentication state
140 * changes.
141 */
142 private authChangedCallback: (() => void) = null;
143
144 /** The current fetch Promise. */
145 private currentFetch: Promise<void>;
146 /** The current fetch token. */
147 private currentFetchToken: FetchToken;
148
149 /** Are we in automatic mode? */
150 private automatic = false;
151 /** Are we tailing? */
152 private tailing = false;
153 /** Are we in the middle of rendering logs? */
154 private rendering = true;
155
156 private _loadingState: LoadingState = LoadingState.NONE;
157 private _streamStatus: StreamStatusEntry[];
158
159 /**
160 * When rendering a Promise that will resolve when the render completes. We
161 * use this to pipeline parallel data fetching and rendering.
162 */
163 private renderPromise: Promise<void>;
164
165 constructor(readonly view: ViewBinding) {
166 this.resetAuthChanged();
167 }
168
169 resolve(paths: string[]): Promise<void> {
170 this.reset();
171
172 // For any path that is a query, execute that query.
173 this.loadingState = LoadingState.RESOLVING;
174 return Promise.all( paths.map( (path): Promise<LogDog.Stream[]> => {
175 let stream = LogDog.Stream.splitProject(path);
176 if ( ! LogDogQuery.isQuery(stream.path) ) {
177 return Promise.resolve([stream]);
178 }
179
180 // This "path" is really a query. Construct and execute.
181 let query = new LogDogQuery(this.view.client);
182 let doQuery = (): Promise<LogDog.Stream[]> => {
183 return query.getAll({
184 project: stream.project,
185 path: stream.path,
186 streamType: StreamType.TEXT,
187 }, 100).then( (result): LogDog.Stream[] => {
188 return result.map( (qr): LogDog.Stream => {
189 return qr.stream;
190 } );
191 }).catch( (err: Error) => {
192 err = resolveErr(err);
193 if ( err == NotAuthenticatedError ) {
194 return this.authChangedPromise.then( () => {
195 return doQuery();
196 } );
197 }
198
199 throw err;
200 });
201 }
202 return doQuery();
203 } ) ).then( (streamBlocks) => {
204 let streams = new Array<LogDog.Stream>();
205 (streamBlocks || []).forEach( (streamBlock) => {
206 streams.push.apply(streams, streamBlock);
207 } );
208
209
210 let initialFetchSize = ( (streams.length === 1) ?
211 Model.LARGE_INITIAL_FETCH_SIZE : Model.SMALL_INITIAL_FETCH_SIZE );
212
213 // Determine our fetch size.
214 let maxFetchSize = ( (this.view.mobile) ?
215 Model.MOBILE_FETCH_SIZE : Model.STANDARD_FETCH_SIZE );
216
217 // Generate a LogStream client entry for each composite stream.
218 let logStreams = streams.map( (stream) => {
219 console.log("Resolved log stream:", stream);
220 return new LogStream(
221 this.view.client, stream, initialFetchSize, maxFetchSize);
222 });
223
224 // Reset any existing state.
225 this.reset();
226
227 // If we have exactly one stream, then use it directly. This allows it to
228 // split.
229 let provider: LogProvider;
230 switch( logStreams.length ) {
231 case 0:
232 provider = this.nullProvider();
233 break;
234 case 1:
235 provider = logStreams[0];
236 break;
237 default:
238 provider = new AggregateLogStream(logStreams);
239 break
240 }
241 provider.setStreamStatusCallback((st: LogStreamStatus[]) => {
242 if ( this.provider === provider ) {
243 this.streamStatus = this.buildStreamStatus(st);
244 }
245 });
246 this.provider = provider;
247 this.loadingState = LoadingState.NONE;
248 } ).catch( (err: Error) => {
249 this.loadingState = LoadingState.ERROR;
250 console.error("Failed to resolve log streams:", err);
251 });
252 }
253
254 reset() {
255 this.view.clearLogEntries();
256 this.clearCurrentFetch();
257 this.provider = this.nullProvider();
258
259 this.updateControls();
260 }
261
262 private nullProvider(): LogProvider { return new AggregateLogStream([]); }
263
264 private mintFetchToken(): FetchToken {
265 return (this.currentFetchToken = new FetchToken( (tok: FetchToken) => {
266 return (tok === this.currentFetchToken);
267 } ));
268 }
269
270 private clearCurrentFetch() {
271 this.currentFetch = this.currentFetchToken = null;
272 this.rendering = false;
273 }
274
275 private get loadingState(): LoadingState { return this._loadingState; }
276 private set loadingState(v: LoadingState) {
277 if( v != this._loadingState ) {
278 this._loadingState = v;
279 this.updateControls();
280 }
281 }
282
283 private get streamStatus(): StreamStatusEntry[] { return this._streamStatus; }
284 private set streamStatus(st: StreamStatusEntry[]) {
285 this._streamStatus = st;
286 this.updateControls();
287 }
288
289 private updateControls() {
290 this.view.updateControls({
291 canSplit: this.providerCanSplit,
292 split: this.isSplit,
293 bottom: !this.fetchedEndOfStream,
294 fullyLoaded: (this.fetchedFullStream && (! this.rendering)),
295 loadingState: this.loadingState,
296 streamStatus: this.streamStatus,
297 });
298 }
299
300 /**
301 * Note that the authentication state for the client has changed. This will
302 * trigger an automatic fetch retry if our previous fetch failed due to
303 * lack of authentication.
304 */
305 notifyAuthenticationChanged() {
306 // Resolve our current "auth changed" Promise.
307 this.authChangedCallback();
308 }
309
310 private resetAuthChanged() {
311 // Resolve our previous function, if it's not already resolved.
312 if ( this.authChangedCallback ) {
313 this.authChangedCallback();
314 }
315
316 // Create a new Promise and install it.
317 this.authChangedPromise = new Promise<void>((resolve, reject) => {
318 this.authChangedCallback = resolve;
319 });
320 }
321
322 split(): Promise<void> {
323 // If we haven't already split, and our provider lets us split, then go
324 // ahead and do so.
325 if ( this.providerCanSplit ) {
326 return this.fetchLocation(Location.TAIL, true);
327 }
328 return this.fetch(false);
329 }
330
331 fetch(cancel: boolean): Promise<void> {
332 if ( this.isSplit ) {
333 if ( this.tailing ) {
334 // Next fetch grabs logs from the bottom (continue tailing).
335 if ( ! this.fetchedEndOfStream ) {
336 return this.fetchLocation(Location.BOTTOM, false);
337 } else {
338 return this.fetchLocation(Location.TAIL, false);
339 }
340 }
341
342 // We're split, but not tailing, so fetch logs from HEAD.
343 return this.fetchLocation(Location.HEAD, false);
344 }
345
346 // We're not split. If we haven't reached end of stream, fetch logs from
347 // HEAD.
348 return this.fetchLocation(Location.HEAD, false);
349 }
350
351 /** Fetch logs from an explicit location. */
352 fetchLocation(l: Location, cancel: boolean) {
353 if ( this.currentFetch && (!cancel) ) {
354 return this.currentFetch;
355 }
356
357 // If our provider is finished, then do nothing.
358 if ( this.fetchedFullStream ) {
359 // There are no more logs.
360 return Promise.resolve(null);
361 }
362
363 // If we're asked to fetch BOTTOM, but we're not split, fetch HEAD instead.
364 if ( l === Location.BOTTOM && ! this.isSplit ) {
365 l = Location.HEAD;
366 }
367
368 // If we're not split, always fetch from BOTTOM.
369 this.loadingState = LoadingState.LOADING;
370
371 // Rotate our fetch ID. This will effectively cancel any pending fetches.
372 let token = this.mintFetchToken();
373 return (this.currentFetch = this.provider.fetch(l, token).then( (buf) => {
374 // Clear our fetching status.
375 this.rendering = true;
376 this.loadingState = LoadingState.RENDERING;
377 let pushLogsPromise: Promise<void>;
378 let hasLogs = (buf && buf.peek());
379
380 // Resolve any previous rendering Promise that we have. This makes sure
381 // our rendering and fetching don't get more than one round out of sync.
382 return (this.renderPromise || Promise.resolve(null)).then( () => {
383 // Post-fetch cleanup.
384 this.clearCurrentFetch();
385
386 // Clear our loading state (updates controls automatically).
387 this.loadingState = LoadingState.RENDERING;
388
389 // Initiate the next render. This will happen in the background while
390 // we enqueue our next fetch.
391 this.renderPromise = this.renderLogs(buf, l).then( () => {
392 this.renderPromise = null;
393 if ( this.loadingState === LoadingState.RENDERING ) {
394 this.loadingState = LoadingState.NONE;
395 }
396 });
397
398 if ( this.fetchedFullStream ) {
399 // If we're finished now, perform our finished cleanup.
400 return;
401 }
402
403 // The fetch is finished. If we're automatic, and we got logs, start the
404 // next.
405 if ( this.automatic && hasLogs ) {
406 console.log("Automatic: starting next fetch.")
407 return this.fetch(false);
408 }
409 });
410 }).catch( (err: Error) => {
411 // If we've been canceled, discard this result.
412 if ( ! token.valid ) {
413 return
414 }
415
416 this.clearCurrentFetch();
417 if ( err === NotAuthenticatedError ) {
418 this.loadingState = LoadingState.NEEDS_AUTH;
419
420 // We failed because we were not authenticated. Mark this so we can
421 // retry if that state changes.
422 return this.authChangedPromise.then( () => {
423 // Our authentication state changed during the fetch! Retry
424 // automatically.
425 this.fetchLocation(l, false);
426 });
427 }
428
429 console.error("Failed to load log streams:", err);
430 }));
431 }
432
433 private renderLogs(buf: BufferedLogs, l: Location): Promise<void> {
434 if ( ! (buf && buf.peek()) ) {
435 return Promise.resolve(null);
436 }
437
438 let lines = 0;
439 let logBlock = new Array<LogDog.LogEntry>();
440 let appendBlock = () => {
441 if ( logBlock.length ) {
442 console.log("Rendering", logBlock.length, "logs...");
443 this.view.pushLogEntries(logBlock, l);
444 logBlock.length = 0;
445 lines = 0;
446
447 // Update our status and controls.
448 this.updateControls();
449 }
450 };
451
452 // Create a promise loop to push logs at intervals.
453 let pushLogs = (): Promise<void> => {
454 return Promise.resolve().then( () => {
455 // Add logs until we reach our interval lines.
456 for ( let nextLog = buf.next(); (nextLog); nextLog = buf.next() ) {
457 // If we've exceeded our burst, then interleave a sleep (yield).
458 if (Model.logAppendInterval > 0 &&
459 lines >= Model.logAppendInterval ) {
460 appendBlock();
461
462 return luci_sleep_promise.sleep(Model.logAppendDelay).then(
463 () => {
464 // Enqueue the next push round.
465 return pushLogs();
466 } );
467 }
468
469 // Add the next log to the append block.
470 logBlock.push(nextLog);
471 if ( nextLog.text && nextLog.text.lines ) {
472 lines += nextLog.text.lines.length;
473 }
474 }
475
476 // If there are any buffered logs, append that block.
477 appendBlock();
478 });
479 }
480 return pushLogs();
481 }
482
483 setTailing(v: boolean) {
484 this.tailing = v;
485 }
486
487 setAutomatic(v: boolean) {
488 this.automatic = v;
489 if ( v ) {
490 // Passively kick off a new fetch.
491 this.fetch(false);
492 }
493 }
494
495 private buildStreamStatus(v: LogStreamStatus[]): StreamStatusEntry[] {
496 let maxStatus = FetcherStatus.IDLE;
497 let maxStatusCount = 0;
498 let needsAuth = false;
499
500 // Prune any finished entries and accumulate them for status bar change.
501 v = (v || []).filter( (st) => {
502 needsAuth = (needsAuth || st.needsAuth);
503
504 if ( st.fetchStatus > maxStatus ) {
505 maxStatus = st.fetchStatus;
506 maxStatusCount = 1;
507 } else if ( st.fetchStatus === maxStatus ) {
508 maxStatusCount++;
509 }
510
511 return (! st.finished);
512 });
513
514 return v.map( (st): StreamStatusEntry => {
515 return {
516 name: ".../+/" + st.stream.name,
517 desc: st.state,
518 };
519 } );
520 }
521
522 private get providerCanSplit(): boolean {
523 let split = this.provider.split();
524 return (!! (split && split.canSplit()));
525 }
526
527 private get isSplit(): boolean {
528 let split = this.provider.split();
529 return ( !! (split && split.isSplit()) );
530 }
531
532 private get fetchedEndOfStream(): boolean {
533 return (this.provider.fetchedEndOfStream());
534 }
535
536 private get fetchedFullStream(): boolean {
537 return (this.fetchedEndOfStream && (! this.isSplit));
538 }
539 }
540
541 /**
542 * A token used to repesent an individual fetch. A token can assert whether its
543 * fetch has been invalidated.
544 */
545 class FetchToken {
546 private validate: (tok: FetchToken) => boolean;
547
548 constructor(validate: (tok: FetchToken) => boolean) {
549 this.validate = validate;
550 }
551
552 get valid(): boolean {
553 return this.validate(this);
554 }
555
556 do<T>(p: Promise<T>): Promise<T> {
557 return p.then( (v): T => {
558 if ( ! this.valid ) {
559 throw new Error("Token has been invalidated, discarding fetch.");
560 }
561 return v
562 } );
563 }
564 }
565
566 interface LogProvider {
567 setStreamStatusCallback(cb: StreamStatusCallback): void;
568 fetch(l: Location, token: FetchToken): Promise<BufferedLogs>;
569
570 /** Will return null if this LogProvider doesn't support splitting. */
571 split(): SplitLogProvider;
572 fetchedEndOfStream(): boolean;
573 }
574
575 /** Additional methods for log stream splitting, if supported. */
576 interface SplitLogProvider {
577 canSplit(): boolean
578 isSplit(): boolean;
579 }
580
581 /** A LogStream is a LogProvider manages a single log stream. */
582 class LogStream implements LogProvider {
583 /**
584 * Always begin with a small fetch. We'll disable this afterward the first
585 * finishes.
586 */
587 private initialFetch = true;
588
589 private fetcher: Fetcher;
590
591 /** The log stream index of the next head() log. */
592 private nextHeadIndex = 0;
593 /**
594 * The lowest log stream index of all of the tail logs. If this is <0, then
595 * it is uninitialized.
596 */
597 private firstTailIndex = -1;
598 /**
599 * The next log stream index to fetch to continue pulling logs from the
600 * bottom. If this is <0, it is uninitialized.
601 */
602 private nextBottomIndex = -1;
603
604 private streamStatusCallback: StreamStatusCallback;
605
606 /** The size of the tail walkback region. */
607 private static TAIL_WALKBACK = 500;
608
609 constructor(client: luci_rpc.Client, readonly stream: LogDog.Stream,
610 readonly initialFetchSize: number,
611 readonly maxFetchSize: number ) {
612 this.fetcher = new Fetcher(client, stream);
613 this.fetcher.setStatusChangedCallback( () => {
614 this.statusChanged();
615 });
616 }
617
618 get fetcherStatus(): FetcherStatus { return this.fetcher.status; }
619
620 fetch(l: Location, token: FetchToken): Promise<BufferedLogs> {
621 // Determine which method to use based on the insertion point and current
622 // log stream fetch state.
623 let getLogs: Promise<LogDog.LogEntry[]>;
624 switch( l ) {
625 case Location.HEAD:
626 getLogs = this.getHead(token);
627 break;
628
629 case Location.TAIL:
630 getLogs = this.getTail(token);
631 break;
632
633 case Location.BOTTOM:
634 getLogs = this.getBottom(token);
635 break;
636 }
637
638 return getLogs.then( (logs: LogDog.LogEntry[]) => {
639 this.initialFetch = false;
640 this.statusChanged();
641 return new BufferedLogs(logs);
642 }).catch( (err: Error) => {
643 err = resolveErr(err);
644 throw err;
645 });
646 }
647
648 setStreamStatusCallback(cb: StreamStatusCallback) {
649 this.streamStatusCallback = cb;
650 }
651
652 private statusChanged() {
653 if ( this.streamStatusCallback ) {
654 this.streamStatusCallback([this.getStreamStatus()]);
655 }
656 }
657
658 getStreamStatus(): LogStreamStatus {
659 let pieces = new Array<string>();
660 let tidx = this.fetcher.terminalIndex;
661 if ( this.nextHeadIndex > 0 ) {
662 pieces.push("1.." + this.nextHeadIndex);
663 } else {
664 pieces.push("0");
665 }
666 if ( this.isSplit() ) {
667 if ( tidx >= 0 ) {
668 pieces.push("| " + this.firstTailIndex + " / " + tidx);
669 tidx = -1;
670 } else {
671 pieces.push("| " + this.firstTailIndex + ".." + this.nextBottomIndex +
672 " ...");
673 }
674 } else if (tidx >= 0) {
675 pieces.push("/ " + tidx);
676 } else {
677 pieces.push("...");
678 }
679
680 let needsAuth = false;
681 let finished = this.finished;
682 if ( finished ) {
683 pieces.push("(Finished)");
684 } else {
685 switch ( this.fetcher.status ) {
686 case FetcherStatus.IDLE:
687 case FetcherStatus.LOADING:
688 pieces.push("(Loading)");
689 break;
690
691 case FetcherStatus.STREAMING:
692 pieces.push("(Streaming)");
693 break;
694
695 case FetcherStatus.MISSING:
696 pieces.push("(Missing)");
697 break;
698
699 case FetcherStatus.ERROR:
700 let err = resolveErr(this.fetcher.lastError);
701 if (err === NotAuthenticatedError ) {
702 pieces.push("(Auth Error)");
703 needsAuth = true;
704 } else {
705 pieces.push("(Error)");
706 }
707 break;
708 }
709 }
710
711 return {
712 stream: this.stream,
713 state: pieces.join(" "),
714 finished: finished,
715 fetchStatus: this.fetcher.status,
716 needsAuth: needsAuth,
717 };
718 }
719
720 split(): SplitLogProvider {
721 return this;
722 }
723
724 isSplit(): boolean {
725 // We're split if we have a bottom and we're not finished tailing.
726 return ( this.firstTailIndex >= 0 &&
727 (this.nextHeadIndex < this.firstTailIndex) );
728 }
729
730 canSplit(): boolean {
731 return ( ! (this.isSplit() || this.caughtUp) );
732 }
733
734 private get caughtUp(): boolean {
735 // We're caught up if we have both a head and bottom index, and the head
736 // is at or past the bottom.
737 return ( this.nextHeadIndex >= 0 && this.nextBottomIndex >= 0 &&
738 this.nextHeadIndex >= this.nextBottomIndex );
739 }
740
741 fetchedEndOfStream(): boolean {
742 let tidx = this.fetcher.terminalIndex;
743 return ( tidx >= 0 && (
744 (this.nextHeadIndex > tidx) || (this.nextBottomIndex > tidx) ) );
745 }
746
747 private get finished(): boolean {
748 return ( (! this.isSplit()) && this.fetchedEndOfStream() );
749 }
750
751 private updateIndexes() {
752 if ( this.firstTailIndex >= 0 ) {
753 if ( this.nextBottomIndex < this.firstTailIndex ) {
754 this.nextBottomIndex = this.firstTailIndex + 1;
755 }
756
757 if ( this.nextHeadIndex >= this.firstTailIndex &&
758 this.nextBottomIndex >= 0) {
759 // Synchronize our head and bottom pointers.
760 this.nextHeadIndex = this.nextBottomIndex =
761 Math.max(this.nextHeadIndex, this.nextBottomIndex);
762 }
763 }
764 }
765
766 private nextFetcherOptions(): FetcherOptions {
767 let opts: FetcherOptions = {};
768 if ( this.initialFetch ) {
769 opts.byteCount = this.initialFetchSize;
770 } else if ( this.maxFetchSize > 0 ) {
771 opts.byteCount = this.maxFetchSize;
772 }
773 return opts;
774 }
775
776 private getHead(token: FetchToken): Promise<LogDog.LogEntry[]> {
777 this.updateIndexes();
778
779 if ( this.finished ) {
780 // Our HEAD region has met/surpassed our TAIL region, so there are no
781 // HEAD logs to return. Only bottom.
782 return Promise.resolve();
783 }
784
785 // If we have a tail pointer, only fetch HEAD up to that point.
786 let opts = this.nextFetcherOptions();
787 if ( this.firstTailIndex >= 0 ) {
788 opts.logCount = (this.firstTailIndex - this.nextHeadIndex);
789 }
790
791 return token.do( this.fetcher.get(this.nextHeadIndex, opts) ).then(
792 (logs) => {
793 if ( logs && logs.length ) {
794 this.nextHeadIndex = (logs[logs.length - 1].streamIndex + 1);
795 this.updateIndexes();
796 }
797 return logs;
798 } );
799 }
800
801 private getTail(token: FetchToken): Promise<LogDog.LogEntry[]> {
802 // If we haven't performed a Tail before, start with one.
803 if ( this.firstTailIndex < 0 ) {
804 let tidx = this.fetcher.terminalIndex;
805 if ( tidx < 0 ) {
806 return token.do( this.fetcher.tail() ).then( (logs) => {
807 // Mark our initial "tail" position.
808 if ( logs && logs.length ) {
809 this.firstTailIndex = logs[0].streamIndex;
810 this.updateIndexes();
811 }
812 return logs;
813 } );
814 }
815
816 this.firstTailIndex = (tidx+1);
817 this.updateIndexes();
818 }
819
820 // We're doing incremental reverse fetches. If we're finished tailing,
821 // return no logs.
822 if ( ! this.isSplit() ) {
823 return Promise.resolve(null);
824 }
825
826 // Determine our walkback region.
827 let startIndex = this.firstTailIndex - LogStream.TAIL_WALKBACK;
828 if ( this.nextHeadIndex >= 0 ) {
829 if ( startIndex < this.nextHeadIndex ) {
830 startIndex = this.nextHeadIndex;
831 }
832 } else if ( startIndex < 0 ) {
833 startIndex = 0;
834 }
835 let count = (this.firstTailIndex - startIndex);
836
837 // Fetch the full walkback region.
838 return token.do( this.fetcher.getAll(startIndex, count) ).then( (logs) => {
839 this.firstTailIndex = startIndex;
840 this.updateIndexes();
841 return logs;
842 });
843 }
844
845 private getBottom(token: FetchToken): Promise<LogDog.LogEntry[]> {
846 this.updateIndexes();
847
848 // If there are no more logs in the stream, return no logs.
849 if ( this.fetchedEndOfStream() ) {
850 return Promise.resolve(null);
851 }
852
853 // If our bottom index isn't initialized, initialize it via tail.
854 if ( this.nextBottomIndex < 0 ) {
855 return this.getTail(token);
856 }
857
858 let opts = this.nextFetcherOptions();
859 return token.do( this.fetcher.get(this.nextBottomIndex, opts) ).then(
860 (logs) => {
861 if ( logs && logs.length ) {
862 this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1);
863 }
864 return logs;
865 } );
866 }
867 }
868
869 /**
870 * An aggregate log stream. It presents a single-stream view, but is really
871 * composed of several log streams interleaved based on their prefix indices
872 * (if they share a prefix) or timestamps (if they don't).
873 *
874 * At least one log entry from each stream must be buffered before any log
875 * entries can be yielded, since we don't know what ordering to apply otherwise.
876 * To make this fast, we will make the first request for each stream small so
877 * it finishes quickly and we can start rendering. Subsequent entries will be
878 * larger for efficiency.
879 *
880 * @param {LogStream} streams the composite streams.
881 */
882 class AggregateLogStream implements LogProvider {
883
884 private streams: AggregateLogStream.Entry[];
885 private active: AggregateLogStream.Entry[];
886 private currentNextPromise: Promise<BufferedLogs>;
887 private compareLogs: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number;
888
889 private streamStatusCallback: StreamStatusCallback;
890
891 constructor(streams: LogStream[]) {
892 // Input streams, ordered by input order.
893 this.streams = streams.map<AggregateLogStream.Entry>( (ls, i) => {
894 ls.setStreamStatusCallback( (st: LogStreamStatus[]) => {
895 if ( st ) {
896 this.streams[i].status = st[0];
897 this.statusChanged();
898 }
899 });
900
901 return {
902 ls: ls,
903 buffer: null,
904 needsAuth: false,
905 status: ls.getStreamStatus(),
906 };
907 } );
908
909 // Subset of input streams that are still active (not finished).
910 this.active = this.streams;
911
912 // The currently-active "next" promise.
913 this.currentNextPromise = null;
914
915 // Determine our log comparison function. If all of our logs share a prefix,
916 // we will use the prefix index. Otherwise, we will use the timestamp.
917 let template: LogDog.Stream = null;
918 let sharedPrefix = this.streams.every( (entry) => {
919 if ( ! template ) {
920 template = entry.ls.stream;
921 return true;
922 }
923 return template.samePrefixAs(entry.ls.stream);
924 });
925
926 this.compareLogs = (( sharedPrefix ) ?
927 (a, b) => {
928 return (a.prefixIndex - b.prefixIndex);
929 } :
930 (a, b) => {
931 return a.timestamp.getTime() - b.timestamp.getTime();
932 });
933 }
934
935 split(): SplitLogProvider { return null; }
936 fetchedEndOfStream(): boolean { return ( ! this.active.length ); }
937
938 setStreamStatusCallback(cb: StreamStatusCallback) {
939 this.streamStatusCallback = cb;
940 }
941
942 private statusChanged() {
943 if ( this.streamStatusCallback ) {
944 // Iterate through our composite stream statuses and pick the one that we
945 // want to report.
946 this.streamStatusCallback( this.streams.map( (entry): LogStreamStatus => {
947 return entry.status;
948 } ));
949 }
950 }
951
952 /**
953 * Implements LogProvider.next
954 */
955 fetch(l: Location, token: FetchToken): Promise<BufferedLogs> {
956 // If we're already are fetching the next buffer, this is an error.
957 if (this.currentNextPromise) {
958 throw new Error("In-progress next(), cannot start another.");
959 }
960
961 // Filter out any finished streams from our active list. A stream is
962 // finished if it is finished streaming and we don't have a retained buffer
963 // from it.
964 //
965 // This updates our "finished" property, since it's derived from the length
966 // of our active array.
967 this.active = this.active.filter( (entry) => {
968 return ( (! entry.buffer) || entry.buffer.peek() ||
969 (! entry.ls.fetchedEndOfStream()) );
970 });
971
972 if ( ! this.active.length ) {
973 // No active streams, so we're finished. Permanently set our promise to
974 // the finished state.
975 return Promise.resolve();
976 }
977
978 // Fill all buffers for all active streams. This may result in an RPC to
979 // load new buffer content for streams whose buffers are empty.
980 //
981 // If any stream doesn't currently have buffered logs, we will call their
982 // "next()" methods to pull the next set of logs. This will result in one of
983 // three possibilities:
984 // - A BufferedLogs will be returned containing the next logs for this strea m.
985 // The log stream may also be finished.
986 // - null will be returned, and this log stream must now be finished.
987 // - An error will be returned.
988 //
989 // The error is interesting, since we must present a common error view to ou r
990 // caller. If all returned errors are "NotAuthenticatedError", we will retur n
991 // a NotAuthenticatedError. Otherwise, we will return a generic "streams
992 // failed" error.
993 //
994 // The outer Promise will pull logs for any streams that don't have any.
995 // On success, the "buffer" for the entry will be populated. On failure, an
996 // error will be returned. Because Promise.all fails fast, we will catch inn er
997 // errors and return them as values (null if no error).
998 this.currentNextPromise = Promise.all( this.active.map( (entry) => {
999 // If the entry's buffer still has data, use it immediately.
1000 if (entry.buffer && entry.buffer.peek()) {
1001 return null;
1002 }
1003
1004 // No buffered logs. Call the stream's "next()" method to get some.
1005 return entry.ls.fetch(Location.HEAD, token).then(
1006 (buffer): Error => {
1007 // Retain this buffer.
1008 entry.buffer = buffer;
1009 return null;
1010 }
1011 ).catch( (error: Error) => {
1012 // Log stream source of error. Raise a generic "failed to buffer"
1013 // error. This will become a permanent failure.
1014 console.error("Error loading buffer for", entry.ls.stream.fullName(),
1015 "(", entry.ls, "): ", error);
1016 return error;
1017 });
1018 })).then( (results: Error[]): BufferedLogs => {
1019 // Identify any errors that we hit.
1020 let buffers = new Array<BufferedLogs>(this.active.length);
1021 let errors: Error[] = [];
1022 results.forEach( (err, idx) => {
1023 buffers[idx] = this.active[idx].buffer;
1024 if ( err ) { errors[idx] = err; }
1025 });
1026
1027 // We are done, and will return a value.
1028 this.currentNextPromise = null;
1029 if ( errors.length ) {
1030 throw this._aggregateErrors(errors);
1031 }
1032 return this._aggregateBuffers(buffers);
1033 });
1034
1035 return this.currentNextPromise;
1036 }
1037
1038 private _aggregateErrors(errors: Error[]): Error {
1039 let isNotAuthenticated = false;
1040 errors.every( (err) => {
1041 if ( ! err ) { return true; }
1042 if ( err === NotAuthenticatedError ) {
1043 isNotAuthenticated = true;
1044 return true;
1045 }
1046 isNotAuthenticated = false;
1047 return false;
1048 });
1049 return (( isNotAuthenticated ) ?
1050 (NotAuthenticatedError) : new Error("Stream Error"));
1051 }
1052
1053 private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs {
1054 switch ( buffers.length ) {
1055 case 0:
1056 // No buffers, so no logs.
1057 return new BufferedLogs(null);
1058 case 1:
1059 // As a special case, if we only have one buffer, and we assume that its
1060 // entries are sorted, then that buffer is a return value.
1061 return new BufferedLogs(buffers[0].getAll());
1062 }
1063
1064 // Preload our peek array.
1065 let incomplete = false;
1066 let peek = buffers.map(function(buf) {
1067 var le = buf.peek();
1068 if (! le) {
1069 incomplete = true;
1070 }
1071 return le;
1072 });
1073 if (incomplete) {
1074 // One of our input buffers had no log entries.
1075 return new BufferedLogs(null);
1076 }
1077
1078 // Assemble our aggregate buffer array.
1079 let entries: LogDog.LogEntry[] = [];
1080 while (true) {
1081 // Choose the next stream.
1082 var earliest = 0;
1083 for (var i = 1; i < buffers.length; i++) {
1084 if (this.compareLogs(peek[i], peek[earliest]) < 0) {
1085 earliest = i;
1086 }
1087 }
1088
1089 // Get the next log from the earliest stream.
1090 entries.push(buffers[earliest].next());
1091
1092 // Repopulate that buffer's "peek" value. If the buffer has no more
1093 // entries, then we're done.
1094 var next = buffers[earliest].peek();
1095 if (!next) {
1096 return new BufferedLogs(entries);
1097 }
1098 peek[earliest] = next;
1099 }
1100 }
1101 }
1102
1103 module AggregateLogStream {
1104 export type Entry = {
1105 ls: LogStream;
1106 buffer: BufferedLogs;
1107 needsAuth: boolean;
1108 status: LogStreamStatus;
1109 }
1110 }
1111
1112 /**
1113 * A buffer of ordered log entries.
1114 *
1115 * Assumes total ownership of the input log buffer, which can be null to
1116 * indicate no logs.
1117 */
1118 class BufferedLogs {
1119 private logs: LogDog.LogEntry[] | null;
1120 private index: number;
1121
1122 constructor(logs: LogDog.LogEntry[] | null) {
1123 this.logs = logs;
1124 this.index = 0;
1125 }
1126
1127 peek(): LogDog.LogEntry | null {
1128 return (this.logs) ? (this.logs[this.index]) : (null);
1129 }
1130
1131 getAll(): LogDog.LogEntry[] {
1132 // Pop all logs.
1133 var logs = this.logs;
1134 this.logs = null;
1135 return logs;
1136 }
1137
1138 next() : LogDog.LogEntry | null {
1139 if (! (this.logs && this.logs.length)) {
1140 return null;
1141 }
1142
1143 // Get the next log and increment our index.
1144 var log = this.logs[this.index++];
1145 if (this.index >= this.logs.length) {
1146 this.logs = null;
1147 }
1148 return log;
1149 }
1150 }
OLDNEW
« no previous file with comments | « web/inc/logdog-stream-view/query.ts ('k') | web/inc/logdog-stream/logdog.ts » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698