| Index: web/inc/logdog-stream-view/logdog-stream-view.html
|
| diff --git a/web/inc/logdog-stream-view/logdog-stream-view.html b/web/inc/logdog-stream-view/logdog-stream-view.html
|
| index 2da1de958fa7a476754f6c9f73466044a2e0dfcb..4de04d4ef89f3d2b3ba54fc5939ecb04f0981567 100644
|
| --- a/web/inc/logdog-stream-view/logdog-stream-view.html
|
| +++ b/web/inc/logdog-stream-view/logdog-stream-view.html
|
| @@ -5,9 +5,12 @@
|
| -->
|
|
|
| <link rel="import" href="../bower_components/polymer/polymer.html">
|
| +<link rel="import" href="../bower_components/google-signin/google-signin-aware.html">
|
| <link rel="import" href="../bower_components/paper-checkbox/paper-checkbox.html">
|
|
|
| <link rel="import" href="../logdog-stream/logdog-stream.html">
|
| +<link rel="import" href="../logdog-stream/logdog-error.html">
|
| +<link rel="import" href="../luci-sleep-promise/luci-sleep-promise.html">
|
| <link rel="import" href="logdog-stream-fetcher.html">
|
| <link rel="import" href="logdog-stream-query.html">
|
|
|
| @@ -23,9 +26,11 @@ An element for rendering muxed LogDog log streams.
|
| background-color: white;
|
| }
|
|
|
| - #counter {
|
| + #stream-status {
|
| position: fixed;
|
| right: 16px;
|
| + background-color: #EEEEEE;
|
| + opacity: 0.7;
|
| }
|
|
|
| #logContent {
|
| @@ -86,8 +91,24 @@ An element for rendering muxed LogDog log streams.
|
| height: 2px;
|
| margin-bottom: 10px;
|
| }
|
| +
|
| + #status-bar {
|
| + /* Overlay at the bottom of the page. */
|
| + position: absolute;
|
| + bottom: 0;
|
| + left: 0;
|
| + width: 100%;
|
| +
|
| + text-align: center;
|
| + font-size: 16px;
|
| + background-color: rgba(245, 245, 220, 0.7);
|
| + }
|
| </style>
|
|
|
| + <google-signin-aware
|
| + id="aware"
|
| + on-google-signin-aware-success="_onSignin"></google-signin-aware>
|
| +
|
| <rpc-client
|
| id="client"
|
| auto-token
|
| @@ -107,13 +128,13 @@ An element for rendering muxed LogDog log streams.
|
| </div>
|
|
|
| <!-- Display current fetching status, if stream data is still loading. -->
|
| - <template is="dom-if" if="{{fetch}}">
|
| - <div id="counter">
|
| + <template is="dom-if" if="{{streamStatus}}">
|
| + <div id="stream-status">
|
| <table>
|
| - <template is="dom-repeat" items="{{fetch.status}}">
|
| + <template is="dom-repeat" items="{{streamStatus}}">
|
| <tr>
|
| <td>{{item.name}}</td>
|
| - <td>{{item.status}}</td>
|
| + <td>{{item.desc}}</td>
|
| </tr>
|
| </template>
|
| </table>
|
| @@ -146,6 +167,9 @@ An element for rendering muxed LogDog log streams.
|
| <div id="bottom"></div>
|
| </div>
|
|
|
| + <template is="dom-if" if="{{statusBar}}">
|
| + <div id="status-bar">{{statusBar.value}}</div>
|
| + </template>
|
| </template>
|
|
|
| </dom-module>
|
| @@ -185,8 +209,8 @@ An element for rendering muxed LogDog log streams.
|
| * renders in between elements.
|
| */
|
| burst: {
|
| - type: Array,
|
| - value: 100,
|
| + type: Number,
|
| + value: 1000,
|
| notify: true,
|
| },
|
|
|
| @@ -215,25 +239,32 @@ An element for rendering muxed LogDog log streams.
|
| },
|
|
|
| /**
|
| - * The current log fetching context.
|
| - *
|
| - * The "Fetch" object is structured:
|
| - * fatch.streams: An array of _BufferedStream instances for each muxed
|
| - * stream.
|
| - * fetch.status: The renderable status for a given stream.
|
| + * The current stream status. This is an Array of objects:
|
| + * obj.name is the name of the stream.
|
| + * obj.desc is the status description of the stream.
|
| */
|
| - fetch: {
|
| - type: Object,
|
| + streamStatus: {
|
| + type: String,
|
| value: null,
|
| notify: true,
|
| readOnly: true,
|
| },
|
| +
|
| + /**
|
| + * The text content of the status element at the bottom of the page.
|
| + */
|
| + statusBar: {
|
| + type: String,
|
| + value: null,
|
| + readOnly: true,
|
| + },
|
| },
|
|
|
| ready: function() {
|
| - this._setFetch(null);
|
| this._scheduledWrite = null;
|
| - this._bufferedLogs = null;
|
| + this._buffer = null;
|
| + this._currentLogBuffer = null;
|
| + this._authCallback = null;
|
| },
|
|
|
| detached: function() {
|
| @@ -241,11 +272,41 @@ An element for rendering muxed LogDog log streams.
|
| },
|
|
|
| stop: function() {
|
| - this._cancelFetch();
|
| + this._cancelFetch(true);
|
| },
|
|
|
| /** Clears state and begins fetching log data. */
|
| reset: function() {
|
| + this._resetLogState();
|
| +
|
| + this._resolveStreams().then(function(streams) {
|
| + this._resetToStreams(streams);
|
| + }.bind(this)).catch(function(error) {
|
| + this._loadStatusBar("Failed to resolve streams:" + error);
|
| + throw error;
|
| + }.bind(this));
|
| + },
|
| +
|
| + /** Clears all current logs. */
|
| + _resetLogState: function() {
|
| + this._cancelFetch(true);
|
| +
|
| + // Remove all current log elements. */
|
| + while (this.$.logs.hasChildNodes()) {
|
| + this.$.logs.removeChild(this.$.logs.lastChild);
|
| + }
|
| +
|
| + // Clear our buffer and streamer state.
|
| + this._buffer = null;
|
| + this._currentLogBuffer = null;
|
| + if (this._streamer) {
|
| + this._streamer.shutdown();
|
| + }
|
| + this._streamer = null;
|
| + },
|
| +
|
| + _resolveStreams: function() {
|
| + // Separate our configured streams into full stream paths and queries.
|
| var parts = {
|
| queries: [],
|
| streams: [],
|
| @@ -259,69 +320,108 @@ An element for rendering muxed LogDog log streams.
|
| }
|
| });
|
|
|
| - Promise.all(parts.queries.map(function(v) {
|
| - var params = new LogDogQueryParams(v.project).
|
| - path(v.path).
|
| - streamType("text");
|
| - return new LogDogQuery(this.$.client, params).getAll();
|
| - }.bind(this))).then(function(results) {
|
| - // Add query results (if any) to streams.
|
| - results.forEach(function(streams) {
|
| - (streams || []).forEach(function(stream) {
|
| - parts.streams.push(stream.stream);
|
| + // Resolve any outstanding queries into full stream paths.
|
| + //
|
| + // If we get an authentication error, register to have our query
|
| + // resolution callback invoked on signin changes until it works (or
|
| + // indefinitely).
|
| + var queries = parts.queries.map(function(v) {
|
| + var params = new LogDogQueryParams(v.project).
|
| + path(v.path).
|
| + streamType("text");
|
| + return new LogDogQuery(this.$.client, params);
|
| + }.bind(this));
|
| +
|
| + var issueQuery = function() {
|
| + this._loadStatusBar("Resolving log streams from query...");
|
| + this._authCallback = null;
|
| +
|
| + return Promise.all(queries.map(function(q) {
|
| + return q.getAll();
|
| + }.bind(this))).then(function(results) {
|
| + this._loadStatusBar(null);
|
| +
|
| + // Add query results (if any) to streams.
|
| + results.forEach(function(streams) {
|
| + (streams || []).forEach(function(stream) {
|
| + parts.streams.push(stream.stream);
|
| + });
|
| });
|
| - });
|
| + parts.streams.sort(LogDogStream.cmp);
|
| +
|
| + // Remove any duplicates.
|
| + var seenStreams = {};
|
| + var result = [];
|
| + parts.streams.forEach(function(s) {
|
| + var fullName = s.fullName();
|
| + if (!seenStreams[fullName]) {
|
| + seenStreams[fullName] = s;
|
| + result.push(s);
|
| + }
|
| + });
|
| + return result;
|
| + }.bind(this)).catch(function(error) {
|
| + if (error instanceof LogDogError && error.isPermissionDenied()) {
|
| + // Retry on auth event.
|
| + this._loadStatusBar("Not authorized to execute query. Log in " +
|
| + "with an authorized account.");
|
| + return new Promise(function(resolve) {
|
| + this._authCallback = resolve;
|
| + }.bind(this)).then(issueQuery);
|
| + }
|
|
|
| - // Start loading the streams.
|
| - this._resetToStreams(parts.streams);
|
| - }.bind(this));
|
| + throw error;
|
| + }.bind(this));
|
| + }.bind(this);
|
| + return issueQuery();
|
| },
|
|
|
| _resetToStreams: function(streams) {
|
| - this._cancelFetch();
|
| - this._clearLogs();
|
| -
|
| -
|
| // Unique streams.
|
| if (!streams.length) {
|
| + this._loadStatusBar("No log streams.");
|
| return;
|
| }
|
|
|
| console.log("Loading log streams:", streams);
|
| + this._loadStatusBar("Loading stream data...");
|
| streams.sort(LogDogStream.cmp);
|
|
|
| - // Construct our fetch context.
|
| - var fetch = {};
|
| - fetch.streams = streams.map(function(stream) {
|
| - // TODO: Re-use fetcher if it already exists in the previous streams
|
| - // map.
|
| - return new _BufferedStream(stream, new LogDogFetcher(
|
| - this.$.client, stream.project, stream.path));
|
| + // Create a _BufferedStream for each stream.
|
| + var bufStreams = streams.map(function(stream, idx) {
|
| + return new _BufferedStream(stream, this.$.client,
|
| + (streams.length > 1), function(bs) {
|
| + this._updateStreamStatus(bs, idx);
|
| + }.bind(this));
|
| + }.bind(this));
|
| + this._buffer = new _LogStreamBuffer();
|
| + this._buffer.setStreams(bufStreams)
|
| +
|
| + this._streamer = new _LogStreamer(this._buffer, this.burst, function(v) {
|
| + this._loadStatusBar(v);
|
| }.bind(this));
|
| - fetch.status = fetch.streams.map(function(v, idx) {
|
| - var name = v.stream.path;
|
| - var lidx = name.lastIndexOf("/");
|
| - if (lidx >= 0) {
|
| - name = idx + " [.../" + name.substr(lidx+1) + "]";
|
| - }
|
|
|
| + // Construct our initial status content.
|
| + this._setStreamStatus(bufStreams.map(function(bs, idx) {
|
| return {
|
| - name: name,
|
| - status: this._buildStreamStatus(v, null),
|
| + name: (" [.../+/" + bs.stream.name() + "]"),
|
| + desc: bs.description(),
|
| };
|
| - }.bind(this));
|
| - this._setFetch(fetch);
|
| + }.bind(this)));
|
|
|
| // Kick off our log fetching.
|
| this._scheduleWriteNextLogs();
|
| },
|
|
|
| /** Cancels any currently-executing log stream fetch. */
|
| - _cancelFetch: function() {
|
| - if (this.fetch) {
|
| - this._setFetch(null);
|
| - }
|
| + _cancelFetch: function(clear) {
|
| this._cancelScheduledWrite();
|
| + this._authCallback = null;
|
| +
|
| + if (clear) {
|
| + this._setStreamStatus(null);
|
| + this._loadStatusBar(null);
|
| + }
|
| },
|
|
|
| /** Cancels any scheduled asynchronous write. */
|
| @@ -343,8 +443,10 @@ An element for rendering muxed LogDog log streams.
|
| // to the bottom.
|
| this._maybeScrollToBottom();
|
|
|
| - if (!this._scheduledWrite) {
|
| - this._scheduledWrite = this.async(this._writeNextLogs);
|
| + if (! this._scheduledWrite) {
|
| + this._scheduledWrite = this.async(function() {
|
| + this._writeNextLogs()
|
| + }.bind(this));
|
| }
|
| },
|
|
|
| @@ -356,151 +458,45 @@ An element for rendering muxed LogDog log streams.
|
| _writeNextLogs: function() {
|
| this._cancelScheduledWrite();
|
|
|
| - if (this._writeNextLogsImpl()) {
|
| - // Yield so that our browser can refresh. We can't directly use
|
| - // this.async since a timeout of "0" causes immediate execution instead
|
| - // of yielding.
|
| - setTimeout(this._scheduleWriteNextLogs.bind(this), 0);
|
| - }
|
| - },
|
| -
|
| - /**
|
| - * Primary implementation of _writeNextLogs.
|
| - *
|
| - * Returns true if any logs were rendered.
|
| - */
|
| - _writeNextLogsImpl: function() {
|
| - var fetch = this.fetch;
|
| - if (!(fetch && fetch.streams.length)) {
|
| - return false;
|
| - }
|
| -
|
| - // Render any buffered logs.
|
| - var buffer = this._getOrBuildLogBuffer(fetch.streams);
|
| - if (buffer) {
|
| - // We will track how many log entries that we've rendered. If we exceed
|
| - // this amount, we will force a refresh so the logs appear streaming and
|
| - // the app remains responsive.
|
| - var rendered = 0;
|
| - var updated = {};
|
| -
|
| - while (buffer.length && rendered < this.burst) {
|
| - // Get the next log. The buffer is sorted descendingly, so we can use
|
| - // pop to get it.
|
| - var log = buffer.pop();
|
| - rendered += this._appendLogEntry(log);
|
| -
|
| - // Record our last appended log entry for this stream.
|
| - updated[log.fetchIndex] = log.streamIndex;
|
| + this._streamer.load().then(function(entries) {
|
| + // If there are no entries, then we're done.
|
| + if (! entries) {
|
| + // Cancel all fetching state. If our streamer is finished, also clear
|
| + // messages and status.
|
| + if (this._streamer.finished) {
|
| + if (this._streamer.someStreamsFailed) {
|
| + this._cancelFetch(false);
|
| + this._loadStatusBar("Some streams failed to load.");
|
| + } else {
|
| + this._cancelFetch(true);
|
| + }
|
| + } else {
|
| + // No more logs, but also we are not finished. Retry after auth.
|
| + this._authCallback = this._scheduleWriteNextLogs.bind(this);
|
| + }
|
| + return;
|
| }
|
|
|
| - Object.keys(updated).forEach(function(idx) {
|
| - var statusKey = ("fetch.status." + idx + ".status");
|
| - this.set(statusKey, this._buildStreamStatus(
|
| - fetch.streams[idx], updated[idx]));
|
| + var logEntryChunk = document.createElement("div");
|
| + entries.forEach(function(le) {
|
| + this._appendLogEntry(logEntryChunk, le);
|
| }.bind(this));
|
|
|
| - // If we rendered any logs, we will finish this write round.
|
| - if (rendered) {
|
| - return true;
|
| - }
|
| - }
|
| + // To have styles apply correctly, we need to add it twice, see
|
| + // https://github.com/Polymer/polymer/issues/3100.
|
| + Polymer.dom(this.root).appendChild(logEntryChunk);
|
| + this.$.logs.appendChild(logEntryChunk);
|
|
|
| - // We didn't have any buffered logs, so either all of our streams are
|
| - // finished, or our buffer is empty and needs to be refreshed.
|
| - if(fetch.streams.every(function(v) {
|
| - return (v.finished());
|
| - })) {
|
| - console.log("All streams have been exhausted.");
|
| - this._cancelFetch();
|
| - return false;
|
| - }
|
| -
|
| - // Fetch any streams' missing logs. If a stream already has buffered logs,
|
| - // skip it in this fetch.
|
| - Promise.all(fetch.streams.map(function(v) {
|
| - if (v.finished() || v.peek() !== null) {
|
| - // This stream still has buffered logs.
|
| - return null;
|
| - }
|
| - return v.fetcher.next();
|
| - })).then(function(result) {
|
| - result.forEach(function(v, i) {
|
| - if (v) {
|
| - fetch.streams[i].load(v.entries);
|
| - }
|
| - }.bind(this));
|
| - this._scheduleWriteNextLogs();
|
| + // Yield so that our browser can refresh. We can't directly use
|
| + // this.async since a timeout of "0" causes immediate execution instead
|
| + // of yielding.
|
| + setTimeout(function() {
|
| + this._scheduleWriteNextLogs();
|
| + }.bind(this), 0);
|
| }.bind(this));
|
| - return false;
|
| - },
|
| -
|
| - /**
|
| - * Examines the current buffered set of logs/streams. If sufficient logs
|
| - * are buffered to render the next log, it will be immediately added and
|
| - * this function will return "true". Otherwise, it will return "false",
|
| - * indicating that log fetch must be performed.
|
| - */
|
| - _getOrBuildLogBuffer: function(streams) {
|
| - if (this._bufferedLogs && this._bufferedLogs.length) {
|
| - return this._bufferedLogs;
|
| - }
|
| -
|
| - // If we have no active streams, we can't buffer anything.
|
| - var active = [];
|
| - streams.forEach(function(v, idx) {
|
| - var next = v.peek();
|
| - if (next) {
|
| - active.push({
|
| - stream: v,
|
| - streamIndex: idx,
|
| - next: next,
|
| - });
|
| - }
|
| - });
|
| - if (!active.length) {
|
| - return null;
|
| - }
|
| -
|
| - // Build our log buffer.
|
| - //
|
| - // TODO: A binary heap would be pretty great for this.
|
| - var buffer = [];
|
| - while (true) {
|
| - // Choose the next stream.
|
| - var earliest = 0;
|
| - for (var i = 1; i < active.length; i++) {
|
| - if (active[i].next.timestamp < active[earliest].next.timestamp) {
|
| - earliest = i;
|
| - }
|
| - }
|
| -
|
| - // Get the next log from the earliest stream.
|
| - //
|
| - // Additionally, record the index in the original streams array that
|
| - // this log came from. We need this to update stream status when the
|
| - // log is consumed.
|
| - var nextStream = active[earliest];
|
| - var nextLog = nextStream.stream.pop();
|
| - nextLog.fetchIndex = nextStream.streamIndex;
|
| - buffer.push(nextLog);
|
| -
|
| - nextStream.next = nextStream.stream.peek();
|
| - if (nextStream.next) {
|
| - // This stream has more logs, so we can continue building our buffer.
|
| - continue;
|
| - }
|
| -
|
| - // This stream has no more buffered entries, so we're done.
|
| - //
|
| - // Reverse our log buffer so we can easily pop logs from it.
|
| - buffer.reverse();
|
| - this._bufferedLogs = buffer;
|
| - return buffer;
|
| - }
|
| },
|
|
|
| - _appendLogEntry: function(le) {
|
| + _appendLogEntry: function(root, le) {
|
| var text = le.text;
|
| if (!(text && text.lines)) {
|
| return 0;
|
| @@ -542,34 +538,17 @@ An element for rendering muxed LogDog log streams.
|
| }
|
| }
|
| entryRow.appendChild(logDataBlock);
|
| -
|
| - // To have styles apply correctly, we need to add it twice, see
|
| - // https://github.com/Polymer/polymer/issues/3100.
|
| - Polymer.dom(this.root).appendChild(entryRow);
|
| - this.$.logs.appendChild(entryRow);
|
| + root.appendChild(entryRow);
|
|
|
| return le.text.lines.length;
|
| },
|
|
|
| - /** Clears all current logs. */
|
| - _clearLogs: function() {
|
| - while (this.$.logs.hasChildNodes()) {
|
| - this.$.logs.removeChild(this.$.logs.lastChild);
|
| - }
|
| - this._bufferedLogs = null;
|
| - },
|
| -
|
| - /** Constructs the log stream status object for a given stream. */
|
| - _buildStreamStatus: function(stream, lastStreamIndex) {
|
| - if (!lastStreamIndex && lastStreamIndex !== 0) {
|
| - return "(Fetching)";
|
| - }
|
| -
|
| - var tidx = stream.fetcher.terminalIndex;
|
| - if (tidx >= 0) {
|
| - return lastStreamIndex + " / " + tidx;
|
| - }
|
| - return lastStreamIndex + " (Streaming)";
|
| + _updateStreamStatus: function(bs, idx) {
|
| + var origStatus = this.streamStatus[idx];
|
| + this.splice("streamStatus", idx, 1, {
|
| + name: origStatus.name,
|
| + desc: bs.description(),
|
| + });
|
| },
|
|
|
| /** Scrolls to the bottom if "follow" is enabled. */
|
| @@ -605,58 +584,488 @@ An element for rendering muxed LogDog log streams.
|
| _handleMouseWheel: function() {
|
| this.follow = false;
|
| },
|
| +
|
| + /**
|
| + * Loads text content into the status bar.
|
| + *
|
| + * If null is passed, the status bar will be cleared. If text is passed, the
|
| + * status bar will become visible with the supplied content.
|
| + */
|
| + _loadStatusBar: function(v) {
|
| + var st = null;
|
| + if (v) {
|
| + st = {
|
| + value: v,
|
| + };
|
| + }
|
| + this._setStatusBar(st);
|
| + },
|
| +
|
| + _onSignin: function() {
|
| + var fn = this._authCallback;
|
| + if (fn) {
|
| + this._authCallback = null;
|
| + fn();
|
| + }
|
| + },
|
| });
|
|
|
| /**
|
| - * Container for logs that have been punted.
|
| + * Continuously loads log streams from a _LogStreamBuffer and exposes them via
|
| + * callback.
|
| */
|
| - function _BufferedStream(stream, fetcher) {
|
| - this.stream = stream;
|
| - this.fetcher = fetcher;
|
| + function _LogStreamer(buffer, burst, statusCallback) {
|
| + this._buffer = buffer;
|
| + this._burst = (burst || 0);
|
| + this._missingDelay = 5000;
|
| + this._statusCallback = statusCallback;
|
|
|
| - this._logs = null;
|
| + this.finished = false;
|
| + this.someStreamsFailed = false;
|
| +
|
| + this._currentLogBuffer = null;
|
| + }
|
| +
|
| + _LogStreamer.prototype.shutdown = function() {
|
| + this.finshed = true;
|
| + };
|
| +
|
| + _LogStreamer.prototype._setStatus = function(v) {
|
| + if (this._statusCallback) {
|
| + this._statusCallback(v);
|
| + }
|
| + };
|
| +
|
| + _LogStreamer.prototype.load = function() {
|
| + if (this.finished) {
|
| + this._setStatus(null);
|
| + return Promise.resolve(null);
|
| + }
|
| +
|
| + // If we have buffered logs, return them.
|
| + var current = this._currentLogBuffer;
|
| + if (current) {
|
| + // We will track how many log entries that we've rendered. If we exceed
|
| + // this amount, we will force a refresh so the logs appear streaming and
|
| + // the app remains responsive.
|
| + var rendered = 0;
|
| +
|
| + var entries = [];
|
| + for (var le = current.next(); (le); le = current.next()) {
|
| + entries.push(le);
|
| + if (le.text && le.text.lines) {
|
| + rendered += le.text.lines.length;
|
| + }
|
| +
|
| + if (this._burst > 0 && rendered >= this._burst) {
|
| + break;
|
| + }
|
| + }
|
| +
|
| + // Have we exhausted this buffer?
|
| + if (! current.peek()) {
|
| + this._currentLogBuffer = null;
|
| + }
|
| +
|
| + // Return the bundle of entries.
|
| + return Promise.resolve(entries);
|
| + }
|
| +
|
| + // We didn't have any buffered logs, so either all of our streams are
|
| + // finished or our buffer is empty and needs to be refreshed.
|
| + this._setStatus("Loading log stream data...");
|
| + return this._buffer.nextBuffer().then(function(buf) {
|
| + this.someStreamsFailed = (!!this._buffer._failures.length);
|
| +
|
| + // Check result.
|
| + if (buf === null) {
|
| + if (this._buffer.finished) {
|
| + // No more buffers, we are done.
|
| + console.log("All streams have been exhausted.");
|
| + this.finished = true;
|
| + this._setStatus(null);
|
| + return null;
|
| + }
|
| +
|
| + // The buffer was incomplete. Should we retry after a delay, or do
|
| + // we need to wait for an explicit edge (e.g., auth)?
|
| + if (this._buffer.autoRetry) {
|
| + // Sleep for 5 seconds and try again (waiting).
|
| + console.log("Log stream delayed; sleeping", this._missingDelay,
|
| + "and retry.");
|
| + this._setStatus("Missing log streams, retrying after delay...");
|
| + return new LuciSleepPromise(this._missingDelay).then(function() {
|
| + if (this.finished) {
|
| + console.log("Streamer is deactivated, discarding.");
|
| + return null;
|
| + }
|
| +
|
| + return this.load();
|
| + }.bind(this));
|
| + }
|
| +
|
| + this._setStatus("Some log streams could not be loaded.");
|
| + return null;
|
| + }
|
| +
|
| + // Install the new buffer and re-enter.
|
| + this._currentLogBuffer = buf;
|
| + return this.load();
|
| + }.bind(this)).catch(function(error) {
|
| + this._setStatus("[" + error.name + "] fetching streams: " +
|
| + error.message);
|
| + throw error;
|
| + }.bind(this));
|
| + };
|
| +
|
| + /**
|
| + * Manages an aggregate log stream buffer, consisting of logs punted from a
|
| + * set of zero or more _BufferedStream instances.
|
| + */
|
| + function _LogStreamBuffer() {
|
| + this._streams = null;
|
| + this._active = null;
|
| + this._nextBufferPromise = null;
|
| + this._failures = [];
|
| +
|
| + this.finished = false;
|
| + this._resetIterativeState();
|
| + }
|
| +
|
| + _LogStreamBuffer.prototype.setStreams = function(streams) {
|
| + // TODO(dnj): Make this do a delta with previous streams so we don't lose
|
| + // their already-loaded logs if the page changes.
|
| + this._streams = streams.map(function(bs, i) {
|
| + return {
|
| + bs: bs,
|
| + active: true,
|
| + buffer: new _BufferedLogs(),
|
| + };
|
| + });
|
| + this._active = this._streams;
|
| + this._nextBufferPromise = null;
|
| + };
|
| +
|
| + _LogStreamBuffer.prototype._resetIterativeState = function() {
|
| + this.autoRetry = false;
|
| };
|
| +
|
| /**
|
| - * Refresh the buffer with the contents of the supplied logs array.
|
| + * Returns a Promise that resolves into a _BufferedLogs instance containing
|
| + * the next set of logs, in order, from the source log streams.
|
| + *
|
| + * The _BufferedLogs bundle may have status flags set, and should be checked.
|
| + *
|
| + * The Promise will also resolve to "null" if there are no more logs in the
|
| + * source streams.
|
| *
|
| - * @param {Array[Object]} logs The LogEntry protobuf objects from the fetcher
|
| - * to load.
|
| + * If there are errors fetching logs, the Promise will be rejected, and an
|
| + * error will be returned.
|
| */
|
| - _BufferedStream.prototype.load = function(logs) {
|
| - // Disallow a state where "logs" is not null but empty.
|
| - if (!(logs && logs.length)) {
|
| - this._logs = null;
|
| - return;
|
| + _LogStreamBuffer.prototype.nextBuffer = function() {
|
| + // If we're already are fetching the next buffer, return that Promise.
|
| + if (this._nextBufferPromise) {
|
| + return this._nextBufferPromise;
|
| + }
|
| +
|
| + // Filter our any finished streams from our active list. A stream is
|
| + // finished if it is finished streaming and we don't have a retained buffer
|
| + // from it.
|
| + this._active = this._active.filter(function(entry) {
|
| + return (entry.buffer.peek() || (! (entry.bs.finished || entry.bs.error)));
|
| + })
|
| +
|
| + if (! this._active.length) {
|
| + this.finished = true;
|
| + }
|
| + if (this.finished) {
|
| + // No active streams, so we're finished. Permanently set our promise to
|
| + // the finished state.
|
| + this._nextBufferPromise = Promise.resolve(null);
|
| + return this._nextBufferPromise;
|
| + }
|
| +
|
| + // Fill all buffers for all active streams. This may result in an RPC to
|
| + // load new buffer content for streams whose buffers are empty.
|
| + //
|
| + // RPC failures are handled here:
|
| + // - If the stream reports "not found", we will terminate early and set
|
| + // out status to "waiting". Our owner should retry after a delay.
|
| + // - Otherwise, we will set our status to "error". Our owner should report
|
| + // that an error has occurred while loading stream data.
|
| + this._resetIterativeState();
|
| +
|
| + var incomplete = false;
|
| + this._nextBufferPromise = Promise.all(this._active.map(function(entry) {
|
| + // If the entry's buffer still has data, use it immediately.
|
| + if (entry.buffer.peek()) {
|
| + return entry.buffer;
|
| + }
|
| +
|
| + // Get the next log buffer for each stream. This may result in an RPC.
|
| + return entry.bs.nextBuffer().then(function(buffer) {
|
| + // Retain this buffer, if valid. The stream may have returned a null
|
| + // buffer if it failed to fetch for a legitimate reason. In this case,
|
| + // we will not retain it (since we want entry.buffer to be valid), but
|
| + // will forward the "null" to our aggregate function.
|
| + if (buffer) {
|
| + entry.buffer = buffer;
|
| + } else {
|
| + incomplete = true;
|
| +
|
| + // If this stream is waiting, but not on auth, mark that we should
|
| + // automatically retry.
|
| + if (entry.bs.waiting && !entry.bs.auth) {
|
| + this.autoRetry = true;
|
| + }
|
| + }
|
| + return buffer;
|
| + }.bind(this)).catch(function(error) {
|
| + // Log stream source of error. Raise a generic "failed to buffer"
|
| + // error. This will become a permanent failure.
|
| + console.error("Error loading buffer for", entry.bs.stream.fullName(),
|
| + "(", entry.bs, "): ", error);
|
| + this._failures.push(entry.bs);
|
| + return null;
|
| + }.bind(this));
|
| + }.bind(this))).then(function(buffers) {
|
| + this._nextBufferPromise = null;
|
| +
|
| + // Check each buffer. If any are null, that stream failed to deliver.
|
| + if (incomplete) {
|
| + // We succeeded, but are incomplete. At least one stream failed to
|
| + // deliver and should have state flags set accordingly.
|
| + return null;
|
| + }
|
| +
|
| + // Remove any null buffers. These would be placed here when a stream fails
|
| + // to load. Aggregate as much data from each of our streams as possible.
|
| + buffers = buffers.filter(v => (!!v));
|
| + return this._aggregateBuffers(buffers);
|
| + }.bind(this));
|
| + return this._nextBufferPromise;
|
| + };
|
| +
|
| + _LogStreamBuffer.prototype._aggregateBuffers = function(buffers) {
|
| + switch (buffers.length) {
|
| + case 0:
|
| + // No buffers, so no logs.
|
| + return new _BufferedLogs(null);
|
| + case 1:
|
| + // As a special case, if we only have one buffer, and we assume that its
|
| + // entries are sorted, then that buffer is a return value.
|
| + return new _BufferedLogs(buffers[0].getAll());
|
| + }
|
| +
|
| + // Preload our peek array.
|
| + var incomplete = false;
|
| + var peek = buffers.map(function(buf) {
|
| + var le = buf.peek();
|
| + if (! le) {
|
| + incomplete = true;
|
| + }
|
| + return le;
|
| + });
|
| + if (incomplete) {
|
| + // One of our input buffers had no log entries.
|
| + return new _BufferedLogs(null);
|
| }
|
|
|
| - // Clone and reverse the logs. This means that the last log will be the
|
| - // earliest.
|
| - this._logs = logs.splice(0);
|
| - this._logs.reverse();
|
| + // Assemble our aggregate buffer array.
|
| + // TODO: A binary heap would be pretty great for this.
|
| + var entries = [];
|
| + while (true) {
|
| + // Choose the next stream.
|
| + var earliest = 0;
|
| + for (var i = 1; i < buffers.length; i++) {
|
| + if (_LogStreamBuffer.compareLogs(peek[i], peek[earliest]) < 0) {
|
| + earliest = i;
|
| + }
|
| + }
|
| +
|
| + // Get the next log from the earliest stream.
|
| + entries.push(buffers[earliest].next());
|
| +
|
| + // Repopulate that buffer's "peek" value. If the buffer has no more
|
| + // entries, then we're done.
|
| + var next = buffers[earliest].peek();
|
| + if (!next) {
|
| + return new _BufferedLogs(entries);
|
| + }
|
| + peek[earliest] = next;
|
| + }
|
| };
|
| - /** @returns {Object} The next buffered log, or null if none are buffered */
|
| - _BufferedStream.prototype.peek = function() {
|
| - return (this._logs) ? (this._logs[this._logs.length-1]) : (null);
|
| +
|
| + _LogStreamBuffer.compareLogs = function(a, b) {
|
| + // If they are part of the same stream, compare prefix indexes.
|
| + if (a.source.stream.samePrefixAs(b.source.stream)) {
|
| + return (a.prefixIndex - b.prefixIndex);
|
| + }
|
| +
|
| + // Compare based on timestamp.
|
| + return a.timestamp - b.timestamp;
|
| };
|
| +
|
| +
|
| /**
|
| - * Returns the next buffered log, removing it from the buffer.
|
| + * A buffer of ordered log entries from all streams.
|
| *
|
| - * @return {Object} The next buffered LogEntry, or null if the buffer is
|
| - * empty.
|
| + * Assumes total ownership of the input log buffer, which can be null to
|
| + * indicate no logs.
|
| */
|
| - _BufferedStream.prototype.pop = function() {
|
| - if (!this._logs) {
|
| + function _BufferedLogs(logs) {
|
| + this._logs = logs;
|
| + this._index = 0;
|
| + }
|
| +
|
| + _BufferedLogs.prototype.getAll = function() {
|
| + // Pop all logs.
|
| + var logs = this._logs;
|
| + this._logs = null;
|
| + return logs;
|
| + };
|
| +
|
| + _BufferedLogs.prototype.peek = function() {
|
| + return (this._logs) ? (this._logs[this._index]) : (null);
|
| + };
|
| +
|
| + _BufferedLogs.prototype.next = function() {
|
| + if (! (this._logs && this._logs.length)) {
|
| return null;
|
| }
|
|
|
| - var log = this._logs.pop();
|
| - if (!this._logs.length) {
|
| + // Get the next log and increment our index.
|
| + var log = this._logs[this._index++];
|
| + if (this._index >= this._logs.length) {
|
| this._logs = null;
|
| }
|
| return log;
|
| };
|
| - /** @returns {bool} true if the log stream is finished being fetched. */
|
| - _BufferedStream.prototype.finished = function() {
|
| - return this.fetcher.finished;
|
| +
|
| +
|
| + /**
|
| + * Stateful log fetching manager for a single log stream.
|
| + */
|
| + function _BufferedStream(stream, client, oneOfMany, statusCallback) {
|
| + this.stream = stream;
|
| +
|
| + this.error = null;
|
| + this.finished = false;
|
| +
|
| + this._fetcher = new LogDogFetcher(client, stream);
|
| + this._oneOfMany = oneOfMany;
|
| + this._statusCallback = statusCallback;
|
| + this._lastFetchIndex = null;
|
| + }
|
| +
|
| + _BufferedStream.INITIAL_FETCH_SIZE = 4096;
|
| +
|
| + _BufferedStream.prototype._resetIterativeState = function() {
|
| + this.waiting = false;
|
| + this.auth = false;
|
| + this._fireStatusUpdated();
|
| +
|
| + this._currentFetch = null;
|
| + };
|
| +
|
| + _BufferedStream.prototype.nextBuffer = function() {
|
| + if (this._currentFetch) {
|
| + return this._currentFetch;
|
| + }
|
| +
|
| + // Reset per-round state and begin next round fetch.
|
| + this._resetIterativeState();
|
| +
|
| + // If this is the first fetch, and we're not the only log stream being
|
| + // rendered, fetch a small amount so we can (probably) start rendering
|
| + // without waiting for a lot of huge chunks.
|
| + this._fetcher.byteCount = (
|
| + (this._lastFetchIndex === null) && this._oneOfMany) ?
|
| + (_BufferedStream.INITIAL_FETCH_SIZE) : (null);
|
| +
|
| + this._currentFetch = this._fetcher.next().then(function(result) {
|
| + this._currentFetch = null;
|
| +
|
| + // Update our stream information.
|
| + this.finished = this._fetcher.finished;
|
| +
|
| + // Augment each returned log entry with self-descriptive metadata.
|
| + var logs = result.entries;
|
| + if (logs && logs.length) {
|
| + logs.forEach(function(le) {
|
| + le.desc = result.desc;
|
| + le.state = result.state;
|
| + le.source = this;
|
| + }.bind(this));
|
| +
|
| + // Record the latest fetch index.
|
| + this._lastFetchIndex = logs[logs.length - 1].streamIndex;
|
| + }
|
| +
|
| + this._fireStatusUpdated();
|
| + return new _BufferedLogs(logs);
|
| + }.bind(this)).catch(function(error) {;
|
| + // If this is a "not found" error, we assume that the stream is valid, but
|
| + // hasn't been ingested into LogDog yet. Return "null".
|
| + if (error instanceof LogDogError) {
|
| + if (error.isPermissionDenied()) {
|
| + this.waiting = true;
|
| + this.auth = true;
|
| + } else if (error.isNotFound()) {
|
| + this.waiting = true;
|
| + }
|
| +
|
| + // If this is an error that we understand, recover from it, return
|
| + // null, and set our status flags.
|
| + if (this.waiting) {
|
| + // Recover from this error.
|
| + this._currentFetch = null;
|
| + this._fireStatusUpdated();
|
| + return null;
|
| + }
|
| + }
|
| +
|
| + // Retain this error forever.
|
| + this.error = error;
|
| + throw error;
|
| + }.bind(this));
|
| + return this._currentFetch;
|
| + };
|
| +
|
| + _BufferedStream.prototype._fireStatusUpdated = function() {
|
| + if (this._statusCallback) {
|
| + this._statusCallback(this);
|
| + }
|
| + };
|
| +
|
| + _BufferedStream.prototype.description = function() {
|
| + if (this._waiting) {
|
| + return "(Waiting)";
|
| + }
|
| +
|
| + var pieces = []
|
| + var tidx = this._fetcher.terminalIndex();
|
| + if (this._lastFetchIndex) {
|
| + if (tidx >= 0) {
|
| + pieces.push(this._lastFetchIndex + " / " + tidx);
|
| + } else {
|
| + pieces.push(this._lastFetchIndex + " ?");
|
| + }
|
| + }
|
| +
|
| + if (this.error) {
|
| + pieces.push("(Error)");
|
| + } else if (this.auth) {
|
| + pieces.push("(Auth Error)");
|
| + } else if (this.waiting) {
|
| + pieces.push("(Waiting)");
|
| + } else if (!this._fetcher.state) {
|
| + pieces.push("(Fetching)");
|
| + } else if (this._fetcher.finished) {
|
| + pieces.push("(Finished)");
|
| + } else {
|
| + pieces.push("(Streaming)");
|
| + }
|
| + return pieces.join(" ");
|
| };
|
| </script>
|
|
|