Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 /* | 1 /* |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. | 2 Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 Use of this source code is governed under the Apache License, Version 2.0 | 3 Use of this source code is governed under the Apache License, Version 2.0 |
| 4 that can be found in the LICENSE file. | 4 that can be found in the LICENSE file. |
| 5 */ | 5 */ |
| 6 | 6 |
| 7 ///<reference path="../logdog-stream/logdog.ts" /> | 7 ///<reference path="../logdog-stream/logdog.ts" /> |
| 8 ///<reference path="../luci-operation/operation.ts" /> | 8 ///<reference path="../luci-operation/operation.ts" /> |
| 9 ///<reference path="../luci-sleep-promise/promise.ts" /> | 9 ///<reference path="../luci-sleep-promise/promise.ts" /> |
| 10 ///<reference path="../rpc/client.ts" /> | 10 ///<reference path="../rpc/client.ts" /> |
| (...skipping 980 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 991 this.setActiveFetch(this.fetcher.get(op, this.nextBottomIndex, opts)); | 991 this.setActiveFetch(this.fetcher.get(op, this.nextBottomIndex, opts)); |
| 992 let logs = await f.p; | 992 let logs = await f.p; |
| 993 if (logs && logs.length) { | 993 if (logs && logs.length) { |
| 994 this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1); | 994 this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1); |
| 995 } | 995 } |
| 996 return logs; | 996 return logs; |
| 997 } | 997 } |
| 998 } | 998 } |
| 999 | 999 |
| 1000 /** | 1000 /** |
| 1001 * LogSorter is used to extract sorted logs from a set of BufferedLogs. | |
|
hinoka
2017/05/03 17:16:58
LogSorter is an interface used to....
dnj
2017/05/03 19:09:27
Done.
| |
| 1002 * | |
| 1003 * LogSorter will modify the BufferedLogs, consuming the logs that it | |
|
hinoka
2017/05/03 17:16:58
A LogSorter which implements implicitNext() will m
dnj
2017/05/03 19:09:27
Done.
| |
| 1004 * returns from the buffer. | |
| 1005 */ | |
| 1006 type LogSorter = { | |
| 1007 compare: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number; | |
| 1008 implicitNext?: (prev: LogDog.LogEntry, buffers: BufferedLogs[]) => | |
| 1009 LogDog.LogEntry | null; | |
| 1010 }; | |
| 1011 | |
| 1012 const prefixIndexLogSorter: LogSorter = { | |
| 1013 compare: (a: LogDog.LogEntry, b: LogDog.LogEntry) => { | |
| 1014 return (a.prefixIndex - b.prefixIndex); | |
| 1015 }, | |
| 1016 | |
| 1017 implicitNext: (prev: LogDog.LogEntry, buffers: BufferedLogs[]) => { | |
| 1018 let nextPrefixIndex = (prev.prefixIndex + 1); | |
| 1019 for (let buf of buffers) { | |
| 1020 let le = buf.peek(); | |
| 1021 if (le && le.prefixIndex === nextPrefixIndex) { | |
| 1022 return buf.next(); | |
| 1023 } | |
| 1024 } | |
| 1025 return null; | |
| 1026 }, | |
| 1027 }; | |
| 1028 | |
| 1029 const timestampLogSorter: LogSorter = { | |
| 1030 compare: (a: LogDog.LogEntry, b: LogDog.LogEntry) => { | |
| 1031 if (a.timestamp) { | |
| 1032 if (b.timestamp) { | |
| 1033 return a.timestamp.getTime() - b.timestamp.getTime(); | |
| 1034 } | |
| 1035 return 1; | |
| 1036 } | |
| 1037 if (b.timestamp) { | |
| 1038 return -1; | |
| 1039 } | |
| 1040 return 0; | |
| 1041 }, | |
| 1042 | |
| 1043 // No implicit "next" with timestamp-based logs, since the next log in | |
| 1044 // an empty buffer may actually be the next contiguous log. | |
| 1045 implicitNext: undefined, | |
| 1046 }; | |
| 1047 | |
| 1048 /** | |
| 1001 * An aggregate log stream. It presents a single-stream view, but is really | 1049 * An aggregate log stream. It presents a single-stream view, but is really |
| 1002 * composed of several log streams interleaved based on their prefix indices | 1050 * composed of several log streams interleaved based on their prefix indices |
| 1003 * (if they share a prefix) or timestamps (if they don't). | 1051 * (if they share a prefix) or timestamps (if they don't). |
| 1004 * | 1052 * |
| 1005 * At least one log entry from each stream must be buffered before any log | 1053 * At least one log entry from each stream must be buffered before any log |
| 1006 * entries can be yielded, since we don't know what ordering to apply | 1054 * entries can be yielded, since we don't know what ordering to apply |
| 1007 * otherwise. To make this fast, we will make the first request for each | 1055 * otherwise. To make this fast, we will make the first request for each |
| 1008 * stream small so it finishes quickly and we can start rendering. Subsequent | 1056 * stream small so it finishes quickly and we can start rendering. Subsequent |
| 1009 * entries will be larger for efficiency. | 1057 * entries will be larger for efficiency. |
| 1010 * | 1058 * |
| 1011 * @param {LogStream} streams the composite streams. | 1059 * @param {LogStream} streams the composite streams. |
| 1012 */ | 1060 */ |
| 1013 class AggregateLogStream implements LogProvider { | 1061 class AggregateLogStream implements LogProvider { |
| 1014 private streams: AggregateLogStream.Entry[]; | 1062 private streams: AggregateLogStream.Entry[]; |
| 1015 private active: AggregateLogStream.Entry[]; | 1063 private active: AggregateLogStream.Entry[]; |
| 1016 private currentNextPromise: Promise<BufferedLogs[]>|null; | 1064 private currentNextPromise: Promise<BufferedLogs[]>|null; |
| 1017 private compareLogs: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number; | 1065 private readonly logSorter: LogSorter; |
| 1018 | 1066 |
| 1019 private streamStatusCallback: StreamStatusCallback; | 1067 private streamStatusCallback: StreamStatusCallback; |
| 1020 | 1068 |
| 1021 constructor(streams: LogStream[]) { | 1069 constructor(streams: LogStream[]) { |
| 1022 // Input streams, ordered by input order. | 1070 // Input streams, ordered by input order. |
| 1023 this.streams = streams.map<AggregateLogStream.Entry>((ls, i) => { | 1071 this.streams = streams.map<AggregateLogStream.Entry>((ls, i) => { |
| 1024 ls.setStreamStatusCallback((st: LogStreamStatus[]) => { | 1072 ls.setStreamStatusCallback((st: LogStreamStatus[]) => { |
| 1025 if (st) { | 1073 if (st) { |
| 1026 this.streams[i].status = st[0]; | 1074 this.streams[i].status = st[0]; |
| 1027 this.statusChanged(); | 1075 this.statusChanged(); |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 1042 // timestamp. | 1090 // timestamp. |
| 1043 let template: LogDog.StreamPath; | 1091 let template: LogDog.StreamPath; |
| 1044 let sharedPrefix = this.streams.every((entry) => { | 1092 let sharedPrefix = this.streams.every((entry) => { |
| 1045 if (!template) { | 1093 if (!template) { |
| 1046 template = entry.ls.stream; | 1094 template = entry.ls.stream; |
| 1047 return true; | 1095 return true; |
| 1048 } | 1096 } |
| 1049 return template.samePrefixAs(entry.ls.stream); | 1097 return template.samePrefixAs(entry.ls.stream); |
| 1050 }); | 1098 }); |
| 1051 | 1099 |
| 1052 this.compareLogs = ((sharedPrefix) ? (a, b) => { | 1100 if (sharedPrefix) { |
| 1053 return (a.prefixIndex - b.prefixIndex); | 1101 this.logSorter = prefixIndexLogSorter; |
| 1054 } : (a, b) => { | 1102 } else { |
| 1055 if (a.timestamp) { | 1103 this.logSorter = timestampLogSorter; |
| 1056 if (b.timestamp) { | 1104 } |
| 1057 return a.timestamp.getTime() - b.timestamp.getTime(); | |
| 1058 } | |
| 1059 return 1; | |
| 1060 } else if (b.timestamp) { | |
| 1061 return -1; | |
| 1062 } else { | |
| 1063 return 0; | |
| 1064 } | |
| 1065 }); | |
| 1066 } | 1105 } |
| 1067 | 1106 |
| 1068 split(): SplitLogProvider|null { | 1107 split(): SplitLogProvider|null { |
| 1069 return null; | 1108 return null; |
| 1070 } | 1109 } |
| 1071 fetchedEndOfStream(): boolean { | 1110 fetchedEndOfStream(): boolean { |
| 1072 return (!this.active.length); | 1111 return (!this.active.length); |
| 1073 } | 1112 } |
| 1074 | 1113 |
| 1075 setStreamStatusCallback(cb: StreamStatusCallback) { | 1114 setStreamStatusCallback(cb: StreamStatusCallback) { |
| (...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1178 private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs { | 1217 private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs { |
| 1179 switch (buffers.length) { | 1218 switch (buffers.length) { |
| 1180 case 0: | 1219 case 0: |
| 1181 // No buffers, so no logs. | 1220 // No buffers, so no logs. |
| 1182 return new BufferedLogs(null); | 1221 return new BufferedLogs(null); |
| 1183 case 1: | 1222 case 1: |
| 1184 // As a special case, if we only have one buffer, and we assume that | 1223 // As a special case, if we only have one buffer, and we assume that |
| 1185 // its entries are sorted, then that buffer is a return value. | 1224 // its entries are sorted, then that buffer is a return value. |
| 1186 return new BufferedLogs(buffers[0].getAll()); | 1225 return new BufferedLogs(buffers[0].getAll()); |
| 1187 default: | 1226 default: |
| 1188 // Nothing to do. | |
| 1189 break; | 1227 break; |
| 1190 } | 1228 } |
| 1191 | 1229 |
| 1192 // Preload our peek array. | 1230 // Preload our peek array. |
| 1193 let peek = new Array<LogDog.LogEntry>(buffers.length); | 1231 let peek = new Array<LogDog.LogEntry>(buffers.length); |
| 1194 peek.length = 0; | 1232 peek.length = 0; |
| 1195 for (let buf of buffers) { | 1233 for (let buf of buffers) { |
| 1196 let le = buf.peek(); | 1234 let le = buf.peek(); |
| 1197 if (!le) { | 1235 if (!le) { |
| 1198 // One of our input buffers had no log entries. | 1236 // One of our input buffers had no log entries. |
| 1199 return new BufferedLogs(null); | 1237 return new BufferedLogs(null); |
| 1200 } | 1238 } |
| 1201 peek.push(le); | 1239 peek.push(le); |
| 1202 } | 1240 } |
| 1203 | 1241 |
| 1204 // Assemble our aggregate buffer array. | 1242 // Assemble our aggregate buffer array. |
| 1205 let entries: LogDog.LogEntry[] = []; | 1243 let entries: LogDog.LogEntry[] = []; |
| 1244 let last: LogDog.LogEntry|null = null; | |
|
hinoka
2017/05/03 17:16:58
how about "current"? "last" often implies "final",
dnj
2017/05/03 19:09:27
Renamed to "latestAdded".
| |
| 1206 while (true) { | 1245 while (true) { |
| 1207 // Choose the next stream. | 1246 // Choose the next stream. |
| 1208 let earliest = 0; | 1247 let earliest = 0; |
| 1209 for (let i = 1; i < buffers.length; i++) { | 1248 for (let i = 1; i < buffers.length; i++) { |
| 1210 if (this.compareLogs(peek[i], peek[earliest]) < 0) { | 1249 if (this.logSorter.compare(peek[i], peek[earliest]) < 0) { |
| 1211 earliest = i; | 1250 earliest = i; |
| 1212 } | 1251 } |
| 1213 } | 1252 } |
| 1214 | 1253 |
| 1215 // Get the next log from the earliest stream. | 1254 // Get the next log from the earliest stream. |
| 1216 let next = buffers[earliest].next(); | 1255 let next = buffers[earliest].next(); |
| 1217 if (next) { | 1256 if (next) { |
| 1218 entries.push(next); | 1257 last = next; |
| 1258 entries.push(last); | |
| 1219 } | 1259 } |
| 1220 | 1260 |
| 1221 // Repopulate that buffer's "peek" value. If the buffer has no more | 1261 // Repopulate that buffer's "peek" value. If the buffer has no more |
| 1222 // entries, then we're done. | 1262 // entries, then we're done this round. |
| 1223 next = buffers[earliest].peek(); | 1263 next = buffers[earliest].peek(); |
| 1224 if (!next) { | 1264 if (!next) { |
| 1225 return new BufferedLogs(entries); | 1265 break; |
| 1226 } | 1266 } |
| 1227 peek[earliest] = next; | 1267 peek[earliest] = next; |
| 1228 } | 1268 } |
| 1269 | |
| 1270 // One or more of our buffers is exhausted. If we have the ability to load | |
| 1271 // implicit next logs, try and extract more using that. | |
| 1272 if (last && this.logSorter.implicitNext) { | |
| 1273 while (true) { | |
| 1274 last = this.logSorter.implicitNext(last, buffers); | |
| 1275 if (!last) { | |
| 1276 break; | |
| 1277 } | |
| 1278 entries.push(last); | |
| 1279 } | |
| 1280 } | |
| 1281 return new BufferedLogs(entries); | |
| 1229 } | 1282 } |
| 1230 } | 1283 } |
| 1231 | 1284 |
| 1232 /** Internal namespace for AggregateLogStream types. */ | 1285 /** Internal namespace for AggregateLogStream types. */ |
| 1233 namespace AggregateLogStream { | 1286 namespace AggregateLogStream { |
| 1234 /** Entry is an entry for a single log stream and its buffered logs. */ | 1287 /** Entry is an entry for a single log stream and its buffered logs. */ |
| 1235 export class Entry { | 1288 export class Entry { |
| 1236 buffer = new BufferedLogs(null); | 1289 buffer = new BufferedLogs(null); |
| 1237 status: LogStreamStatus; | 1290 status: LogStreamStatus; |
| 1238 lastError: Error|null; | 1291 lastError: Error|null; |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1280 | 1333 |
| 1281 /** | 1334 /** |
| 1282 * Peek returns the next log in the buffer without modifying the buffer. If | 1335 * Peek returns the next log in the buffer without modifying the buffer. If |
| 1283 * there are no logs in the buffer, peek will return null. | 1336 * there are no logs in the buffer, peek will return null. |
| 1284 */ | 1337 */ |
| 1285 peek(): LogDog.LogEntry|null { | 1338 peek(): LogDog.LogEntry|null { |
| 1286 return (this.logs) ? (this.logs[this.index]) : (null); | 1339 return (this.logs) ? (this.logs[this.index]) : (null); |
| 1287 } | 1340 } |
| 1288 | 1341 |
| 1289 /** | 1342 /** |
| 1343 * Returns a copy of the remaining logs in the buffer. | |
| 1344 * If there are no logs, an empty array will be returned. | |
| 1345 */ | |
| 1346 peekAll(): LogDog.LogEntry[] { | |
| 1347 return (this.logs || []).slice(0); | |
| 1348 } | |
| 1349 | |
| 1350 /** | |
| 1290 * GetAll returns all logs in the buffer. Afterwards, the buffer will be | 1351 * GetAll returns all logs in the buffer. Afterwards, the buffer will be |
| 1291 * empty. | 1352 * empty. |
| 1292 */ | 1353 */ |
| 1293 getAll(): LogDog.LogEntry[] { | 1354 getAll(): LogDog.LogEntry[] { |
| 1294 // Pop all logs. | 1355 // Pop all logs. |
| 1295 let logs = this.logs; | 1356 let logs = this.logs; |
| 1296 this.logs = null; | 1357 this.logs = null; |
| 1297 return (logs || []); | 1358 return (logs || []); |
| 1298 } | 1359 } |
| 1299 | 1360 |
| 1300 /** | 1361 /** |
| 1301 * Next fetches the next log in the buffer, removing it from the buffer. If | 1362 * Next fetches the next log in the buffer, removing it from the buffer. If |
| 1302 * no more logs are available, it will return null. | 1363 * no more logs are available, it will return null. |
| 1303 */ | 1364 */ |
| 1304 next(): LogDog.LogEntry|null { | 1365 next(): LogDog.LogEntry|null { |
| 1305 if (!(this.logs && this.logs.length)) { | 1366 if (!(this.logs && this.logs.length)) { |
| 1306 return null; | 1367 return null; |
| 1307 } | 1368 } |
| 1308 | 1369 |
| 1309 // Get the next log and increment our index. | 1370 // Get the next log and increment our index. |
| 1310 let log = this.logs[this.index++]; | 1371 let log = this.logs[this.index++]; |
| 1311 if (this.index >= this.logs.length) { | 1372 if (this.index >= this.logs.length) { |
| 1312 this.logs = null; | 1373 this.logs = null; |
| 1313 } | 1374 } |
| 1314 return log; | 1375 return log; |
| 1315 } | 1376 } |
| 1316 } | 1377 } |
| 1317 } | 1378 } |
| OLD | NEW |