Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(25)

Side by Side Diff: web/inc/logdog-stream-view/fetcher.ts

Issue 2717043002: Add LogDog log stream fetcher code. (Closed)
Patch Set: operations, comments Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 Copyright 2016 The LUCI Authors. All rights reserved.
3 Use of this source code is governed under the Apache License, Version 2.0
4 that can be found in the LICENSE file.
5 */
6
7 ///<reference path="../logdog-stream/logdog.ts" />
8 ///<reference path="../rpc/client.ts" />
9
10 namespace LogDog {
11
12 /** Options that can be passed to fetch operations. */
13 export type FetcherOptions = {
14 /**
15 * The maximum number of bytes to fetch. If undefined, no maximum will be
16 * specified, and the service will constrain the results.
17 */
18 byteCount?: number;
19 /**
20 * The maximum number of logs to fetch. If undefined, no maximum will be
21 * specified, and the service will constrain the results.
22 */
23 logCount?: number;
24 /** If defined and true, allow a fetch to return non-continuous entries. */
25 sparse?: boolean;
26 };
27
28 // Type of a "Get" or "Tail" response (protobuf).
29 type GetResponse = {state: any; desc: any; logs: any[];};
30
31 /** The Fetcher's current status. */
32 export enum FetchStatus {
33 // Not doing anything.
34 IDLE,
35 // Attempting to load log data.
36 LOADING,
37 // We're waiting for the log stream to emit more logs.
38 STREAMING,
39 // The log stream is missing.
40 MISSING,
41 // The log stream encountered an error.
42 ERROR,
43 // The operaiton has been cancelled.
44 CANCELLED,
45 }
46
47 /** Operation is a cancellable operation. */
48 class Operation {
49 static CANCELLED = new Error('operation is cancelled');
50
51 /** If set, a callback to invoke if the status changes. */
52 stateChanged: (op: Operation) => void;
53
54 private cancelledValue = false;
nodir 2017/03/13 19:48:22 nit: _cancelled ? _foo is a typical name when foo
dnj 2017/03/14 00:14:41 Had to modify tslint rules to allow this, but done
55 private lastStatusValue = FetchStatus.IDLE;
56 private lastErrorValue: Error|undefined;
57
58 /**
59 * Cancels the Fetch operation. If the operation completes or returns an
60 * error after it is cancelled, the result will be ignored.
61 *
62 * Additionally, no status callbacks will be invoked after a Fetch is
63 * cancelled.
64 *
65 * Calling cancel multiple times is safe.
66 */
67 cancel() {
68 this.updateStatus(FetchStatus.CANCELLED);
69 this.cancelledValue = true;
70 }
71
72 /**
73 * Assert will throw Operation.CANCELLED if the operation has been
74 * cancelled. Otherwise, it will do nothing.
75 */
76 assert() {
77 if (this.cancelledValue) {
78 throw Operation.CANCELLED;
79 }
80 }
81
82 get cancelled() {
83 return this.cancelledValue;
84 }
85
86 get lastStatus(): FetchStatus {
87 return this.lastStatusValue;
88 }
89
90 get lastError(): Error|undefined {
91 return this.lastErrorValue;
92 }
93
94 updateStatus(st: FetchStatus, err?: Error) {
95 if (this.cancelled) {
96 // No more status updates.
97 return;
98 }
99
100 this.lastStatusValue = st;
101 this.lastErrorValue = err;
102 if (this.stateChanged) {
103 this.stateChanged(this);
104 }
105 }
106 }
107
108 /** Fetch represents a single fetch operation. */
109 class Fetch<T> {
110 readonly promise: Promise<FetchResult>;
111
112 constructor(readonly op: Operation, p: Promise<T>) {
113 this.promise = p.then(
114 result => {
115 this.op.updateStatus(FetchStatus.IDLE);
116 return result;
117 },
118 err => {
119 this.op.updateStatus(FetchStatus.ERROR, err);
120 return Promise.reject(err);
121 });
122 }
123 }
124
125 /**
126 * Fetcher is responsible for fetching LogDog log stream entries from the
127 * remote service via an RPC client.
128 *
129 * Fetcher is responsible for wrapping the raw RPC calls and their results,
130 * and retrying calls due to:
131 *
132 * - Transient failures (via RPC client).
133 * - Missing stream (assumption is that the stream is still being ingested and
134 * registered, and therefore a repeated retry is appropriate).
135 * - Streaming stream (log stream is not terminated, but more records are not
136 * yet available).
137 *
138 * The interface that Fetcher presents to its caller is a simple Promise-based
139 * method to retrieve log stream data.
140 *
141 * Fetcher offers fetching via "get", "getAll", and "getLatest".
142 */
143 export class Fetcher {
144 private debug = false;
145 private static maxLogsPerGet = 0;
146
147 private lastDesc: LogDog.LogStreamDescriptor;
148 private lastState: LogDog.LogStreamState;
149
150 private static missingRetry: luci.Retry = {delay: 5000, maxDelay: 15000};
151 private static streamingRetry: luci.Retry = {delay: 1000, maxDelay: 5000};
152
153 constructor(
154 private client: luci.Client, readonly stream: LogDog.StreamPath) {}
155
156 get desc() {
157 return this.lastDesc;
158 }
159 get state() {
160 return this.lastState;
161 }
162
163 /**
164 * Returns the log stream's terminal index.
165 *
166 * If no terminal index is known (the log is still streaming) this will
167 * return -1.
168 */
169 get terminalIndex(): number {
170 return ((this.lastState) ? this.lastState.terminalIndex : -1);
171 }
172
173 /** Archived returns true if this log stream is known to be archived. */
174 get archived(): boolean {
175 return (!!(this.lastState && this.lastState.archive));
176 }
177
178 /**
179 * Returns a Promise that will resolve to the next block of logs in the
180 * stream.
181 *
182 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the
183 * next block of logs in the stream.
184 */
185 get(index: number, opts: FetcherOptions): Fetch<LogDog.LogEntry[]> {
186 let op = new Operation();
187 return new Fetch(op, this.getIndex(index, opts, op));
188 }
189
190 /**
191 * Returns a Promise that will resolve to "count" log entries starting at
192 * "startIndex".
193 *
194 * If multiple RPC calls are required to retrieve "count" entries, these
195 * will be scheduled, and the Promise will block until the full set of
196 * requested stream entries is retrieved.
197 */
198 getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> {
199 // Request the tail walkback logs. Since our request for N logs may return
200 // <N logs, we will repeat the request until all requested logs have been
201 // obtained.
202 let allLogs: LogDog.LogEntry[] = [];
203
204 let op = new Operation();
205 let getIter = (): Promise<LogDog.LogEntry[]> => {
206 if (count <= 0) {
207 return Promise.resolve(allLogs);
208 }
209
210 // Perform Gets until we have the requested number of logs. We don't
211 // have to constrain the "logCount" parameter b/c we automatically do
212 // that in getIndex.
213 let opts: FetcherOptions = {
214 logCount: count,
215 sparse: true,
216 };
217 return this.getIndex(startIndex, opts, op).then(logs => {
218 if (logs && logs.length) {
219 allLogs.push.apply(allLogs, logs);
220 startIndex += logs.length;
221 count -= logs.length;
222 }
223 if (count > 0) {
224 // Recurse.
225 }
nodir 2017/03/13 19:48:22 return getIter()
226 return Promise.resolve(allLogs);
227 });
228 };
229 return getIter();
230 }
231
232 /**
233 * Fetches the latest log entry.
234 */
235 getLatest(): Fetch<LogDog.LogEntry[]> {
236 let errNoLogs = new Error('no logs, streaming');
237 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry);
238 let op = new Operation();
239 return new Fetch(
240 op,
241 streamingRetry.do(
242 () => {
243 return this.doTail(op).then(logs => {
244 if (!(logs && logs.length)) {
245 throw errNoLogs;
246 }
247 return logs;
248 });
249 },
250 (err: Error, delay: number) => {
251 if (err !== errNoLogs) {
252 throw err;
253 }
254
255 // No logs were returned, and we expect logs, so we're
256 // streaming. Try again after a delay.
257 op.updateStatus(FetchStatus.STREAMING);
258 console.warn(
259 this.stream,
260 `: No logs returned; retrying after ${delay}ms...`);
261 }));
262 }
263
264 private getIndex(index: number, opts: FetcherOptions, op: Operation):
265 Promise<LogDog.LogEntry[]> {
266 // (Testing) Constrain our max logs, if set.
267 if (Fetcher.maxLogsPerGet > 0) {
268 if ((!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet) {
269 opts.logCount = Fetcher.maxLogsPerGet;
270 }
271 }
272
273 // We will retry continuously until we get a log (streaming).
274 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry);
275 let errNoLogs = new Error('no logs, streaming');
276 return streamingRetry
277 .do(
278 () => {
279 // If we're asking for a log beyond our stream, don't bother.
280 if (this.terminalIndex >= 0 && index > this.terminalIndex) {
281 return Promise.resolve([]);
282 }
283
284 return this.doGet(index, opts, op).then(logs => {
285 op.assert();
286
287 if (!(logs && logs.length)) {
288 // (Retry)
289 throw errNoLogs;
290 }
291
292 return logs;
293 });
294 },
295 (err: Error, delay: number) => {
296 op.assert();
297
298 if (err !== errNoLogs) {
299 throw err;
300 }
301
302 // No logs were returned, and we expect logs, so we're
303 // streaming. Try again after a delay.
304 op.updateStatus(FetchStatus.STREAMING);
305 console.warn(
306 this.stream,
307 `: No logs returned; retrying after ${delay}ms...`);
308 })
309 .then(logs => {
310 op.assert();
311
312 // Since we allow non-contiguous Get, we may get back more logs than
313 // we actually expected. Prune any such additional.
314 if (opts.sparse && opts.logCount && opts.logCount > 0) {
315 let maxStreamIndex = index + opts.logCount - 1;
316 logs = logs.filter(le => le.streamIndex <= maxStreamIndex);
317 }
318 return logs;
319 });
320 }
321
322 private doGet(index: number, opts: FetcherOptions, op: Operation):
323 Promise<LogDog.LogEntry[]> {
324 let request: {
325 project: string; path: string; state: boolean; index: number;
326
327 nonContiguous?: boolean;
328 byteCount?: number;
329 logCount?: number;
330 } = {
331 project: this.stream.project,
332 path: this.stream.path,
333 state: (this.terminalIndex < 0),
334 index: index,
335 };
336 if (opts.sparse || this.archived) {
337 // This log stream is archived. We will relax the contiguous requirement
338 // so we can render sparse log streams.
339 request.nonContiguous = true;
340 }
341 if (opts.byteCount && opts.byteCount > 0) {
342 request.byteCount = opts.byteCount;
343 }
344 if (opts.logCount && opts.logCount > 0) {
345 request.logCount = opts.logCount;
346 }
347
348 if (this.debug) {
349 console.log('logdog.Logs.Get:', request);
350 }
351
352 // Perform our Get, waiting until the stream actually exists.
353 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry);
354 return missingRetry
355 .do(
356 () => {
357 op.updateStatus(FetchStatus.LOADING);
358 return this.client.call('logdog.Logs', 'Get', request);
359 },
360 this.doRetryIfMissing(op))
361 .then((resp: GetResponse) => {
362 let fr = FetchResult.make(resp, this.lastDesc);
363 return this.afterProcessResult(fr, op);
364 });
365 }
366
367 private doTail(op: Operation): Promise<LogDog.LogEntry[]> {
368 let request: {project: string; path: string; state: boolean;} = {
369 project: this.stream.project,
370 path: this.stream.path,
371 state: (this.terminalIndex < 0),
372 };
373
374 if (this.debug) {
375 console.log('logdog.Logs.Tail:', request);
376 }
377
378 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry);
379 return missingRetry
380 .do(
381 () => {
382 op.updateStatus(FetchStatus.LOADING);
383 return this.client.call('logdog.Logs', 'Tail', request);
384 },
385 this.doRetryIfMissing(op))
386 .then((resp: GetResponse) => {
387 let fr = FetchResult.make(resp, this.lastDesc);
388 return this.afterProcessResult(fr, op);
389 });
390 }
391
392 private afterProcessResult(fr: FetchResult, op: Operation):
393 LogDog.LogEntry[] {
394 if (this.debug) {
395 if (fr.logs.length) {
396 console.log(
397 'Request returned:', fr.logs[0].streamIndex, '..',
398 fr.logs[fr.logs.length - 1].streamIndex, fr.desc, fr.state);
399 } else {
400 console.log('Request returned no logs:', fr.desc, fr.state);
401 }
402 }
403
404 op.updateStatus(FetchStatus.IDLE);
405 if (fr.desc) {
406 this.lastDesc = fr.desc;
407 }
408 if (fr.state) {
409 this.lastState = fr.state;
410 }
411 return fr.logs;
412 }
413
414 private doRetryIfMissing(op: Operation) {
415 return (err: Error, delay: number) => {
416 op.assert();
417
418 // Is this a gRPC Error?
419 let grpc = luci.GrpcError.convert(err);
420 if (grpc && grpc.code === luci.Code.NOT_FOUND) {
421 op.updateStatus(FetchStatus.MISSING);
422
423 console.warn(
424 this.stream, ': Is not found:', err,
425 `; retrying after ${delay}ms...`);
426 return;
427 }
428
429 op.updateStatus(FetchStatus.ERROR, err);
430 throw err;
431 };
432 }
433 }
434
435 /**
436 * The result of a log stream fetch, for internal usage.
437 *
438 * It will include zero or more log entries, and optionally (if requested)
439 * the log stream's descriptor and state.
440 */
441 class FetchResult {
442 constructor(
443 readonly logs: LogDog.LogEntry[],
444 readonly desc?: LogDog.LogStreamDescriptor,
445 readonly state?: LogDog.LogStreamState) {}
446
447 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor):
448 FetchResult {
449 let loadDesc: LogDog.LogStreamDescriptor|undefined;
450 if (resp.desc) {
451 desc = loadDesc = LogDog.LogStreamDescriptor.make(resp.desc);
452 }
453
454 let loadState: LogDog.LogStreamState|undefined;
455 if (resp.state) {
456 loadState = LogDog.LogStreamState.make(resp.state);
457 }
458
459 let logs = (resp.logs || []).map(le => LogDog.LogEntry.make(le, desc));
460 return new FetchResult(logs, loadDesc, loadState);
461 }
462 }
463 }
OLDNEW
« no previous file with comments | « no previous file | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698