| OLD | NEW |
| 1 /* | 1 /* |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. | 2 Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 Use of this source code is governed under the Apache License, Version 2.0 | 3 Use of this source code is governed under the Apache License, Version 2.0 |
| 4 that can be found in the LICENSE file. | 4 that can be found in the LICENSE file. |
| 5 */ | 5 */ |
| 6 | 6 |
| 7 ///<reference path="../logdog-stream/logdog.ts" /> | 7 ///<reference path="../logdog-stream/logdog.ts" /> |
| 8 ///<reference path="../luci-operation/operation.ts" /> | 8 ///<reference path="../luci-operation/operation.ts" /> |
| 9 ///<reference path="../luci-sleep-promise/promise.ts" /> | 9 ///<reference path="../luci-sleep-promise/promise.ts" /> |
| 10 ///<reference path="../rpc/client.ts" /> | 10 ///<reference path="../rpc/client.ts" /> |
| (...skipping 1005 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1016 this.setActiveFetch(this.fetcher.get(op, this.nextBottomIndex, opts)); | 1016 this.setActiveFetch(this.fetcher.get(op, this.nextBottomIndex, opts)); |
| 1017 let logs = await f.p; | 1017 let logs = await f.p; |
| 1018 if (logs && logs.length) { | 1018 if (logs && logs.length) { |
| 1019 this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1); | 1019 this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1); |
| 1020 } | 1020 } |
| 1021 return logs; | 1021 return logs; |
| 1022 } | 1022 } |
| 1023 } | 1023 } |
| 1024 | 1024 |
| 1025 /** | 1025 /** |
| 1026 * LogSorter is an interface that used by AggregateLogStream to extract sorted |
| 1027 * logs from a set of BufferedLogs. |
| 1028 * |
| 1029 * It is used to compare two log entries to determine their relative order. |
| 1030 */ |
| 1031 type LogSorter = { |
| 1032 /** Returns true if "a" comes before "b". */ |
| 1033 before: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number; |
| 1034 |
| 1035 /** |
| 1036 * If implemented, returns an implicit next log in the buffer set. |
| 1037 * |
| 1038 * This is useful if the next log can be determined from the current |
| 1039 * buffered data, even if it is partial or incomplete. |
| 1040 */ |
| 1041 implicitNext?: (prev: LogDog.LogEntry, buffers: BufferedLogs[]) => |
| 1042 LogDog.LogEntry | null; |
| 1043 }; |
| 1044 |
| 1045 const prefixIndexLogSorter: LogSorter = { |
| 1046 before: (a: LogDog.LogEntry, b: LogDog.LogEntry) => { |
| 1047 return (a.prefixIndex - b.prefixIndex); |
| 1048 }, |
| 1049 |
| 1050 implicitNext: (prev: LogDog.LogEntry, buffers: BufferedLogs[]) => { |
| 1051 let nextPrefixIndex = (prev.prefixIndex + 1); |
| 1052 for (let buf of buffers) { |
| 1053 let le = buf.peek(); |
| 1054 if (le && le.prefixIndex === nextPrefixIndex) { |
| 1055 return buf.next(); |
| 1056 } |
| 1057 } |
| 1058 return null; |
| 1059 }, |
| 1060 }; |
| 1061 |
| 1062 const timestampLogSorter: LogSorter = { |
| 1063 before: (a: LogDog.LogEntry, b: LogDog.LogEntry) => { |
| 1064 if (a.timestamp) { |
| 1065 if (b.timestamp) { |
| 1066 return a.timestamp.getTime() - b.timestamp.getTime(); |
| 1067 } |
| 1068 return 1; |
| 1069 } |
| 1070 if (b.timestamp) { |
| 1071 return -1; |
| 1072 } |
| 1073 return 0; |
| 1074 }, |
| 1075 |
| 1076 // No implicit "next" with timestamp-based logs, since the next log in |
| 1077 // an empty buffer may actually be the next contiguous log. |
| 1078 implicitNext: undefined, |
| 1079 }; |
| 1080 |
| 1081 /** |
| 1026 * An aggregate log stream. It presents a single-stream view, but is really | 1082 * An aggregate log stream. It presents a single-stream view, but is really |
| 1027 * composed of several log streams interleaved based on their prefix indices | 1083 * composed of several log streams interleaved based on their prefix indices |
| 1028 * (if they share a prefix) or timestamps (if they don't). | 1084 * (if they share a prefix) or timestamps (if they don't). |
| 1029 * | 1085 * |
| 1030 * At least one log entry from each stream must be buffered before any log | 1086 * At least one log entry from each stream must be buffered before any log |
| 1031 * entries can be yielded, since we don't know what ordering to apply | 1087 * entries can be yielded, since we don't know what ordering to apply |
| 1032 * otherwise. To make this fast, we will make the first request for each | 1088 * otherwise. To make this fast, we will make the first request for each |
| 1033 * stream small so it finishes quickly and we can start rendering. Subsequent | 1089 * stream small so it finishes quickly and we can start rendering. Subsequent |
| 1034 * entries will be larger for efficiency. | 1090 * entries will be larger for efficiency. |
| 1035 * | 1091 * |
| 1036 * @param {LogStream} streams the composite streams. | 1092 * @param {LogStream} streams the composite streams. |
| 1037 */ | 1093 */ |
| 1038 class AggregateLogStream implements LogProvider { | 1094 class AggregateLogStream implements LogProvider { |
| 1039 private streams: AggregateLogStream.Entry[]; | 1095 private streams: AggregateLogStream.Entry[]; |
| 1040 private active: AggregateLogStream.Entry[]; | 1096 private active: AggregateLogStream.Entry[]; |
| 1041 private currentNextPromise: Promise<BufferedLogs[]>|null; | 1097 private currentNextPromise: Promise<BufferedLogs[]>|null; |
| 1042 private compareLogs: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number; | 1098 private readonly logSorter: LogSorter; |
| 1043 | 1099 |
| 1044 private streamStatusCallback: StreamStatusCallback; | 1100 private streamStatusCallback: StreamStatusCallback; |
| 1045 | 1101 |
| 1046 constructor(streams: LogStream[]) { | 1102 constructor(streams: LogStream[]) { |
| 1047 // Input streams, ordered by input order. | 1103 // Input streams, ordered by input order. |
| 1048 this.streams = streams.map<AggregateLogStream.Entry>((ls, i) => { | 1104 this.streams = streams.map<AggregateLogStream.Entry>((ls, i) => { |
| 1049 ls.setStreamStatusCallback((st: LogStreamStatus[]) => { | 1105 ls.setStreamStatusCallback((st: LogStreamStatus[]) => { |
| 1050 if (st) { | 1106 if (st) { |
| 1051 this.streams[i].status = st[0]; | 1107 this.streams[i].status = st[0]; |
| 1052 this.statusChanged(); | 1108 this.statusChanged(); |
| (...skipping 14 matching lines...) Expand all Loading... |
| 1067 // timestamp. | 1123 // timestamp. |
| 1068 let template: LogDog.StreamPath; | 1124 let template: LogDog.StreamPath; |
| 1069 let sharedPrefix = this.streams.every((entry) => { | 1125 let sharedPrefix = this.streams.every((entry) => { |
| 1070 if (!template) { | 1126 if (!template) { |
| 1071 template = entry.ls.stream; | 1127 template = entry.ls.stream; |
| 1072 return true; | 1128 return true; |
| 1073 } | 1129 } |
| 1074 return template.samePrefixAs(entry.ls.stream); | 1130 return template.samePrefixAs(entry.ls.stream); |
| 1075 }); | 1131 }); |
| 1076 | 1132 |
| 1077 this.compareLogs = ((sharedPrefix) ? (a, b) => { | 1133 if (sharedPrefix) { |
| 1078 return (a.prefixIndex - b.prefixIndex); | 1134 this.logSorter = prefixIndexLogSorter; |
| 1079 } : (a, b) => { | 1135 } else { |
| 1080 if (a.timestamp) { | 1136 this.logSorter = timestampLogSorter; |
| 1081 if (b.timestamp) { | 1137 } |
| 1082 return a.timestamp.getTime() - b.timestamp.getTime(); | |
| 1083 } | |
| 1084 return 1; | |
| 1085 } else if (b.timestamp) { | |
| 1086 return -1; | |
| 1087 } else { | |
| 1088 return 0; | |
| 1089 } | |
| 1090 }); | |
| 1091 } | 1138 } |
| 1092 | 1139 |
| 1093 split(): SplitLogProvider|null { | 1140 split(): SplitLogProvider|null { |
| 1094 return null; | 1141 return null; |
| 1095 } | 1142 } |
| 1096 fetchedEndOfStream(): boolean { | 1143 fetchedEndOfStream(): boolean { |
| 1097 return (!this.active.length); | 1144 return (!this.active.length); |
| 1098 } | 1145 } |
| 1099 | 1146 |
| 1100 setStreamStatusCallback(cb: StreamStatusCallback) { | 1147 setStreamStatusCallback(cb: StreamStatusCallback) { |
| (...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1216 private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs { | 1263 private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs { |
| 1217 switch (buffers.length) { | 1264 switch (buffers.length) { |
| 1218 case 0: | 1265 case 0: |
| 1219 // No buffers, so no logs. | 1266 // No buffers, so no logs. |
| 1220 return new BufferedLogs(null); | 1267 return new BufferedLogs(null); |
| 1221 case 1: | 1268 case 1: |
| 1222 // As a special case, if we only have one buffer, and we assume that | 1269 // As a special case, if we only have one buffer, and we assume that |
| 1223 // its entries are sorted, then that buffer is a return value. | 1270 // its entries are sorted, then that buffer is a return value. |
| 1224 return new BufferedLogs(buffers[0].getAll()); | 1271 return new BufferedLogs(buffers[0].getAll()); |
| 1225 default: | 1272 default: |
| 1226 // Nothing to do. | |
| 1227 break; | 1273 break; |
| 1228 } | 1274 } |
| 1229 | 1275 |
| 1230 // Preload our peek array. | 1276 // Preload our peek array. |
| 1231 let peek = new Array<LogDog.LogEntry>(buffers.length); | 1277 let peek = new Array<LogDog.LogEntry>(buffers.length); |
| 1232 peek.length = 0; | 1278 peek.length = 0; |
| 1233 for (let buf of buffers) { | 1279 for (let buf of buffers) { |
| 1234 let le = buf.peek(); | 1280 let le = buf.peek(); |
| 1235 if (!le) { | 1281 if (!le) { |
| 1236 // One of our input buffers had no log entries. | 1282 // One of our input buffers had no log entries. |
| 1237 return new BufferedLogs(null); | 1283 return new BufferedLogs(null); |
| 1238 } | 1284 } |
| 1239 peek.push(le); | 1285 peek.push(le); |
| 1240 } | 1286 } |
| 1241 | 1287 |
| 1242 // Assemble our aggregate buffer array. | 1288 // Assemble our aggregate buffer array. |
| 1289 // |
| 1290 // As we add log entries, latestAdded will be updated to point to the most |
| 1291 // recently added LogEntry. |
| 1243 let entries: LogDog.LogEntry[] = []; | 1292 let entries: LogDog.LogEntry[] = []; |
| 1293 let latestAdded: LogDog.LogEntry|null = null; |
| 1244 while (true) { | 1294 while (true) { |
| 1245 // Choose the next stream. | 1295 // Choose the next stream. |
| 1246 let earliest = 0; | 1296 let earliest = 0; |
| 1247 for (let i = 1; i < buffers.length; i++) { | 1297 for (let i = 1; i < buffers.length; i++) { |
| 1248 if (this.compareLogs(peek[i], peek[earliest]) < 0) { | 1298 if (this.logSorter.before(peek[i], peek[earliest])) { |
| 1249 earliest = i; | 1299 earliest = i; |
| 1250 } | 1300 } |
| 1251 } | 1301 } |
| 1252 | 1302 |
| 1253 // Get the next log from the earliest stream. | 1303 // Get the next log from the earliest stream. |
| 1254 let next = buffers[earliest].next(); | 1304 let next = buffers[earliest].next(); |
| 1255 if (next) { | 1305 if (next) { |
| 1256 entries.push(next); | 1306 latestAdded = next; |
| 1307 entries.push(latestAdded); |
| 1257 } | 1308 } |
| 1258 | 1309 |
| 1259 // Repopulate that buffer's "peek" value. If the buffer has no more | 1310 // Repopulate that buffer's "peek" value. If the buffer has no more |
| 1260 // entries, then we're done. | 1311 // entries, then we're done this round. |
| 1261 next = buffers[earliest].peek(); | 1312 next = buffers[earliest].peek(); |
| 1262 if (!next) { | 1313 if (!next) { |
| 1263 return new BufferedLogs(entries); | 1314 break; |
| 1264 } | 1315 } |
| 1265 peek[earliest] = next; | 1316 peek[earliest] = next; |
| 1266 } | 1317 } |
| 1318 |
| 1319 // One or more of our buffers is exhausted. If we have the ability to load |
| 1320 // implicit next logs, try and extract more using that. |
| 1321 if (latestAdded && this.logSorter.implicitNext) { |
| 1322 while (true) { |
| 1323 latestAdded = this.logSorter.implicitNext(latestAdded, buffers); |
| 1324 if (!latestAdded) { |
| 1325 break; |
| 1326 } |
| 1327 entries.push(latestAdded); |
| 1328 } |
| 1329 } |
| 1330 return new BufferedLogs(entries); |
| 1267 } | 1331 } |
| 1268 } | 1332 } |
| 1269 | 1333 |
| 1270 /** Internal namespace for AggregateLogStream types. */ | 1334 /** Internal namespace for AggregateLogStream types. */ |
| 1271 namespace AggregateLogStream { | 1335 namespace AggregateLogStream { |
| 1272 /** Entry is an entry for a single log stream and its buffered logs. */ | 1336 /** Entry is an entry for a single log stream and its buffered logs. */ |
| 1273 export class Entry { | 1337 export class Entry { |
| 1274 buffer = new BufferedLogs(null); | 1338 buffer = new BufferedLogs(null); |
| 1275 status: LogStreamStatus; | 1339 status: LogStreamStatus; |
| 1276 lastError: Error|null; | 1340 lastError: Error|null; |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1318 | 1382 |
| 1319 /** | 1383 /** |
| 1320 * Peek returns the next log in the buffer without modifying the buffer. If | 1384 * Peek returns the next log in the buffer without modifying the buffer. If |
| 1321 * there are no logs in the buffer, peek will return null. | 1385 * there are no logs in the buffer, peek will return null. |
| 1322 */ | 1386 */ |
| 1323 peek(): LogDog.LogEntry|null { | 1387 peek(): LogDog.LogEntry|null { |
| 1324 return (this.logs) ? (this.logs[this.index]) : (null); | 1388 return (this.logs) ? (this.logs[this.index]) : (null); |
| 1325 } | 1389 } |
| 1326 | 1390 |
| 1327 /** | 1391 /** |
| 1392 * Returns a copy of the remaining logs in the buffer. |
| 1393 * If there are no logs, an empty array will be returned. |
| 1394 */ |
| 1395 peekAll(): LogDog.LogEntry[] { |
| 1396 return (this.logs || []).slice(0); |
| 1397 } |
| 1398 |
| 1399 /** |
| 1328 * GetAll returns all logs in the buffer. Afterwards, the buffer will be | 1400 * GetAll returns all logs in the buffer. Afterwards, the buffer will be |
| 1329 * empty. | 1401 * empty. |
| 1330 */ | 1402 */ |
| 1331 getAll(): LogDog.LogEntry[] { | 1403 getAll(): LogDog.LogEntry[] { |
| 1332 // Pop all logs. | 1404 // Pop all logs. |
| 1333 let logs = this.logs; | 1405 let logs = this.logs; |
| 1334 this.logs = null; | 1406 this.logs = null; |
| 1335 return (logs || []); | 1407 return (logs || []); |
| 1336 } | 1408 } |
| 1337 | 1409 |
| 1338 /** | 1410 /** |
| 1339 * Next fetches the next log in the buffer, removing it from the buffer. If | 1411 * Next fetches the next log in the buffer, removing it from the buffer. If |
| 1340 * no more logs are available, it will return null. | 1412 * no more logs are available, it will return null. |
| 1341 */ | 1413 */ |
| 1342 next(): LogDog.LogEntry|null { | 1414 next(): LogDog.LogEntry|null { |
| 1343 if (!(this.logs && this.logs.length)) { | 1415 if (!(this.logs && this.logs.length)) { |
| 1344 return null; | 1416 return null; |
| 1345 } | 1417 } |
| 1346 | 1418 |
| 1347 // Get the next log and increment our index. | 1419 // Get the next log and increment our index. |
| 1348 let log = this.logs[this.index++]; | 1420 let log = this.logs[this.index++]; |
| 1349 if (this.index >= this.logs.length) { | 1421 if (this.index >= this.logs.length) { |
| 1350 this.logs = null; | 1422 this.logs = null; |
| 1351 } | 1423 } |
| 1352 return log; | 1424 return log; |
| 1353 } | 1425 } |
| 1354 } | 1426 } |
| 1355 } | 1427 } |
| OLD | NEW |