Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(73)

Side by Side Diff: web/inc/logdog-stream-view/fetcher.ts

Issue 2717043002: Add LogDog log stream fetcher code. (Closed)
Patch Set: use operation token Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 Copyright 2016 The LUCI Authors. All rights reserved.
3 Use)) of this source code is governed under the Apache License, Version 2.0
4 that can be found in the LICENSE file.
5 */
6
7 ///<reference path="../logdog-stream/logdog.ts" />
8 ///<reference path="../logdog-stream/client.ts" />
9 ///<reference path="../luci-operation/operation.ts" />
10
11 namespace LogDog {
12
13 /** Options that can be passed to fetch operations. */
14 export type FetcherOptions = {
15 /**
16 * The maximum number of bytes to fetch. If undefined, no maximum will be
17 * specified, and the service will constrain the results.
18 */
19 byteCount?: number;
20 /**
21 * The maximum number of logs to fetch. If undefined, no maximum will be
22 * specified, and the service will constrain the results.
23 */
24 logCount?: number;
25 /** If defined and true, allow a fetch to return non-continuous entries. */
26 sparse?: boolean;
27 };
28
29 /** The Fetcher's current status. */
30 export enum FetchStatus {
31 // Not doing anything.
32 IDLE,
33 // Attempting to load log data.
34 LOADING,
35 // We're waiting for the log stream to emit more logs.
36 STREAMING,
37 // The log stream is missing.
38 MISSING,
39 // The log stream encountered an error.
40 ERROR,
41 // The operaiton has been cancelled.
42 CANCELLED,
43 }
44
45 /** Fetch represents a single fetch operation. */
46 export class Fetch {
47 readonly op: luci.Operation;
48 private stateChangedCallbacks = new Array<(f: Fetch) => void>();
49
50 constructor(
51 private ctx: FetchContext, readonly p: Promise<LogDog.LogEntry[]>) {
52 this.op = this.ctx.op;
53
54 // We will now get notifications if our Context's state changes.
55 this.ctx.stateChangedCallback = this.onStateChanged.bind(this);
56 }
57
58 get lastStatus(): FetchStatus {
59 return this.ctx.lastStatus;
60 }
61
62 get lastError(): Error|undefined {
63 return this.ctx.lastError;
64 }
65
66 addStateChangedCallback(cb: (f: Fetch) => void) {
nodir 2017/03/13 19:48:22 add removeStateChangedCallback?
dnj 2017/03/14 00:14:41 Not necessary, won't ever be called.
67 this.stateChangedCallbacks.push(cb);
68 }
69
70 private onStateChanged() {
71 this.stateChangedCallbacks.forEach(cb => cb(this));
72 }
73 }
74
75 /**
76 * FetchContext is an internal context used to pair all of the common
77 * parameters involved in a fetch operation.
78 */
79 class FetchContext {
80 private lastStatusValue = FetchStatus.IDLE;
81 private lastErrorValue?: Error;
82
83 stateChangedCallback: () => void;
84
85 constructor(readonly op: luci.Operation) {
86 // If our operation is cancelled, update our status to note this.
87 op.addCancelCallback(() => {
88 this.lastStatusValue = FetchStatus.CANCELLED;
89 this.lastErrorValue = undefined;
90 });
91 }
92
93 get lastStatus(): FetchStatus {
94 return this.lastStatusValue;
95 }
96
97 get lastError(): Error|undefined {
98 return this.lastErrorValue;
99 }
100
101 updateStatus(st: FetchStatus, err?: Error) {
102 if (this.op.cancelled) {
103 // No more status updates, force cancelled.
104 st = FetchStatus.CANCELLED;
105 err = undefined;
106 }
107
108 if (!(st === this.lastStatusValue && err !== this.lastErrorValue)) {
nodir 2017/03/13 19:48:22 err === this.lastErrorValue ?
dnj 2017/03/14 00:14:41 good catch, fixed.
109 this.lastStatusValue = st;
110 this.lastErrorValue = err;
111
112 this.notifyStateChanged();
113 }
114 }
115
116 notifyStateChanged() {
117 // If our Fetch has assigned our callback, notify it.
118 if (this.stateChangedCallback) {
119 this.stateChangedCallback();
120 }
121 }
122 }
123
124 /**
125 * Fetcher is responsible for fetching LogDog log stream entries from the
126 * remote service via an RPC client.
127 *
128 * Fetcher is responsible for wrapping the raw RPC calls and their results,
129 * and retrying calls due to:
130 *
131 * - Transient failures (via RPC client).
132 * - Missing stream (assumption is that the stream is still being ingested and
133 * registered, and therefore a repeated retry is appropriate).
134 * - Streaming stream (log stream is not terminated, but more records are not
135 * yet available).
136 *
137 * The interface that Fetcher presents to its caller is a simple Promise-based
138 * method to retrieve log stream data.
139 *
140 * Fetcher offers fetching via "get", "getAll", and "getLatest".
141 */
142 export class Fetcher {
143 private debug = false;
144 private static maxLogsPerGet = 0;
145
146 private lastDesc: LogDog.LogStreamDescriptor;
147 private lastState: LogDog.LogStreamState;
148
149 private static missingRetry: luci.Retry = {delay: 5000, maxDelay: 15000};
150 private static streamingRetry: luci.Retry = {delay: 1000, maxDelay: 5000};
151
152 constructor(
153 private client: LogDog.Client, readonly stream: LogDog.StreamPath) {}
154
155 get desc() {
156 return this.lastDesc;
157 }
158 get state() {
159 return this.lastState;
160 }
161
162 /**
163 * Returns the log stream's terminal index.
164 *
165 * If no terminal index is known (the log is still streaming) this will
166 * return -1.
167 */
168 get terminalIndex(): number {
169 return ((this.lastState) ? this.lastState.terminalIndex : -1);
170 }
171
172 /** Archived returns true if this log stream is known to be archived. */
173 get archived(): boolean {
174 return (!!(this.lastState && this.lastState.archive));
175 }
176
177 /**
178 * Returns a Promise that will resolve to the next block of logs in the
179 * stream.
180 *
181 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the
182 * next block of logs in the stream.
183 */
184 get(op: luci.Operation, index: number, opts: FetcherOptions): Fetch {
185 let ctx = new FetchContext(op);
186 return new Fetch(ctx, this.getIndex(ctx, index, opts));
187 }
188
189 /**
190 * Returns a Promise that will resolve to "count" log entries starting at
191 * "startIndex".
192 *
193 * If multiple RPC calls are required to retrieve "count" entries, these
194 * will be scheduled, and the Promise will block until the full set of
195 * requested stream entries is retrieved.
196 */
197 getAll(op: luci.Operation, startIndex: number, count: number): Fetch {
198 // Request the tail walkback logs. Since our request for N logs may return
199 // <N logs, we will repeat the request until all requested logs have been
200 // obtained.
201 let allLogs: LogDog.LogEntry[] = [];
202
203 let ctx = new FetchContext(op);
204 let getIter = (): Promise<LogDog.LogEntry[]> => {
205 op.assert();
206
207 if (count <= 0) {
208 return Promise.resolve(allLogs);
209 }
210
211 // Perform Gets until we have the requested number of logs. We don't
212 // have to constrain the "logCount" parameter b/c we automatically do
213 // that in getIndex.
214 let opts: FetcherOptions = {
215 logCount: count,
216 sparse: true,
217 };
218 return this.getIndex(ctx, startIndex, opts).then(logs => {
219 op.assert();
220
221 if (logs && logs.length) {
222 allLogs.push.apply(allLogs, logs);
223 startIndex += logs.length;
224 count -= logs.length;
225 }
226 if (count > 0) {
227 // Recurse.
228 }
229 return Promise.resolve(allLogs);
230 });
231 };
232 return new Fetch(ctx, getIter());
233 }
234
235 /**
236 * Fetches the latest log entry.
237 */
238 getLatest(op: luci.Operation): Fetch {
239 let errNoLogs = new Error('no logs, streaming');
240 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry);
241 let ctx = new FetchContext(op);
242 return new Fetch(
243 ctx,
244 streamingRetry.do(
245 () => {
246 return this.doTail(ctx).then(logs => {
247 if (!(logs && logs.length)) {
248 throw errNoLogs;
249 }
250 return logs;
251 });
252 },
253 (err: Error, delay: number) => {
254 if (err !== errNoLogs) {
255 throw err;
256 }
257
258 // No logs were returned, and we expect logs, so we're
259 // streaming. Try again after a delay.
260 ctx.updateStatus(FetchStatus.STREAMING);
261 console.warn(
262 this.stream,
263 `: No logs returned; retrying after ${delay}ms...`);
264 }));
265 }
266
267 private getIndex(ctx: FetchContext, index: number, opts: FetcherOptions):
268 Promise<LogDog.LogEntry[]> {
269 // (Testing) Constrain our max logs, if set.
270 if (Fetcher.maxLogsPerGet > 0) {
271 if ((!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet) {
272 opts.logCount = Fetcher.maxLogsPerGet;
273 }
274 }
275
276 // We will retry continuously until we get a log (streaming).
277 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry);
278 let errNoLogs = new Error('no logs, streaming');
279 return streamingRetry
280 .do(
281 () => {
282 // If we're asking for a log beyond our stream, don't bother.
283 if (this.terminalIndex >= 0 && index > this.terminalIndex) {
284 return Promise.resolve([]);
285 }
286
287 return this.doGet(ctx, index, opts).then(logs => {
288 ctx.op.assert();
289
290 if (!(logs && logs.length)) {
291 // (Retry)
292 throw errNoLogs;
293 }
294
295 return logs;
296 });
297 },
298 (err: Error, delay: number) => {
299 ctx.op.assert();
300
301 if (err !== errNoLogs) {
302 throw err;
303 }
304
305 // No logs were returned, and we expect logs, so we're
306 // streaming. Try again after a delay.
307 ctx.updateStatus(FetchStatus.STREAMING);
308 console.warn(
309 this.stream,
310 `: No logs returned; retrying after ${delay}ms...`);
311 })
312 .then(logs => {
313 ctx.op.assert();
314
315 // Since we allow non-contiguous Get, we may get back more logs than
316 // we actually expected. Prune any such additional.
317 if (opts.sparse && opts.logCount && opts.logCount > 0) {
318 let maxStreamIndex = index + opts.logCount - 1;
319 logs = logs.filter(le => le.streamIndex <= maxStreamIndex);
320 }
321 return logs;
322 });
323 }
324
325 private doGet(ctx: FetchContext, index: number, opts: FetcherOptions):
326 Promise<LogDog.LogEntry[]> {
327 let request: LogDog.GetRequest = {
328 project: this.stream.project,
329 path: this.stream.path,
330 state: (this.terminalIndex < 0),
331 index: index,
332 };
333 if (opts.sparse || this.archived) {
334 // This log stream is archived. We will relax the contiguous requirement
335 // so we can render sparse log streams.
336 request.nonContiguous = true;
337 }
338 if (opts.byteCount && opts.byteCount > 0) {
339 request.byteCount = opts.byteCount;
340 }
341 if (opts.logCount && opts.logCount > 0) {
342 request.logCount = opts.logCount;
343 }
344
345 if (this.debug) {
346 console.log('logdog.Logs.Get:', request);
347 }
348
349 // Perform our Get, waiting until the stream actually exists.
350 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry);
351 return missingRetry
352 .do(
353 () => {
354 ctx.updateStatus(FetchStatus.LOADING);
355 return this.client.get(request);
356 },
357 this.doRetryIfMissing(ctx))
358 .then((resp: GetResponse) => {
359 let fr = FetchResult.make(resp, this.lastDesc);
360 return this.afterProcessResult(ctx, fr);
361 });
362 }
363
364 private doTail(ctx: FetchContext): Promise<LogDog.LogEntry[]> {
365 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry);
366 return missingRetry
367 .do(
368 () => {
369 ctx.updateStatus(FetchStatus.LOADING);
370 let needsState = (this.terminalIndex < 0);
371 return this.client.tail(this.stream, needsState);
372 },
373 this.doRetryIfMissing(ctx))
374 .then((resp: GetResponse) => {
375 let fr = FetchResult.make(resp, this.lastDesc);
376 return this.afterProcessResult(ctx, fr);
377 });
378 }
379
380 private afterProcessResult(ctx: FetchContext, fr: FetchResult):
381 LogDog.LogEntry[] {
382 if (this.debug) {
383 if (fr.logs.length) {
384 console.log(
385 'Request returned:', fr.logs[0].streamIndex, '..',
386 fr.logs[fr.logs.length - 1].streamIndex, fr.desc, fr.state);
387 } else {
388 console.log('Request returned no logs:', fr.desc, fr.state);
389 }
390 }
391
392 ctx.updateStatus(FetchStatus.IDLE);
393 if (fr.desc) {
394 this.lastDesc = fr.desc;
nodir 2017/03/13 19:48:22 AFAIU no race here because desc is going to be the
dnj 2017/03/14 00:14:41 Correct.
395 }
396 if (fr.state) {
397 this.lastState = fr.state;
398 }
399 return fr.logs;
400 }
401
402 private doRetryIfMissing(ctx: FetchContext) {
403 return (err: Error, delay: number) => {
404 ctx.op.assert();
405
406 // Is this a gRPC Error?
407 let grpc = luci.GrpcError.convert(err);
408 if (grpc && grpc.code === luci.Code.NOT_FOUND) {
409 ctx.updateStatus(FetchStatus.MISSING);
410
411 console.warn(
412 this.stream, ': Is not found:', err,
413 `; retrying after ${delay}ms...`);
414 return;
415 }
416
417 ctx.updateStatus(FetchStatus.ERROR, err);
418 throw err;
419 };
420 }
421 }
422
423 /**
424 * The result of a log stream fetch, for internal usage.
425 *
426 * It will include zero or more log entries, and optionally (if requested)
427 * the log stream's descriptor and state.
428 */
429 class FetchResult {
430 constructor(
431 readonly logs: LogDog.LogEntry[],
432 readonly desc?: LogDog.LogStreamDescriptor,
433 readonly state?: LogDog.LogStreamState) {}
434
435 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor):
436 FetchResult {
437 let loadDesc: LogDog.LogStreamDescriptor|undefined;
438 if (resp.desc) {
439 desc = loadDesc = LogDog.LogStreamDescriptor.make(resp.desc);
440 }
441
442 let loadState: LogDog.LogStreamState|undefined;
443 if (resp.state) {
444 loadState = LogDog.LogStreamState.make(resp.state);
445 }
446
447 let logs = (resp.logs || []).map(le => LogDog.LogEntry.make(le, desc));
448 return new FetchResult(logs, loadDesc, loadState);
449 }
450 }
451 }
OLDNEW
« no previous file with comments | « no previous file | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698