| Index: web/inc/logdog-stream-view/logdog-stream-fetcher.html
|
| diff --git a/web/inc/logdog-stream-view/logdog-stream-fetcher.html b/web/inc/logdog-stream-view/logdog-stream-fetcher.html
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..f0d89bb9c19813b4b06c1945d5316fb936edd53e
|
| --- /dev/null
|
| +++ b/web/inc/logdog-stream-view/logdog-stream-fetcher.html
|
| @@ -0,0 +1,164 @@
|
| +<!--
|
| + Copyright 2016 The LUCI Authors. All rights reserved.
|
| + Use of this source code is governed under the Apache License, Version 2.0
|
| + that can be found in the LICENSE file.
|
| + -->
|
| +
|
| +<link rel="import" href="../bower_components/polymer/polymer.html">
|
| +<link rel="import" href="../bower_components/promise-polyfill/promise-polyfill-lite.html">
|
| +
|
| +<link rel="import" href="../luci-sleep-promise/luci-sleep-promise.html">
|
| +
|
| +<script>
|
| + "use strict";
|
| +
|
| + function LogDogFetcher(client, stream) {
|
| + this.client = client;
|
| + this.stream = stream;
|
| +
|
| + // Fetching parameters, will be updated as logs are fetched.
|
| + this.sleepTimeSecs = 5;
|
| + this.byteCount = null;
|
| + this.logCount = null;
|
| + this.reset();
|
| + }
|
| +
|
| + LogDogFetcher.prototype.reset = function() {
|
| + this.nextIndex = 0;
|
| + this.finished = false;
|
| + this.desc = null;
|
| + this.state = null;
|
| +
|
| + this._current = null;
|
| + this._nextLogsPromise = null;
|
| + };
|
| +
|
| + /**
|
| + * Returns the log stream's terminal index.
|
| + *
|
| + * If no terminal index is known, or if the log stream is still streaming,
|
| + * this will return -1.
|
| + */
|
| + LogDogFetcher.prototype.terminalIndex = function() {
|
| + return (this.state) ? (this.state.terminalIndex) : (-1);
|
| + };
|
| +
|
| + /**
|
| + * Returns a Promise that resolves to the next block of logs in the stream.
|
| + *
|
| + * If there are no more logs in the stream (finished), the returned Promise
|
| + * will already be resolved and will contain a null log.
|
| + *
|
| + * @return {Promise[Object]} A Promise that will resolve to the next block
|
| + * of logs in the stream.
|
| + */
|
| + LogDogFetcher.prototype.next = function() {
|
| + // If we don't have an in-progress fetch, start a new one.
|
| + if (this._nextLogsPromise === null) {
|
| + this._nextLogsPromise = this._fetchNextBatch().
|
| + then(function(result) {
|
| + var entries = result.entries;
|
| + if (entries && entries.length) {
|
| + var lastIndex = entries[entries.length-1].streamIndex;
|
| + this.nextIndex = (lastIndex + 1);
|
| +
|
| + var tidx = this.terminalIndex();
|
| + if (tidx >= 0 && tidx < this.nextIndex) {
|
| + // We have punted the full log stream. Mark finished.
|
| + this.finished = true;
|
| + }
|
| + }
|
| +
|
| + this._nextLogsPromise = null;
|
| + return result;
|
| + }.bind(this)).catch(function(error) {
|
| + this._nextLogsPromise = null;
|
| + throw error;
|
| + }.bind(this));
|
| + }
|
| + return this._nextLogsPromise;
|
| + },
|
| +
|
| + /** Creates and returns a Promise for the next batch of logs. */
|
| + LogDogFetcher.prototype._fetchNextBatch = function() {
|
| + // If we're already finished, return the terminal result.
|
| + if (this.finished) {
|
| + return this._resolvedLogs(null);
|
| + }
|
| +
|
| + // Fetch and return the next batch of logs.
|
| + return this._scheduleAsyncGet().then(function(resp) {
|
| + // Update our state/desc.
|
| + if (resp.state) {
|
| + this.state = resp.state;
|
| + }
|
| + if (resp.desc) {
|
| + this.desc = resp.desc;
|
| + }
|
| +
|
| + var logs = resp.logs;
|
| + if (! (logs && logs.length)) {
|
| + // No logs were loaded this round. Sleep for a bit then try again.
|
| + // (Streaming case).
|
| + console.log("No logs for", this.stream, "; sleeping...");
|
| + return new LuciSleepPromise(this.sleepTimeSecs * 1000).
|
| + then(function() {
|
| + return this._fetchNextBatch();
|
| + }.bind(this));
|
| + }
|
| +
|
| + return this._resolvedLogs(logs);
|
| + }.bind(this));
|
| + };
|
| +
|
| + /** Generates a structured Promise for a given block of log entries. */
|
| + LogDogFetcher.prototype._resolvedLogs = function(punt) {
|
| + return Promise.resolve({
|
| + desc: this.desc,
|
| + state: this.state,
|
| + entries: punt,
|
| + });
|
| + };
|
| +
|
| + /** Schedules the next asynchronous fetch. */
|
| + LogDogFetcher.prototype._scheduleAsyncGet = function() {
|
| + this.client.service = "logdog.Logs";
|
| + this.client.method = "Get";
|
| + this.client.request = {
|
| + project: this.stream.project,
|
| + path: this.stream.path,
|
| + state: (!this.state || this.terminalIndex() < 0),
|
| + index: this.nextIndex,
|
| + };
|
| +
|
| + if (this.byteCount !== null) {
|
| + this.client.request.byteCount = this.byteCount;
|
| + }
|
| + if (this.logCount !== null) {
|
| + this.client.request.logCount = this.logCount;
|
| + }
|
| +
|
| + return this.client.call().completes.then(function(resp) {
|
| + resp = resp.response;
|
| +
|
| + // Normalize the resulting logs.
|
| + //
|
| + // JSONPB timestamps are in the form of RFC3339 strings.
|
| + if (resp.desc) {
|
| + patchDescriptor(resp.desc);
|
| + }
|
| + if (resp.state) {
|
| + patchState(resp.state);
|
| + }
|
| + if (resp.logs) {
|
| + resp.logs.forEach(function(le) {
|
| + patchLogEntry(le, resp.desc);
|
| + });
|
| + }
|
| +
|
| + return resp;
|
| + }, function(error) {
|
| + throw LogDogError.wrapGrpc(error);
|
| + });
|
| + };
|
| +</script>
|
|
|