| OLD | NEW |
| (Empty) | |
| 1 <!-- |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 Use of this source code is governed under the Apache License, Version 2.0 |
| 4 that can be found in the LICENSE file. |
| 5 --> |
| 6 |
| 7 <link rel="import" href="../bower_components/polymer/polymer.html"> |
| 8 <link rel="import" href="../bower_components/promise-polyfill/promise-polyfill-l
ite.html"> |
| 9 |
| 10 <link rel="import" href="../luci-sleep-promise/luci-sleep-promise.html"> |
| 11 |
| 12 <script> |
| 13 "use strict"; |
| 14 |
| 15 function LogDogFetcher(client, stream) { |
| 16 this.client = client; |
| 17 this.stream = stream; |
| 18 |
| 19 // Fetching parameters, will be updated as logs are fetched. |
| 20 this.sleepTimeSecs = 5; |
| 21 this.byteCount = null; |
| 22 this.logCount = null; |
| 23 this.reset(); |
| 24 } |
| 25 |
| 26 LogDogFetcher.prototype.reset = function() { |
| 27 this.nextIndex = 0; |
| 28 this.finished = false; |
| 29 this.desc = null; |
| 30 this.state = null; |
| 31 |
| 32 this._current = null; |
| 33 this._nextLogsPromise = null; |
| 34 }; |
| 35 |
| 36 /** |
| 37 * Returns the log stream's terminal index. |
| 38 * |
| 39 * If no terminal index is known, or if the log stream is still streaming, |
| 40 * this will return -1. |
| 41 */ |
| 42 LogDogFetcher.prototype.terminalIndex = function() { |
| 43 return (this.state) ? (this.state.terminalIndex) : (-1); |
| 44 }; |
| 45 |
| 46 /** |
| 47 * Returns a Promise that resolves to the next block of logs in the stream. |
| 48 * |
| 49 * If there are no more logs in the stream (finished), the returned Promise |
| 50 * will already be resolved and will contain a null log. |
| 51 * |
| 52 * @return {Promise[Object]} A Promise that will resolve to the next block |
| 53 * of logs in the stream. |
| 54 */ |
| 55 LogDogFetcher.prototype.next = function() { |
| 56 // If we don't have an in-progress fetch, start a new one. |
| 57 if (this._nextLogsPromise === null) { |
| 58 this._nextLogsPromise = this._fetchNextBatch(). |
| 59 then(function(result) { |
| 60 var entries = result.entries; |
| 61 if (entries && entries.length) { |
| 62 var lastIndex = entries[entries.length-1].streamIndex; |
| 63 this.nextIndex = (lastIndex + 1); |
| 64 |
| 65 var tidx = this.terminalIndex(); |
| 66 if (tidx >= 0 && tidx < this.nextIndex) { |
| 67 // We have punted the full log stream. Mark finished. |
| 68 this.finished = true; |
| 69 } |
| 70 } |
| 71 |
| 72 this._nextLogsPromise = null; |
| 73 return result; |
| 74 }.bind(this)).catch(function(error) { |
| 75 this._nextLogsPromise = null; |
| 76 throw error; |
| 77 }.bind(this)); |
| 78 } |
| 79 return this._nextLogsPromise; |
| 80 }, |
| 81 |
| 82 /** Creates and returns a Promise for the next batch of logs. */ |
| 83 LogDogFetcher.prototype._fetchNextBatch = function() { |
| 84 // If we're already finished, return the terminal result. |
| 85 if (this.finished) { |
| 86 return this._resolvedLogs(null); |
| 87 } |
| 88 |
| 89 // Fetch and return the next batch of logs. |
| 90 return this._scheduleAsyncGet().then(function(resp) { |
| 91 // Update our state/desc. |
| 92 if (resp.state) { |
| 93 this.state = resp.state; |
| 94 } |
| 95 if (resp.desc) { |
| 96 this.desc = resp.desc; |
| 97 } |
| 98 |
| 99 var logs = resp.logs; |
| 100 if (! (logs && logs.length)) { |
| 101 // No logs were loaded this round. Sleep for a bit then try again. |
| 102 // (Streaming case). |
| 103 console.log("No logs for", this.stream, "; sleeping..."); |
| 104 return new LuciSleepPromise(this.sleepTimeSecs * 1000). |
| 105 then(function() { |
| 106 return this._fetchNextBatch(); |
| 107 }.bind(this)); |
| 108 } |
| 109 |
| 110 return this._resolvedLogs(logs); |
| 111 }.bind(this)); |
| 112 }; |
| 113 |
| 114 /** Generates a structured Promise for a given block of log entries. */ |
| 115 LogDogFetcher.prototype._resolvedLogs = function(punt) { |
| 116 return Promise.resolve({ |
| 117 desc: this.desc, |
| 118 state: this.state, |
| 119 entries: punt, |
| 120 }); |
| 121 }; |
| 122 |
| 123 /** Schedules the next asynchronous fetch. */ |
| 124 LogDogFetcher.prototype._scheduleAsyncGet = function() { |
| 125 this.client.service = "logdog.Logs"; |
| 126 this.client.method = "Get"; |
| 127 this.client.request = { |
| 128 project: this.stream.project, |
| 129 path: this.stream.path, |
| 130 state: (!this.state || this.terminalIndex() < 0), |
| 131 index: this.nextIndex, |
| 132 }; |
| 133 |
| 134 if (this.byteCount !== null) { |
| 135 this.client.request.byteCount = this.byteCount; |
| 136 } |
| 137 if (this.logCount !== null) { |
| 138 this.client.request.logCount = this.logCount; |
| 139 } |
| 140 |
| 141 return this.client.call().completes.then(function(resp) { |
| 142 resp = resp.response; |
| 143 |
| 144 // Normalize the resulting logs. |
| 145 // |
| 146 // JSONPB timestamps are in the form of RFC3339 strings. |
| 147 if (resp.desc) { |
| 148 patchDescriptor(resp.desc); |
| 149 } |
| 150 if (resp.state) { |
| 151 patchState(resp.state); |
| 152 } |
| 153 if (resp.logs) { |
| 154 resp.logs.forEach(function(le) { |
| 155 patchLogEntry(le, resp.desc); |
| 156 }); |
| 157 } |
| 158 |
| 159 return resp; |
| 160 }, function(error) { |
| 161 throw LogDogError.wrapGrpc(error); |
| 162 }); |
| 163 }; |
| 164 </script> |
| OLD | NEW |