Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(557)

Side by Side Diff: web/inc/logdog-stream-view/viewer.ts

Issue 2861583003: Enable sequential prefix index logs to be loaded. (Closed)
Patch Set: comments Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 Copyright 2016 The LUCI Authors. All rights reserved. 2 Copyright 2016 The LUCI Authors. All rights reserved.
3 Use of this source code is governed under the Apache License, Version 2.0 3 Use of this source code is governed under the Apache License, Version 2.0
4 that can be found in the LICENSE file. 4 that can be found in the LICENSE file.
5 */ 5 */
6 6
7 ///<reference path="../logdog-stream/logdog.ts" /> 7 ///<reference path="../logdog-stream/logdog.ts" />
8 ///<reference path="../luci-operation/operation.ts" /> 8 ///<reference path="../luci-operation/operation.ts" />
9 ///<reference path="../luci-sleep-promise/promise.ts" /> 9 ///<reference path="../luci-sleep-promise/promise.ts" />
10 ///<reference path="../rpc/client.ts" /> 10 ///<reference path="../rpc/client.ts" />
(...skipping 1005 matching lines...) Expand 10 before | Expand all | Expand 10 after
1016 this.setActiveFetch(this.fetcher.get(op, this.nextBottomIndex, opts)); 1016 this.setActiveFetch(this.fetcher.get(op, this.nextBottomIndex, opts));
1017 let logs = await f.p; 1017 let logs = await f.p;
1018 if (logs && logs.length) { 1018 if (logs && logs.length) {
1019 this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1); 1019 this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1);
1020 } 1020 }
1021 return logs; 1021 return logs;
1022 } 1022 }
1023 } 1023 }
1024 1024
1025 /** 1025 /**
1026 * LogSorter is an interface that used by AggregateLogStream to extract sorted
1027 * logs from a set of BufferedLogs.
1028 *
1029 * It is used to compare two log entries to determine their relative order.
1030 */
1031 type LogSorter = {
1032 /** Returns true if "a" comes before "b". */
1033 before: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number;
1034
1035 /**
1036 * If implemented, returns an implicit next log in the buffer set.
1037 *
1038 * This is useful if the next log can be determined from the current
1039 * buffered data, even if it is partial or incomplete.
1040 */
1041 implicitNext?: (prev: LogDog.LogEntry, buffers: BufferedLogs[]) =>
1042 LogDog.LogEntry | null;
1043 };
1044
1045 const prefixIndexLogSorter: LogSorter = {
1046 before: (a: LogDog.LogEntry, b: LogDog.LogEntry) => {
1047 return (a.prefixIndex - b.prefixIndex);
1048 },
1049
1050 implicitNext: (prev: LogDog.LogEntry, buffers: BufferedLogs[]) => {
1051 let nextPrefixIndex = (prev.prefixIndex + 1);
1052 for (let buf of buffers) {
1053 let le = buf.peek();
1054 if (le && le.prefixIndex === nextPrefixIndex) {
1055 return buf.next();
1056 }
1057 }
1058 return null;
1059 },
1060 };
1061
1062 const timestampLogSorter: LogSorter = {
1063 before: (a: LogDog.LogEntry, b: LogDog.LogEntry) => {
1064 if (a.timestamp) {
1065 if (b.timestamp) {
1066 return a.timestamp.getTime() - b.timestamp.getTime();
1067 }
1068 return 1;
1069 }
1070 if (b.timestamp) {
1071 return -1;
1072 }
1073 return 0;
1074 },
1075
1076 // No implicit "next" with timestamp-based logs, since the next log in
1077 // an empty buffer may actually be the next contiguous log.
1078 implicitNext: undefined,
1079 };
1080
1081 /**
1026 * An aggregate log stream. It presents a single-stream view, but is really 1082 * An aggregate log stream. It presents a single-stream view, but is really
1027 * composed of several log streams interleaved based on their prefix indices 1083 * composed of several log streams interleaved based on their prefix indices
1028 * (if they share a prefix) or timestamps (if they don't). 1084 * (if they share a prefix) or timestamps (if they don't).
1029 * 1085 *
1030 * At least one log entry from each stream must be buffered before any log 1086 * At least one log entry from each stream must be buffered before any log
1031 * entries can be yielded, since we don't know what ordering to apply 1087 * entries can be yielded, since we don't know what ordering to apply
1032 * otherwise. To make this fast, we will make the first request for each 1088 * otherwise. To make this fast, we will make the first request for each
1033 * stream small so it finishes quickly and we can start rendering. Subsequent 1089 * stream small so it finishes quickly and we can start rendering. Subsequent
1034 * entries will be larger for efficiency. 1090 * entries will be larger for efficiency.
1035 * 1091 *
1036 * @param {LogStream} streams the composite streams. 1092 * @param {LogStream} streams the composite streams.
1037 */ 1093 */
1038 class AggregateLogStream implements LogProvider { 1094 class AggregateLogStream implements LogProvider {
1039 private streams: AggregateLogStream.Entry[]; 1095 private streams: AggregateLogStream.Entry[];
1040 private active: AggregateLogStream.Entry[]; 1096 private active: AggregateLogStream.Entry[];
1041 private currentNextPromise: Promise<BufferedLogs[]>|null; 1097 private currentNextPromise: Promise<BufferedLogs[]>|null;
1042 private compareLogs: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number; 1098 private readonly logSorter: LogSorter;
1043 1099
1044 private streamStatusCallback: StreamStatusCallback; 1100 private streamStatusCallback: StreamStatusCallback;
1045 1101
1046 constructor(streams: LogStream[]) { 1102 constructor(streams: LogStream[]) {
1047 // Input streams, ordered by input order. 1103 // Input streams, ordered by input order.
1048 this.streams = streams.map<AggregateLogStream.Entry>((ls, i) => { 1104 this.streams = streams.map<AggregateLogStream.Entry>((ls, i) => {
1049 ls.setStreamStatusCallback((st: LogStreamStatus[]) => { 1105 ls.setStreamStatusCallback((st: LogStreamStatus[]) => {
1050 if (st) { 1106 if (st) {
1051 this.streams[i].status = st[0]; 1107 this.streams[i].status = st[0];
1052 this.statusChanged(); 1108 this.statusChanged();
(...skipping 14 matching lines...) Expand all
1067 // timestamp. 1123 // timestamp.
1068 let template: LogDog.StreamPath; 1124 let template: LogDog.StreamPath;
1069 let sharedPrefix = this.streams.every((entry) => { 1125 let sharedPrefix = this.streams.every((entry) => {
1070 if (!template) { 1126 if (!template) {
1071 template = entry.ls.stream; 1127 template = entry.ls.stream;
1072 return true; 1128 return true;
1073 } 1129 }
1074 return template.samePrefixAs(entry.ls.stream); 1130 return template.samePrefixAs(entry.ls.stream);
1075 }); 1131 });
1076 1132
1077 this.compareLogs = ((sharedPrefix) ? (a, b) => { 1133 if (sharedPrefix) {
1078 return (a.prefixIndex - b.prefixIndex); 1134 this.logSorter = prefixIndexLogSorter;
1079 } : (a, b) => { 1135 } else {
1080 if (a.timestamp) { 1136 this.logSorter = timestampLogSorter;
1081 if (b.timestamp) { 1137 }
1082 return a.timestamp.getTime() - b.timestamp.getTime();
1083 }
1084 return 1;
1085 } else if (b.timestamp) {
1086 return -1;
1087 } else {
1088 return 0;
1089 }
1090 });
1091 } 1138 }
1092 1139
1093 split(): SplitLogProvider|null { 1140 split(): SplitLogProvider|null {
1094 return null; 1141 return null;
1095 } 1142 }
1096 fetchedEndOfStream(): boolean { 1143 fetchedEndOfStream(): boolean {
1097 return (!this.active.length); 1144 return (!this.active.length);
1098 } 1145 }
1099 1146
1100 setStreamStatusCallback(cb: StreamStatusCallback) { 1147 setStreamStatusCallback(cb: StreamStatusCallback) {
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after
1216 private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs { 1263 private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs {
1217 switch (buffers.length) { 1264 switch (buffers.length) {
1218 case 0: 1265 case 0:
1219 // No buffers, so no logs. 1266 // No buffers, so no logs.
1220 return new BufferedLogs(null); 1267 return new BufferedLogs(null);
1221 case 1: 1268 case 1:
1222 // As a special case, if we only have one buffer, and we assume that 1269 // As a special case, if we only have one buffer, and we assume that
1223 // its entries are sorted, then that buffer is a return value. 1270 // its entries are sorted, then that buffer is a return value.
1224 return new BufferedLogs(buffers[0].getAll()); 1271 return new BufferedLogs(buffers[0].getAll());
1225 default: 1272 default:
1226 // Nothing to do.
1227 break; 1273 break;
1228 } 1274 }
1229 1275
1230 // Preload our peek array. 1276 // Preload our peek array.
1231 let peek = new Array<LogDog.LogEntry>(buffers.length); 1277 let peek = new Array<LogDog.LogEntry>(buffers.length);
1232 peek.length = 0; 1278 peek.length = 0;
1233 for (let buf of buffers) { 1279 for (let buf of buffers) {
1234 let le = buf.peek(); 1280 let le = buf.peek();
1235 if (!le) { 1281 if (!le) {
1236 // One of our input buffers had no log entries. 1282 // One of our input buffers had no log entries.
1237 return new BufferedLogs(null); 1283 return new BufferedLogs(null);
1238 } 1284 }
1239 peek.push(le); 1285 peek.push(le);
1240 } 1286 }
1241 1287
1242 // Assemble our aggregate buffer array. 1288 // Assemble our aggregate buffer array.
1289 //
1290 // As we add log entries, latestAdded will be updated to point to the most
1291 // recently added LogEntry.
1243 let entries: LogDog.LogEntry[] = []; 1292 let entries: LogDog.LogEntry[] = [];
1293 let latestAdded: LogDog.LogEntry|null = null;
1244 while (true) { 1294 while (true) {
1245 // Choose the next stream. 1295 // Choose the next stream.
1246 let earliest = 0; 1296 let earliest = 0;
1247 for (let i = 1; i < buffers.length; i++) { 1297 for (let i = 1; i < buffers.length; i++) {
1248 if (this.compareLogs(peek[i], peek[earliest]) < 0) { 1298 if (this.logSorter.before(peek[i], peek[earliest])) {
1249 earliest = i; 1299 earliest = i;
1250 } 1300 }
1251 } 1301 }
1252 1302
1253 // Get the next log from the earliest stream. 1303 // Get the next log from the earliest stream.
1254 let next = buffers[earliest].next(); 1304 let next = buffers[earliest].next();
1255 if (next) { 1305 if (next) {
1256 entries.push(next); 1306 latestAdded = next;
1307 entries.push(latestAdded);
1257 } 1308 }
1258 1309
1259 // Repopulate that buffer's "peek" value. If the buffer has no more 1310 // Repopulate that buffer's "peek" value. If the buffer has no more
1260 // entries, then we're done. 1311 // entries, then we're done this round.
1261 next = buffers[earliest].peek(); 1312 next = buffers[earliest].peek();
1262 if (!next) { 1313 if (!next) {
1263 return new BufferedLogs(entries); 1314 break;
1264 } 1315 }
1265 peek[earliest] = next; 1316 peek[earliest] = next;
1266 } 1317 }
1318
1319 // One or more of our buffers is exhausted. If we have the ability to load
1320 // implicit next logs, try and extract more using that.
1321 if (latestAdded && this.logSorter.implicitNext) {
1322 while (true) {
1323 latestAdded = this.logSorter.implicitNext(latestAdded, buffers);
1324 if (!latestAdded) {
1325 break;
1326 }
1327 entries.push(latestAdded);
1328 }
1329 }
1330 return new BufferedLogs(entries);
1267 } 1331 }
1268 } 1332 }
1269 1333
1270 /** Internal namespace for AggregateLogStream types. */ 1334 /** Internal namespace for AggregateLogStream types. */
1271 namespace AggregateLogStream { 1335 namespace AggregateLogStream {
1272 /** Entry is an entry for a single log stream and its buffered logs. */ 1336 /** Entry is an entry for a single log stream and its buffered logs. */
1273 export class Entry { 1337 export class Entry {
1274 buffer = new BufferedLogs(null); 1338 buffer = new BufferedLogs(null);
1275 status: LogStreamStatus; 1339 status: LogStreamStatus;
1276 lastError: Error|null; 1340 lastError: Error|null;
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
1318 1382
1319 /** 1383 /**
1320 * Peek returns the next log in the buffer without modifying the buffer. If 1384 * Peek returns the next log in the buffer without modifying the buffer. If
1321 * there are no logs in the buffer, peek will return null. 1385 * there are no logs in the buffer, peek will return null.
1322 */ 1386 */
1323 peek(): LogDog.LogEntry|null { 1387 peek(): LogDog.LogEntry|null {
1324 return (this.logs) ? (this.logs[this.index]) : (null); 1388 return (this.logs) ? (this.logs[this.index]) : (null);
1325 } 1389 }
1326 1390
1327 /** 1391 /**
1392 * Returns a copy of the remaining logs in the buffer.
1393 * If there are no logs, an empty array will be returned.
1394 */
1395 peekAll(): LogDog.LogEntry[] {
1396 return (this.logs || []).slice(0);
1397 }
1398
1399 /**
1328 * GetAll returns all logs in the buffer. Afterwards, the buffer will be 1400 * GetAll returns all logs in the buffer. Afterwards, the buffer will be
1329 * empty. 1401 * empty.
1330 */ 1402 */
1331 getAll(): LogDog.LogEntry[] { 1403 getAll(): LogDog.LogEntry[] {
1332 // Pop all logs. 1404 // Pop all logs.
1333 let logs = this.logs; 1405 let logs = this.logs;
1334 this.logs = null; 1406 this.logs = null;
1335 return (logs || []); 1407 return (logs || []);
1336 } 1408 }
1337 1409
1338 /** 1410 /**
1339 * Next fetches the next log in the buffer, removing it from the buffer. If 1411 * Next fetches the next log in the buffer, removing it from the buffer. If
1340 * no more logs are available, it will return null. 1412 * no more logs are available, it will return null.
1341 */ 1413 */
1342 next(): LogDog.LogEntry|null { 1414 next(): LogDog.LogEntry|null {
1343 if (!(this.logs && this.logs.length)) { 1415 if (!(this.logs && this.logs.length)) {
1344 return null; 1416 return null;
1345 } 1417 }
1346 1418
1347 // Get the next log and increment our index. 1419 // Get the next log and increment our index.
1348 let log = this.logs[this.index++]; 1420 let log = this.logs[this.index++];
1349 if (this.index >= this.logs.length) { 1421 if (this.index >= this.logs.length) {
1350 this.logs = null; 1422 this.logs = null;
1351 } 1423 }
1352 return log; 1424 return log;
1353 } 1425 }
1354 } 1426 }
1355 } 1427 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698