| OLD | NEW |
| (Empty) |
| 1 /* | |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. | |
| 3 Use of this source code is governed under the Apache License, Version 2.0 | |
| 4 that can be found in the LICENSE file. | |
| 5 */ | |
| 6 | |
| 7 import {luci_rpc} from "rpc/client"; | |
| 8 import * as luci_sleep_promise from "luci-sleep-promise/promise"; | |
| 9 import {LogDog} from "logdog-stream/logdog"; | |
| 10 | |
| 11 export type FetcherOptions = { | |
| 12 byteCount?: number; | |
| 13 logCount?: number; | |
| 14 sparse?: boolean; | |
| 15 } | |
| 16 | |
| 17 // Type of a "Get" or "Tail" response (protobuf). | |
| 18 type GetResponse = { | |
| 19 state: any; | |
| 20 desc: any; | |
| 21 logs: any[]; | |
| 22 }; | |
| 23 | |
| 24 /** The Fetcher's current status. */ | |
| 25 export enum FetcherStatus { | |
| 26 // Not doing anything. | |
| 27 IDLE, | |
| 28 // Attempting to load log data. | |
| 29 LOADING, | |
| 30 // We're waiting for the log stream to emit more logs. | |
| 31 STREAMING, | |
| 32 // The log stream is missing. | |
| 33 MISSING, | |
| 34 // The log stream encountered an error. | |
| 35 ERROR | |
| 36 } | |
| 37 | |
| 38 export class Fetcher { | |
| 39 private client: luci_rpc.Client; | |
| 40 | |
| 41 private debug: boolean = false; | |
| 42 private static maxLogsPerGet = 0; | |
| 43 | |
| 44 private lastDesc: LogDog.LogStreamDescriptor; | |
| 45 private lastState: LogDog.LogStreamState; | |
| 46 private activePromise: Promise<LogDog.LogEntry[]>; | |
| 47 | |
| 48 private currentStatus: FetcherStatus = FetcherStatus.IDLE; | |
| 49 private _lastError: Error; | |
| 50 private statusChangedCallback: (() => void); | |
| 51 | |
| 52 private static missingRetry: luci_rpc.Retry = new luci_rpc.Retry( | |
| 53 new luci_rpc.RetryIterator(null, 5000, 15000)); | |
| 54 | |
| 55 private static streamingRetry = new luci_rpc.Retry( | |
| 56 new luci_rpc.RetryIterator(null, 1000, 5000)); | |
| 57 | |
| 58 constructor(client: luci_rpc.Client, readonly stream: LogDog.Stream) { | |
| 59 this.client = client; | |
| 60 } | |
| 61 | |
| 62 get desc() { return this.lastDesc; } | |
| 63 get state() { return this.lastState; } | |
| 64 | |
| 65 get status() { return this.currentStatus; } | |
| 66 | |
| 67 /** | |
| 68 * Returns the log stream's terminal index. | |
| 69 * | |
| 70 * If no terminal index is known, or if the log stream is still streaming, | |
| 71 * this will return -1. | |
| 72 */ | |
| 73 get terminalIndex(): number { | |
| 74 return ( ( this.lastState ) ? this.lastState.terminalIndex : -1 ); | |
| 75 } | |
| 76 | |
| 77 /** Archived returns true if this log stream is known to be archived. */ | |
| 78 get archived(): boolean { | |
| 79 return ( !! (this.lastState && this.lastState.archive) ); | |
| 80 } | |
| 81 | |
| 82 get lastError(): Error { return this._lastError; } | |
| 83 | |
| 84 private setCurrentStatus(st: FetcherStatus, err?: Error) { | |
| 85 if ( st !== this.currentStatus || err !== this._lastError ) { | |
| 86 this.currentStatus = st; | |
| 87 this._lastError = err; | |
| 88 | |
| 89 if ( this.statusChangedCallback ) { | |
| 90 this.statusChangedCallback(); | |
| 91 }; | |
| 92 } | |
| 93 } | |
| 94 | |
| 95 setStatusChangedCallback(fn: () => void) { | |
| 96 this.statusChangedCallback = fn; | |
| 97 } | |
| 98 | |
| 99 /** | |
| 100 * Returns a Promise that resolves to the next block of logs in the stream. | |
| 101 * | |
| 102 * @return {Promise[LogDog.LogEntry[]]} A Promise that will resolve to the | |
| 103 * next block of logs in the stream, or null if there are no logs to | |
| 104 * return. | |
| 105 */ | |
| 106 get(index: number, opts?: FetcherOptions): Promise<LogDog.LogEntry[]> { | |
| 107 return this.getIndex(index, opts); | |
| 108 } | |
| 109 | |
| 110 getAll(startIndex: number, count: number): Promise<LogDog.LogEntry[]> { | |
| 111 // Request the tail walkback logs. Since our request for N logs may return | |
| 112 // <N logs, we will repeat the request until all requested logs have been | |
| 113 // obtained. | |
| 114 let allLogs = new Array<LogDog.LogEntry>(); | |
| 115 | |
| 116 let getIter = (): Promise<LogDog.LogEntry[]> => { | |
| 117 if ( count <= 0 ) { | |
| 118 return Promise.resolve(allLogs); | |
| 119 } | |
| 120 | |
| 121 // Perform Gets until we have the requested number of logs. We don't have | |
| 122 // to constrain the "logCount" parameter b/c we automatically do that in | |
| 123 // getIndex. | |
| 124 let opts: FetcherOptions = { | |
| 125 logCount: count, | |
| 126 sparse: true, | |
| 127 }; | |
| 128 return this.getIndex(startIndex, opts).then( (logs) => { | |
| 129 if ( logs ) { | |
| 130 allLogs.push.apply(allLogs, logs); | |
| 131 startIndex += logs.length; | |
| 132 count -= logs.length; | |
| 133 } | |
| 134 return getIter(); | |
| 135 }); | |
| 136 }; | |
| 137 return getIter(); | |
| 138 } | |
| 139 | |
| 140 tail(): Promise<LogDog.LogEntry[]> { | |
| 141 let streamingRetry = Fetcher.streamingRetry.iterator(); | |
| 142 let tryTail = (): Promise<LogDog.LogEntry[]> => { | |
| 143 return this.doTail().then( (logs): Promise<LogDog.LogEntry[]> => { | |
| 144 if ( logs && logs.length ) { | |
| 145 return Promise.resolve(logs); | |
| 146 } | |
| 147 | |
| 148 // No logs were returned, and we expect logs, so we're streaming. Try | |
| 149 // again after a delay. | |
| 150 this.setCurrentStatus(FetcherStatus.STREAMING); | |
| 151 let delay = streamingRetry.next(); | |
| 152 console.warn(this.stream, | |
| 153 `: No logs returned; retrying after ${delay}ms...`); | |
| 154 return luci_sleep_promise.sleep(delay).then( () => { | |
| 155 return tryTail(); | |
| 156 }); | |
| 157 }); | |
| 158 }; | |
| 159 return tryTail(); | |
| 160 } | |
| 161 | |
| 162 | |
| 163 private getIndex(index: number, opts: FetcherOptions): | |
| 164 Promise<LogDog.LogEntry[]> { | |
| 165 | |
| 166 // (Testing) Constrain our max logs, if set. | |
| 167 if ( Fetcher.maxLogsPerGet > 0 ) { | |
| 168 if ( ! opts ) { | |
| 169 opts = {}; | |
| 170 } | |
| 171 if ( (!opts.logCount) || opts.logCount > Fetcher.maxLogsPerGet ) { | |
| 172 opts.logCount = Fetcher.maxLogsPerGet; | |
| 173 } | |
| 174 } | |
| 175 | |
| 176 // We will retry continuously until we get a log (streaming). | |
| 177 let streamingRetry = Fetcher.streamingRetry.iterator(); | |
| 178 let tryGet = (): Promise<LogDog.LogEntry[]> => { | |
| 179 // If we're asking for a log beyond our stream, don't bother. | |
| 180 if ( this.terminalIndex >= 0 && index > this.terminalIndex ) { | |
| 181 return Promise.resolve(null); | |
| 182 } | |
| 183 | |
| 184 return this.doGet(index, opts). | |
| 185 then( (logs) => { | |
| 186 if ( logs && logs.length ) { | |
| 187 // Since we allow non-contiguous Get, we may get back more logs than | |
| 188 // we actually expected. Prune any such additional. | |
| 189 if ( opts.logCount > 0 ) { | |
| 190 let maxStreamIndex = index + opts.logCount - 1; | |
| 191 logs = logs.filter( (le) => { | |
| 192 return le.streamIndex <= maxStreamIndex; | |
| 193 } ); | |
| 194 } | |
| 195 | |
| 196 return Promise.resolve(logs); | |
| 197 } | |
| 198 | |
| 199 // No logs were returned, and we expect logs, so we're streaming. Try | |
| 200 // again after a delay. | |
| 201 this.setCurrentStatus(FetcherStatus.STREAMING); | |
| 202 let delay = streamingRetry.next(); | |
| 203 console.warn(this.stream, | |
| 204 `: No logs returned; retrying after ${delay}ms...`); | |
| 205 return luci_sleep_promise.sleep(delay).then( () => { | |
| 206 return tryGet(); | |
| 207 }); | |
| 208 }); | |
| 209 }; | |
| 210 return tryGet(); | |
| 211 } | |
| 212 | |
| 213 private doGet(index: number, opts: FetcherOptions): | |
| 214 Promise<LogDog.LogEntry[]> { | |
| 215 | |
| 216 let request: { | |
| 217 project: string; | |
| 218 path: string; | |
| 219 state: boolean; | |
| 220 index: number; | |
| 221 | |
| 222 nonContiguous?: boolean; | |
| 223 byteCount?: number; | |
| 224 logCount?: number; | |
| 225 } = { | |
| 226 project: this.stream.project, | |
| 227 path: this.stream.path, | |
| 228 state: (this.terminalIndex < 0), | |
| 229 index: index, | |
| 230 }; | |
| 231 if ( opts.sparse || this.archived ) { | |
| 232 // This log stream is archived. We will relax the contiguous requirement | |
| 233 // so we can render sparse log streams. | |
| 234 request.nonContiguous = true; | |
| 235 } | |
| 236 if ( opts ) { | |
| 237 if ( opts.byteCount > 0 ) { | |
| 238 request.byteCount = opts.byteCount; | |
| 239 } | |
| 240 if ( opts.logCount > 0 ) { | |
| 241 request.logCount = opts.logCount; | |
| 242 } | |
| 243 } | |
| 244 | |
| 245 if ( this.debug ) { | |
| 246 console.log("logdog.Logs.Get:", request); | |
| 247 } | |
| 248 | |
| 249 // Perform our Get, waiting until the stream actually exists. | |
| 250 return this.doRetryIfMissing( (): Promise<FetchResult> => { | |
| 251 return this.client.call("logdog.Logs", "Get", request). | |
| 252 then( (resp: GetResponse): FetchResult => { | |
| 253 return FetchResult.make(resp, this.lastDesc); | |
| 254 }); | |
| 255 }).then( (fr) => { | |
| 256 return this.afterProcessResult(fr); | |
| 257 }); | |
| 258 } | |
| 259 | |
| 260 private doTail(): Promise<LogDog.LogEntry[]> { | |
| 261 let request: { | |
| 262 project: string; | |
| 263 path: string; | |
| 264 state: boolean; | |
| 265 } = { | |
| 266 project: this.stream.project, | |
| 267 path: this.stream.path, | |
| 268 state: (this.terminalIndex < 0), | |
| 269 }; | |
| 270 | |
| 271 if ( this.debug ) { | |
| 272 console.log("logdog.Logs.Tail:", request); | |
| 273 } | |
| 274 | |
| 275 return this.doRetryIfMissing( (): Promise<FetchResult> => { | |
| 276 return this.client.call("logdog.Logs", "Tail", request). | |
| 277 then( (resp: GetResponse): FetchResult => { | |
| 278 return FetchResult.make(resp, this.lastDesc); | |
| 279 }); | |
| 280 }).then( (fr) => { | |
| 281 return this.afterProcessResult(fr); | |
| 282 }); | |
| 283 } | |
| 284 | |
| 285 private afterProcessResult(fr: FetchResult): LogDog.LogEntry[] { | |
| 286 if ( this.debug ) { | |
| 287 if ( fr.logs.length ) { | |
| 288 console.log("Request returned:", fr.logs[0].streamIndex, "..", | |
| 289 fr.logs[fr.logs.length-1].streamIndex, fr.desc, fr.state); | |
| 290 } else { | |
| 291 console.log("Request returned no logs:", fr.desc, fr.state); | |
| 292 } | |
| 293 } | |
| 294 | |
| 295 this.setCurrentStatus(FetcherStatus.IDLE); | |
| 296 if ( fr.desc ) { | |
| 297 this.lastDesc = fr.desc; | |
| 298 } | |
| 299 if ( fr.state ) { | |
| 300 this.lastState = fr.state; | |
| 301 } | |
| 302 return fr.logs; | |
| 303 } | |
| 304 | |
| 305 private doRetryIfMissing(fn: () => Promise<FetchResult>): | |
| 306 Promise<FetchResult> { | |
| 307 | |
| 308 let missingRetry = Fetcher.missingRetry.iterator(); | |
| 309 | |
| 310 let doIt = (): Promise<FetchResult> => { | |
| 311 this.setCurrentStatus(FetcherStatus.LOADING); | |
| 312 | |
| 313 return fn().catch( (err: Error) => { | |
| 314 // Is this a gRPC Error? | |
| 315 let grpc = luci_rpc.GrpcError.convert(err); | |
| 316 if ( grpc && grpc.code === luci_rpc.Code.NOT_FOUND ) { | |
| 317 this.setCurrentStatus(FetcherStatus.MISSING); | |
| 318 | |
| 319 let delay = missingRetry.next(); | |
| 320 console.warn(this.stream, ": Is not found:", err, | |
| 321 `; retrying after ${delay}ms...`); | |
| 322 return luci_sleep_promise.sleep(delay).then( () => { | |
| 323 return doIt(); | |
| 324 }); | |
| 325 } | |
| 326 | |
| 327 this.setCurrentStatus(FetcherStatus.ERROR, err); | |
| 328 throw err; | |
| 329 }); | |
| 330 }; | |
| 331 return doIt(); | |
| 332 } | |
| 333 } | |
| 334 | |
| 335 export class FetchResult { | |
| 336 constructor(readonly logs: LogDog.LogEntry[], | |
| 337 readonly desc?: LogDog.LogStreamDescriptor, | |
| 338 readonly state?: LogDog.LogStreamState) {} | |
| 339 | |
| 340 static make(resp: GetResponse, desc: LogDog.LogStreamDescriptor): | |
| 341 FetchResult { | |
| 342 | |
| 343 let loadDesc: LogDog.LogStreamDescriptor; | |
| 344 if ( resp.desc ) { | |
| 345 desc = loadDesc = LogDog.makeLogStreamDescriptor(resp.desc); | |
| 346 } | |
| 347 | |
| 348 let loadState: LogDog.LogStreamState; | |
| 349 if ( resp.state ) { | |
| 350 loadState = LogDog.makeLogStreamState( resp.state ); | |
| 351 } | |
| 352 | |
| 353 let logs = (resp.logs || []).map( (le) => { | |
| 354 return LogDog.makeLogEntry(le, desc); | |
| 355 }); | |
| 356 return new FetchResult(logs, loadDesc, loadState); | |
| 357 } | |
| 358 | |
| 359 } | |
| OLD | NEW |