Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(718)

Side by Side Diff: web/inc/logdog-stream-view/logdog-stream-view.html

Issue 2335223003: LogDog: Update web code, stream/query with auth. (Closed)
Patch Set: Fix background page loading. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 <!-- 1 <!--
2 Copyright 2016 The LUCI Authors. All rights reserved. 2 Copyright 2016 The LUCI Authors. All rights reserved.
3 Use of this source code is governed under the Apache License, Version 2.0 3 Use of this source code is governed under the Apache License, Version 2.0
4 that can be found in the LICENSE file. 4 that can be found in the LICENSE file.
5 --> 5 -->
6 6
7 <link rel="import" href="../bower_components/polymer/polymer.html"> 7 <link rel="import" href="../bower_components/polymer/polymer.html">
8 <link rel="import" href="../bower_components/google-signin/google-signin-aware.h tml">
8 <link rel="import" href="../bower_components/paper-checkbox/paper-checkbox.html" > 9 <link rel="import" href="../bower_components/paper-checkbox/paper-checkbox.html" >
9 10
10 <link rel="import" href="../logdog-stream/logdog-stream.html"> 11 <link rel="import" href="../logdog-stream/logdog-stream.html">
12 <link rel="import" href="../logdog-stream/logdog-error.html">
13 <link rel="import" href="../luci-sleep-promise/luci-sleep-promise.html">
11 <link rel="import" href="logdog-stream-fetcher.html"> 14 <link rel="import" href="logdog-stream-fetcher.html">
12 <link rel="import" href="logdog-stream-query.html"> 15 <link rel="import" href="logdog-stream-query.html">
13 16
14 <!-- 17 <!--
15 An element for rendering muxed LogDog log streams. 18 An element for rendering muxed LogDog log streams.
16 --> 19 -->
17 <dom-module id="logdog-stream-view"> 20 <dom-module id="logdog-stream-view">
18 21
19 <template> 22 <template>
20 <style> 23 <style>
21 .buttons { 24 .buttons {
22 position: fixed; 25 position: fixed;
23 background-color: white; 26 background-color: white;
24 } 27 }
25 28
26 #counter { 29 #stream-status {
27 position: fixed; 30 position: fixed;
28 right: 16px; 31 right: 16px;
32 background-color: #EEEEEE;
33 opacity: 0.7;
29 } 34 }
30 35
31 #logContent { 36 #logContent {
32 padding-top: 20px; 37 padding-top: 20px;
33 } 38 }
34 39
35 .log-entry { 40 .log-entry {
36 padding: 0 0 0 0; 41 padding: 0 0 0 0;
37 clear: left; 42 clear: left;
38 } 43 }
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
79 84
80 .log-entry-line:nth-last-child(2) { 85 .log-entry-line:nth-last-child(2) {
81 border-bottom: 1px solid #CCCCCC; 86 border-bottom: 1px solid #CCCCCC;
82 } 87 }
83 88
84 #bottom { 89 #bottom {
85 background-color: lightcoral; 90 background-color: lightcoral;
86 height: 2px; 91 height: 2px;
87 margin-bottom: 10px; 92 margin-bottom: 10px;
88 } 93 }
94
95 #status-bar {
96 /* Overlay at the bottom of the page. */
97 position: absolute;
98 bottom: 0;
99 left: 0;
100 width: 100%;
101
102 text-align: center;
103 font-size: 16px;
104 background-color: rgba(245, 245, 220, 0.7);
105 }
89 </style> 106 </style>
90 107
108 <google-signin-aware
109 id="aware"
110 on-google-signin-aware-success="_onSignin"></google-signin-aware>
111
91 <rpc-client 112 <rpc-client
92 id="client" 113 id="client"
93 auto-token 114 auto-token
94 host="[[host]]"></rpc-client> 115 host="[[host]]"></rpc-client>
95 116
96 <!-- Stream view options. --> 117 <!-- Stream view options. -->
97 <div class="buttons"> 118 <div class="buttons">
98 <paper-checkbox checked="{{showMetadata}}"> 119 <paper-checkbox checked="{{showMetadata}}">
99 Show Metadata 120 Show Metadata
100 </paper-checkbox> 121 </paper-checkbox>
101 <paper-checkbox checked="{{wrapLines}}"> 122 <paper-checkbox checked="{{wrapLines}}">
102 Wrap Lines 123 Wrap Lines
103 </paper-checkbox> 124 </paper-checkbox>
104 <paper-checkbox checked="{{follow}}"> 125 <paper-checkbox checked="{{follow}}">
105 Follow 126 Follow
106 </paper-checkbox> 127 </paper-checkbox>
107 </div> 128 </div>
108 129
109 <!-- Display current fetching status, if stream data is still loading. --> 130 <!-- Display current fetching status, if stream data is still loading. -->
110 <template is="dom-if" if="{{fetch}}"> 131 <template is="dom-if" if="{{streamStatus}}">
111 <div id="counter"> 132 <div id="stream-status">
112 <table> 133 <table>
113 <template is="dom-repeat" items="{{fetch.status}}"> 134 <template is="dom-repeat" items="{{streamStatus}}">
114 <tr> 135 <tr>
115 <td>{{item.name}}</td> 136 <td>{{item.name}}</td>
116 <td>{{item.status}}</td> 137 <td>{{item.desc}}</td>
117 </tr> 138 </tr>
118 </template> 139 </template>
119 </table> 140 </table>
120 </div> 141 </div>
121 </template> 142 </template>
122 143
123 <!-- Muxed log content. --> 144 <!-- Muxed log content. -->
124 <div id="logContent" on-mousewheel="_handleMouseWheel"> 145 <div id="logContent" on-mousewheel="_handleMouseWheel">
125 <div id="logs"> 146 <div id="logs">
126 <!-- Content will be populated with JavaScript as logs are loaded. 147 <!-- Content will be populated with JavaScript as logs are loaded.
(...skipping 12 matching lines...) Expand all
139 </div> 160 </div>
140 ... 161 ...
141 162
142 --> 163 -->
143 </div> 164 </div>
144 165
145 <!-- Current red bottom line. --> 166 <!-- Current red bottom line. -->
146 <div id="bottom"></div> 167 <div id="bottom"></div>
147 </div> 168 </div>
148 169
170 <template is="dom-if" if="{{statusBar}}">
171 <div id="status-bar">{{statusBar.value}}</div>
172 </template>
149 </template> 173 </template>
150 174
151 </dom-module> 175 </dom-module>
152 176
153 <script> 177 <script>
154 "use strict"; 178 "use strict";
155 179
156 Polymer({ 180 Polymer({
157 is: "logdog-stream-view", 181 is: "logdog-stream-view",
158 182
(...skipping 19 matching lines...) Expand all
178 }, 202 },
179 203
180 /** 204 /**
181 * The number of logs to load before forcing a page refresh. 205 * The number of logs to load before forcing a page refresh.
182 * 206 *
183 * The smaller the value, the smoother the page will behave while logs are 207 * The smaller the value, the smoother the page will behave while logs are
184 * loading. However, the logs will also load slower because of forced 208 * loading. However, the logs will also load slower because of forced
185 * renders in between elements. 209 * renders in between elements.
186 */ 210 */
187 burst: { 211 burst: {
188 type: Array, 212 type: Number,
189 value: 100, 213 value: 1000,
190 notify: true, 214 notify: true,
191 }, 215 },
192 216
193 /** If true, show log metadata column. */ 217 /** If true, show log metadata column. */
194 showMetadata: { 218 showMetadata: {
195 type: Boolean, 219 type: Boolean,
196 value: false, 220 value: false,
197 observer: "_showMetadataChanged", 221 observer: "_showMetadataChanged",
198 }, 222 },
199 223
200 /** If true, wrap log lines to the screen. */ 224 /** If true, wrap log lines to the screen. */
201 wrapLines: { 225 wrapLines: {
202 type: Boolean, 226 type: Boolean,
203 value: false, 227 value: false,
204 observer: "_wrapLinesChanged", 228 observer: "_wrapLinesChanged",
205 }, 229 },
206 230
207 /** 231 /**
208 * If true, automatically scroll the page to the bottom of the logs 232 * If true, automatically scroll the page to the bottom of the logs
209 * while they are streaming. 233 * while they are streaming.
210 */ 234 */
211 follow: { 235 follow: {
212 type: Boolean, 236 type: Boolean,
213 value: false, 237 value: false,
214 observer: "_followChanged", 238 observer: "_followChanged",
215 }, 239 },
216 240
217 /** 241 /**
218 * The current log fetching context. 242 * The current stream status. This is an Array of objects:
219 * 243 * obj.name is the name of the stream.
220 * The "Fetch" object is structured: 244 * obj.desc is the status description of the stream.
221 * fatch.streams: An array of _BufferedStream instances for each muxed
222 * stream.
223 * fetch.status: The renderable status for a given stream.
224 */ 245 */
225 fetch: { 246 streamStatus: {
226 type: Object, 247 type: String,
227 value: null, 248 value: null,
228 notify: true, 249 notify: true,
229 readOnly: true, 250 readOnly: true,
230 }, 251 },
252
253 /**
254 * The text content of the status element at the bottom of the page.
255 */
256 statusBar: {
257 type: String,
258 value: null,
259 readOnly: true,
260 },
231 }, 261 },
232 262
233 ready: function() { 263 ready: function() {
234 this._setFetch(null);
235 this._scheduledWrite = null; 264 this._scheduledWrite = null;
236 this._bufferedLogs = null; 265 this._buffer = null;
266 this._currentLogBuffer = null;
267 this._authCallback = null;
237 }, 268 },
238 269
239 detached: function() { 270 detached: function() {
240 this.stop(); 271 this.stop();
241 }, 272 },
242 273
243 stop: function() { 274 stop: function() {
244 this._cancelFetch(); 275 this._cancelFetch(true);
245 }, 276 },
246 277
247 /** Clears state and begins fetching log data. */ 278 /** Clears state and begins fetching log data. */
248 reset: function() { 279 reset: function() {
280 this._resetLogState();
281
282 this._resolveStreams().then(function(streams) {
283 this._resetToStreams(streams);
284 }.bind(this)).catch(function(error) {
285 this._loadStatusBar("Failed to resolve streams:" + error);
286 throw error;
287 }.bind(this));
288 },
289
290 /** Clears all current logs. */
291 _resetLogState: function() {
292 this._cancelFetch(true);
293
294 // Remove all current log elements. */
295 while (this.$.logs.hasChildNodes()) {
296 this.$.logs.removeChild(this.$.logs.lastChild);
297 }
298
299 // Clear our buffer and streamer state.
300 this._buffer = null;
301 this._currentLogBuffer = null;
302 if (this._streamer) {
303 this._streamer.shutdown();
304 }
305 this._streamer = null;
306 },
307
308 _resolveStreams: function() {
309 // Separate our configured streams into full stream paths and queries.
249 var parts = { 310 var parts = {
250 queries: [], 311 queries: [],
251 streams: [], 312 streams: [],
252 }; 313 };
253 var query = new LogDogQuery(this.project); 314 var query = new LogDogQuery(this.project);
254 this.streams.map(LogDogStream.splitProject).forEach(function(v) { 315 this.streams.map(LogDogStream.splitProject).forEach(function(v) {
255 if (LogDogQuery.isQuery(v.path)) { 316 if (LogDogQuery.isQuery(v.path)) {
256 parts.queries.push(v); 317 parts.queries.push(v);
257 } else { 318 } else {
258 parts.streams.push(v); 319 parts.streams.push(v);
259 } 320 }
260 }); 321 });
261 322
262 Promise.all(parts.queries.map(function(v) { 323 // Resolve any outstanding queries into full stream paths.
263 var params = new LogDogQueryParams(v.project). 324 //
264 path(v.path). 325 // If we get an authentication error, register to have our query
265 streamType("text"); 326 // resolution callback invoked on signin changes until it works (or
266 return new LogDogQuery(this.$.client, params).getAll(); 327 // indefinitely).
267 }.bind(this))).then(function(results) { 328 var queries = parts.queries.map(function(v) {
268 // Add query results (if any) to streams. 329 var params = new LogDogQueryParams(v.project).
269 results.forEach(function(streams) { 330 path(v.path).
270 (streams || []).forEach(function(stream) { 331 streamType("text");
271 parts.streams.push(stream.stream); 332 return new LogDogQuery(this.$.client, params);
333 }.bind(this));
334
335 var issueQuery = function() {
336 this._loadStatusBar("Resolving log streams from query...");
337 this._authCallback = null;
338
339 return Promise.all(queries.map(function(q) {
340 return q.getAll();
341 }.bind(this))).then(function(results) {
342 this._loadStatusBar(null);
343
344 // Add query results (if any) to streams.
345 results.forEach(function(streams) {
346 (streams || []).forEach(function(stream) {
347 parts.streams.push(stream.stream);
348 });
272 }); 349 });
273 }); 350 parts.streams.sort(LogDogStream.cmp);
274 351
275 // Start loading the streams. 352 // Remove any duplicates.
276 this._resetToStreams(parts.streams); 353 var seenStreams = {};
277 }.bind(this)); 354 var result = [];
355 parts.streams.forEach(function(s) {
356 var fullName = s.fullName();
357 if (!seenStreams[fullName]) {
358 seenStreams[fullName] = s;
359 result.push(s);
360 }
361 });
362 return result;
363 }.bind(this)).catch(function(error) {
364 if (error instanceof LogDogError && error.isPermissionDenied()) {
365 // Retry on auth event.
366 this._loadStatusBar("Not authorized to execute query. Log in " +
367 "with an authorized account.");
368 return new Promise(function(resolve) {
369 this._authCallback = resolve;
370 }.bind(this)).then(issueQuery);
371 }
372
373 throw error;
374 }.bind(this));
375 }.bind(this);
376 return issueQuery();
278 }, 377 },
279 378
280 _resetToStreams: function(streams) { 379 _resetToStreams: function(streams) {
281 this._cancelFetch();
282 this._clearLogs();
283
284
285 // Unique streams. 380 // Unique streams.
286 if (!streams.length) { 381 if (!streams.length) {
382 this._loadStatusBar("No log streams.");
287 return; 383 return;
288 } 384 }
289 385
290 console.log("Loading log streams:", streams); 386 console.log("Loading log streams:", streams);
387 this._loadStatusBar("Loading stream data...");
291 streams.sort(LogDogStream.cmp); 388 streams.sort(LogDogStream.cmp);
292 389
293 // Construct our fetch context. 390 // Create a _BufferedStream for each stream.
294 var fetch = {}; 391 var bufStreams = streams.map(function(stream, idx) {
295 fetch.streams = streams.map(function(stream) { 392 return new _BufferedStream(stream, this.$.client,
296 // TODO: Re-use fetcher if it already exists in the previous streams 393 (streams.length > 1), function(bs) {
297 // map. 394 this._updateStreamStatus(bs, idx);
298 return new _BufferedStream(stream, new LogDogFetcher( 395 }.bind(this));
299 this.$.client, stream.project, stream.path));
300 }.bind(this)); 396 }.bind(this));
301 fetch.status = fetch.streams.map(function(v, idx) { 397 this._buffer = new _LogStreamBuffer();
302 var name = v.stream.path; 398 this._buffer.setStreams(bufStreams)
303 var lidx = name.lastIndexOf("/");
304 if (lidx >= 0) {
305 name = idx + " [.../" + name.substr(lidx+1) + "]";
306 }
307 399
400 this._streamer = new _LogStreamer(this._buffer, this.burst, function(v) {
401 this._loadStatusBar(v);
402 }.bind(this));
403
404 // Construct our initial status content.
405 this._setStreamStatus(bufStreams.map(function(bs, idx) {
308 return { 406 return {
309 name: name, 407 name: (" [.../+/" + bs.stream.name() + "]"),
310 status: this._buildStreamStatus(v, null), 408 desc: bs.description(),
311 }; 409 };
312 }.bind(this)); 410 }.bind(this)));
313 this._setFetch(fetch);
314 411
315 // Kick off our log fetching. 412 // Kick off our log fetching.
316 this._scheduleWriteNextLogs(); 413 this._scheduleWriteNextLogs();
317 }, 414 },
318 415
319 /** Cancels any currently-executing log stream fetch. */ 416 /** Cancels any currently-executing log stream fetch. */
320 _cancelFetch: function() { 417 _cancelFetch: function(clear) {
321 if (this.fetch) { 418 this._cancelScheduledWrite();
322 this._setFetch(null); 419 this._authCallback = null;
420
421 if (clear) {
422 this._setStreamStatus(null);
423 this._loadStatusBar(null);
323 } 424 }
324 this._cancelScheduledWrite();
325 }, 425 },
326 426
327 /** Cancels any scheduled asynchronous write. */ 427 /** Cancels any scheduled asynchronous write. */
328 _cancelScheduledWrite: function() { 428 _cancelScheduledWrite: function() {
329 if (this._scheduledWrite) { 429 if (this._scheduledWrite) {
330 this.cancelAsync(this._scheduledWrite); 430 this.cancelAsync(this._scheduledWrite);
331 this._scheduledWrite = null; 431 this._scheduledWrite = null;
332 } 432 }
333 }, 433 },
334 434
335 /** Called when the bound log stream variables has changed. */ 435 /** Called when the bound log stream variables has changed. */
336 _streamsChanged: function(v, old) { 436 _streamsChanged: function(v, old) {
337 this.reset(); 437 this.reset();
338 }, 438 },
339 439
340 /** Schedules the next asynchronous log write. */ 440 /** Schedules the next asynchronous log write. */
341 _scheduleWriteNextLogs: function() { 441 _scheduleWriteNextLogs: function() {
342 // This is called after refresh, so use this opportunity to maybe scroll 442 // This is called after refresh, so use this opportunity to maybe scroll
343 // to the bottom. 443 // to the bottom.
344 this._maybeScrollToBottom(); 444 this._maybeScrollToBottom();
345 445
346 if (!this._scheduledWrite) { 446 if (! this._scheduledWrite) {
347 this._scheduledWrite = this.async(this._writeNextLogs); 447 this._scheduledWrite = this.async(function() {
448 this._writeNextLogs()
449 }.bind(this));
348 } 450 }
349 }, 451 },
350 452
351 /** 453 /**
352 * This is an iterative function that grabs the next set of logs and renders 454 * This is an iterative function that grabs the next set of logs and renders
353 * them. Afterwards, it will continue rescheduling itself until there are 455 * them. Afterwards, it will continue rescheduling itself until there are
354 * no more logs to render. 456 * no more logs to render.
355 */ 457 */
356 _writeNextLogs: function() { 458 _writeNextLogs: function() {
357 this._cancelScheduledWrite(); 459 this._cancelScheduledWrite();
358 460
359 if (this._writeNextLogsImpl()) { 461 this._streamer.load().then(function(entries) {
462 // If there are no entries, then we're done.
463 if (! entries) {
464 // Cancel all fetching state. If our streamer is finished, also clear
465 // messages and status.
466 if (this._streamer.finished) {
467 if (this._streamer.someStreamsFailed) {
468 this._cancelFetch(false);
469 this._loadStatusBar("Some streams failed to load.");
470 } else {
471 this._cancelFetch(true);
472 }
473 } else {
474 // No more logs, but also we are not finished. Retry after auth.
475 this._authCallback = this._scheduleWriteNextLogs.bind(this);
476 }
477 return;
478 }
479
480 var logEntryChunk = document.createElement("div");
481 entries.forEach(function(le) {
482 this._appendLogEntry(logEntryChunk, le);
483 }.bind(this));
484
485 // To have styles apply correctly, we need to add it twice, see
486 // https://github.com/Polymer/polymer/issues/3100.
487 Polymer.dom(this.root).appendChild(logEntryChunk);
488 this.$.logs.appendChild(logEntryChunk);
489
360 // Yield so that our browser can refresh. We can't directly use 490 // Yield so that our browser can refresh. We can't directly use
361 // this.async since a timeout of "0" causes immediate execution instead 491 // this.async since a timeout of "0" causes immediate execution instead
362 // of yielding. 492 // of yielding.
363 setTimeout(this._scheduleWriteNextLogs.bind(this), 0); 493 setTimeout(function() {
364 } 494 this._scheduleWriteNextLogs();
495 }.bind(this), 0);
496 }.bind(this));
365 }, 497 },
366 498
367 /** 499 _appendLogEntry: function(root, le) {
368 * Primary implementation of _writeNextLogs.
369 *
370 * Returns true if any logs were rendered.
371 */
372 _writeNextLogsImpl: function() {
373 var fetch = this.fetch;
374 if (!(fetch && fetch.streams.length)) {
375 return false;
376 }
377
378 // Render any buffered logs.
379 var buffer = this._getOrBuildLogBuffer(fetch.streams);
380 if (buffer) {
381 // We will track how many log entries that we've rendered. If we exceed
382 // this amount, we will force a refresh so the logs appear streaming and
383 // the app remains responsive.
384 var rendered = 0;
385 var updated = {};
386
387 while (buffer.length && rendered < this.burst) {
388 // Get the next log. The buffer is sorted descendingly, so we can use
389 // pop to get it.
390 var log = buffer.pop();
391 rendered += this._appendLogEntry(log);
392
393 // Record our last appended log entry for this stream.
394 updated[log.fetchIndex] = log.streamIndex;
395 }
396
397 Object.keys(updated).forEach(function(idx) {
398 var statusKey = ("fetch.status." + idx + ".status");
399 this.set(statusKey, this._buildStreamStatus(
400 fetch.streams[idx], updated[idx]));
401 }.bind(this));
402
403 // If we rendered any logs, we will finish this write round.
404 if (rendered) {
405 return true;
406 }
407 }
408
409 // We didn't have any buffered logs, so either all of our streams are
410 // finished, or our buffer is empty and needs to be refreshed.
411 if(fetch.streams.every(function(v) {
412 return (v.finished());
413 })) {
414 console.log("All streams have been exhausted.");
415 this._cancelFetch();
416 return false;
417 }
418
419 // Fetch any streams' missing logs. If a stream already has buffered logs,
420 // skip it in this fetch.
421 Promise.all(fetch.streams.map(function(v) {
422 if (v.finished() || v.peek() !== null) {
423 // This stream still has buffered logs.
424 return null;
425 }
426 return v.fetcher.next();
427 })).then(function(result) {
428 result.forEach(function(v, i) {
429 if (v) {
430 fetch.streams[i].load(v.entries);
431 }
432 }.bind(this));
433 this._scheduleWriteNextLogs();
434 }.bind(this));
435 return false;
436 },
437
438 /**
439 * Examines the current buffered set of logs/streams. If sufficient logs
440 * are buffered to render the next log, it will be immediately added and
441 * this function will return "true". Otherwise, it will return "false",
442 * indicating that log fetch must be performed.
443 */
444 _getOrBuildLogBuffer: function(streams) {
445 if (this._bufferedLogs && this._bufferedLogs.length) {
446 return this._bufferedLogs;
447 }
448
449 // If we have no active streams, we can't buffer anything.
450 var active = [];
451 streams.forEach(function(v, idx) {
452 var next = v.peek();
453 if (next) {
454 active.push({
455 stream: v,
456 streamIndex: idx,
457 next: next,
458 });
459 }
460 });
461 if (!active.length) {
462 return null;
463 }
464
465 // Build our log buffer.
466 //
467 // TODO: A binary heap would be pretty great for this.
468 var buffer = [];
469 while (true) {
470 // Choose the next stream.
471 var earliest = 0;
472 for (var i = 1; i < active.length; i++) {
473 if (active[i].next.timestamp < active[earliest].next.timestamp) {
474 earliest = i;
475 }
476 }
477
478 // Get the next log from the earliest stream.
479 //
480 // Additionally, record the index in the original streams array that
481 // this log came from. We need this to update stream status when the
482 // log is consumed.
483 var nextStream = active[earliest];
484 var nextLog = nextStream.stream.pop();
485 nextLog.fetchIndex = nextStream.streamIndex;
486 buffer.push(nextLog);
487
488 nextStream.next = nextStream.stream.peek();
489 if (nextStream.next) {
490 // This stream has more logs, so we can continue building our buffer.
491 continue;
492 }
493
494 // This stream has no more buffered entries, so we're done.
495 //
496 // Reverse our log buffer so we can easily pop logs from it.
497 buffer.reverse();
498 this._bufferedLogs = buffer;
499 return buffer;
500 }
501 },
502
503 _appendLogEntry: function(le) {
504 var text = le.text; 500 var text = le.text;
505 if (!(text && text.lines)) { 501 if (!(text && text.lines)) {
506 return 0; 502 return 0;
507 } 503 }
508 504
509 // Create elements manually to avoid Polymer overhead for large logs. 505 // Create elements manually to avoid Polymer overhead for large logs.
510 var entryRow = document.createElement("div"); 506 var entryRow = document.createElement("div");
511 entryRow.className = "log-entry"; 507 entryRow.className = "log-entry";
512 508
513 // Metadata column. 509 // Metadata column.
(...skipping 21 matching lines...) Expand all
535 logDataBlock.className = "log-entry-content"; 531 logDataBlock.className = "log-entry-content";
536 if (le.text) { 532 if (le.text) {
537 for (var i = 0; i < le.text.lines.length; i++) { 533 for (var i = 0; i < le.text.lines.length; i++) {
538 var lineDiv = document.createElement("div"); 534 var lineDiv = document.createElement("div");
539 lineDiv.className = "log-entry-line"; 535 lineDiv.className = "log-entry-line";
540 lineDiv.textContent = le.text.lines[i].value; 536 lineDiv.textContent = le.text.lines[i].value;
541 logDataBlock.appendChild(lineDiv); 537 logDataBlock.appendChild(lineDiv);
542 } 538 }
543 } 539 }
544 entryRow.appendChild(logDataBlock); 540 entryRow.appendChild(logDataBlock);
545 541 root.appendChild(entryRow);
546 // To have styles apply correctly, we need to add it twice, see
547 // https://github.com/Polymer/polymer/issues/3100.
548 Polymer.dom(this.root).appendChild(entryRow);
549 this.$.logs.appendChild(entryRow);
550 542
551 return le.text.lines.length; 543 return le.text.lines.length;
552 }, 544 },
553 545
554 /** Clears all current logs. */ 546 _updateStreamStatus: function(bs, idx) {
555 _clearLogs: function() { 547 var origStatus = this.streamStatus[idx];
556 while (this.$.logs.hasChildNodes()) { 548 this.splice("streamStatus", idx, 1, {
557 this.$.logs.removeChild(this.$.logs.lastChild); 549 name: origStatus.name,
558 } 550 desc: bs.description(),
559 this._bufferedLogs = null; 551 });
560 },
561
562 /** Constructs the log stream status object for a given stream. */
563 _buildStreamStatus: function(stream, lastStreamIndex) {
564 if (!lastStreamIndex && lastStreamIndex !== 0) {
565 return "(Fetching)";
566 }
567
568 var tidx = stream.fetcher.terminalIndex;
569 if (tidx >= 0) {
570 return lastStreamIndex + " / " + tidx;
571 }
572 return lastStreamIndex + " (Streaming)";
573 }, 552 },
574 553
575 /** Scrolls to the bottom if "follow" is enabled. */ 554 /** Scrolls to the bottom if "follow" is enabled. */
576 _maybeScrollToBottom: function() { 555 _maybeScrollToBottom: function() {
577 if (this.follow) { 556 if (this.follow) {
578 this.$.bottom.scrollIntoView({ 557 this.$.bottom.scrollIntoView({
579 "behavior": "smooth", 558 "behavior": "smooth",
580 "block": "end", 559 "block": "end",
581 }); 560 });
582 } 561 }
(...skipping 15 matching lines...) Expand all
598 }, 577 },
599 /** Callback when "follow" has changed. */ 578 /** Callback when "follow" has changed. */
600 _followChanged: function(v) { 579 _followChanged: function(v) {
601 this._maybeScrollToBottom(); 580 this._maybeScrollToBottom();
602 }, 581 },
603 582
604 /** Callback for when the mouse wheel has scrolled. Disables follow. */ 583 /** Callback for when the mouse wheel has scrolled. Disables follow. */
605 _handleMouseWheel: function() { 584 _handleMouseWheel: function() {
606 this.follow = false; 585 this.follow = false;
607 }, 586 },
587
588 /**
589 * Loads text content into the status bar.
590 *
591 * If null is passed, the status bar will be cleared. If text is passed, the
592 * status bar will become visible with the supplied content.
593 */
594 _loadStatusBar: function(v) {
595 var st = null;
596 if (v) {
597 st = {
598 value: v,
599 };
600 }
601 this._setStatusBar(st);
602 },
603
604 _onSignin: function() {
605 var fn = this._authCallback;
606 if (fn) {
607 this._authCallback = null;
608 fn();
609 }
610 },
608 }); 611 });
609 612
610 /** 613 /**
611 * Container for logs that have been punted. 614 * Continuously loads log streams from a _LogStreamBuffer and exposes them via
615 * callback.
612 */ 616 */
613 function _BufferedStream(stream, fetcher) { 617 function _LogStreamer(buffer, burst, statusCallback) {
618 this._buffer = buffer;
619 this._burst = (burst || 0);
620 this._missingDelay = 5000;
621 this._statusCallback = statusCallback;
622
623 this.finished = false;
624 this.someStreamsFailed = false;
625
626 this._currentLogBuffer = null;
627 }
628
629 _LogStreamer.prototype.shutdown = function() {
630 this.finshed = true;
631 };
632
633 _LogStreamer.prototype._setStatus = function(v) {
634 if (this._statusCallback) {
635 this._statusCallback(v);
636 }
637 };
638
639 _LogStreamer.prototype.load = function() {
640 if (this.finished) {
641 this._setStatus(null);
642 return Promise.resolve(null);
643 }
644
645 // If we have buffered logs, return them.
646 var current = this._currentLogBuffer;
647 if (current) {
648 // We will track how many log entries that we've rendered. If we exceed
649 // this amount, we will force a refresh so the logs appear streaming and
650 // the app remains responsive.
651 var rendered = 0;
652
653 var entries = [];
654 for (var le = current.next(); (le); le = current.next()) {
655 entries.push(le);
656 if (le.text && le.text.lines) {
657 rendered += le.text.lines.length;
658 }
659
660 if (this._burst > 0 && rendered >= this._burst) {
661 break;
662 }
663 }
664
665 // Have we exhausted this buffer?
666 if (! current.peek()) {
667 this._currentLogBuffer = null;
668 }
669
670 // Return the bundle of entries.
671 return Promise.resolve(entries);
672 }
673
674 // We didn't have any buffered logs, so either all of our streams are
675 // finished or our buffer is empty and needs to be refreshed.
676 this._setStatus("Loading log stream data...");
677 return this._buffer.nextBuffer().then(function(buf) {
678 this.someStreamsFailed = (!!this._buffer._failures.length);
679
680 // Check result.
681 if (buf === null) {
682 if (this._buffer.finished) {
683 // No more buffers, we are done.
684 console.log("All streams have been exhausted.");
685 this.finished = true;
686 this._setStatus(null);
687 return null;
688 }
689
690 // The buffer was incomplete. Should we retry after a delay, or do
691 // we need to wait for an explicit edge (e.g., auth)?
692 if (this._buffer.autoRetry) {
693 // Sleep for 5 seconds and try again (waiting).
694 console.log("Log stream delayed; sleeping", this._missingDelay,
695 "and retry.");
696 this._setStatus("Missing log streams, retrying after delay...");
697 return new LuciSleepPromise(this._missingDelay).then(function() {
698 if (this.finished) {
699 console.log("Streamer is deactivated, discarding.");
700 return null;
701 }
702
703 return this.load();
704 }.bind(this));
705 }
706
707 this._setStatus("Some log streams could not be loaded.");
708 return null;
709 }
710
711 // Install the new buffer and re-enter.
712 this._currentLogBuffer = buf;
713 return this.load();
714 }.bind(this)).catch(function(error) {
715 this._setStatus("[" + error.name + "] fetching streams: " +
716 error.message);
717 throw error;
718 }.bind(this));
719 };
720
721 /**
722 * Manages an aggregate log stream buffer, consisting of logs punted from a
723 * set of zero or more _BufferedStream instances.
724 */
725 function _LogStreamBuffer() {
726 this._streams = null;
727 this._active = null;
728 this._nextBufferPromise = null;
729 this._failures = [];
730
731 this.finished = false;
732 this._resetIterativeState();
733 }
734
735 _LogStreamBuffer.prototype.setStreams = function(streams) {
736 // TODO(dnj): Make this do a delta with previous streams so we don't lose
737 // their already-loaded logs if the page changes.
738 this._streams = streams.map(function(bs, i) {
739 return {
740 bs: bs,
741 active: true,
742 buffer: new _BufferedLogs(),
743 };
744 });
745 this._active = this._streams;
746 this._nextBufferPromise = null;
747 };
748
749 _LogStreamBuffer.prototype._resetIterativeState = function() {
750 this.autoRetry = false;
751 };
752
753 /**
754 * Returns a Promise that resolves into a _BufferedLogs instance containing
755 * the next set of logs, in order, from the source log streams.
756 *
757 * The _BufferedLogs bundle may have status flags set, and should be checked.
758 *
759 * The Promise will also resolve to "null" if there are no more logs in the
760 * source streams.
761 *
762 * If there are errors fetching logs, the Promise will be rejected, and an
763 * error will be returned.
764 */
765 _LogStreamBuffer.prototype.nextBuffer = function() {
766 // If we're already are fetching the next buffer, return that Promise.
767 if (this._nextBufferPromise) {
768 return this._nextBufferPromise;
769 }
770
771 // Filter our any finished streams from our active list. A stream is
772 // finished if it is finished streaming and we don't have a retained buffer
773 // from it.
774 this._active = this._active.filter(function(entry) {
775 return (entry.buffer.peek() || (! (entry.bs.finished || entry.bs.error)));
776 })
777
778 if (! this._active.length) {
779 this.finished = true;
780 }
781 if (this.finished) {
782 // No active streams, so we're finished. Permanently set our promise to
783 // the finished state.
784 this._nextBufferPromise = Promise.resolve(null);
785 return this._nextBufferPromise;
786 }
787
788 // Fill all buffers for all active streams. This may result in an RPC to
789 // load new buffer content for streams whose buffers are empty.
790 //
791 // RPC failures are handled here:
792 // - If the stream reports "not found", we will terminate early and set
793 // out status to "waiting". Our owner should retry after a delay.
794 // - Otherwise, we will set our status to "error". Our owner should report
795 // that an error has occurred while loading stream data.
796 this._resetIterativeState();
797
798 var incomplete = false;
799 this._nextBufferPromise = Promise.all(this._active.map(function(entry) {
800 // If the entry's buffer still has data, use it immediately.
801 if (entry.buffer.peek()) {
802 return entry.buffer;
803 }
804
805 // Get the next log buffer for each stream. This may result in an RPC.
806 return entry.bs.nextBuffer().then(function(buffer) {
807 // Retain this buffer, if valid. The stream may have returned a null
808 // buffer if it failed to fetch for a legitimate reason. In this case,
809 // we will not retain it (since we want entry.buffer to be valid), but
810 // will forward the "null" to our aggregate function.
811 if (buffer) {
812 entry.buffer = buffer;
813 } else {
814 incomplete = true;
815
816 // If this stream is waiting, but not on auth, mark that we should
817 // automatically retry.
818 if (entry.bs.waiting && !entry.bs.auth) {
819 this.autoRetry = true;
820 }
821 }
822 return buffer;
823 }.bind(this)).catch(function(error) {
824 // Log stream source of error. Raise a generic "failed to buffer"
825 // error. This will become a permanent failure.
826 console.error("Error loading buffer for", entry.bs.stream.fullName(),
827 "(", entry.bs, "): ", error);
828 this._failures.push(entry.bs);
829 return null;
830 }.bind(this));
831 }.bind(this))).then(function(buffers) {
832 this._nextBufferPromise = null;
833
834 // Check each buffer. If any are null, that stream failed to deliver.
835 if (incomplete) {
836 // We succeeded, but are incomplete. At least one stream failed to
837 // deliver and should have state flags set accordingly.
838 return null;
839 }
840
841 // Remove any null buffers. These would be placed here when a stream fails
842 // to load. Aggregate as much data from each of our streams as possible.
843 buffers = buffers.filter(v => (!!v));
844 return this._aggregateBuffers(buffers);
845 }.bind(this));
846 return this._nextBufferPromise;
847 };
848
849 _LogStreamBuffer.prototype._aggregateBuffers = function(buffers) {
850 switch (buffers.length) {
851 case 0:
852 // No buffers, so no logs.
853 return new _BufferedLogs(null);
854 case 1:
855 // As a special case, if we only have one buffer, and we assume that its
856 // entries are sorted, then that buffer is a return value.
857 return new _BufferedLogs(buffers[0].getAll());
858 }
859
860 // Preload our peek array.
861 var incomplete = false;
862 var peek = buffers.map(function(buf) {
863 var le = buf.peek();
864 if (! le) {
865 incomplete = true;
866 }
867 return le;
868 });
869 if (incomplete) {
870 // One of our input buffers had no log entries.
871 return new _BufferedLogs(null);
872 }
873
874 // Assemble our aggregate buffer array.
875 // TODO: A binary heap would be pretty great for this.
876 var entries = [];
877 while (true) {
878 // Choose the next stream.
879 var earliest = 0;
880 for (var i = 1; i < buffers.length; i++) {
881 if (_LogStreamBuffer.compareLogs(peek[i], peek[earliest]) < 0) {
882 earliest = i;
883 }
884 }
885
886 // Get the next log from the earliest stream.
887 entries.push(buffers[earliest].next());
888
889 // Repopulate that buffer's "peek" value. If the buffer has no more
890 // entries, then we're done.
891 var next = buffers[earliest].peek();
892 if (!next) {
893 return new _BufferedLogs(entries);
894 }
895 peek[earliest] = next;
896 }
897 };
898
899 _LogStreamBuffer.compareLogs = function(a, b) {
900 // If they are part of the same stream, compare prefix indexes.
901 if (a.source.stream.samePrefixAs(b.source.stream)) {
902 return (a.prefixIndex - b.prefixIndex);
903 }
904
905 // Compare based on timestamp.
906 return a.timestamp - b.timestamp;
907 };
908
909
910 /**
911 * A buffer of ordered log entries from all streams.
912 *
913 * Assumes total ownership of the input log buffer, which can be null to
914 * indicate no logs.
915 */
916 function _BufferedLogs(logs) {
917 this._logs = logs;
918 this._index = 0;
919 }
920
921 _BufferedLogs.prototype.getAll = function() {
922 // Pop all logs.
923 var logs = this._logs;
924 this._logs = null;
925 return logs;
926 };
927
928 _BufferedLogs.prototype.peek = function() {
929 return (this._logs) ? (this._logs[this._index]) : (null);
930 };
931
932 _BufferedLogs.prototype.next = function() {
933 if (! (this._logs && this._logs.length)) {
934 return null;
935 }
936
937 // Get the next log and increment our index.
938 var log = this._logs[this._index++];
939 if (this._index >= this._logs.length) {
940 this._logs = null;
941 }
942 return log;
943 };
944
945
946 /**
947 * Stateful log fetching manager for a single log stream.
948 */
949 function _BufferedStream(stream, client, oneOfMany, statusCallback) {
614 this.stream = stream; 950 this.stream = stream;
615 this.fetcher = fetcher; 951
616 952 this.error = null;
617 this._logs = null; 953 this.finished = false;
618 }; 954
619 /** 955 this._fetcher = new LogDogFetcher(client, stream);
620 * Refresh the buffer with the contents of the supplied logs array. 956 this._oneOfMany = oneOfMany;
621 * 957 this._statusCallback = statusCallback;
622 * @param {Array[Object]} logs The LogEntry protobuf objects from the fetcher 958 this._lastFetchIndex = null;
623 * to load. 959 }
624 */ 960
625 _BufferedStream.prototype.load = function(logs) { 961 _BufferedStream.INITIAL_FETCH_SIZE = 4096;
626 // Disallow a state where "logs" is not null but empty. 962
627 if (!(logs && logs.length)) { 963 _BufferedStream.prototype._resetIterativeState = function() {
628 this._logs = null; 964 this.waiting = false;
629 return; 965 this.auth = false;
630 } 966 this._fireStatusUpdated();
631 967
632 // Clone and reverse the logs. This means that the last log will be the 968 this._currentFetch = null;
633 // earliest. 969 };
634 this._logs = logs.splice(0); 970
635 this._logs.reverse(); 971 _BufferedStream.prototype.nextBuffer = function() {
636 }; 972 if (this._currentFetch) {
637 /** @returns {Object} The next buffered log, or null if none are buffered */ 973 return this._currentFetch;
638 _BufferedStream.prototype.peek = function() { 974 }
639 return (this._logs) ? (this._logs[this._logs.length-1]) : (null); 975
640 }; 976 // Reset per-round state and begin next round fetch.
641 /** 977 this._resetIterativeState();
642 * Returns the next buffered log, removing it from the buffer. 978
643 * 979 // If this is the first fetch, and we're not the only log stream being
644 * @return {Object} The next buffered LogEntry, or null if the buffer is 980 // rendered, fetch a small amount so we can (probably) start rendering
645 * empty. 981 // without waiting for a lot of huge chunks.
646 */ 982 this._fetcher.byteCount = (
647 _BufferedStream.prototype.pop = function() { 983 (this._lastFetchIndex === null) && this._oneOfMany) ?
648 if (!this._logs) { 984 (_BufferedStream.INITIAL_FETCH_SIZE) : (null);
649 return null; 985
650 } 986 this._currentFetch = this._fetcher.next().then(function(result) {
651 987 this._currentFetch = null;
652 var log = this._logs.pop(); 988
653 if (!this._logs.length) { 989 // Update our stream information.
654 this._logs = null; 990 this.finished = this._fetcher.finished;
655 } 991
656 return log; 992 // Augment each returned log entry with self-descriptive metadata.
657 }; 993 var logs = result.entries;
658 /** @returns {bool} true if the log stream is finished being fetched. */ 994 if (logs && logs.length) {
659 _BufferedStream.prototype.finished = function() { 995 logs.forEach(function(le) {
660 return this.fetcher.finished; 996 le.desc = result.desc;
997 le.state = result.state;
998 le.source = this;
999 }.bind(this));
1000
1001 // Record the latest fetch index.
1002 this._lastFetchIndex = logs[logs.length - 1].streamIndex;
1003 }
1004
1005 this._fireStatusUpdated();
1006 return new _BufferedLogs(logs);
1007 }.bind(this)).catch(function(error) {;
1008 // If this is a "not found" error, we assume that the stream is valid, but
1009 // hasn't been ingested into LogDog yet. Return "null".
1010 if (error instanceof LogDogError) {
1011 if (error.isPermissionDenied()) {
1012 this.waiting = true;
1013 this.auth = true;
1014 } else if (error.isNotFound()) {
1015 this.waiting = true;
1016 }
1017
1018 // If this is an error that we understand, recover from it, return
1019 // null, and set our status flags.
1020 if (this.waiting) {
1021 // Recover from this error.
1022 this._currentFetch = null;
1023 this._fireStatusUpdated();
1024 return null;
1025 }
1026 }
1027
1028 // Retain this error forever.
1029 this.error = error;
1030 throw error;
1031 }.bind(this));
1032 return this._currentFetch;
1033 };
1034
1035 _BufferedStream.prototype._fireStatusUpdated = function() {
1036 if (this._statusCallback) {
1037 this._statusCallback(this);
1038 }
1039 };
1040
1041 _BufferedStream.prototype.description = function() {
1042 if (this._waiting) {
1043 return "(Waiting)";
1044 }
1045
1046 var pieces = []
1047 var tidx = this._fetcher.terminalIndex();
1048 if (this._lastFetchIndex) {
1049 if (tidx >= 0) {
1050 pieces.push(this._lastFetchIndex + " / " + tidx);
1051 } else {
1052 pieces.push(this._lastFetchIndex + " ?");
1053 }
1054 }
1055
1056 if (this.error) {
1057 pieces.push("(Error)");
1058 } else if (this.auth) {
1059 pieces.push("(Auth Error)");
1060 } else if (this.waiting) {
1061 pieces.push("(Waiting)");
1062 } else if (!this._fetcher.state) {
1063 pieces.push("(Fetching)");
1064 } else if (this._fetcher.finished) {
1065 pieces.push("(Finished)");
1066 } else {
1067 pieces.push("(Streaming)");
1068 }
1069 return pieces.join(" ");
661 }; 1070 };
662 </script> 1071 </script>
OLDNEW
« no previous file with comments | « web/inc/logdog-stream-view/logdog-stream-query.html ('k') | web/inc/logdog-stream/logdog-error.html » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698