Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(197)

Unified Diff: web/inc/logdog-stream-view/fetcher.ts

Issue 2717043002: Add LogDog log stream fetcher code. (Closed)
Patch Set: comments, retry "do" Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | web/inc/rpc/client.ts » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: web/inc/logdog-stream-view/fetcher.ts
diff --git a/web/inc/logdog-stream-view/fetcher.ts b/web/inc/logdog-stream-view/fetcher.ts
new file mode 100644
index 0000000000000000000000000000000000000000..4e58048e3dbd79986f152ceb86621bf3df8fa332
--- /dev/null
+++ b/web/inc/logdog-stream-view/fetcher.ts
@@ -0,0 +1,391 @@
+/*
+ Copyright 2016 The LUCI Authors. All rights reserved.
+ Use of this source code is governed under the Apache License, Version 2.0
+ that can be found in the LICENSE file.
+*/
+
+///<reference path="../logdog-stream/logdog.ts" />
+///<reference path="../rpc/client.ts" />
+
+namespace LogDog {
+
+ export type FetcherOptions = {
+ byteCount?: number; logCount?: number; sparse?: boolean;
+ };
+
+ // Type of a "Get" or "Tail" response (protobuf).
+ type GetResponse = {state: any; desc: any; logs: any[];};
+
+ /** The Fetcher's current status. */
+ export enum FetcherStatus {
+ // Not doing anything.
+ IDLE,
+ // Attempting to load log data.
+ LOADING,
+ // We're waiting for the log stream to emit more logs.
+ STREAMING,
+ // The log stream is missing.
+ MISSING,
+ // The log stream encountered an error.
+ ERROR
+ }
+
+ /**
+ * Fetcher is responsible for fetching LogDog log stream entries from the
+ * remote service via an RPC client.
+ *
+ * Fetcher is responsible for wrapping the raw RPC calls and their results,
+ * and retrying calls due to:
+ *
+ * - Transient failures (via RPC client).
+ * - Missing stream (assumption is that the stream is still being ingested and
+ * registered, and therefore a repeated retry is appropriate).
+ * - Streaming stream (log stream is not terminated, but more records are not
+ * yet available).
+ *
+ * The interface that Fetcher presents to its caller is a simple Promise-based
+ * method to retrieve log stream data.
+ *
+ * Fetcher offers fetching via "get", "getAll", and "getLatest".
+ */
+ export class Fetcher {
+ private debug = false;
+ private static maxLogsPerGet = 0;
+
+ private lastDesc: LogDog.LogStreamDescriptor;
+ private lastState: LogDog.LogStreamState;
+
+ private lastStatus: FetcherStatus = FetcherStatus.IDLE;
+ private lastErrorValue: Error|null;
+ private statusChangedCallback: (() => void);
+
+ private static missingRetry: luci.Retry = {delay: 5000, maxDelay: 15000};
+ private static streamingRetry: luci.Retry = {delay: 1000, maxDelay: 5000};
+
+ constructor(
+ private client: luci.Client, readonly stream: LogDog.StreamPath) {}
+
+ get desc() {
+ return this.lastDesc;
+ }
+ get state() {
+ return this.lastState;
+ }
+
+ get status() {
+ return this.lastStatus;
+ }
+
+ /**
+ * Returns the log stream's terminal index.
+ *
+ * If no terminal index is known (the log is still streaming) this will
+ * return -1.
+ */
+ get terminalIndex(): number {
+ return ((this.lastState) ? this.lastState.terminalIndex : -1);
+ }
+
+ /** Archived returns true if this log stream is known to be archived. */
+ get archived(): boolean {
+ return (!!(this.lastState && this.lastState.archive));
+ }
+
+ get lastError(): Error|null {
+ return this.lastErrorValue;
+ }
+
+ private updateStatus(st: FetcherStatus, err?: Error) {
+ if (st !== this.lastStatus || err !== this.lastErrorValue) {
+ this.lastStatus = st;
+ this.lastErrorValue = (err || null);
+
+ if (this.statusChangedCallback) {
+ this.statusChangedCallback();
+ };
+ }
+ }
+
+ /**
+ * Sets the status changed callback, which will be invoked whenever the
+ * Fetcher's status has changed.
+ */
+ setStatusChangedCallback(fn: () => void) {
+ this.statusChangedCallback = fn;
+ }
+
+ /**
+ * Returns a Promise that will resolve to the next block of logs in the
+ * stream.
+ *
+ * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the
+ * next block of logs in the stream.
+ */
+ get(index: number, opts: FetcherOptions): Promise<LogDog.LogEntry[]> {
+ return this.getIndex(index, opts);
+ }
+
+ /**
+ * Returns a Promise that will resolve to "count" log entries starting at
+ * "startIndex".
+ *
+ * If multiple RPC calls are required to retrieve "count" entries, these
+ * will be scheduled, and the Promise will block until the full set of
+ * requested stream entries is retrieved.
+ */
+ getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> {
+ // Request the tail walkback logs. Since our request for N logs may return
+ // <N logs, we will repeat the request until all requested logs have been
+ // obtained.
+ let allLogs: LogDog.LogEntry[] = [];
+
+ let getIter = (): Promise<LogDog.LogEntry[]> => {
+ if (count <= 0) {
+ return Promise.resolve(allLogs);
+ }
+
+ // Perform Gets until we have the requested number of logs. We don't
+ // have to constrain the "logCount" parameter b/c we automatically do
+ // that in getIndex.
+ let opts: FetcherOptions = {
+ logCount: count,
+ sparse: true,
+ };
+ return this.getIndex(startIndex, opts).then((logs) => {
+ if (logs) {
+ allLogs.push.apply(allLogs, logs);
nodir 2017/03/08 07:41:28 nit, in my experience this is usually written as A
dnj 2017/03/08 08:50:14 TypeScript complains: Property 'push' does not exi
nodir 2017/03/13 19:48:21 :(
+ startIndex += logs.length;
+ count -= logs.length;
+ }
+ return getIter();
nodir 2017/03/08 07:41:28 return resolved promise instead of recursive if co
dnj 2017/03/08 08:50:14 Done.
+ });
nodir 2017/03/08 07:41:28 then return allLogs? currently allLogs is not used
dnj 2017/03/08 08:50:14 It's used above (see line #144).
nodir 2017/03/13 19:48:21 Acknowledged.
+ };
+ return getIter();
+ }
+
+ /**
+ * Fetches the latest log entry.
+ */
+ getLatest(): Promise<LogDog.LogEntry[]> {
+ let errNoLogs = new Error('no logs, streaming');
+ let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry);
+ return streamingRetry.do(
+ () => {
+ return this.doTail().then((logs) => {
+ if (!(logs && logs.length)) {
+ throw errNoLogs;
+ }
+ return logs;
+ });
+ },
+ (err: Error, delay: number) => {
+ if (err !== errNoLogs) {
+ throw err;
+ }
+
+ // No logs were returned, and we expect logs, so we're streaming.
+ // Try again after a delay.
+ this.updateStatus(FetcherStatus.STREAMING);
+ console.warn(
+ this.stream,
+ `: No logs returned; retrying after ${delay}ms...`);
+ });
+ }
+
+ private getIndex(index: number, opts: FetcherOptions):
+ Promise<LogDog.LogEntry[]> {
+ // (Testing) Constrain our max logs, if set.
+ if (Fetcher.maxLogsPerGet > 0) {
+ if (!opts) {
nodir 2017/03/08 07:41:28 doGet assumes opts is not null. doGet is called be
dnj 2017/03/08 08:50:14 Actually now that I do strict null checks, I can d
+ opts = {};
+ }
+ if ((!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet) {
+ opts.logCount = Fetcher.maxLogsPerGet;
+ }
+ }
+
+ // We will retry continuously until we get a log (streaming).
+ let streamingRetry = new luci.RetryIterator(Fetcher.streamingRetry);
+ let errNoLogs = new Error('no logs, streaming');
+ return streamingRetry
+ .do(
+ () => {
+ // If we're asking for a log beyond our stream, don't bother.
+ if (this.terminalIndex >= 0 && index > this.terminalIndex) {
+ return Promise.resolve([]);
+ }
+
+ return this.doGet(index, opts).then((logs) => {
nodir 2017/03/08 07:41:28 nit: parens around parameters of a lambda expressi
dnj 2017/03/08 08:50:15 Done.
+ if (!(logs && !logs.length)) {
nodir 2017/03/08 07:41:28 this is hard to read. This is equivalent to if (!
dnj 2017/03/08 08:50:15 Actually that inner "!" was a bug, it should be: !
+ // (Retry)
+ throw errNoLogs;
+ }
+
+ return logs;
+ });
+ },
+ (err: Error, delay: number) => {
+ if (err !== errNoLogs) {
+ throw err;
+ }
+
+ // No logs were returned, and we expect logs, so we're
+ // streaming. Try again after a delay.
+ this.updateStatus(FetcherStatus.STREAMING);
nodir 2017/03/08 07:41:28 all mutations (like updateStatus calls) in callbac
dnj 2017/03/08 08:50:14 Originally I didn't really care, but I think you'r
nodir 2017/03/13 19:48:21 great
+ console.warn(
+ this.stream,
+ `: No logs returned; retrying after ${delay}ms...`);
+ })
+ .then((logs) => {
nodir 2017/03/08 07:41:28 nit: unnecessary parens
dnj 2017/03/08 08:50:15 Done.
+ // Since we allow non-contiguous Get, we may get back more logs than
+ // we actually expected. Prune any such additional.
+ if (opts.logCount && opts.logCount > 0) {
nodir 2017/03/08 07:41:28 shouldn't this check opts.nonContiguous? currently
dnj 2017/03/08 08:50:14 Not in this function. Probably should, Done.
+ let maxStreamIndex = index + opts.logCount - 1;
nodir 2017/03/08 07:41:28 i don't fully understand this. Can you document lo
dnj 2017/03/08 08:50:14 Done.
+ logs = logs.filter((le) => {
nodir 2017/03/08 07:41:28 logs = logs.filter(le => le.streamIndex <= maxStre
dnj 2017/03/08 08:50:14 Oh nice, done.
+ return le.streamIndex <= maxStreamIndex;
+ });
+ }
+ return logs;
+ });
+ }
+
+ private doGet(index: number, opts: FetcherOptions):
+ Promise<LogDog.LogEntry[]> {
+ let request: {
+ project: string; path: string; state: boolean; index: number;
+
+ nonContiguous?: boolean;
+ byteCount?: number;
+ logCount?: number;
+ } = {
+ project: this.stream.project,
+ path: this.stream.path,
+ state: (this.terminalIndex < 0),
+ index: index,
+ };
+ if (opts.sparse || this.archived) {
+ // This log stream is archived. We will relax the contiguous requirement
+ // so we can render sparse log streams.
+ request.nonContiguous = true;
+ }
+ if (opts) {
nodir 2017/03/08 07:41:28 opts is not falsy since L266 didn't explode
dnj 2017/03/08 08:50:14 Ah yeah, it used to be allowed to be null, but not
+ if (opts.byteCount && opts.byteCount > 0) {
+ request.byteCount = opts.byteCount;
+ }
+ if (opts.logCount && opts.logCount > 0) {
+ request.logCount = opts.logCount;
+ }
+ }
+
+ if (this.debug) {
+ console.log('logdog.Logs.Get:', request);
+ }
+
+ // Perform our Get, waiting until the stream actually exists.
+ let missingRetry = new luci.RetryIterator(Fetcher.missingRetry);
+ return missingRetry
+ .do(
+ () => {
+ this.updateStatus(FetcherStatus.LOADING);
+ return this.client.call('logdog.Logs', 'Get', request);
+ },
+ this.doRetryIfMissing)
+ .then((resp: GetResponse) => {
+ let fr = FetchResult.make(resp, this.lastDesc);
+ return this.afterProcessResult(fr);
+ });
+ }
+
+ private doTail(): Promise<LogDog.LogEntry[]> {
+ let request: {project: string; path: string; state: boolean;} = {
+ project: this.stream.project,
+ path: this.stream.path,
+ state: (this.terminalIndex < 0),
+ };
+
+ if (this.debug) {
+ console.log('logdog.Logs.Tail:', request);
+ }
+
+ let missingRetry = new luci.RetryIterator(Fetcher.missingRetry);
+ return missingRetry
+ .do(
+ () => {
+ this.updateStatus(FetcherStatus.LOADING);
+ return this.client.call('logdog.Logs', 'Tail', request);
+ },
+ this.doRetryIfMissing)
+ .then((resp: GetResponse) => {
+ let fr = FetchResult.make(resp, this.lastDesc);
+ return this.afterProcessResult(fr);
+ });
+ }
+
+ private afterProcessResult(fr: FetchResult): LogDog.LogEntry[] {
+ if (this.debug) {
+ if (fr.logs.length) {
+ console.log(
+ 'Request returned:', fr.logs[0].streamIndex, '..',
+ fr.logs[fr.logs.length - 1].streamIndex, fr.desc, fr.state);
+ } else {
+ console.log('Request returned no logs:', fr.desc, fr.state);
+ }
+ }
+
+ this.updateStatus(FetcherStatus.IDLE);
nodir 2017/03/08 07:41:28 probably a race
dnj 2017/03/08 08:50:15 Done.
+ if (fr.desc) {
+ this.lastDesc = fr.desc;
nodir 2017/03/08 07:41:28 here too
dnj 2017/03/08 08:50:14 Done.
+ }
+ if (fr.state) {
+ this.lastState = fr.state;
+ }
+ return fr.logs;
+ }
+
+ private doRetryIfMissing(err: Error, delay: number) {
+ // Is this a gRPC Error?
+ let grpc = luci.GrpcError.convert(err);
+ if (grpc && grpc.code === luci.Code.NOT_FOUND) {
+ this.updateStatus(FetcherStatus.MISSING);
+
+ console.warn(
+ this.stream, ': Is not found:', err,
+ `; retrying after ${delay}ms...`);
+ }
nodir 2017/03/08 07:41:28 return
dnj 2017/03/08 08:50:15 Done.
+
+ this.updateStatus(FetcherStatus.ERROR, err);
+ throw err;
+ }
+ }
+
+ /**
+ * The result of a log stream fetch, for internal usage.
+ *
+ * It will include zero or more log entries, and optionally (if requested)
+ * the log stream's descriptor and state.
+ */
+ class FetchResult {
+ constructor(
+ readonly logs: LogDog.LogEntry[],
+ readonly desc?: LogDog.LogStreamDescriptor,
+ readonly state?: LogDog.LogStreamState) {}
+
+ static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor):
+ FetchResult {
+ let loadDesc: LogDog.LogStreamDescriptor|undefined;
+ if (resp.desc) {
+ desc = loadDesc = LogDog.LogStreamDescriptor.make(resp.desc);
+ }
+
+ let loadState: LogDog.LogStreamState|undefined;
+ if (resp.state) {
+ loadState = LogDog.LogStreamState.make(resp.state);
+ }
+
+ let logs = (resp.logs || []).map((le) => {
nodir 2017/03/08 07:41:28 nit let logs = (resp.logs || []).map(le =>
dnj 2017/03/08 08:50:15 Done.
+ return LogDog.LogEntry.make(le, desc);
+ });
+ return new FetchResult(logs, loadDesc, loadState);
+ }
+ }
+}
« no previous file with comments | « no previous file | web/inc/logdog-stream-view/logdog-stream-fetcher.html » ('j') | web/inc/rpc/client.ts » ('J')

Powered by Google App Engine
This is Rietveld 408576698