Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(19)

Side by Side Diff: web/inc/logdog-stream-view/fetcher.ts

Issue 2717043002: Add LogDog log stream fetcher code. (Closed)
Patch Set: comments, retry "do" Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 Copyright 2016 The LUCI Authors. All rights reserved.
3 Use of this source code is governed under the Apache License, Version 2.0
4 that can be found in the LICENSE file.
5 */
6
7 ///<reference path="../logdog-stream/logdog.ts" />
8 ///<reference path="../rpc/client.ts" />
9
10 namespace LogDog {
11
12 export type FetcherOptions = {
13 byteCount?: number; logCount?: number; sparse?: boolean;
14 };
15
16 // Type of a "Get" or "Tail" response (protobuf).
17 type GetResponse = {state: any; desc: any; logs: any[];};
18
19 /** The Fetcher's current status. */
20 export enum FetcherStatus {
21 // Not doing anything.
22 IDLE,
23 // Attempting to load log data.
24 LOADING,
25 // We're waiting for the log stream to emit more logs.
26 STREAMING,
27 // The log stream is missing.
28 MISSING,
29 // The log stream encountered an error.
30 ERROR
31 }
32
33 /**
34 * Fetcher is responsible for fetching LogDog log stream entries from the
35 * remote service via an RPC client.
36 *
37 * Fetcher is responsible for wrapping the raw RPC calls and their results,
38 * and retrying calls due to:
39 *
40 * - Transient failures (via RPC client).
41 * - Missing stream (assumption is that the stream is still being ingested and
42 * registered, and therefore a repeated retry is appropriate).
43 * - Streaming stream (log stream is not terminated, but more records are not
44 * yet available).
45 *
46 * The interface that Fetcher presents to its caller is a simple Promise-based
47 * method to retrieve log stream data.
48 *
49 * Fetcher offers fetching via "get", "getAll", and "getLatest".
50 */
51 export class Fetcher {
52 private debug = false;
53 private static maxLogsPerGet = 0;
54
55 private lastDesc: LogDog.LogStreamDescriptor;
56 private lastState: LogDog.LogStreamState;
57
58 private lastStatus: FetcherStatus = FetcherStatus.IDLE;
59 private lastErrorValue: Error|null;
60 private statusChangedCallback: (() => void);
61
62 private static missingRetry: luci.Retry = {delay: 5000, maxDelay: 15000};
63 private static streamingRetry: luci.Retry = {delay: 1000, maxDelay: 5000};
64
65 constructor(
66 private client: luci.Client, readonly stream: LogDog.StreamPath) {}
67
68 get desc() {
69 return this.lastDesc;
70 }
71 get state() {
72 return this.lastState;
73 }
74
75 get status() {
76 return this.lastStatus;
77 }
78
79 /**
80 * Returns the log stream's terminal index.
81 *
82 * If no terminal index is known (the log is still streaming) this will
83 * return -1.
84 */
85 get terminalIndex(): number {
86 return ((this.lastState) ? this.lastState.terminalIndex : -1);
87 }
88
89 /** Archived returns true if this log stream is known to be archived. */
90 get archived(): boolean {
91 return (!!(this.lastState && this.lastState.archive));
92 }
93
94 get lastError(): Error|null {
95 return this.lastErrorValue;
96 }
97
98 private updateStatus(st: FetcherStatus, err?: Error) {
99 if (st !== this.lastStatus || err !== this.lastErrorValue) {
100 this.lastStatus = st;
101 this.lastErrorValue = (err || null);
102
103 if (this.statusChangedCallback) {
104 this.statusChangedCallback();
105 };
106 }
107 }
108
109 /**
110 * Sets the status changed callback, which will be invoked whenever the
111 * Fetcher's status has changed.
112 */
113 setStatusChangedCallback(fn: () => void) {
114 this.statusChangedCallback = fn;
115 }
116
117 /**
118 * Returns a Promise that will resolve to the next block of logs in the
119 * stream.
120 *
121 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the
122 * next block of logs in the stream.
123 */
124 get(index: number, opts: FetcherOptions): Promise<LogDog.LogEntry[]> {
125 return this.getIndex(index, opts);
126 }
127
128 /**
129 * Returns a Promise that will resolve to "count" log entries starting at
130 * "startIndex".
131 *
132 * If multiple RPC calls are required to retrieve "count" entries, these
133 * will be scheduled, and the Promise will block until the full set of
134 * requested stream entries is retrieved.
135 */
136 getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> {
137 // Request the tail walkback logs. Since our request for N logs may return
138 // <N logs, we will repeat the request until all requested logs have been
139 // obtained.
140 let allLogs: LogDog.LogEntry[] = [];
141
142 let getIter = (): Promise<LogDog.LogEntry[]> => {
143 if (count <= 0) {
144 return Promise.resolve(allLogs);
145 }
146
147 // Perform Gets until we have the requested number of logs. We don't
148 // have to constrain the "logCount" parameter b/c we automatically do
149 // that in getIndex.
150 let opts: FetcherOptions = {
151 logCount: count,
152 sparse: true,
153 };
154 return this.getIndex(startIndex, opts).then((logs) => {
155 if (logs) {
156 allLogs.push.apply(allLogs, logs);
nodir 2017/03/08 07:41:28 nit, in my experience this is usually written as A
dnj 2017/03/08 08:50:14 TypeScript complains: Property 'push' does not exi
nodir 2017/03/13 19:48:21 :(
157 startIndex += logs.length;
158 count -= logs.length;
159 }
160 return getIter();
nodir 2017/03/08 07:41:28 return resolved promise instead of recursive if co
dnj 2017/03/08 08:50:14 Done.
161 });
nodir 2017/03/08 07:41:28 then return allLogs? currently allLogs is not used
dnj 2017/03/08 08:50:14 It's used above (see line #144).
nodir 2017/03/13 19:48:21 Acknowledged.
162 };
163 return getIter();
164 }
165
166 /**
167 * Fetches the latest log entry.
168 */
169 getLatest(): Promise<LogDog.LogEntry[]> {
170 let errNoLogs = new Error('no logs, streaming');
171 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry);
172 return streamingRetry.do(
173 () => {
174 return this.doTail().then((logs) => {
175 if (!(logs && logs.length)) {
176 throw errNoLogs;
177 }
178 return logs;
179 });
180 },
181 (err: Error, delay: number) => {
182 if (err !== errNoLogs) {
183 throw err;
184 }
185
186 // No logs were returned, and we expect logs, so we're streaming.
187 // Try again after a delay.
188 this.updateStatus(FetcherStatus.STREAMING);
189 console.warn(
190 this.stream,
191 `: No logs returned; retrying after ${delay}ms...`);
192 });
193 }
194
195 private getIndex(index: number, opts: FetcherOptions):
196 Promise<LogDog.LogEntry[]> {
197 // (Testing) Constrain our max logs, if set.
198 if (Fetcher.maxLogsPerGet > 0) {
199 if (!opts) {
nodir 2017/03/08 07:41:28 doGet assumes opts is not null. doGet is called be
dnj 2017/03/08 08:50:14 Actually now that I do strict null checks, I can d
200 opts = {};
201 }
202 if ((!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet) {
203 opts.logCount = Fetcher.maxLogsPerGet;
204 }
205 }
206
207 // We will retry continuously until we get a log (streaming).
208 let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry);
209 let errNoLogs = new Error('no logs, streaming');
210 return streamingRetry
211 .do(
212 () => {
213 // If we're asking for a log beyond our stream, don't bother.
214 if (this.terminalIndex >= 0 && index > this.terminalIndex) {
215 return Promise.resolve([]);
216 }
217
218 return this.doGet(index, opts).then((logs) => {
nodir 2017/03/08 07:41:28 nit: parens around parameters of a lambda expressi
dnj 2017/03/08 08:50:15 Done.
219 if (!(logs && !logs.length)) {
nodir 2017/03/08 07:41:28 this is hard to read. This is equivalent to if (!
dnj 2017/03/08 08:50:15 Actually that inner "!" was a bug, it should be: !
220 // (Retry)
221 throw errNoLogs;
222 }
223
224 return logs;
225 });
226 },
227 (err: Error, delay: number) => {
228 if (err !== errNoLogs) {
229 throw err;
230 }
231
232 // No logs were returned, and we expect logs, so we're
233 // streaming. Try again after a delay.
234 this.updateStatus(FetcherStatus.STREAMING);
nodir 2017/03/08 07:41:28 all mutations (like updateStatus calls) in callbac
dnj 2017/03/08 08:50:14 Originally I didn't really care, but I think you'r
nodir 2017/03/13 19:48:21 great
235 console.warn(
236 this.stream,
237 `: No logs returned; retrying after ${delay}ms...`);
238 })
239 .then((logs) => {
nodir 2017/03/08 07:41:28 nit: unnecessary parens
dnj 2017/03/08 08:50:15 Done.
240 // Since we allow non-contiguous Get, we may get back more logs than
241 // we actually expected. Prune any such additional.
242 if (opts.logCount && opts.logCount > 0) {
nodir 2017/03/08 07:41:28 shouldn't this check opts.nonContiguous? currently
dnj 2017/03/08 08:50:14 Not in this function. Probably should, Done.
243 let maxStreamIndex = index + opts.logCount - 1;
nodir 2017/03/08 07:41:28 i don't fully understand this. Can you document lo
dnj 2017/03/08 08:50:14 Done.
244 logs = logs.filter((le) => {
nodir 2017/03/08 07:41:28 logs = logs.filter(le => le.streamIndex <= maxStre
dnj 2017/03/08 08:50:14 Oh nice, done.
245 return le.streamIndex <= maxStreamIndex;
246 });
247 }
248 return logs;
249 });
250 }
251
252 private doGet(index: number, opts: FetcherOptions):
253 Promise<LogDog.LogEntry[]> {
254 let request: {
255 project: string; path: string; state: boolean; index: number;
256
257 nonContiguous?: boolean;
258 byteCount?: number;
259 logCount?: number;
260 } = {
261 project: this.stream.project,
262 path: this.stream.path,
263 state: (this.terminalIndex < 0),
264 index: index,
265 };
266 if (opts.sparse || this.archived) {
267 // This log stream is archived. We will relax the contiguous requirement
268 // so we can render sparse log streams.
269 request.nonContiguous = true;
270 }
271 if (opts) {
nodir 2017/03/08 07:41:28 opts is not falsy since L266 didn't explode
dnj 2017/03/08 08:50:14 Ah yeah, it used to be allowed to be null, but not
272 if (opts.byteCount && opts.byteCount > 0) {
273 request.byteCount = opts.byteCount;
274 }
275 if (opts.logCount && opts.logCount > 0) {
276 request.logCount = opts.logCount;
277 }
278 }
279
280 if (this.debug) {
281 console.log('logdog.Logs.Get:', request);
282 }
283
284 // Perform our Get, waiting until the stream actually exists.
285 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry);
286 return missingRetry
287 .do(
288 () => {
289 this.updateStatus(FetcherStatus.LOADING);
290 return this.client.call('logdog.Logs', 'Get', request);
291 },
292 this.doRetryIfMissing)
293 .then((resp: GetResponse) => {
294 let fr = FetchResult.make(resp, this.lastDesc);
295 return this.afterProcessResult(fr);
296 });
297 }
298
299 private doTail(): Promise<LogDog.LogEntry[]> {
300 let request: {project: string; path: string; state: boolean;} = {
301 project: this.stream.project,
302 path: this.stream.path,
303 state: (this.terminalIndex < 0),
304 };
305
306 if (this.debug) {
307 console.log('logdog.Logs.Tail:', request);
308 }
309
310 let missingRetry = new luci.RetryIterator(Fetcher.missingRetry);
311 return missingRetry
312 .do(
313 () => {
314 this.updateStatus(FetcherStatus.LOADING);
315 return this.client.call('logdog.Logs', 'Tail', request);
316 },
317 this.doRetryIfMissing)
318 .then((resp: GetResponse) => {
319 let fr = FetchResult.make(resp, this.lastDesc);
320 return this.afterProcessResult(fr);
321 });
322 }
323
324 private afterProcessResult(fr: FetchResult): LogDog.LogEntry[] {
325 if (this.debug) {
326 if (fr.logs.length) {
327 console.log(
328 'Request returned:', fr.logs[0].streamIndex, '..',
329 fr.logs[fr.logs.length - 1].streamIndex, fr.desc, fr.state);
330 } else {
331 console.log('Request returned no logs:', fr.desc, fr.state);
332 }
333 }
334
335 this.updateStatus(FetcherStatus.IDLE);
nodir 2017/03/08 07:41:28 probably a race
dnj 2017/03/08 08:50:15 Done.
336 if (fr.desc) {
337 this.lastDesc = fr.desc;
nodir 2017/03/08 07:41:28 here too
dnj 2017/03/08 08:50:14 Done.
338 }
339 if (fr.state) {
340 this.lastState = fr.state;
341 }
342 return fr.logs;
343 }
344
345 private doRetryIfMissing(err: Error, delay: number) {
346 // Is this a gRPC Error?
347 let grpc = luci.GrpcError.convert(err);
348 if (grpc && grpc.code === luci.Code.NOT_FOUND) {
349 this.updateStatus(FetcherStatus.MISSING);
350
351 console.warn(
352 this.stream, ': Is not found:', err,
353 `; retrying after ${delay}ms...`);
354 }
nodir 2017/03/08 07:41:28 return
dnj 2017/03/08 08:50:15 Done.
355
356 this.updateStatus(FetcherStatus.ERROR, err);
357 throw err;
358 }
359 }
360
361 /**
362 * The result of a log stream fetch, for internal usage.
363 *
364 * It will include zero or more log entries, and optionally (if requested)
365 * the log stream's descriptor and state.
366 */
367 class FetchResult {
368 constructor(
369 readonly logs: LogDog.LogEntry[],
370 readonly desc?: LogDog.LogStreamDescriptor,
371 readonly state?: LogDog.LogStreamState) {}
372
373 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor):
374 FetchResult {
375 let loadDesc: LogDog.LogStreamDescriptor|undefined;
376 if (resp.desc) {
377 desc = loadDesc = LogDog.LogStreamDescriptor.make(resp.desc);
378 }
379
380 let loadState: LogDog.LogStreamState|undefined;
381 if (resp.state) {
382 loadState = LogDog.LogStreamState.make(resp.state);
383 }
384
385 let logs = (resp.logs || []).map((le) => {
nodir 2017/03/08 07:41:28 nit let logs = (resp.logs || []).map(le =>
dnj 2017/03/08 08:50:15 Done.
386 return LogDog.LogEntry.make(le, desc);
387 });
388 return new FetchResult(logs, loadDesc, loadState);
389 }
390 }
391 }
OLDNEW
« no previous file with comments | « no previous file | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | web/inc/rpc/client.ts » ('J')

Powered by Google App Engine
This is Rietveld 408576698