Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(216)

Side by Side Diff: web/inc/logdog-stream-view/fetcher.ts

Issue 2570963003: Revert of Rewrite LogDog log viewer app. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 Copyright 2016 The LUCI Authors. All rights reserved.
3 Use of this source code is governed under the Apache License, Version 2.0
4 that can be found in the LICENSE file.
5 */
6
7 import {luci_rpc} from "rpc/client";
8 import * as luci_sleep_promise from "luci-sleep-promise/promise";
9 import {LogDog} from "logdog-stream/logdog";
10
11 export type FetcherOptions = {
12 byteCount?: number;
13 logCount?: number;
14 sparse?: boolean;
15 }
16
17 // Type of a "Get" or "Tail" response (protobuf).
18 type GetResponse = {
19 state: any;
20 desc: any;
21 logs: any[];
22 };
23
24 /** The Fetcher's current status. */
25 export enum FetcherStatus {
26 // Not doing anything.
27 IDLE,
28 // Attempting to load log data.
29 LOADING,
30 // We're waiting for the log stream to emit more logs.
31 STREAMING,
32 // The log stream is missing.
33 MISSING,
34 // The log stream encountered an error.
35 ERROR
36 }
37
38 export class Fetcher {
39 private client: luci_rpc.Client;
40
41 private debug: boolean = false;
42 private static maxLogsPerGet = 0;
43
44 private lastDesc: LogDog.LogStreamDescriptor;
45 private lastState: LogDog.LogStreamState;
46 private activePromise: Promise<LogDog.LogEntry[]>;
47
48 private currentStatus: FetcherStatus = FetcherStatus.IDLE;
49 private _lastError: Error;
50 private statusChangedCallback: (() => void);
51
52 private static missingRetry: luci_rpc.Retry = new luci_rpc.Retry(
53 new luci_rpc.RetryIterator(null, 5000, 15000));
54
55 private static streamingRetry = new luci_rpc.Retry(
56 new luci_rpc.RetryIterator(null, 1000, 5000));
57
58 constructor(client: luci_rpc.Client, readonly stream: LogDog.Stream) {
59 this.client = client;
60 }
61
62 get desc() { return this.lastDesc; }
63 get state() { return this.lastState; }
64
65 get status() { return this.currentStatus; }
66
67 /**
68 * Returns the log stream's terminal index.
69 *
70 * If no terminal index is known, or if the log stream is still streaming,
71 * this will return -1.
72 */
73 get terminalIndex(): number {
74 return ( ( this.lastState ) ? this.lastState.terminalIndex : -1 );
75 }
76
77 /** Archived returns true if this log stream is known to be archived. */
78 get archived(): boolean {
79 return ( !! (this.lastState && this.lastState.archive) );
80 }
81
82 get lastError(): Error { return this._lastError; }
83
84 private setCurrentStatus(st: FetcherStatus, err?: Error) {
85 if ( st !== this.currentStatus || err !== this._lastError ) {
86 this.currentStatus = st;
87 this._lastError = err;
88
89 if ( this.statusChangedCallback ) {
90 this.statusChangedCallback();
91 };
92 }
93 }
94
95 setStatusChangedCallback(fn: () => void) {
96 this.statusChangedCallback = fn;
97 }
98
99 /**
100 * Returns a Promise that resolves to the next block of logs in the stream.
101 *
102 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the
103 * next block of logs in the stream, or null if there are no logs to
104 * return.
105 */
106 get(index: number, opts?: FetcherOptions): Promise<LogDog.LogEntry[]> {
107 return this.getIndex(index, opts);
108 }
109
110 getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> {
111 // Request the tail walkback logs. Since our request for N logs may return
112 // <N logs, we will repeat the request until all requested logs have been
113 // obtained.
114 let allLogs = new Array<LogDog.LogEntry>();
115
116 let getIter = (): Promise<LogDog.LogEntry[]> => {
117 if ( count <= 0 ) {
118 return Promise.resolve(allLogs);
119 }
120
121 // Perform Gets until we have the requested number of logs. We don't have
122 // to constrain the "logCount" parameter b/c we automatically do that in
123 // getIndex.
124 let opts: FetcherOptions = {
125 logCount: count,
126 sparse: true,
127 };
128 return this.getIndex(startIndex, opts).then( (logs) => {
129 if ( logs ) {
130 allLogs.push.apply(allLogs, logs);
131 startIndex += logs.length;
132 count -= logs.length;
133 }
134 return getIter();
135 });
136 };
137 return getIter();
138 }
139
140 tail(): Promise<LogDog.LogEntry[]> {
141 let streamingRetry = Fetcher.streamingRetry.iterator();
142 let tryTail = (): Promise<LogDog.LogEntry[]> => {
143 return this.doTail().then( (logs): Promise<LogDog.LogEntry[]> => {
144 if ( logs && logs.length ) {
145 return Promise.resolve(logs);
146 }
147
148 // No logs were returned, and we expect logs, so we're streaming. Try
149 // again after a delay.
150 this.setCurrentStatus(FetcherStatus.STREAMING);
151 let delay = streamingRetry.next();
152 console.warn(this.stream,
153 `: No logs returned; retrying after ${delay}ms...`);
154 return luci_sleep_promise.sleep(delay).then( () => {
155 return tryTail();
156 });
157 });
158 };
159 return tryTail();
160 }
161
162
163 private getIndex(index: number, opts: FetcherOptions):
164 Promise<LogDog.LogEntry[]> {
165
166 // (Testing) Constrain our max logs, if set.
167 if ( Fetcher.maxLogsPerGet > 0 ) {
168 if ( ! opts ) {
169 opts = {};
170 }
171 if ( (!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet ) {
172 opts.logCount = Fetcher.maxLogsPerGet;
173 }
174 }
175
176 // We will retry continuously until we get a log (streaming).
177 let streamingRetry = Fetcher.streamingRetry.iterator();
178 let tryGet = (): Promise<LogDog.LogEntry[]> => {
179 // If we're asking for a log beyond our stream, don't bother.
180 if ( this.terminalIndex >= 0 && index > this.terminalIndex ) {
181 return Promise.resolve(null);
182 }
183
184 return this.doGet(index, opts).
185 then( (logs) => {
186 if ( logs && logs.length ) {
187 // Since we allow non-contiguous Get, we may get back more logs than
188 // we actually expected. Prune any such additional.
189 if ( opts.logCount > 0 ) {
190 let maxStreamIndex = index + opts.logCount - 1;
191 logs = logs.filter( (le) => {
192 return le.streamIndex <= maxStreamIndex;
193 } );
194 }
195
196 return Promise.resolve(logs);
197 }
198
199 // No logs were returned, and we expect logs, so we're streaming. Try
200 // again after a delay.
201 this.setCurrentStatus(FetcherStatus.STREAMING);
202 let delay = streamingRetry.next();
203 console.warn(this.stream,
204 `: No logs returned; retrying after ${delay}ms...`);
205 return luci_sleep_promise.sleep(delay).then( () => {
206 return tryGet();
207 });
208 });
209 };
210 return tryGet();
211 }
212
213 private doGet(index: number, opts: FetcherOptions):
214 Promise<LogDog.LogEntry[]> {
215
216 let request: {
217 project: string;
218 path: string;
219 state: boolean;
220 index: number;
221
222 nonContiguous?: boolean;
223 byteCount?: number;
224 logCount?: number;
225 } = {
226 project: this.stream.project,
227 path: this.stream.path,
228 state: (this.terminalIndex < 0),
229 index: index,
230 };
231 if ( opts.sparse || this.archived ) {
232 // This log stream is archived. We will relax the contiguous requirement
233 // so we can render sparse log streams.
234 request.nonContiguous = true;
235 }
236 if ( opts ) {
237 if ( opts.byteCount > 0 ) {
238 request.byteCount = opts.byteCount;
239 }
240 if ( opts.logCount > 0 ) {
241 request.logCount = opts.logCount;
242 }
243 }
244
245 if ( this.debug ) {
246 console.log("logdog.Logs.Get:", request);
247 }
248
249 // Perform our Get, waiting until the stream actually exists.
250 return this.doRetryIfMissing( (): Promise<FetchResult> => {
251 return this.client.call("logdog.Logs", "Get", request).
252 then( (resp: GetResponse): FetchResult => {
253 return FetchResult.make(resp, this.lastDesc);
254 });
255 }).then( (fr) => {
256 return this.afterProcessResult(fr);
257 });
258 }
259
260 private doTail(): Promise<LogDog.LogEntry[]> {
261 let request: {
262 project: string;
263 path: string;
264 state: boolean;
265 } = {
266 project: this.stream.project,
267 path: this.stream.path,
268 state: (this.terminalIndex < 0),
269 };
270
271 if ( this.debug ) {
272 console.log("logdog.Logs.Tail:", request);
273 }
274
275 return this.doRetryIfMissing( (): Promise<FetchResult> => {
276 return this.client.call("logdog.Logs", "Tail", request).
277 then( (resp: GetResponse): FetchResult => {
278 return FetchResult.make(resp, this.lastDesc);
279 });
280 }).then( (fr) => {
281 return this.afterProcessResult(fr);
282 });
283 }
284
285 private afterProcessResult(fr: FetchResult): LogDog.LogEntry[] {
286 if ( this.debug ) {
287 if ( fr.logs.length ) {
288 console.log("Request returned:", fr.logs[0].streamIndex, "..",
289 fr.logs[fr.logs.length-1].streamIndex, fr.desc, fr.state);
290 } else {
291 console.log("Request returned no logs:", fr.desc, fr.state);
292 }
293 }
294
295 this.setCurrentStatus(FetcherStatus.IDLE);
296 if ( fr.desc ) {
297 this.lastDesc = fr.desc;
298 }
299 if ( fr.state ) {
300 this.lastState = fr.state;
301 }
302 return fr.logs;
303 }
304
305 private doRetryIfMissing(fn: () => Promise<FetchResult>):
306 Promise<FetchResult> {
307
308 let missingRetry = Fetcher.missingRetry.iterator();
309
310 let doIt = (): Promise<FetchResult> => {
311 this.setCurrentStatus(FetcherStatus.LOADING);
312
313 return fn().catch( (err: Error) => {
314 // Is this a gRPC Error?
315 let grpc = luci_rpc.GrpcError.convert(err);
316 if ( grpc && grpc.code === luci_rpc.Code.NOT_FOUND ) {
317 this.setCurrentStatus(FetcherStatus.MISSING);
318
319 let delay = missingRetry.next();
320 console.warn(this.stream, ": Is not found:", err,
321 `; retrying after ${delay}ms...`);
322 return luci_sleep_promise.sleep(delay).then( () => {
323 return doIt();
324 });
325 }
326
327 this.setCurrentStatus(FetcherStatus.ERROR, err);
328 throw err;
329 });
330 };
331 return doIt();
332 }
333 }
334
335 export class FetchResult {
336 constructor(readonly logs: LogDog.LogEntry[],
337 readonly desc?: LogDog.LogStreamDescriptor,
338 readonly state?: LogDog.LogStreamState) {}
339
340 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor):
341 FetchResult {
342
343 let loadDesc: LogDog.LogStreamDescriptor;
344 if ( resp.desc ) {
345 desc = loadDesc = LogDog.makeLogStreamDescriptor(resp.desc);
346 }
347
348 let loadState: LogDog.LogStreamState;
349 if ( resp.state ) {
350 loadState = LogDog.makeLogStreamState( resp.state );
351 }
352
353 let logs = (resp.logs || []).map( (le) => {
354 return LogDog.makeLogEntry(le, desc);
355 });
356 return new FetchResult(logs, loadDesc, loadState);
357 }
358
359 }
OLDNEW
« no previous file with comments | « web/inc/logdog-app-base/logdog-app-base.html ('k') | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698