| OLD | NEW |
| (Empty) | |
| 1 /* |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 Use of this source code is governed under the Apache License, Version 2.0 |
| 4 that can be found in the LICENSE file. |
| 5 */ |
| 6 |
| 7 import {luci_rpc} from "rpc/client"; |
| 8 import * as luci_sleep_promise from "luci-sleep-promise/promise"; |
| 9 import {LogDog} from "logdog-stream/logdog"; |
| 10 |
| 11 export type FetcherOptions = { |
| 12 byteCount?: number; |
| 13 logCount?: number; |
| 14 sparse?: boolean; |
| 15 } |
| 16 |
| 17 // Type of a "Get" or "Tail" response (protobuf). |
| 18 type GetResponse = { |
| 19 state: any; |
| 20 desc: any; |
| 21 logs: any[]; |
| 22 }; |
| 23 |
| 24 /** The Fetcher's current status. */ |
| 25 export enum FetcherStatus { |
| 26 // Not doing anything. |
| 27 IDLE, |
| 28 // Attempting to load log data. |
| 29 LOADING, |
| 30 // We're waiting for the log stream to emit more logs. |
| 31 STREAMING, |
| 32 // The log stream is missing. |
| 33 MISSING, |
| 34 // The log stream encountered an error. |
| 35 ERROR |
| 36 } |
| 37 |
| 38 export class Fetcher { |
| 39 private client: luci_rpc.Client; |
| 40 |
| 41 private debug: boolean = false; |
| 42 private static maxLogsPerGet = 0; |
| 43 |
| 44 private lastDesc: LogDog.LogStreamDescriptor; |
| 45 private lastState: LogDog.LogStreamState; |
| 46 private activePromise: Promise<LogDog.LogEntry[]>; |
| 47 |
| 48 private currentStatus: FetcherStatus = FetcherStatus.IDLE; |
| 49 private _lastError: Error; |
| 50 private statusChangedCallback: (() => void); |
| 51 |
| 52 private static missingRetry: luci_rpc.Retry = new luci_rpc.Retry( |
| 53 new luci_rpc.RetryIterator(null, 5000, 15000)); |
| 54 |
| 55 private static streamingRetry = new luci_rpc.Retry( |
| 56 new luci_rpc.RetryIterator(null, 1000, 5000)); |
| 57 |
| 58 constructor(client: luci_rpc.Client, readonly stream: LogDog.Stream) { |
| 59 this.client = client; |
| 60 } |
| 61 |
| 62 get desc() { return this.lastDesc; } |
| 63 get state() { return this.lastState; } |
| 64 |
| 65 get status() { return this.currentStatus; } |
| 66 |
| 67 /** |
| 68 * Returns the log stream's terminal index. |
| 69 * |
| 70 * If no terminal index is known, or if the log stream is still streaming, |
| 71 * this will return -1. |
| 72 */ |
| 73 get terminalIndex(): number { |
| 74 return ( ( this.lastState ) ? this.lastState.terminalIndex : -1 ); |
| 75 } |
| 76 |
| 77 /** Archived returns true if this log stream is known to be archived. */ |
| 78 get archived(): boolean { |
| 79 return ( !! (this.lastState && this.lastState.archive) ); |
| 80 } |
| 81 |
| 82 get lastError(): Error { return this._lastError; } |
| 83 |
| 84 private setCurrentStatus(st: FetcherStatus, err?: Error) { |
| 85 if ( st !== this.currentStatus || err !== this._lastError ) { |
| 86 this.currentStatus = st; |
| 87 this._lastError = err; |
| 88 |
| 89 if ( this.statusChangedCallback ) { |
| 90 this.statusChangedCallback(); |
| 91 }; |
| 92 } |
| 93 } |
| 94 |
| 95 setStatusChangedCallback(fn: () => void) { |
| 96 this.statusChangedCallback = fn; |
| 97 } |
| 98 |
| 99 /** |
| 100 * Returns a Promise that resolves to the next block of logs in the stream. |
| 101 * |
| 102 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the |
| 103 * next block of logs in the stream, or null if there are no logs to |
| 104 * return. |
| 105 */ |
| 106 get(index: number, opts?: FetcherOptions): Promise<LogDog.LogEntry[]> { |
| 107 return this.getIndex(index, opts); |
| 108 } |
| 109 |
| 110 getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> { |
| 111 // Request the tail walkback logs. Since our request for N logs may return |
| 112 // <N logs, we will repeat the request until all requested logs have been |
| 113 // obtained. |
| 114 let allLogs = new Array<LogDog.LogEntry>(); |
| 115 |
| 116 let getIter = (): Promise<LogDog.LogEntry[]> => { |
| 117 if ( count <= 0 ) { |
| 118 return Promise.resolve(allLogs); |
| 119 } |
| 120 |
| 121 // Perform Gets until we have the requested number of logs. We don't have |
| 122 // to constrain the "logCount" parameter b/c we automatically do that in |
| 123 // getIndex. |
| 124 let opts: FetcherOptions = { |
| 125 logCount: count, |
| 126 sparse: true, |
| 127 }; |
| 128 return this.getIndex(startIndex, opts).then( (logs) => { |
| 129 if ( logs ) { |
| 130 allLogs.push.apply(allLogs, logs); |
| 131 startIndex += logs.length; |
| 132 count -= logs.length; |
| 133 } |
| 134 return getIter(); |
| 135 }); |
| 136 }; |
| 137 return getIter(); |
| 138 } |
| 139 |
| 140 tail(): Promise<LogDog.LogEntry[]> { |
| 141 let streamingRetry = Fetcher.streamingRetry.iterator(); |
| 142 let tryTail = (): Promise<LogDog.LogEntry[]> => { |
| 143 return this.doTail().then( (logs): Promise<LogDog.LogEntry[]> => { |
| 144 if ( logs && logs.length ) { |
| 145 return Promise.resolve(logs); |
| 146 } |
| 147 |
| 148 // No logs were returned, and we expect logs, so we're streaming. Try |
| 149 // again after a delay. |
| 150 this.setCurrentStatus(FetcherStatus.STREAMING); |
| 151 let delay = streamingRetry.next(); |
| 152 console.warn(this.stream, |
| 153 `: No logs returned; retrying after ${delay}ms...`); |
| 154 return luci_sleep_promise.sleep(delay).then( () => { |
| 155 return tryTail(); |
| 156 }); |
| 157 }); |
| 158 }; |
| 159 return tryTail(); |
| 160 } |
| 161 |
| 162 |
| 163 private getIndex(index: number, opts: FetcherOptions): |
| 164 Promise<LogDog.LogEntry[]> { |
| 165 |
| 166 // (Testing) Constrain our max logs, if set. |
| 167 if ( Fetcher.maxLogsPerGet > 0 ) { |
| 168 if ( ! opts ) { |
| 169 opts = {}; |
| 170 } |
| 171 if ( (!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet ) { |
| 172 opts.logCount = Fetcher.maxLogsPerGet; |
| 173 } |
| 174 } |
| 175 |
| 176 // We will retry continuously until we get a log (streaming). |
| 177 let streamingRetry = Fetcher.streamingRetry.iterator(); |
| 178 let tryGet = (): Promise<LogDog.LogEntry[]> => { |
| 179 // If we're asking for a log beyond our stream, don't bother. |
| 180 if ( this.terminalIndex >= 0 && index > this.terminalIndex ) { |
| 181 return Promise.resolve(null); |
| 182 } |
| 183 |
| 184 return this.doGet(index, opts). |
| 185 then( (logs) => { |
| 186 if ( logs && logs.length ) { |
| 187 // Since we allow non-contiguous Get, we may get back more logs than |
| 188 // we actually expected. Prune any such additional. |
| 189 if ( opts.logCount > 0 ) { |
| 190 let maxStreamIndex = index + opts.logCount - 1; |
| 191 logs = logs.filter( (le) => { |
| 192 return le.streamIndex <= maxStreamIndex; |
| 193 } ); |
| 194 } |
| 195 |
| 196 return Promise.resolve(logs); |
| 197 } |
| 198 |
| 199 // No logs were returned, and we expect logs, so we're streaming. Try |
| 200 // again after a delay. |
| 201 this.setCurrentStatus(FetcherStatus.STREAMING); |
| 202 let delay = streamingRetry.next(); |
| 203 console.warn(this.stream, |
| 204 `: No logs returned; retrying after ${delay}ms...`); |
| 205 return luci_sleep_promise.sleep(delay).then( () => { |
| 206 return tryGet(); |
| 207 }); |
| 208 }); |
| 209 }; |
| 210 return tryGet(); |
| 211 } |
| 212 |
| 213 private doGet(index: number, opts: FetcherOptions): |
| 214 Promise<LogDog.LogEntry[]> { |
| 215 |
| 216 let request: { |
| 217 project: string; |
| 218 path: string; |
| 219 state: boolean; |
| 220 index: number; |
| 221 |
| 222 nonContiguous?: boolean; |
| 223 byteCount?: number; |
| 224 logCount?: number; |
| 225 } = { |
| 226 project: this.stream.project, |
| 227 path: this.stream.path, |
| 228 state: (this.terminalIndex < 0), |
| 229 index: index, |
| 230 }; |
| 231 if ( opts.sparse || this.archived ) { |
| 232 // This log stream is archived. We will relax the contiguous requirement |
| 233 // so we can render sparse log streams. |
| 234 request.nonContiguous = true; |
| 235 } |
| 236 if ( opts ) { |
| 237 if ( opts.byteCount > 0 ) { |
| 238 request.byteCount = opts.byteCount; |
| 239 } |
| 240 if ( opts.logCount > 0 ) { |
| 241 request.logCount = opts.logCount; |
| 242 } |
| 243 } |
| 244 |
| 245 if ( this.debug ) { |
| 246 console.log("logdog.Logs.Get:", request); |
| 247 } |
| 248 |
| 249 // Perform our Get, waiting until the stream actually exists. |
| 250 return this.doRetryIfMissing( (): Promise<FetchResult> => { |
| 251 return this.client.call("logdog.Logs", "Get", request). |
| 252 then( (resp: GetResponse): FetchResult => { |
| 253 return FetchResult.make(resp, this.lastDesc); |
| 254 }); |
| 255 }).then( (fr) => { |
| 256 return this.afterProcessResult(fr); |
| 257 }); |
| 258 } |
| 259 |
| 260 private doTail(): Promise<LogDog.LogEntry[]> { |
| 261 let request: { |
| 262 project: string; |
| 263 path: string; |
| 264 state: boolean; |
| 265 } = { |
| 266 project: this.stream.project, |
| 267 path: this.stream.path, |
| 268 state: (this.terminalIndex < 0), |
| 269 }; |
| 270 |
| 271 if ( this.debug ) { |
| 272 console.log("logdog.Logs.Tail:", request); |
| 273 } |
| 274 |
| 275 return this.doRetryIfMissing( (): Promise<FetchResult> => { |
| 276 return this.client.call("logdog.Logs", "Tail", request). |
| 277 then( (resp: GetResponse): FetchResult => { |
| 278 return FetchResult.make(resp, this.lastDesc); |
| 279 }); |
| 280 }).then( (fr) => { |
| 281 return this.afterProcessResult(fr); |
| 282 }); |
| 283 } |
| 284 |
| 285 private afterProcessResult(fr: FetchResult): LogDog.LogEntry[] { |
| 286 if ( this.debug ) { |
| 287 if ( fr.logs.length ) { |
| 288 console.log("Request returned:", fr.logs[0].streamIndex, "..", |
| 289 fr.logs[fr.logs.length-1].streamIndex, fr.desc, fr.state); |
| 290 } else { |
| 291 console.log("Request returned no logs:", fr.desc, fr.state); |
| 292 } |
| 293 } |
| 294 |
| 295 this.setCurrentStatus(FetcherStatus.IDLE); |
| 296 if ( fr.desc ) { |
| 297 this.lastDesc = fr.desc; |
| 298 } |
| 299 if ( fr.state ) { |
| 300 this.lastState = fr.state; |
| 301 } |
| 302 return fr.logs; |
| 303 } |
| 304 |
| 305 private doRetryIfMissing(fn: () => Promise<FetchResult>): |
| 306 Promise<FetchResult> { |
| 307 |
| 308 let missingRetry = Fetcher.missingRetry.iterator(); |
| 309 |
| 310 let doIt = (): Promise<FetchResult> => { |
| 311 this.setCurrentStatus(FetcherStatus.LOADING); |
| 312 |
| 313 return fn().catch( (err: Error) => { |
| 314 // Is this a gRPC Error? |
| 315 let grpc = luci_rpc.GrpcError.convert(err); |
| 316 if ( grpc && grpc.code === luci_rpc.Code.NOT_FOUND ) { |
| 317 this.setCurrentStatus(FetcherStatus.MISSING); |
| 318 |
| 319 let delay = missingRetry.next(); |
| 320 console.warn(this.stream, ": Is not found:", err, |
| 321 `; retrying after ${delay}ms...`); |
| 322 return luci_sleep_promise.sleep(delay).then( () => { |
| 323 return doIt(); |
| 324 }); |
| 325 } |
| 326 |
| 327 this.setCurrentStatus(FetcherStatus.ERROR, err); |
| 328 throw err; |
| 329 }); |
| 330 }; |
| 331 return doIt(); |
| 332 } |
| 333 } |
| 334 |
| 335 export class FetchResult { |
| 336 constructor(readonly logs: LogDog.LogEntry[], |
| 337 readonly desc?: LogDog.LogStreamDescriptor, |
| 338 readonly state?: LogDog.LogStreamState) {} |
| 339 |
| 340 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor): |
| 341 FetchResult { |
| 342 |
| 343 let loadDesc: LogDog.LogStreamDescriptor; |
| 344 if ( resp.desc ) { |
| 345 desc = loadDesc = LogDog.makeLogStreamDescriptor(resp.desc); |
| 346 } |
| 347 |
| 348 let loadState: LogDog.LogStreamState; |
| 349 if ( resp.state ) { |
| 350 loadState = LogDog.makeLogStreamState( resp.state ); |
| 351 } |
| 352 |
| 353 let logs = (resp.logs || []).map( (le) => { |
| 354 return LogDog.makeLogEntry(le, desc); |
| 355 }); |
| 356 return new FetchResult(logs, loadDesc, loadState); |
| 357 } |
| 358 |
| 359 } |
| OLD | NEW |