Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(157)

Unified Diff: web/inc/logdog-stream-view/logdog-stream-view.html

Issue 2335223003: LogDog: Update web code, stream/query with auth. (Closed)
Patch Set: Fix background page loading. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « web/inc/logdog-stream-view/logdog-stream-query.html ('k') | web/inc/logdog-stream/logdog-error.html » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: web/inc/logdog-stream-view/logdog-stream-view.html
diff --git a/web/inc/logdog-stream-view/logdog-stream-view.html b/web/inc/logdog-stream-view/logdog-stream-view.html
index 2da1de958fa7a476754f6c9f73466044a2e0dfcb..4de04d4ef89f3d2b3ba54fc5939ecb04f0981567 100644
--- a/web/inc/logdog-stream-view/logdog-stream-view.html
+++ b/web/inc/logdog-stream-view/logdog-stream-view.html
@@ -5,9 +5,12 @@
-->
<link rel="import" href="../bower_components/polymer/polymer.html">
+<link rel="import" href="../bower_components/google-signin/google-signin-aware.html">
<link rel="import" href="../bower_components/paper-checkbox/paper-checkbox.html">
<link rel="import" href="../logdog-stream/logdog-stream.html">
+<link rel="import" href="../logdog-stream/logdog-error.html">
+<link rel="import" href="../luci-sleep-promise/luci-sleep-promise.html">
<link rel="import" href="logdog-stream-fetcher.html">
<link rel="import" href="logdog-stream-query.html">
@@ -23,9 +26,11 @@ An element for rendering muxed LogDog log streams.
background-color: white;
}
- #counter {
+ #stream-status {
position: fixed;
right: 16px;
+ background-color: #EEEEEE;
+ opacity: 0.7;
}
#logContent {
@@ -86,8 +91,24 @@ An element for rendering muxed LogDog log streams.
height: 2px;
margin-bottom: 10px;
}
+
+ #status-bar {
+ /* Overlay at the bottom of the page. */
+ position: absolute;
+ bottom: 0;
+ left: 0;
+ width: 100%;
+
+ text-align: center;
+ font-size: 16px;
+ background-color: rgba(245, 245, 220, 0.7);
+ }
</style>
+ <google-signin-aware
+ id="aware"
+ on-google-signin-aware-success="_onSignin"></google-signin-aware>
+
<rpc-client
id="client"
auto-token
@@ -107,13 +128,13 @@ An element for rendering muxed LogDog log streams.
</div>
<!-- Display current fetching status, if stream data is still loading. -->
- <template is="dom-if" if="{{fetch}}">
- <div id="counter">
+ <template is="dom-if" if="{{streamStatus}}">
+ <div id="stream-status">
<table>
- <template is="dom-repeat" items="{{fetch.status}}">
+ <template is="dom-repeat" items="{{streamStatus}}">
<tr>
<td>{{item.name}}</td>
- <td>{{item.status}}</td>
+ <td>{{item.desc}}</td>
</tr>
</template>
</table>
@@ -146,6 +167,9 @@ An element for rendering muxed LogDog log streams.
<div id="bottom"></div>
</div>
+ <template is="dom-if" if="{{statusBar}}">
+ <div id="status-bar">{{statusBar.value}}</div>
+ </template>
</template>
</dom-module>
@@ -185,8 +209,8 @@ An element for rendering muxed LogDog log streams.
* renders in between elements.
*/
burst: {
- type: Array,
- value: 100,
+ type: Number,
+ value: 1000,
notify: true,
},
@@ -215,25 +239,32 @@ An element for rendering muxed LogDog log streams.
},
/**
- * The current log fetching context.
- *
- * The "Fetch" object is structured:
- * fatch.streams: An array of _BufferedStream instances for each muxed
- * stream.
- * fetch.status: The renderable status for a given stream.
+ * The current stream status. This is an Array of objects:
+ * obj.name is the name of the stream.
+ * obj.desc is the status description of the stream.
*/
- fetch: {
- type: Object,
+ streamStatus: {
+ type: String,
value: null,
notify: true,
readOnly: true,
},
+
+ /**
+ * The text content of the status element at the bottom of the page.
+ */
+ statusBar: {
+ type: String,
+ value: null,
+ readOnly: true,
+ },
},
ready: function() {
- this._setFetch(null);
this._scheduledWrite = null;
- this._bufferedLogs = null;
+ this._buffer = null;
+ this._currentLogBuffer = null;
+ this._authCallback = null;
},
detached: function() {
@@ -241,11 +272,41 @@ An element for rendering muxed LogDog log streams.
},
stop: function() {
- this._cancelFetch();
+ this._cancelFetch(true);
},
/** Clears state and begins fetching log data. */
reset: function() {
+ this._resetLogState();
+
+ this._resolveStreams().then(function(streams) {
+ this._resetToStreams(streams);
+ }.bind(this)).catch(function(error) {
+ this._loadStatusBar("Failed to resolve streams:" + error);
+ throw error;
+ }.bind(this));
+ },
+
+ /** Clears all current logs. */
+ _resetLogState: function() {
+ this._cancelFetch(true);
+
+ // Remove all current log elements. */
+ while (this.$.logs.hasChildNodes()) {
+ this.$.logs.removeChild(this.$.logs.lastChild);
+ }
+
+ // Clear our buffer and streamer state.
+ this._buffer = null;
+ this._currentLogBuffer = null;
+ if (this._streamer) {
+ this._streamer.shutdown();
+ }
+ this._streamer = null;
+ },
+
+ _resolveStreams: function() {
+ // Separate our configured streams into full stream paths and queries.
var parts = {
queries: [],
streams: [],
@@ -259,69 +320,108 @@ An element for rendering muxed LogDog log streams.
}
});
- Promise.all(parts.queries.map(function(v) {
- var params = new LogDogQueryParams(v.project).
- path(v.path).
- streamType("text");
- return new LogDogQuery(this.$.client, params).getAll();
- }.bind(this))).then(function(results) {
- // Add query results (if any) to streams.
- results.forEach(function(streams) {
- (streams || []).forEach(function(stream) {
- parts.streams.push(stream.stream);
+ // Resolve any outstanding queries into full stream paths.
+ //
+ // If we get an authentication error, register to have our query
+ // resolution callback invoked on signin changes until it works (or
+ // indefinitely).
+ var queries = parts.queries.map(function(v) {
+ var params = new LogDogQueryParams(v.project).
+ path(v.path).
+ streamType("text");
+ return new LogDogQuery(this.$.client, params);
+ }.bind(this));
+
+ var issueQuery = function() {
+ this._loadStatusBar("Resolving log streams from query...");
+ this._authCallback = null;
+
+ return Promise.all(queries.map(function(q) {
+ return q.getAll();
+ }.bind(this))).then(function(results) {
+ this._loadStatusBar(null);
+
+ // Add query results (if any) to streams.
+ results.forEach(function(streams) {
+ (streams || []).forEach(function(stream) {
+ parts.streams.push(stream.stream);
+ });
});
- });
+ parts.streams.sort(LogDogStream.cmp);
+
+ // Remove any duplicates.
+ var seenStreams = {};
+ var result = [];
+ parts.streams.forEach(function(s) {
+ var fullName = s.fullName();
+ if (!seenStreams[fullName]) {
+ seenStreams[fullName] = s;
+ result.push(s);
+ }
+ });
+ return result;
+ }.bind(this)).catch(function(error) {
+ if (error instanceof LogDogError && error.isPermissionDenied()) {
+ // Retry on auth event.
+ this._loadStatusBar("Not authorized to execute query. Log in " +
+ "with an authorized account.");
+ return new Promise(function(resolve) {
+ this._authCallback = resolve;
+ }.bind(this)).then(issueQuery);
+ }
- // Start loading the streams.
- this._resetToStreams(parts.streams);
- }.bind(this));
+ throw error;
+ }.bind(this));
+ }.bind(this);
+ return issueQuery();
},
_resetToStreams: function(streams) {
- this._cancelFetch();
- this._clearLogs();
-
-
// Unique streams.
if (!streams.length) {
+ this._loadStatusBar("No log streams.");
return;
}
console.log("Loading log streams:", streams);
+ this._loadStatusBar("Loading stream data...");
streams.sort(LogDogStream.cmp);
- // Construct our fetch context.
- var fetch = {};
- fetch.streams = streams.map(function(stream) {
- // TODO: Re-use fetcher if it already exists in the previous streams
- // map.
- return new _BufferedStream(stream, new LogDogFetcher(
- this.$.client, stream.project, stream.path));
+ // Create a _BufferedStream for each stream.
+ var bufStreams = streams.map(function(stream, idx) {
+ return new _BufferedStream(stream, this.$.client,
+ (streams.length > 1), function(bs) {
+ this._updateStreamStatus(bs, idx);
+ }.bind(this));
+ }.bind(this));
+ this._buffer = new _LogStreamBuffer();
+ this._buffer.setStreams(bufStreams)
+
+ this._streamer = new _LogStreamer(this._buffer, this.burst, function(v) {
+ this._loadStatusBar(v);
}.bind(this));
- fetch.status = fetch.streams.map(function(v, idx) {
- var name = v.stream.path;
- var lidx = name.lastIndexOf("/");
- if (lidx >= 0) {
- name = idx + " [.../" + name.substr(lidx+1) + "]";
- }
+ // Construct our initial status content.
+ this._setStreamStatus(bufStreams.map(function(bs, idx) {
return {
- name: name,
- status: this._buildStreamStatus(v, null),
+ name: (" [.../+/" + bs.stream.name() + "]"),
+ desc: bs.description(),
};
- }.bind(this));
- this._setFetch(fetch);
+ }.bind(this)));
// Kick off our log fetching.
this._scheduleWriteNextLogs();
},
/** Cancels any currently-executing log stream fetch. */
- _cancelFetch: function() {
- if (this.fetch) {
- this._setFetch(null);
- }
+ _cancelFetch: function(clear) {
this._cancelScheduledWrite();
+ this._authCallback = null;
+
+ if (clear) {
+ this._setStreamStatus(null);
+ this._loadStatusBar(null);
+ }
},
/** Cancels any scheduled asynchronous write. */
@@ -343,8 +443,10 @@ An element for rendering muxed LogDog log streams.
// to the bottom.
this._maybeScrollToBottom();
- if (!this._scheduledWrite) {
- this._scheduledWrite = this.async(this._writeNextLogs);
+ if (! this._scheduledWrite) {
+ this._scheduledWrite = this.async(function() {
+ this._writeNextLogs()
+ }.bind(this));
}
},
@@ -356,151 +458,45 @@ An element for rendering muxed LogDog log streams.
_writeNextLogs: function() {
this._cancelScheduledWrite();
- if (this._writeNextLogsImpl()) {
- // Yield so that our browser can refresh. We can't directly use
- // this.async since a timeout of "0" causes immediate execution instead
- // of yielding.
- setTimeout(this._scheduleWriteNextLogs.bind(this), 0);
- }
- },
-
- /**
- * Primary implementation of _writeNextLogs.
- *
- * Returns true if any logs were rendered.
- */
- _writeNextLogsImpl: function() {
- var fetch = this.fetch;
- if (!(fetch && fetch.streams.length)) {
- return false;
- }
-
- // Render any buffered logs.
- var buffer = this._getOrBuildLogBuffer(fetch.streams);
- if (buffer) {
- // We will track how many log entries that we've rendered. If we exceed
- // this amount, we will force a refresh so the logs appear streaming and
- // the app remains responsive.
- var rendered = 0;
- var updated = {};
-
- while (buffer.length && rendered < this.burst) {
- // Get the next log. The buffer is sorted descendingly, so we can use
- // pop to get it.
- var log = buffer.pop();
- rendered += this._appendLogEntry(log);
-
- // Record our last appended log entry for this stream.
- updated[log.fetchIndex] = log.streamIndex;
+ this._streamer.load().then(function(entries) {
+ // If there are no entries, then we're done.
+ if (! entries) {
+ // Cancel all fetching state. If our streamer is finished, also clear
+ // messages and status.
+ if (this._streamer.finished) {
+ if (this._streamer.someStreamsFailed) {
+ this._cancelFetch(false);
+ this._loadStatusBar("Some streams failed to load.");
+ } else {
+ this._cancelFetch(true);
+ }
+ } else {
+ // No more logs, but also we are not finished. Retry after auth.
+ this._authCallback = this._scheduleWriteNextLogs.bind(this);
+ }
+ return;
}
- Object.keys(updated).forEach(function(idx) {
- var statusKey = ("fetch.status." + idx + ".status");
- this.set(statusKey, this._buildStreamStatus(
- fetch.streams[idx], updated[idx]));
+ var logEntryChunk = document.createElement("div");
+ entries.forEach(function(le) {
+ this._appendLogEntry(logEntryChunk, le);
}.bind(this));
- // If we rendered any logs, we will finish this write round.
- if (rendered) {
- return true;
- }
- }
+ // To have styles apply correctly, we need to add it twice, see
+ // https://github.com/Polymer/polymer/issues/3100.
+ Polymer.dom(this.root).appendChild(logEntryChunk);
+ this.$.logs.appendChild(logEntryChunk);
- // We didn't have any buffered logs, so either all of our streams are
- // finished, or our buffer is empty and needs to be refreshed.
- if(fetch.streams.every(function(v) {
- return (v.finished());
- })) {
- console.log("All streams have been exhausted.");
- this._cancelFetch();
- return false;
- }
-
- // Fetch any streams' missing logs. If a stream already has buffered logs,
- // skip it in this fetch.
- Promise.all(fetch.streams.map(function(v) {
- if (v.finished() || v.peek() !== null) {
- // This stream still has buffered logs.
- return null;
- }
- return v.fetcher.next();
- })).then(function(result) {
- result.forEach(function(v, i) {
- if (v) {
- fetch.streams[i].load(v.entries);
- }
- }.bind(this));
- this._scheduleWriteNextLogs();
+ // Yield so that our browser can refresh. We can't directly use
+ // this.async since a timeout of "0" causes immediate execution instead
+ // of yielding.
+ setTimeout(function() {
+ this._scheduleWriteNextLogs();
+ }.bind(this), 0);
}.bind(this));
- return false;
- },
-
- /**
- * Examines the current buffered set of logs/streams. If sufficient logs
- * are buffered to render the next log, it will be immediately added and
- * this function will return "true". Otherwise, it will return "false",
- * indicating that log fetch must be performed.
- */
- _getOrBuildLogBuffer: function(streams) {
- if (this._bufferedLogs && this._bufferedLogs.length) {
- return this._bufferedLogs;
- }
-
- // If we have no active streams, we can't buffer anything.
- var active = [];
- streams.forEach(function(v, idx) {
- var next = v.peek();
- if (next) {
- active.push({
- stream: v,
- streamIndex: idx,
- next: next,
- });
- }
- });
- if (!active.length) {
- return null;
- }
-
- // Build our log buffer.
- //
- // TODO: A binary heap would be pretty great for this.
- var buffer = [];
- while (true) {
- // Choose the next stream.
- var earliest = 0;
- for (var i = 1; i < active.length; i++) {
- if (active[i].next.timestamp < active[earliest].next.timestamp) {
- earliest = i;
- }
- }
-
- // Get the next log from the earliest stream.
- //
- // Additionally, record the index in the original streams array that
- // this log came from. We need this to update stream status when the
- // log is consumed.
- var nextStream = active[earliest];
- var nextLog = nextStream.stream.pop();
- nextLog.fetchIndex = nextStream.streamIndex;
- buffer.push(nextLog);
-
- nextStream.next = nextStream.stream.peek();
- if (nextStream.next) {
- // This stream has more logs, so we can continue building our buffer.
- continue;
- }
-
- // This stream has no more buffered entries, so we're done.
- //
- // Reverse our log buffer so we can easily pop logs from it.
- buffer.reverse();
- this._bufferedLogs = buffer;
- return buffer;
- }
},
- _appendLogEntry: function(le) {
+ _appendLogEntry: function(root, le) {
var text = le.text;
if (!(text && text.lines)) {
return 0;
@@ -542,34 +538,17 @@ An element for rendering muxed LogDog log streams.
}
}
entryRow.appendChild(logDataBlock);
-
- // To have styles apply correctly, we need to add it twice, see
- // https://github.com/Polymer/polymer/issues/3100.
- Polymer.dom(this.root).appendChild(entryRow);
- this.$.logs.appendChild(entryRow);
+ root.appendChild(entryRow);
return le.text.lines.length;
},
- /** Clears all current logs. */
- _clearLogs: function() {
- while (this.$.logs.hasChildNodes()) {
- this.$.logs.removeChild(this.$.logs.lastChild);
- }
- this._bufferedLogs = null;
- },
-
- /** Constructs the log stream status object for a given stream. */
- _buildStreamStatus: function(stream, lastStreamIndex) {
- if (!lastStreamIndex && lastStreamIndex !== 0) {
- return "(Fetching)";
- }
-
- var tidx = stream.fetcher.terminalIndex;
- if (tidx >= 0) {
- return lastStreamIndex + " / " + tidx;
- }
- return lastStreamIndex + " (Streaming)";
+ _updateStreamStatus: function(bs, idx) {
+ var origStatus = this.streamStatus[idx];
+ this.splice("streamStatus", idx, 1, {
+ name: origStatus.name,
+ desc: bs.description(),
+ });
},
/** Scrolls to the bottom if "follow" is enabled. */
@@ -605,58 +584,488 @@ An element for rendering muxed LogDog log streams.
_handleMouseWheel: function() {
this.follow = false;
},
+
+ /**
+ * Loads text content into the status bar.
+ *
+ * If null is passed, the status bar will be cleared. If text is passed, the
+ * status bar will become visible with the supplied content.
+ */
+ _loadStatusBar: function(v) {
+ var st = null;
+ if (v) {
+ st = {
+ value: v,
+ };
+ }
+ this._setStatusBar(st);
+ },
+
+ _onSignin: function() {
+ var fn = this._authCallback;
+ if (fn) {
+ this._authCallback = null;
+ fn();
+ }
+ },
});
/**
- * Container for logs that have been punted.
+ * Continuously loads log streams from a _LogStreamBuffer and exposes them via
+ * callback.
*/
- function _BufferedStream(stream, fetcher) {
- this.stream = stream;
- this.fetcher = fetcher;
+ function _LogStreamer(buffer, burst, statusCallback) {
+ this._buffer = buffer;
+ this._burst = (burst || 0);
+ this._missingDelay = 5000;
+ this._statusCallback = statusCallback;
- this._logs = null;
+ this.finished = false;
+ this.someStreamsFailed = false;
+
+ this._currentLogBuffer = null;
+ }
+
+ _LogStreamer.prototype.shutdown = function() {
+ this.finshed = true;
+ };
+
+ _LogStreamer.prototype._setStatus = function(v) {
+ if (this._statusCallback) {
+ this._statusCallback(v);
+ }
+ };
+
+ _LogStreamer.prototype.load = function() {
+ if (this.finished) {
+ this._setStatus(null);
+ return Promise.resolve(null);
+ }
+
+ // If we have buffered logs, return them.
+ var current = this._currentLogBuffer;
+ if (current) {
+ // We will track how many log entries that we've rendered. If we exceed
+ // this amount, we will force a refresh so the logs appear streaming and
+ // the app remains responsive.
+ var rendered = 0;
+
+ var entries = [];
+ for (var le = current.next(); (le); le = current.next()) {
+ entries.push(le);
+ if (le.text && le.text.lines) {
+ rendered += le.text.lines.length;
+ }
+
+ if (this._burst > 0 && rendered >= this._burst) {
+ break;
+ }
+ }
+
+ // Have we exhausted this buffer?
+ if (! current.peek()) {
+ this._currentLogBuffer = null;
+ }
+
+ // Return the bundle of entries.
+ return Promise.resolve(entries);
+ }
+
+ // We didn't have any buffered logs, so either all of our streams are
+ // finished or our buffer is empty and needs to be refreshed.
+ this._setStatus("Loading log stream data...");
+ return this._buffer.nextBuffer().then(function(buf) {
+ this.someStreamsFailed = (!!this._buffer._failures.length);
+
+ // Check result.
+ if (buf === null) {
+ if (this._buffer.finished) {
+ // No more buffers, we are done.
+ console.log("All streams have been exhausted.");
+ this.finished = true;
+ this._setStatus(null);
+ return null;
+ }
+
+ // The buffer was incomplete. Should we retry after a delay, or do
+ // we need to wait for an explicit edge (e.g., auth)?
+ if (this._buffer.autoRetry) {
+ // Sleep for 5 seconds and try again (waiting).
+ console.log("Log stream delayed; sleeping", this._missingDelay,
+ "and retry.");
+ this._setStatus("Missing log streams, retrying after delay...");
+ return new LuciSleepPromise(this._missingDelay).then(function() {
+ if (this.finished) {
+ console.log("Streamer is deactivated, discarding.");
+ return null;
+ }
+
+ return this.load();
+ }.bind(this));
+ }
+
+ this._setStatus("Some log streams could not be loaded.");
+ return null;
+ }
+
+ // Install the new buffer and re-enter.
+ this._currentLogBuffer = buf;
+ return this.load();
+ }.bind(this)).catch(function(error) {
+ this._setStatus("[" + error.name + "] fetching streams: " +
+ error.message);
+ throw error;
+ }.bind(this));
+ };
+
+ /**
+ * Manages an aggregate log stream buffer, consisting of logs punted from a
+ * set of zero or more _BufferedStream instances.
+ */
+ function _LogStreamBuffer() {
+ this._streams = null;
+ this._active = null;
+ this._nextBufferPromise = null;
+ this._failures = [];
+
+ this.finished = false;
+ this._resetIterativeState();
+ }
+
+ _LogStreamBuffer.prototype.setStreams = function(streams) {
+ // TODO(dnj): Make this do a delta with previous streams so we don't lose
+ // their already-loaded logs if the page changes.
+ this._streams = streams.map(function(bs, i) {
+ return {
+ bs: bs,
+ active: true,
+ buffer: new _BufferedLogs(),
+ };
+ });
+ this._active = this._streams;
+ this._nextBufferPromise = null;
+ };
+
+ _LogStreamBuffer.prototype._resetIterativeState = function() {
+ this.autoRetry = false;
};
+
/**
- * Refresh the buffer with the contents of the supplied logs array.
+ * Returns a Promise that resolves into a _BufferedLogs instance containing
+ * the next set of logs, in order, from the source log streams.
+ *
+ * The _BufferedLogs bundle may have status flags set, and should be checked.
+ *
+ * The Promise will also resolve to "null" if there are no more logs in the
+ * source streams.
*
- * @param {Array[Object]} logs The LogEntry protobuf objects from the fetcher
- * to load.
+ * If there are errors fetching logs, the Promise will be rejected, and an
+ * error will be returned.
*/
- _BufferedStream.prototype.load = function(logs) {
- // Disallow a state where "logs" is not null but empty.
- if (!(logs && logs.length)) {
- this._logs = null;
- return;
+ _LogStreamBuffer.prototype.nextBuffer = function() {
+ // If we're already are fetching the next buffer, return that Promise.
+ if (this._nextBufferPromise) {
+ return this._nextBufferPromise;
+ }
+
+ // Filter our any finished streams from our active list. A stream is
+ // finished if it is finished streaming and we don't have a retained buffer
+ // from it.
+ this._active = this._active.filter(function(entry) {
+ return (entry.buffer.peek() || (! (entry.bs.finished || entry.bs.error)));
+ })
+
+ if (! this._active.length) {
+ this.finished = true;
+ }
+ if (this.finished) {
+ // No active streams, so we're finished. Permanently set our promise to
+ // the finished state.
+ this._nextBufferPromise = Promise.resolve(null);
+ return this._nextBufferPromise;
+ }
+
+ // Fill all buffers for all active streams. This may result in an RPC to
+ // load new buffer content for streams whose buffers are empty.
+ //
+ // RPC failures are handled here:
+ // - If the stream reports "not found", we will terminate early and set
+ // out status to "waiting". Our owner should retry after a delay.
+ // - Otherwise, we will set our status to "error". Our owner should report
+ // that an error has occurred while loading stream data.
+ this._resetIterativeState();
+
+ var incomplete = false;
+ this._nextBufferPromise = Promise.all(this._active.map(function(entry) {
+ // If the entry's buffer still has data, use it immediately.
+ if (entry.buffer.peek()) {
+ return entry.buffer;
+ }
+
+ // Get the next log buffer for each stream. This may result in an RPC.
+ return entry.bs.nextBuffer().then(function(buffer) {
+ // Retain this buffer, if valid. The stream may have returned a null
+ // buffer if it failed to fetch for a legitimate reason. In this case,
+ // we will not retain it (since we want entry.buffer to be valid), but
+ // will forward the "null" to our aggregate function.
+ if (buffer) {
+ entry.buffer = buffer;
+ } else {
+ incomplete = true;
+
+ // If this stream is waiting, but not on auth, mark that we should
+ // automatically retry.
+ if (entry.bs.waiting && !entry.bs.auth) {
+ this.autoRetry = true;
+ }
+ }
+ return buffer;
+ }.bind(this)).catch(function(error) {
+ // Log stream source of error. Raise a generic "failed to buffer"
+ // error. This will become a permanent failure.
+ console.error("Error loading buffer for", entry.bs.stream.fullName(),
+ "(", entry.bs, "): ", error);
+ this._failures.push(entry.bs);
+ return null;
+ }.bind(this));
+ }.bind(this))).then(function(buffers) {
+ this._nextBufferPromise = null;
+
+ // Check each buffer. If any are null, that stream failed to deliver.
+ if (incomplete) {
+ // We succeeded, but are incomplete. At least one stream failed to
+ // deliver and should have state flags set accordingly.
+ return null;
+ }
+
+ // Remove any null buffers. These would be placed here when a stream fails
+ // to load. Aggregate as much data from each of our streams as possible.
+ buffers = buffers.filter(v => (!!v));
+ return this._aggregateBuffers(buffers);
+ }.bind(this));
+ return this._nextBufferPromise;
+ };
+
+ _LogStreamBuffer.prototype._aggregateBuffers = function(buffers) {
+ switch (buffers.length) {
+ case 0:
+ // No buffers, so no logs.
+ return new _BufferedLogs(null);
+ case 1:
+ // As a special case, if we only have one buffer, and we assume that its
+ // entries are sorted, then that buffer is a return value.
+ return new _BufferedLogs(buffers[0].getAll());
+ }
+
+ // Preload our peek array.
+ var incomplete = false;
+ var peek = buffers.map(function(buf) {
+ var le = buf.peek();
+ if (! le) {
+ incomplete = true;
+ }
+ return le;
+ });
+ if (incomplete) {
+ // One of our input buffers had no log entries.
+ return new _BufferedLogs(null);
}
- // Clone and reverse the logs. This means that the last log will be the
- // earliest.
- this._logs = logs.splice(0);
- this._logs.reverse();
+ // Assemble our aggregate buffer array.
+ // TODO: A binary heap would be pretty great for this.
+ var entries = [];
+ while (true) {
+ // Choose the next stream.
+ var earliest = 0;
+ for (var i = 1; i < buffers.length; i++) {
+ if (_LogStreamBuffer.compareLogs(peek[i], peek[earliest]) < 0) {
+ earliest = i;
+ }
+ }
+
+ // Get the next log from the earliest stream.
+ entries.push(buffers[earliest].next());
+
+ // Repopulate that buffer's "peek" value. If the buffer has no more
+ // entries, then we're done.
+ var next = buffers[earliest].peek();
+ if (!next) {
+ return new _BufferedLogs(entries);
+ }
+ peek[earliest] = next;
+ }
};
- /** @returns {Object} The next buffered log, or null if none are buffered */
- _BufferedStream.prototype.peek = function() {
- return (this._logs) ? (this._logs[this._logs.length-1]) : (null);
+
+ _LogStreamBuffer.compareLogs = function(a, b) {
+ // If they are part of the same stream, compare prefix indexes.
+ if (a.source.stream.samePrefixAs(b.source.stream)) {
+ return (a.prefixIndex - b.prefixIndex);
+ }
+
+ // Compare based on timestamp.
+ return a.timestamp - b.timestamp;
};
+
+
/**
- * Returns the next buffered log, removing it from the buffer.
+ * A buffer of ordered log entries from all streams.
*
- * @return {Object} The next buffered LogEntry, or null if the buffer is
- * empty.
+ * Assumes total ownership of the input log buffer, which can be null to
+ * indicate no logs.
*/
- _BufferedStream.prototype.pop = function() {
- if (!this._logs) {
+ function _BufferedLogs(logs) {
+ this._logs = logs;
+ this._index = 0;
+ }
+
+ _BufferedLogs.prototype.getAll = function() {
+ // Pop all logs.
+ var logs = this._logs;
+ this._logs = null;
+ return logs;
+ };
+
+ _BufferedLogs.prototype.peek = function() {
+ return (this._logs) ? (this._logs[this._index]) : (null);
+ };
+
+ _BufferedLogs.prototype.next = function() {
+ if (! (this._logs && this._logs.length)) {
return null;
}
- var log = this._logs.pop();
- if (!this._logs.length) {
+ // Get the next log and increment our index.
+ var log = this._logs[this._index++];
+ if (this._index >= this._logs.length) {
this._logs = null;
}
return log;
};
- /** @returns {bool} true if the log stream is finished being fetched. */
- _BufferedStream.prototype.finished = function() {
- return this.fetcher.finished;
+
+
+ /**
+ * Stateful log fetching manager for a single log stream.
+ */
+ function _BufferedStream(stream, client, oneOfMany, statusCallback) {
+ this.stream = stream;
+
+ this.error = null;
+ this.finished = false;
+
+ this._fetcher = new LogDogFetcher(client, stream);
+ this._oneOfMany = oneOfMany;
+ this._statusCallback = statusCallback;
+ this._lastFetchIndex = null;
+ }
+
+ _BufferedStream.INITIAL_FETCH_SIZE = 4096;
+
+ _BufferedStream.prototype._resetIterativeState = function() {
+ this.waiting = false;
+ this.auth = false;
+ this._fireStatusUpdated();
+
+ this._currentFetch = null;
+ };
+
+ _BufferedStream.prototype.nextBuffer = function() {
+ if (this._currentFetch) {
+ return this._currentFetch;
+ }
+
+ // Reset per-round state and begin next round fetch.
+ this._resetIterativeState();
+
+ // If this is the first fetch, and we're not the only log stream being
+ // rendered, fetch a small amount so we can (probably) start rendering
+ // without waiting for a lot of huge chunks.
+ this._fetcher.byteCount = (
+ (this._lastFetchIndex === null) && this._oneOfMany) ?
+ (_BufferedStream.INITIAL_FETCH_SIZE) : (null);
+
+ this._currentFetch = this._fetcher.next().then(function(result) {
+ this._currentFetch = null;
+
+ // Update our stream information.
+ this.finished = this._fetcher.finished;
+
+ // Augment each returned log entry with self-descriptive metadata.
+ var logs = result.entries;
+ if (logs && logs.length) {
+ logs.forEach(function(le) {
+ le.desc = result.desc;
+ le.state = result.state;
+ le.source = this;
+ }.bind(this));
+
+ // Record the latest fetch index.
+ this._lastFetchIndex = logs[logs.length - 1].streamIndex;
+ }
+
+ this._fireStatusUpdated();
+ return new _BufferedLogs(logs);
+ }.bind(this)).catch(function(error) {;
+ // If this is a "not found" error, we assume that the stream is valid, but
+ // hasn't been ingested into LogDog yet. Return "null".
+ if (error instanceof LogDogError) {
+ if (error.isPermissionDenied()) {
+ this.waiting = true;
+ this.auth = true;
+ } else if (error.isNotFound()) {
+ this.waiting = true;
+ }
+
+ // If this is an error that we understand, recover from it, return
+ // null, and set our status flags.
+ if (this.waiting) {
+ // Recover from this error.
+ this._currentFetch = null;
+ this._fireStatusUpdated();
+ return null;
+ }
+ }
+
+ // Retain this error forever.
+ this.error = error;
+ throw error;
+ }.bind(this));
+ return this._currentFetch;
+ };
+
+ _BufferedStream.prototype._fireStatusUpdated = function() {
+ if (this._statusCallback) {
+ this._statusCallback(this);
+ }
+ };
+
+ _BufferedStream.prototype.description = function() {
+ if (this._waiting) {
+ return "(Waiting)";
+ }
+
+ var pieces = []
+ var tidx = this._fetcher.terminalIndex();
+ if (this._lastFetchIndex) {
+ if (tidx >= 0) {
+ pieces.push(this._lastFetchIndex + " / " + tidx);
+ } else {
+ pieces.push(this._lastFetchIndex + " ?");
+ }
+ }
+
+ if (this.error) {
+ pieces.push("(Error)");
+ } else if (this.auth) {
+ pieces.push("(Auth Error)");
+ } else if (this.waiting) {
+ pieces.push("(Waiting)");
+ } else if (!this._fetcher.state) {
+ pieces.push("(Fetching)");
+ } else if (this._fetcher.finished) {
+ pieces.push("(Finished)");
+ } else {
+ pieces.push("(Streaming)");
+ }
+ return pieces.join(" ");
};
</script>
« no previous file with comments | « web/inc/logdog-stream-view/logdog-stream-query.html ('k') | web/inc/logdog-stream/logdog-error.html » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698