Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 /* | 1 /* |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. | 2 Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 Use of this source code is governed under the Apache License, Version 2.0 | 3 Use of this source code is governed under the Apache License, Version 2.0 |
| 4 that can be found in the LICENSE file. | 4 that can be found in the LICENSE file. |
| 5 */ | 5 */ |
| 6 | 6 |
| 7 ///<reference path="../logdog-stream/logdog.ts" /> | 7 ///<reference path="../logdog-stream/logdog.ts" /> |
| 8 ///<reference path="../luci-operation/operation.ts" /> | 8 ///<reference path="../luci-operation/operation.ts" /> |
| 9 ///<reference path="../luci-sleep-promise/promise.ts" /> | 9 ///<reference path="../luci-sleep-promise/promise.ts" /> |
| 10 ///<reference path="../rpc/client.ts" /> | 10 ///<reference path="../rpc/client.ts" /> |
| (...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 185 private currentOperation: luci.Operation|null = null; | 185 private currentOperation: luci.Operation|null = null; |
| 186 private currentFetchPromise: Promise<void>|null = null; | 186 private currentFetchPromise: Promise<void>|null = null; |
| 187 | 187 |
| 188 /** Are we in automatic mode? */ | 188 /** Are we in automatic mode? */ |
| 189 private automatic = false; | 189 private automatic = false; |
| 190 /** Are we tailing? */ | 190 /** Are we tailing? */ |
| 191 private fetchFromTail = false; | 191 private fetchFromTail = false; |
| 192 /** Are we in the middle of rendering logs? */ | 192 /** Are we in the middle of rendering logs? */ |
| 193 private rendering = true; | 193 private rendering = true; |
| 194 | 194 |
| 195 private cachedLogStreamUrl: string|undefined = undefined; | |
| 196 | |
| 195 private loadingStateValue: LoadingState = LoadingState.NONE; | 197 private loadingStateValue: LoadingState = LoadingState.NONE; |
| 196 private streamStatusValue: StreamStatusEntry[]; | 198 private streamStatusValue: StreamStatusEntry[]; |
| 197 | 199 |
| 198 /** | 200 /** |
| 199 * When rendering a Promise that will resolve when the render completes. We | 201 * When rendering a Promise that will resolve when the render completes. We |
| 200 * use this to pipeline parallel data fetching and rendering. | 202 * use this to pipeline parallel data fetching and rendering. |
| 201 */ | 203 */ |
| 202 private renderPromise: Promise<void>|null; | 204 private renderPromise: Promise<void>|null; |
| 203 | 205 |
| 204 constructor( | 206 constructor( |
| (...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 347 this.streamStatusValue = st; | 349 this.streamStatusValue = st; |
| 348 this.updateControls(); | 350 this.updateControls(); |
| 349 } | 351 } |
| 350 | 352 |
| 351 private updateControls() { | 353 private updateControls() { |
| 352 this.view.updateControls({ | 354 this.view.updateControls({ |
| 353 canSplit: this.providerCanSplit, | 355 canSplit: this.providerCanSplit, |
| 354 split: this.isSplit, | 356 split: this.isSplit, |
| 355 bottom: !this.fetchedEndOfStream, | 357 bottom: !this.fetchedEndOfStream, |
| 356 fullyLoaded: (this.fetchedFullStream && (!this.rendering)), | 358 fullyLoaded: (this.fetchedFullStream && (!this.rendering)), |
| 359 logStreamUrl: this.logStreamUrl, | |
| 357 loadingState: this.loadingState, | 360 loadingState: this.loadingState, |
| 358 streamStatus: this.streamStatus, | 361 streamStatus: this.streamStatus, |
| 359 }); | 362 }); |
| 360 } | 363 } |
| 361 | 364 |
| 362 /** | 365 /** |
| 363 * Note that the authentication state for the client has changed. This will | 366 * Note that the authentication state for the client has changed. This will |
| 364 * trigger an automatic fetch retry if our previous fetch failed due to | 367 * trigger an automatic fetch retry if our previous fetch failed due to |
| 365 * lack of authentication. | 368 * lack of authentication. |
| 366 */ | 369 */ |
| (...skipping 258 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 625 return (!!(split && split.isSplit())); | 628 return (!!(split && split.isSplit())); |
| 626 } | 629 } |
| 627 | 630 |
| 628 private get fetchedEndOfStream(): boolean { | 631 private get fetchedEndOfStream(): boolean { |
| 629 return (this.provider.fetchedEndOfStream()); | 632 return (this.provider.fetchedEndOfStream()); |
| 630 } | 633 } |
| 631 | 634 |
| 632 private get fetchedFullStream(): boolean { | 635 private get fetchedFullStream(): boolean { |
| 633 return (this.fetchedEndOfStream && (!this.isSplit)); | 636 return (this.fetchedEndOfStream && (!this.isSplit)); |
| 634 } | 637 } |
| 638 | |
| 639 private get logStreamUrl(): string|undefined { | |
| 640 if (!this.cachedLogStreamUrl) { | |
| 641 this.cachedLogStreamUrl = this.provider.getLogStreamUrl(); | |
| 642 } | |
| 643 return this.cachedLogStreamUrl; | |
| 644 } | |
| 635 } | 645 } |
| 636 | 646 |
| 637 /** Generic interface for a log provider. */ | 647 /** Generic interface for a log provider. */ |
| 638 interface LogProvider { | 648 interface LogProvider { |
| 639 setStreamStatusCallback(cb: StreamStatusCallback): void; | 649 setStreamStatusCallback(cb: StreamStatusCallback): void; |
| 640 fetch(op: luci.Operation, l: Location): Promise<BufferedLogs>; | 650 fetch(op: luci.Operation, l: Location): Promise<BufferedLogs>; |
| 651 getLogStreamUrl(): string|undefined; | |
| 641 | 652 |
| 642 /** Will return null if this LogProvider doesn't support splitting. */ | 653 /** Will return null if this LogProvider doesn't support splitting. */ |
| 643 split(): SplitLogProvider|null; | 654 split(): SplitLogProvider|null; |
| 644 fetchedEndOfStream(): boolean; | 655 fetchedEndOfStream(): boolean; |
| 645 } | 656 } |
| 646 | 657 |
| 647 /** Additional methods for log stream splitting, if supported. */ | 658 /** Additional methods for log stream splitting, if supported. */ |
| 648 interface SplitLogProvider { | 659 interface SplitLogProvider { |
| 649 canSplit(): boolean; | 660 canSplit(): boolean; |
| 650 isSplit(): boolean; | 661 isSplit(): boolean; |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 741 try { | 752 try { |
| 742 let logs = await getLogs; | 753 let logs = await getLogs; |
| 743 this.initialFetch = false; | 754 this.initialFetch = false; |
| 744 this.statusChanged(); | 755 this.statusChanged(); |
| 745 return new BufferedLogs(logs); | 756 return new BufferedLogs(logs); |
| 746 } catch (err) { | 757 } catch (err) { |
| 747 throw resolveErr(err); | 758 throw resolveErr(err); |
| 748 } | 759 } |
| 749 } | 760 } |
| 750 | 761 |
| 762 get descriptor() { | |
| 763 return this.fetcher.desc; | |
| 764 } | |
| 765 | |
| 766 getLogStreamUrl(): string|undefined { | |
| 767 let desc = this.descriptor; | |
| 768 if (desc) { | |
| 769 return (desc.tags || {})['logdog.viewer_url']; | |
| 770 } | |
| 771 return undefined; | |
| 772 } | |
| 773 | |
| 751 setStreamStatusCallback(cb: StreamStatusCallback) { | 774 setStreamStatusCallback(cb: StreamStatusCallback) { |
| 752 this.streamStatusCallback = cb; | 775 this.streamStatusCallback = cb; |
| 753 } | 776 } |
| 754 | 777 |
| 755 private statusChanged() { | 778 private statusChanged() { |
| 756 if (this.streamStatusCallback) { | 779 if (this.streamStatusCallback) { |
| 757 this.streamStatusCallback([this.getStreamStatus()]); | 780 this.streamStatusCallback([this.getStreamStatus()]); |
| 758 } | 781 } |
| 759 } | 782 } |
| 760 | 783 |
| (...skipping 232 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 993 * entries can be yielded, since we don't know what ordering to apply | 1016 * entries can be yielded, since we don't know what ordering to apply |
| 994 * otherwise. To make this fast, we will make the first request for each | 1017 * otherwise. To make this fast, we will make the first request for each |
| 995 * stream small so it finishes quickly and we can start rendering. Subsequent | 1018 * stream small so it finishes quickly and we can start rendering. Subsequent |
| 996 * entries will be larger for efficiency. | 1019 * entries will be larger for efficiency. |
| 997 * | 1020 * |
| 998 * @param {LogStream} streams the composite streams. | 1021 * @param {LogStream} streams the composite streams. |
| 999 */ | 1022 */ |
| 1000 class AggregateLogStream implements LogProvider { | 1023 class AggregateLogStream implements LogProvider { |
| 1001 private streams: AggregateLogStream.Entry[]; | 1024 private streams: AggregateLogStream.Entry[]; |
| 1002 private active: AggregateLogStream.Entry[]; | 1025 private active: AggregateLogStream.Entry[]; |
| 1026 private sharedPrefix: boolean; | |
| 1003 private currentNextPromise: Promise<BufferedLogs[]>|null; | 1027 private currentNextPromise: Promise<BufferedLogs[]>|null; |
| 1004 private compareLogs: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number; | 1028 private compareLogs: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number; |
| 1005 | 1029 |
| 1006 private streamStatusCallback: StreamStatusCallback; | 1030 private streamStatusCallback: StreamStatusCallback; |
| 1007 | 1031 |
| 1008 constructor(streams: LogStream[]) { | 1032 constructor(streams: LogStream[]) { |
| 1009 // Input streams, ordered by input order. | 1033 // Input streams, ordered by input order. |
| 1010 this.streams = streams.map<AggregateLogStream.Entry>((ls, i) => { | 1034 this.streams = streams.map<AggregateLogStream.Entry>((ls, i) => { |
| 1011 ls.setStreamStatusCallback((st: LogStreamStatus[]) => { | 1035 ls.setStreamStatusCallback((st: LogStreamStatus[]) => { |
| 1012 if (st) { | 1036 if (st) { |
| 1013 this.streams[i].status = st[0]; | 1037 this.streams[i].status = st[0]; |
| 1014 this.statusChanged(); | 1038 this.statusChanged(); |
| 1015 } | 1039 } |
| 1016 }); | 1040 }); |
| 1017 | 1041 |
| 1018 return new AggregateLogStream.Entry(ls); | 1042 return new AggregateLogStream.Entry(ls); |
| 1019 }); | 1043 }); |
| 1020 | 1044 |
| 1021 // Subset of input streams that are still active (not finished). | 1045 // Subset of input streams that are still active (not finished). |
| 1022 this.active = this.streams; | 1046 this.active = this.streams; |
| 1023 | 1047 |
| 1024 // The currently-active "next" promise. | 1048 // The currently-active "next" promise. |
| 1025 this.currentNextPromise = null; | 1049 this.currentNextPromise = null; |
| 1026 | 1050 |
| 1027 // Determine our log comparison function. If all of our logs share a | 1051 // Determine our log comparison function. If all of our logs share a |
| 1028 // prefix, we will use the prefix index. Otherwise, we will use the | 1052 // prefix, we will use the prefix index. Otherwise, we will use the |
| 1029 // timestamp. | 1053 // timestamp. |
| 1030 let template: LogDog.StreamPath; | 1054 let template: LogDog.StreamPath; |
| 1031 let sharedPrefix = this.streams.every((entry) => { | 1055 this.sharedPrefix = this.streams.every((entry) => { |
|
nodir
2017/05/03 15:36:52
I don't think this is used outside of this functio
dnj
2017/05/03 15:40:43
Correct, I originally wanted to only return links
| |
| 1032 if (!template) { | 1056 if (!template) { |
| 1033 template = entry.ls.stream; | 1057 template = entry.ls.stream; |
| 1034 return true; | 1058 return true; |
| 1035 } | 1059 } |
| 1036 return template.samePrefixAs(entry.ls.stream); | 1060 return template.samePrefixAs(entry.ls.stream); |
| 1037 }); | 1061 }); |
| 1038 | 1062 |
| 1039 this.compareLogs = ((sharedPrefix) ? (a, b) => { | 1063 this.compareLogs = ((this.sharedPrefix) ? (a, b) => { |
| 1040 return (a.prefixIndex - b.prefixIndex); | 1064 return (a.prefixIndex - b.prefixIndex); |
| 1041 } : (a, b) => { | 1065 } : (a, b) => { |
| 1042 if (a.timestamp) { | 1066 if (a.timestamp) { |
| 1043 if (b.timestamp) { | 1067 if (b.timestamp) { |
| 1044 return a.timestamp.getTime() - b.timestamp.getTime(); | 1068 return a.timestamp.getTime() - b.timestamp.getTime(); |
| 1045 } | 1069 } |
| 1046 return 1; | 1070 return 1; |
| 1047 } else if (b.timestamp) { | 1071 } else if (b.timestamp) { |
| 1048 return -1; | 1072 return -1; |
| 1049 } else { | 1073 } else { |
| (...skipping 16 matching lines...) Expand all Loading... | |
| 1066 private statusChanged() { | 1090 private statusChanged() { |
| 1067 if (this.streamStatusCallback) { | 1091 if (this.streamStatusCallback) { |
| 1068 // Iterate through our composite stream statuses and pick the one that | 1092 // Iterate through our composite stream statuses and pick the one that |
| 1069 // we want to report. | 1093 // we want to report. |
| 1070 this.streamStatusCallback(this.streams.map((entry): LogStreamStatus => { | 1094 this.streamStatusCallback(this.streams.map((entry): LogStreamStatus => { |
| 1071 return entry.status; | 1095 return entry.status; |
| 1072 })); | 1096 })); |
| 1073 } | 1097 } |
| 1074 } | 1098 } |
| 1075 | 1099 |
| 1100 getLogStreamUrl(): string|undefined { | |
| 1101 // Return the first log stream viewer URL. IF we have a shared prefix, | |
| 1102 // this will always work. Otherwise, returning something is better than | |
| 1103 // nothing, so if any of the base streams have a URL, we will return it. | |
| 1104 for (let s of this.streams) { | |
| 1105 let url = s.ls.getLogStreamUrl(); | |
| 1106 if (url) { | |
| 1107 return url; | |
| 1108 } | |
| 1109 } | |
| 1110 return undefined; | |
| 1111 } | |
| 1112 | |
| 1076 /** | 1113 /** |
| 1077 * Implements LogProvider.next | 1114 * Implements LogProvider.next |
| 1078 */ | 1115 */ |
| 1079 async fetch(op: luci.Operation, _: Location) { | 1116 async fetch(op: luci.Operation, _: Location) { |
| 1080 // If we're already are fetching the next buffer, this is an error. | 1117 // If we're already are fetching the next buffer, this is an error. |
| 1081 if (this.currentNextPromise) { | 1118 if (this.currentNextPromise) { |
| 1082 throw new Error('In-progress next(), cannot start another.'); | 1119 throw new Error('In-progress next(), cannot start another.'); |
| 1083 } | 1120 } |
| 1084 | 1121 |
| 1085 // Filter out any finished streams from our active list. A stream is | 1122 // Filter out any finished streams from our active list. A stream is |
| (...skipping 209 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1295 | 1332 |
| 1296 // Get the next log and increment our index. | 1333 // Get the next log and increment our index. |
| 1297 let log = this.logs[this.index++]; | 1334 let log = this.logs[this.index++]; |
| 1298 if (this.index >= this.logs.length) { | 1335 if (this.index >= this.logs.length) { |
| 1299 this.logs = null; | 1336 this.logs = null; |
| 1300 } | 1337 } |
| 1301 return log; | 1338 return log; |
| 1302 } | 1339 } |
| 1303 } | 1340 } |
| 1304 } | 1341 } |
| OLD | NEW |