Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(266)

Side by Side Diff: web/inc/logdog-stream-view/viewer.ts

Issue 2861583003: Enable sequential prefix index logs to be loaded. (Closed)
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 Copyright 2016 The LUCI Authors. All rights reserved. 2 Copyright 2016 The LUCI Authors. All rights reserved.
3 Use of this source code is governed under the Apache License, Version 2.0 3 Use of this source code is governed under the Apache License, Version 2.0
4 that can be found in the LICENSE file. 4 that can be found in the LICENSE file.
5 */ 5 */
6 6
7 ///<reference path="../logdog-stream/logdog.ts" /> 7 ///<reference path="../logdog-stream/logdog.ts" />
8 ///<reference path="../luci-operation/operation.ts" /> 8 ///<reference path="../luci-operation/operation.ts" />
9 ///<reference path="../luci-sleep-promise/promise.ts" /> 9 ///<reference path="../luci-sleep-promise/promise.ts" />
10 ///<reference path="../rpc/client.ts" /> 10 ///<reference path="../rpc/client.ts" />
(...skipping 980 matching lines...) Expand 10 before | Expand all | Expand 10 after
991 this.setActiveFetch(this.fetcher.get(op, this.nextBottomIndex, opts)); 991 this.setActiveFetch(this.fetcher.get(op, this.nextBottomIndex, opts));
992 let logs = await f.p; 992 let logs = await f.p;
993 if (logs && logs.length) { 993 if (logs && logs.length) {
994 this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1); 994 this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1);
995 } 995 }
996 return logs; 996 return logs;
997 } 997 }
998 } 998 }
999 999
1000 /** 1000 /**
1001 * LogSorter is used to extract sorted logs from a set of BufferedLogs.
hinoka 2017/05/03 17:16:58 LogSorter is an interface used to....
dnj 2017/05/03 19:09:27 Done.
1002 *
1003 * LogSorter will modify the BufferedLogs, consuming the logs that it
hinoka 2017/05/03 17:16:58 A LogSorter which implements implicitNext() will m
dnj 2017/05/03 19:09:27 Done.
1004 * returns from the buffer.
1005 */
1006 type LogSorter = {
1007 compare: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number;
1008 implicitNext?: (prev: LogDog.LogEntry, buffers: BufferedLogs[]) =>
1009 LogDog.LogEntry | null;
1010 };
1011
1012 const prefixIndexLogSorter: LogSorter = {
1013 compare: (a: LogDog.LogEntry, b: LogDog.LogEntry) => {
1014 return (a.prefixIndex - b.prefixIndex);
1015 },
1016
1017 implicitNext: (prev: LogDog.LogEntry, buffers: BufferedLogs[]) => {
1018 let nextPrefixIndex = (prev.prefixIndex + 1);
1019 for (let buf of buffers) {
1020 let le = buf.peek();
1021 if (le && le.prefixIndex === nextPrefixIndex) {
1022 return buf.next();
1023 }
1024 }
1025 return null;
1026 },
1027 };
1028
1029 const timestampLogSorter: LogSorter = {
1030 compare: (a: LogDog.LogEntry, b: LogDog.LogEntry) => {
1031 if (a.timestamp) {
1032 if (b.timestamp) {
1033 return a.timestamp.getTime() - b.timestamp.getTime();
1034 }
1035 return 1;
1036 }
1037 if (b.timestamp) {
1038 return -1;
1039 }
1040 return 0;
1041 },
1042
1043 // No implicit "next" with timestamp-based logs, since the next log in
1044 // an empty buffer may actually be the next contiguous log.
1045 implicitNext: undefined,
1046 };
1047
1048 /**
1001 * An aggregate log stream. It presents a single-stream view, but is really 1049 * An aggregate log stream. It presents a single-stream view, but is really
1002 * composed of several log streams interleaved based on their prefix indices 1050 * composed of several log streams interleaved based on their prefix indices
1003 * (if they share a prefix) or timestamps (if they don't). 1051 * (if they share a prefix) or timestamps (if they don't).
1004 * 1052 *
1005 * At least one log entry from each stream must be buffered before any log 1053 * At least one log entry from each stream must be buffered before any log
1006 * entries can be yielded, since we don't know what ordering to apply 1054 * entries can be yielded, since we don't know what ordering to apply
1007 * otherwise. To make this fast, we will make the first request for each 1055 * otherwise. To make this fast, we will make the first request for each
1008 * stream small so it finishes quickly and we can start rendering. Subsequent 1056 * stream small so it finishes quickly and we can start rendering. Subsequent
1009 * entries will be larger for efficiency. 1057 * entries will be larger for efficiency.
1010 * 1058 *
1011 * @param {LogStream} streams the composite streams. 1059 * @param {LogStream} streams the composite streams.
1012 */ 1060 */
1013 class AggregateLogStream implements LogProvider { 1061 class AggregateLogStream implements LogProvider {
1014 private streams: AggregateLogStream.Entry[]; 1062 private streams: AggregateLogStream.Entry[];
1015 private active: AggregateLogStream.Entry[]; 1063 private active: AggregateLogStream.Entry[];
1016 private currentNextPromise: Promise<BufferedLogs[]>|null; 1064 private currentNextPromise: Promise<BufferedLogs[]>|null;
1017 private compareLogs: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number; 1065 private readonly logSorter: LogSorter;
1018 1066
1019 private streamStatusCallback: StreamStatusCallback; 1067 private streamStatusCallback: StreamStatusCallback;
1020 1068
1021 constructor(streams: LogStream[]) { 1069 constructor(streams: LogStream[]) {
1022 // Input streams, ordered by input order. 1070 // Input streams, ordered by input order.
1023 this.streams = streams.map<AggregateLogStream.Entry>((ls, i) => { 1071 this.streams = streams.map<AggregateLogStream.Entry>((ls, i) => {
1024 ls.setStreamStatusCallback((st: LogStreamStatus[]) => { 1072 ls.setStreamStatusCallback((st: LogStreamStatus[]) => {
1025 if (st) { 1073 if (st) {
1026 this.streams[i].status = st[0]; 1074 this.streams[i].status = st[0];
1027 this.statusChanged(); 1075 this.statusChanged();
(...skipping 14 matching lines...) Expand all
1042 // timestamp. 1090 // timestamp.
1043 let template: LogDog.StreamPath; 1091 let template: LogDog.StreamPath;
1044 let sharedPrefix = this.streams.every((entry) => { 1092 let sharedPrefix = this.streams.every((entry) => {
1045 if (!template) { 1093 if (!template) {
1046 template = entry.ls.stream; 1094 template = entry.ls.stream;
1047 return true; 1095 return true;
1048 } 1096 }
1049 return template.samePrefixAs(entry.ls.stream); 1097 return template.samePrefixAs(entry.ls.stream);
1050 }); 1098 });
1051 1099
1052 this.compareLogs = ((sharedPrefix) ? (a, b) => { 1100 if (sharedPrefix) {
1053 return (a.prefixIndex - b.prefixIndex); 1101 this.logSorter = prefixIndexLogSorter;
1054 } : (a, b) => { 1102 } else {
1055 if (a.timestamp) { 1103 this.logSorter = timestampLogSorter;
1056 if (b.timestamp) { 1104 }
1057 return a.timestamp.getTime() - b.timestamp.getTime();
1058 }
1059 return 1;
1060 } else if (b.timestamp) {
1061 return -1;
1062 } else {
1063 return 0;
1064 }
1065 });
1066 } 1105 }
1067 1106
1068 split(): SplitLogProvider|null { 1107 split(): SplitLogProvider|null {
1069 return null; 1108 return null;
1070 } 1109 }
1071 fetchedEndOfStream(): boolean { 1110 fetchedEndOfStream(): boolean {
1072 return (!this.active.length); 1111 return (!this.active.length);
1073 } 1112 }
1074 1113
1075 setStreamStatusCallback(cb: StreamStatusCallback) { 1114 setStreamStatusCallback(cb: StreamStatusCallback) {
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after
1178 private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs { 1217 private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs {
1179 switch (buffers.length) { 1218 switch (buffers.length) {
1180 case 0: 1219 case 0:
1181 // No buffers, so no logs. 1220 // No buffers, so no logs.
1182 return new BufferedLogs(null); 1221 return new BufferedLogs(null);
1183 case 1: 1222 case 1:
1184 // As a special case, if we only have one buffer, and we assume that 1223 // As a special case, if we only have one buffer, and we assume that
1185 // its entries are sorted, then that buffer is a return value. 1224 // its entries are sorted, then that buffer is a return value.
1186 return new BufferedLogs(buffers[0].getAll()); 1225 return new BufferedLogs(buffers[0].getAll());
1187 default: 1226 default:
1188 // Nothing to do.
1189 break; 1227 break;
1190 } 1228 }
1191 1229
1192 // Preload our peek array. 1230 // Preload our peek array.
1193 let peek = new Array<LogDog.LogEntry>(buffers.length); 1231 let peek = new Array<LogDog.LogEntry>(buffers.length);
1194 peek.length = 0; 1232 peek.length = 0;
1195 for (let buf of buffers) { 1233 for (let buf of buffers) {
1196 let le = buf.peek(); 1234 let le = buf.peek();
1197 if (!le) { 1235 if (!le) {
1198 // One of our input buffers had no log entries. 1236 // One of our input buffers had no log entries.
1199 return new BufferedLogs(null); 1237 return new BufferedLogs(null);
1200 } 1238 }
1201 peek.push(le); 1239 peek.push(le);
1202 } 1240 }
1203 1241
1204 // Assemble our aggregate buffer array. 1242 // Assemble our aggregate buffer array.
1205 let entries: LogDog.LogEntry[] = []; 1243 let entries: LogDog.LogEntry[] = [];
1244 let last: LogDog.LogEntry|null = null;
hinoka 2017/05/03 17:16:58 how about "current"? "last" often implies "final",
dnj 2017/05/03 19:09:27 Renamed to "latestAdded".
1206 while (true) { 1245 while (true) {
1207 // Choose the next stream. 1246 // Choose the next stream.
1208 let earliest = 0; 1247 let earliest = 0;
1209 for (let i = 1; i < buffers.length; i++) { 1248 for (let i = 1; i < buffers.length; i++) {
1210 if (this.compareLogs(peek[i], peek[earliest]) < 0) { 1249 if (this.logSorter.compare(peek[i], peek[earliest]) < 0) {
1211 earliest = i; 1250 earliest = i;
1212 } 1251 }
1213 } 1252 }
1214 1253
1215 // Get the next log from the earliest stream. 1254 // Get the next log from the earliest stream.
1216 let next = buffers[earliest].next(); 1255 let next = buffers[earliest].next();
1217 if (next) { 1256 if (next) {
1218 entries.push(next); 1257 last = next;
1258 entries.push(last);
1219 } 1259 }
1220 1260
1221 // Repopulate that buffer's "peek" value. If the buffer has no more 1261 // Repopulate that buffer's "peek" value. If the buffer has no more
1222 // entries, then we're done. 1262 // entries, then we're done this round.
1223 next = buffers[earliest].peek(); 1263 next = buffers[earliest].peek();
1224 if (!next) { 1264 if (!next) {
1225 return new BufferedLogs(entries); 1265 break;
1226 } 1266 }
1227 peek[earliest] = next; 1267 peek[earliest] = next;
1228 } 1268 }
1269
1270 // One or more of our buffers is exhausted. If we have the ability to load
1271 // implicit next logs, try and extract more using that.
1272 if (last && this.logSorter.implicitNext) {
1273 while (true) {
1274 last = this.logSorter.implicitNext(last, buffers);
1275 if (!last) {
1276 break;
1277 }
1278 entries.push(last);
1279 }
1280 }
1281 return new BufferedLogs(entries);
1229 } 1282 }
1230 } 1283 }
1231 1284
1232 /** Internal namespace for AggregateLogStream types. */ 1285 /** Internal namespace for AggregateLogStream types. */
1233 namespace AggregateLogStream { 1286 namespace AggregateLogStream {
1234 /** Entry is an entry for a single log stream and its buffered logs. */ 1287 /** Entry is an entry for a single log stream and its buffered logs. */
1235 export class Entry { 1288 export class Entry {
1236 buffer = new BufferedLogs(null); 1289 buffer = new BufferedLogs(null);
1237 status: LogStreamStatus; 1290 status: LogStreamStatus;
1238 lastError: Error|null; 1291 lastError: Error|null;
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
1280 1333
1281 /** 1334 /**
1282 * Peek returns the next log in the buffer without modifying the buffer. If 1335 * Peek returns the next log in the buffer without modifying the buffer. If
1283 * there are no logs in the buffer, peek will return null. 1336 * there are no logs in the buffer, peek will return null.
1284 */ 1337 */
1285 peek(): LogDog.LogEntry|null { 1338 peek(): LogDog.LogEntry|null {
1286 return (this.logs) ? (this.logs[this.index]) : (null); 1339 return (this.logs) ? (this.logs[this.index]) : (null);
1287 } 1340 }
1288 1341
1289 /** 1342 /**
1343 * Returns a copy of the remaining logs in the buffer.
1344 * If there are no logs, an empty array will be returned.
1345 */
1346 peekAll(): LogDog.LogEntry[] {
1347 return (this.logs || []).slice(0);
1348 }
1349
1350 /**
1290 * GetAll returns all logs in the buffer. Afterwards, the buffer will be 1351 * GetAll returns all logs in the buffer. Afterwards, the buffer will be
1291 * empty. 1352 * empty.
1292 */ 1353 */
1293 getAll(): LogDog.LogEntry[] { 1354 getAll(): LogDog.LogEntry[] {
1294 // Pop all logs. 1355 // Pop all logs.
1295 let logs = this.logs; 1356 let logs = this.logs;
1296 this.logs = null; 1357 this.logs = null;
1297 return (logs || []); 1358 return (logs || []);
1298 } 1359 }
1299 1360
1300 /** 1361 /**
1301 * Next fetches the next log in the buffer, removing it from the buffer. If 1362 * Next fetches the next log in the buffer, removing it from the buffer. If
1302 * no more logs are available, it will return null. 1363 * no more logs are available, it will return null.
1303 */ 1364 */
1304 next(): LogDog.LogEntry|null { 1365 next(): LogDog.LogEntry|null {
1305 if (!(this.logs && this.logs.length)) { 1366 if (!(this.logs && this.logs.length)) {
1306 return null; 1367 return null;
1307 } 1368 }
1308 1369
1309 // Get the next log and increment our index. 1370 // Get the next log and increment our index.
1310 let log = this.logs[this.index++]; 1371 let log = this.logs[this.index++];
1311 if (this.index >= this.logs.length) { 1372 if (this.index >= this.logs.length) {
1312 this.logs = null; 1373 this.logs = null;
1313 } 1374 }
1314 return log; 1375 return log;
1315 } 1376 }
1316 } 1377 }
1317 } 1378 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698