Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(53)

Side by Side Diff: web/inc/logdog-stream-view/fetcher.ts

Issue 2717043002: Add LogDog log stream fetcher code. (Closed)
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 Copyright 2016 The LUCI Authors. All rights reserved.
3 Use of this source code is governed under the Apache License, Version 2.0
4 that can be found in the LICENSE file.
5 */
6
7 ///<reference path="../logdog-stream/logdog.ts" />
8 ///<reference path="../luci-sleep-promise/promise.ts" />
9 ///<reference path="../rpc/client.ts" />
10
11 namespace LogDog {
12
13 export type FetcherOptions = {
14 byteCount?: number; logCount?: number; sparse?: boolean;
15 };
16
17 // Type of a "Get" or "Tail" response (protobuf).
18 type GetResponse = {state: any; desc: any; logs: any[];};
19
20 /** The Fetcher's current status. */
21 export enum FetcherStatus {
22 // Not doing anything.
23 IDLE,
24 // Attempting to load log data.
25 LOADING,
26 // We're waiting for the log stream to emit more logs.
27 STREAMING,
28 // The log stream is missing.
29 MISSING,
30 // The log stream encountered an error.
31 ERROR
32 }
33
34 /**
35 * Fetcher is responsible for fetching LogDog log stream entries from the
36 * remote service via an RPC client.
37 *
38 * Fetcher is responsible for wrapping the raw RPC calls and their results,
39 * and retrying calls due to: - Transient failures (via RPC client). - Missing
hinoka 2017/02/27 21:50:25 newline for bullet points.
dnj 2017/03/08 04:13:43 Done.
40 * stream (assumption is that the stream is still being ingested and
41 * registered, and therefore a repeated retry is appropriate).
42 * - Streaming stream (log stream is not terminated, but more records are not
43 * yet available).
44 *
45 * The interface that Fetcher presents to its caller is a simple Promise-based
46 * method to retrieve log stream data.
47 *
48 * Fetcher offers fetching via "get", "getAll", and "tail".
hinoka 2017/02/27 21:50:25 how about "getLast" instead of "tail"?
dnj 2017/03/08 04:13:43 It's not the last though. Maybe "latest"?
49 */
50 export class Fetcher {
51 private debug = false;
52 private static maxLogsPerGet = 0;
53
54 private lastDesc: LogDog.LogStreamDescriptor;
55 private lastState: LogDog.LogStreamState;
56
57 private currentStatus: FetcherStatus = FetcherStatus.IDLE;
58 private lastErrorValue: Error|null;
59 private statusChangedCallback: (() => void);
60
61 private static missingRetry: luci.Retry = {delay: 5000, maxDelay: 15000};
62 private static streamingRetry: luci.Retry = {delay: 1000, maxDelay: 5000};
63
64 constructor(private client: luci.Client, readonly stream: LogDog.Stream) {}
65
66 get desc() {
67 return this.lastDesc;
68 }
69 get state() {
70 return this.lastState;
71 }
72
73 get status() {
74 return this.currentStatus;
75 }
76
77 /**
78 * Returns the log stream's terminal index.
79 *
80 * If no terminal index is known, or if the log stream is still streaming,
hinoka 2017/02/27 21:50:26 "If no terminal index is known because the log is
dnj 2017/03/08 04:13:43 Done.
81 * this will return -1.
82 */
83 get terminalIndex(): number {
84 return ((this.lastState) ? this.lastState.terminalIndex : -1);
85 }
86
87 /** Archived returns true if this log stream is known to be archived. */
88 get archived(): boolean {
89 return (!!(this.lastState && this.lastState.archive));
hinoka 2017/02/27 21:50:25 does bool(...) exist? That would be more readable
dnj 2017/03/08 04:13:43 No :(
90 }
91
92 get lastError(): Error|null {
93 return this.lastErrorValue;
94 }
95
96 private setCurrentStatus(st: FetcherStatus, err?: Error) {
97 if (st !== this.currentStatus || err !== this.lastErrorValue) {
98 this.currentStatus = st;
hinoka 2017/02/27 21:50:25 If these are set together, should they be called e
dnj 2017/03/08 04:13:43 Done.
99 this.lastErrorValue = (err || null);
100
101 if (this.statusChangedCallback) {
102 this.statusChangedCallback();
103 };
104 }
105 }
106
107 /**
108 * Sets the status changed callback, which will be invoked whenever the
109 * Fetcher's status has changed.
110 */
111 setStatusChangedCallback(fn: () => void) {
112 this.statusChangedCallback = fn;
113 }
114
115 /**
116 * Returns a Promise that resolves to the next block of logs in the stream.
hinoka 2017/02/27 21:50:25 "that will resolve to"
dnj 2017/03/08 04:13:43 Done.
117 *
118 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the
119 * next block of logs in the stream, or null if there are no logs to
hinoka 2017/02/27 21:50:25 "no logs to return currently" or "no more logs to
dnj 2017/03/08 04:13:43 Pretty sure this can't be null. Updated comment.
120 * return.
121 */
122 get(index: number, opts: FetcherOptions): Promise<LogDog.LogEntry[]> {
123 return this.getIndex(index, opts);
124 }
125
126 /**
127 * Returns a Promise that resolves to "count" log entries starting at
128 * "startIndex".
129 *
130 * If multiple RPC calls are required to retrieve "count" entries, these
131 * will be scheduled, and the Promise will block until the full set of
132 * requested stream entries is retrieved.
133 */
134 getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> {
135 // Request the tail walkback logs. Since our request for N logs may return
136 // <N logs, we will repeat the request until all requested logs have been
137 // obtained.
138 let allLogs: LogDog.LogEntry[] = [];
139
140 let getIter = (): Promise<LogDog.LogEntry[]> => {
141 if (count <= 0) {
142 return Promise.resolve(allLogs);
143 }
144
145 // Perform Gets until we have the requested number of logs. We don't
146 // have to constrain the "logCount" parameter b/c we automatically do
147 // that in getIndex.
148 let opts: FetcherOptions = {
149 logCount: count,
150 sparse: true,
151 };
152 return this.getIndex(startIndex, opts).then((logs) => {
153 if (logs) {
154 allLogs.push.apply(allLogs, logs);
155 startIndex += logs.length;
156 count -= logs.length;
157 }
158 return getIter();
159 });
160 };
161 return getIter();
162 }
163
164 /**
165 * Fetches the tail log entry.
166 */
167 tail(): Promise<LogDog.LogEntry[]> {
hinoka 2017/02/27 21:50:25 getLast()?
dnj 2017/03/08 04:13:43 getLatest
168 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry);
169 let tryTail = (): Promise<LogDog.LogEntry[]> => {
170 return this.doTail().then((logs): Promise<LogDog.LogEntry[]> => {
171 if (logs && logs.length) {
172 return Promise.resolve(logs);
173 }
174
175 // No logs were returned, and we expect logs, so we're streaming. Try
176 // again after a delay.
177 this.setCurrentStatus(FetcherStatus.STREAMING);
178 let delay = streamingRetry.next();
179 console.warn(
180 this.stream, `: No logs returned; retrying after ${delay}ms...`);
181 return luci.sleepPromise(delay).then(() => {
182 return tryTail();
183 });
184 });
185 };
186 return tryTail();
187 }
188
189
190 private getIndex(index: number, opts: FetcherOptions):
191 Promise<LogDog.LogEntry[]> {
192 // (Testing) Constrain our max logs, if set.
hinoka 2017/02/27 21:50:25 Is this actually testing?
dnj 2017/03/08 04:13:43 Used by another module, but yes.
193 if (Fetcher.maxLogsPerGet > 0) {
194 if (!opts) {
195 opts = {};
196 }
197 if ((!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet) {
198 opts.logCount = Fetcher.maxLogsPerGet;
199 }
200 }
201
202 // We will retry continuously until we get a log (streaming).
203 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry);
hinoka 2017/02/27 21:50:25 Why are there 3 layers of retries (I counted 3. t
dnj 2017/03/08 04:13:43 There should be two: "retryIterator": streaming:
204 let tryGet = (): Promise<LogDog.LogEntry[]> => {
205 // If we're asking for a log beyond our stream, don't bother.
206 if (this.terminalIndex >= 0 && index > this.terminalIndex) {
207 return Promise.resolve([]);
208 }
209
210 return this.doGet(index, opts).then((logs) => {
211 if (logs && logs.length) {
212 // Since we allow non-contiguous Get, we may get back more logs than
213 // we actually expected. Prune any such additional.
214 if (opts.logCount > 0) {
215 let maxStreamIndex = index + opts.logCount - 1;
216 logs = logs.filter((le) => {
217 return le.streamIndex <= maxStreamIndex;
218 });
219 }
220
221 return Promise.resolve(logs);
222 }
223
224 // No logs were returned, and we expect logs, so we're streaming. Try
225 // again after a delay.
226 this.setCurrentStatus(FetcherStatus.STREAMING);
227 let delay = streamingRetry.next();
228 console.warn(
229 this.stream, `: No logs returned; retrying after ${delay}ms...`);
230 return luci.sleepPromise(delay).then(() => {
231 return tryGet();
hinoka 2017/02/27 21:50:25 Wouldn't this increase the calling stack size on e
dnj 2017/03/08 04:13:43 I think so. Simplified via "do", which should not
232 });
233 });
234 };
235 return tryGet();
236 }
237
238 private doGet(index: number, opts: FetcherOptions):
239 Promise<LogDog.LogEntry[]> {
240 let request: {
241 project: string; path: string; state: boolean; index: number;
242
243 nonContiguous?: boolean;
244 byteCount?: number;
245 logCount?: number;
246 } = {
247 project: this.stream.project,
248 path: this.stream.path,
249 state: (this.terminalIndex < 0),
250 index: index,
251 };
252 if (opts.sparse || this.archived) {
253 // This log stream is archived. We will relax the contiguous requirement
254 // so we can render sparse log streams.
255 request.nonContiguous = true;
256 }
257 if (opts) {
258 if (opts.byteCount > 0) {
259 request.byteCount = opts.byteCount;
260 }
261 if (opts.logCount > 0) {
262 request.logCount = opts.logCount;
263 }
264 }
265
266 if (this.debug) {
267 console.log('logdog.Logs.Get:', request);
268 }
269
270 // Perform our Get, waiting until the stream actually exists.
271 return this
272 .doRetryIfMissing(
273 ():
274 Promise<FetchResult> => {
275 return this.client.call('logdog.Logs', 'Get', request)
276 .then((resp: GetResponse): FetchResult => {
277 return FetchResult.make(resp, this.lastDesc);
278 });
279 })
280 .then((fr) => {
281 return this.afterProcessResult(fr);
282 });
283 }
284
285 private doTail(): Promise<LogDog.LogEntry[]> {
hinoka 2017/02/27 21:50:25 doGetLast()?
dnj 2017/03/08 04:13:43 Since Tail is actually the name of the RPC call, I
286 let request: {project: string; path: string; state: boolean;} = {
287 project: this.stream.project,
288 path: this.stream.path,
289 state: (this.terminalIndex < 0),
290 };
291
292 if (this.debug) {
293 console.log('logdog.Logs.Tail:', request);
294 }
295
296 return this
297 .doRetryIfMissing(
298 ():
299 Promise<FetchResult> => {
300 return this.client.call('logdog.Logs', 'Tail', request)
301 .then((resp: GetResponse): FetchResult => {
302 return FetchResult.make(resp, this.lastDesc);
303 });
304 })
305 .then((fr) => {
306 return this.afterProcessResult(fr);
307 });
308 }
309
310 private afterProcessResult(fr: FetchResult): LogDog.LogEntry[] {
311 if (this.debug) {
312 if (fr.logs.length) {
313 console.log(
314 'Request returned:', fr.logs[0].streamIndex, '..',
315 fr.logs[fr.logs.length - 1].streamIndex, fr.desc, fr.state);
316 } else {
317 console.log('Request returned no logs:', fr.desc, fr.state);
318 }
319 }
320
321 this.setCurrentStatus(FetcherStatus.IDLE);
322 if (fr.desc) {
323 this.lastDesc = fr.desc;
324 }
325 if (fr.state) {
326 this.lastState = fr.state;
327 }
328 return fr.logs;
329 }
330
331 private doRetryIfMissing(fn: () => Promise<FetchResult>):
332 Promise<FetchResult> {
333 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry);
334
335 let doIt = (): Promise<FetchResult> => {
336 this.setCurrentStatus(FetcherStatus.LOADING);
337
338 return fn().catch((err: Error) => {
339 // Is this a gRPC Error?
340 let grpc = luci.GrpcError.convert(err);
341 if (grpc && grpc.code === luci.Code.NOT_FOUND) {
342 this.setCurrentStatus(FetcherStatus.MISSING);
343
344 let delay = missingRetry.next();
345 console.warn(
346 this.stream, ': Is not found:', err,
347 `; retrying after ${delay}ms...`);
348 return luci.sleepPromise(delay).then(() => {
349 return doIt();
350 });
351 }
352
353 this.setCurrentStatus(FetcherStatus.ERROR, err);
354 throw err;
355 });
356 };
357 return doIt();
358 }
359 }
360
361 /**
362 * The result of a log stream fetch, for internal usage.
363 *
364 * It will include zero or more log entries, and optionally (if requested)
365 * the log stream's descriptor and state.
366 */
367 class FetchResult {
368 constructor(
369 readonly logs: LogDog.LogEntry[],
370 readonly desc?: LogDog.LogStreamDescriptor,
371 readonly state?: LogDog.LogStreamState) {}
372
373 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor):
374 FetchResult {
375 let loadDesc: LogDog.LogStreamDescriptor|undefined;
376 if (resp.desc) {
377 desc = loadDesc = LogDog.LogStreamDescriptor.make(resp.desc);
378 }
379
380 let loadState: LogDog.LogStreamState|undefined;
381 if (resp.state) {
382 loadState = LogDog.LogStreamState.make(resp.state);
383 }
384
385 let logs = (resp.logs || []).map((le) => {
386 return LogDog.LogEntry.make(le, desc);
387 });
388 return new FetchResult(logs, loadDesc, loadState);
389 }
390 }
391 }
OLDNEW
« no previous file with comments | « no previous file | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698