| OLD | NEW |
| (Empty) |
| 1 /* | |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. | |
| 3 Use of this source code is governed under the Apache License, Version 2.0 | |
| 4 that can be found in the LICENSE file. | |
| 5 */ | |
| 6 | |
| 7 import {Fetcher, FetcherOptions, FetcherStatus} | |
| 8 from "logdog-stream-view/fetcher"; | |
| 9 import {LogDogQuery, QueryParams, StreamType} from "logdog-stream-view/query"; | |
| 10 import {LogDog} from "logdog-stream/logdog"; | |
| 11 import * as luci_sleep_promise from "luci-sleep-promise/promise"; | |
| 12 import {luci_rpc} from "rpc/client"; | |
| 13 | |
| 14 /** Sentinel error: not authenticated. */ | |
| 15 let NotAuthenticatedError = new Error("Not Authenticated"); | |
| 16 | |
| 17 function resolveErr(err: Error) { | |
| 18 let grpc = luci_rpc.GrpcError.convert(err); | |
| 19 if ( grpc && grpc.code == luci_rpc.Code.UNAUTHENTICATED ) { | |
| 20 return NotAuthenticatedError; | |
| 21 } | |
| 22 return err; | |
| 23 } | |
| 24 | |
| 25 /** Stream status entry, as rendered by the view. */ | |
| 26 type StreamStatusEntry = { | |
| 27 name: string; | |
| 28 desc: string; | |
| 29 }; | |
| 30 | |
| 31 /** An individual log stream's status. */ | |
| 32 type LogStreamStatus = { | |
| 33 stream: LogDog.Stream; | |
| 34 state: string; | |
| 35 fetchStatus: FetcherStatus; | |
| 36 finished: boolean; | |
| 37 needsAuth: boolean; | |
| 38 } | |
| 39 | |
| 40 type StreamStatusCallback = (v: LogStreamStatus[]) => void; | |
| 41 | |
| 42 export enum Location { | |
| 43 /** | |
| 44 * Represents the upper half of a split view. Logs start at 0 and go through | |
| 45 * the HEAD point. | |
| 46 */ | |
| 47 HEAD, | |
| 48 /** | |
| 49 * Represents the lower half of the split view. Logs start at the TAIL point | |
| 50 * and go through the BOTTOM anchor point. | |
| 51 */ | |
| 52 TAIL, | |
| 53 /** | |
| 54 * Represents an anchor point where the split occurred, obtained through a | |
| 55 * single "Tail()" RPC call. If the terminal index is known when the split | |
| 56 * occurs, this should be the terminal index. | |
| 57 */ | |
| 58 BOTTOM, | |
| 59 } | |
| 60 | |
| 61 export enum LoadingState { | |
| 62 NONE, | |
| 63 RESOLVING, | |
| 64 LOADING, | |
| 65 RENDERING, | |
| 66 NEEDS_AUTH, | |
| 67 ERROR, | |
| 68 } | |
| 69 | |
| 70 /** Represents control visibility in the view. */ | |
| 71 type Controls = { | |
| 72 /** Are we completely finished loading stream data? */ | |
| 73 canSplit: boolean; | |
| 74 /** Are we currently split? */ | |
| 75 split: boolean; | |
| 76 /** Show the bottom bar? */ | |
| 77 bottom: boolean; | |
| 78 /** Is the content fully loaded? */ | |
| 79 fullyLoaded: boolean; | |
| 80 | |
| 81 /** Text in the status bar. */ | |
| 82 loadingState: LoadingState; | |
| 83 /** Stream status entries, or null for no status window. */ | |
| 84 streamStatus: StreamStatusEntry[]; | |
| 85 } | |
| 86 | |
| 87 /** Registered callbacks from the LogDog stream view. */ | |
| 88 type ViewBinding = { | |
| 89 client: luci_rpc.Client; | |
| 90 mobile: boolean; | |
| 91 | |
| 92 pushLogEntries: (entries: LogDog.LogEntry[], l: Location) => void; | |
| 93 clearLogEntries: () => void; | |
| 94 | |
| 95 updateControls: (c: Controls) => void; | |
| 96 locationIsVisible: (l: Location) => boolean; | |
| 97 }; | |
| 98 | |
| 99 /** Interface of the specific Model functions used by the view. */ | |
| 100 interface ModelInterface { | |
| 101 fetch(cancel: boolean): Promise<void>; | |
| 102 split(): Promise<void>; | |
| 103 | |
| 104 reset(): void; | |
| 105 setAutomatic(v: boolean): void; | |
| 106 setTailing(v: boolean): void; | |
| 107 notifyAuthenticationChanged(): void; | |
| 108 } | |
| 109 | |
| 110 export class Model implements ModelInterface { | |
| 111 /** If performing a small initial fetch, this is the size of the fetch. */ | |
| 112 private static SMALL_INITIAL_FETCH_SIZE = (1024 * 4); | |
| 113 /** If performing a large initial fetch, this is the size of the fetch. */ | |
| 114 private static LARGE_INITIAL_FETCH_SIZE = (1024 * 24); | |
| 115 /** If fetching on a mobile device, fetch in this chunk size. */ | |
| 116 private static MOBILE_FETCH_SIZE = (1024 * 256); | |
| 117 /** For standard fetching, fetch with this size. */ | |
| 118 private static STANDARD_FETCH_SIZE = (4 * 1024 * 1024); | |
| 119 | |
| 120 /** | |
| 121 * If >0, the maximum number of log lines to push at a time. We will sleep | |
| 122 * in between these entries to allow the rest of the app to be responsive | |
| 123 * during log dumping. | |
| 124 */ | |
| 125 private static logAppendInterval = 4000; | |
| 126 /** Amount of time to sleep in between log append chunks. */ | |
| 127 private static logAppendDelay = 0; | |
| 128 | |
| 129 /** Our log provider. */ | |
| 130 private provider: LogProvider = this.nullProvider(); | |
| 131 | |
| 132 /** | |
| 133 * Promise that is resolved when authentication state changes. When this | |
| 134 * happens, a new Promise is installed, and future authentication changes | |
| 135 * will resolve the new Promise. | |
| 136 */ | |
| 137 private authChangedPromise: Promise<void> = null; | |
| 138 /** | |
| 139 * Retained callback (Promise resolve) to invoke when authentication state | |
| 140 * changes. | |
| 141 */ | |
| 142 private authChangedCallback: (() => void) = null; | |
| 143 | |
| 144 /** The current fetch Promise. */ | |
| 145 private currentFetch: Promise<void>; | |
| 146 /** The current fetch token. */ | |
| 147 private currentFetchToken: FetchToken; | |
| 148 | |
| 149 /** Are we in automatic mode? */ | |
| 150 private automatic = false; | |
| 151 /** Are we tailing? */ | |
| 152 private tailing = false; | |
| 153 /** Are we in the middle of rendering logs? */ | |
| 154 private rendering = true; | |
| 155 | |
| 156 private _loadingState: LoadingState = LoadingState.NONE; | |
| 157 private _streamStatus: StreamStatusEntry[]; | |
| 158 | |
| 159 /** | |
| 160 * When rendering a Promise that will resolve when the render completes. We | |
| 161 * use this to pipeline parallel data fetching and rendering. | |
| 162 */ | |
| 163 private renderPromise: Promise<void>; | |
| 164 | |
| 165 constructor(readonly view: ViewBinding) { | |
| 166 this.resetAuthChanged(); | |
| 167 } | |
| 168 | |
| 169 resolve(paths: string[]): Promise<void> { | |
| 170 this.reset(); | |
| 171 | |
| 172 // For any path that is a query, execute that query. | |
| 173 this.loadingState = LoadingState.RESOLVING; | |
| 174 return Promise.all( paths.map( (path): Promise<LogDog.Stream[]> => { | |
| 175 let stream = LogDog.Stream.splitProject(path); | |
| 176 if ( ! LogDogQuery.isQuery(stream.path) ) { | |
| 177 return Promise.resolve([stream]); | |
| 178 } | |
| 179 | |
| 180 // This "path" is really a query. Construct and execute. | |
| 181 let query = new LogDogQuery(this.view.client); | |
| 182 let doQuery = (): Promise<LogDog.Stream[]> => { | |
| 183 return query.getAll({ | |
| 184 project: stream.project, | |
| 185 path: stream.path, | |
| 186 streamType: StreamType.TEXT, | |
| 187 }, 100).then( (result): LogDog.Stream[] => { | |
| 188 return result.map( (qr): LogDog.Stream => { | |
| 189 return qr.stream; | |
| 190 } ); | |
| 191 }).catch( (err: Error) => { | |
| 192 err = resolveErr(err); | |
| 193 if ( err == NotAuthenticatedError ) { | |
| 194 return this.authChangedPromise.then( () => { | |
| 195 return doQuery(); | |
| 196 } ); | |
| 197 } | |
| 198 | |
| 199 throw err; | |
| 200 }); | |
| 201 } | |
| 202 return doQuery(); | |
| 203 } ) ).then( (streamBlocks) => { | |
| 204 let streams = new Array<LogDog.Stream>(); | |
| 205 (streamBlocks || []).forEach( (streamBlock) => { | |
| 206 streams.push.apply(streams, streamBlock); | |
| 207 } ); | |
| 208 | |
| 209 | |
| 210 let initialFetchSize = ( (streams.length === 1) ? | |
| 211 Model.LARGE_INITIAL_FETCH_SIZE : Model.SMALL_INITIAL_FETCH_SIZE ); | |
| 212 | |
| 213 // Determine our fetch size. | |
| 214 let maxFetchSize = ( (this.view.mobile) ? | |
| 215 Model.MOBILE_FETCH_SIZE : Model.STANDARD_FETCH_SIZE ); | |
| 216 | |
| 217 // Generate a LogStream client entry for each composite stream. | |
| 218 let logStreams = streams.map( (stream) => { | |
| 219 console.log("Resolved log stream:", stream); | |
| 220 return new LogStream( | |
| 221 this.view.client, stream, initialFetchSize, maxFetchSize); | |
| 222 }); | |
| 223 | |
| 224 // Reset any existing state. | |
| 225 this.reset(); | |
| 226 | |
| 227 // If we have exactly one stream, then use it directly. This allows it to | |
| 228 // split. | |
| 229 let provider: LogProvider; | |
| 230 switch( logStreams.length ) { | |
| 231 case 0: | |
| 232 provider = this.nullProvider(); | |
| 233 break; | |
| 234 case 1: | |
| 235 provider = logStreams[0]; | |
| 236 break; | |
| 237 default: | |
| 238 provider = new AggregateLogStream(logStreams); | |
| 239 break | |
| 240 } | |
| 241 provider.setStreamStatusCallback((st: LogStreamStatus[]) => { | |
| 242 if ( this.provider === provider ) { | |
| 243 this.streamStatus = this.buildStreamStatus(st); | |
| 244 } | |
| 245 }); | |
| 246 this.provider = provider; | |
| 247 this.loadingState = LoadingState.NONE; | |
| 248 } ).catch( (err: Error) => { | |
| 249 this.loadingState = LoadingState.ERROR; | |
| 250 console.error("Failed to resolve log streams:", err); | |
| 251 }); | |
| 252 } | |
| 253 | |
| 254 reset() { | |
| 255 this.view.clearLogEntries(); | |
| 256 this.clearCurrentFetch(); | |
| 257 this.provider = this.nullProvider(); | |
| 258 | |
| 259 this.updateControls(); | |
| 260 } | |
| 261 | |
| 262 private nullProvider(): LogProvider { return new AggregateLogStream([]); } | |
| 263 | |
| 264 private mintFetchToken(): FetchToken { | |
| 265 return (this.currentFetchToken = new FetchToken( (tok: FetchToken) => { | |
| 266 return (tok === this.currentFetchToken); | |
| 267 } )); | |
| 268 } | |
| 269 | |
| 270 private clearCurrentFetch() { | |
| 271 this.currentFetch = this.currentFetchToken = null; | |
| 272 this.rendering = false; | |
| 273 } | |
| 274 | |
| 275 private get loadingState(): LoadingState { return this._loadingState; } | |
| 276 private set loadingState(v: LoadingState) { | |
| 277 if( v != this._loadingState ) { | |
| 278 this._loadingState = v; | |
| 279 this.updateControls(); | |
| 280 } | |
| 281 } | |
| 282 | |
| 283 private get streamStatus(): StreamStatusEntry[] { return this._streamStatus; } | |
| 284 private set streamStatus(st: StreamStatusEntry[]) { | |
| 285 this._streamStatus = st; | |
| 286 this.updateControls(); | |
| 287 } | |
| 288 | |
| 289 private updateControls() { | |
| 290 this.view.updateControls({ | |
| 291 canSplit: this.providerCanSplit, | |
| 292 split: this.isSplit, | |
| 293 bottom: !this.fetchedEndOfStream, | |
| 294 fullyLoaded: (this.fetchedFullStream && (! this.rendering)), | |
| 295 loadingState: this.loadingState, | |
| 296 streamStatus: this.streamStatus, | |
| 297 }); | |
| 298 } | |
| 299 | |
| 300 /** | |
| 301 * Note that the authentication state for the client has changed. This will | |
| 302 * trigger an automatic fetch retry if our previous fetch failed due to | |
| 303 * lack of authentication. | |
| 304 */ | |
| 305 notifyAuthenticationChanged() { | |
| 306 // Resolve our current "auth changed" Promise. | |
| 307 this.authChangedCallback(); | |
| 308 } | |
| 309 | |
| 310 private resetAuthChanged() { | |
| 311 // Resolve our previous function, if it's not already resolved. | |
| 312 if ( this.authChangedCallback ) { | |
| 313 this.authChangedCallback(); | |
| 314 } | |
| 315 | |
| 316 // Create a new Promise and install it. | |
| 317 this.authChangedPromise = new Promise<void>((resolve, reject) => { | |
| 318 this.authChangedCallback = resolve; | |
| 319 }); | |
| 320 } | |
| 321 | |
| 322 split(): Promise<void> { | |
| 323 // If we haven't already split, and our provider lets us split, then go | |
| 324 // ahead and do so. | |
| 325 if ( this.providerCanSplit ) { | |
| 326 return this.fetchLocation(Location.TAIL, true); | |
| 327 } | |
| 328 return this.fetch(false); | |
| 329 } | |
| 330 | |
| 331 fetch(cancel: boolean): Promise<void> { | |
| 332 if ( this.isSplit ) { | |
| 333 if ( this.tailing ) { | |
| 334 // Next fetch grabs logs from the bottom (continue tailing). | |
| 335 if ( ! this.fetchedEndOfStream ) { | |
| 336 return this.fetchLocation(Location.BOTTOM, false); | |
| 337 } else { | |
| 338 return this.fetchLocation(Location.TAIL, false); | |
| 339 } | |
| 340 } | |
| 341 | |
| 342 // We're split, but not tailing, so fetch logs from HEAD. | |
| 343 return this.fetchLocation(Location.HEAD, false); | |
| 344 } | |
| 345 | |
| 346 // We're not split. If we haven't reached end of stream, fetch logs from | |
| 347 // HEAD. | |
| 348 return this.fetchLocation(Location.HEAD, false); | |
| 349 } | |
| 350 | |
| 351 /** Fetch logs from an explicit location. */ | |
| 352 fetchLocation(l: Location, cancel: boolean) { | |
| 353 if ( this.currentFetch && (!cancel) ) { | |
| 354 return this.currentFetch; | |
| 355 } | |
| 356 | |
| 357 // If our provider is finished, then do nothing. | |
| 358 if ( this.fetchedFullStream ) { | |
| 359 // There are no more logs. | |
| 360 return Promise.resolve(null); | |
| 361 } | |
| 362 | |
| 363 // If we're asked to fetch BOTTOM, but we're not split, fetch HEAD instead. | |
| 364 if ( l === Location.BOTTOM && ! this.isSplit ) { | |
| 365 l = Location.HEAD; | |
| 366 } | |
| 367 | |
| 368 // If we're not split, always fetch from BOTTOM. | |
| 369 this.loadingState = LoadingState.LOADING; | |
| 370 | |
| 371 // Rotate our fetch ID. This will effectively cancel any pending fetches. | |
| 372 let token = this.mintFetchToken(); | |
| 373 return (this.currentFetch = this.provider.fetch(l, token).then( (buf) => { | |
| 374 // Clear our fetching status. | |
| 375 this.rendering = true; | |
| 376 this.loadingState = LoadingState.RENDERING; | |
| 377 let pushLogsPromise: Promise<void>; | |
| 378 let hasLogs = (buf && buf.peek()); | |
| 379 | |
| 380 // Resolve any previous rendering Promise that we have. This makes sure | |
| 381 // our rendering and fetching don't get more than one round out of sync. | |
| 382 return (this.renderPromise || Promise.resolve(null)).then( () => { | |
| 383 // Post-fetch cleanup. | |
| 384 this.clearCurrentFetch(); | |
| 385 | |
| 386 // Clear our loading state (updates controls automatically). | |
| 387 this.loadingState = LoadingState.RENDERING; | |
| 388 | |
| 389 // Initiate the next render. This will happen in the background while | |
| 390 // we enqueue our next fetch. | |
| 391 this.renderPromise = this.renderLogs(buf, l).then( () => { | |
| 392 this.renderPromise = null; | |
| 393 if ( this.loadingState === LoadingState.RENDERING ) { | |
| 394 this.loadingState = LoadingState.NONE; | |
| 395 } | |
| 396 }); | |
| 397 | |
| 398 if ( this.fetchedFullStream ) { | |
| 399 // If we're finished now, perform our finished cleanup. | |
| 400 return; | |
| 401 } | |
| 402 | |
| 403 // The fetch is finished. If we're automatic, and we got logs, start the | |
| 404 // next. | |
| 405 if ( this.automatic && hasLogs ) { | |
| 406 console.log("Automatic: starting next fetch.") | |
| 407 return this.fetch(false); | |
| 408 } | |
| 409 }); | |
| 410 }).catch( (err: Error) => { | |
| 411 // If we've been canceled, discard this result. | |
| 412 if ( ! token.valid ) { | |
| 413 return | |
| 414 } | |
| 415 | |
| 416 this.clearCurrentFetch(); | |
| 417 if ( err === NotAuthenticatedError ) { | |
| 418 this.loadingState = LoadingState.NEEDS_AUTH; | |
| 419 | |
| 420 // We failed because we were not authenticated. Mark this so we can | |
| 421 // retry if that state changes. | |
| 422 return this.authChangedPromise.then( () => { | |
| 423 // Our authentication state changed during the fetch! Retry | |
| 424 // automatically. | |
| 425 this.fetchLocation(l, false); | |
| 426 }); | |
| 427 } | |
| 428 | |
| 429 console.error("Failed to load log streams:", err); | |
| 430 })); | |
| 431 } | |
| 432 | |
| 433 private renderLogs(buf: BufferedLogs, l: Location): Promise<void> { | |
| 434 if ( ! (buf && buf.peek()) ) { | |
| 435 return Promise.resolve(null); | |
| 436 } | |
| 437 | |
| 438 let lines = 0; | |
| 439 let logBlock = new Array<LogDog.LogEntry>(); | |
| 440 let appendBlock = () => { | |
| 441 if ( logBlock.length ) { | |
| 442 console.log("Rendering", logBlock.length, "logs..."); | |
| 443 this.view.pushLogEntries(logBlock, l); | |
| 444 logBlock.length = 0; | |
| 445 lines = 0; | |
| 446 | |
| 447 // Update our status and controls. | |
| 448 this.updateControls(); | |
| 449 } | |
| 450 }; | |
| 451 | |
| 452 // Create a promise loop to push logs at intervals. | |
| 453 let pushLogs = (): Promise<void> => { | |
| 454 return Promise.resolve().then( () => { | |
| 455 // Add logs until we reach our interval lines. | |
| 456 for ( let nextLog = buf.next(); (nextLog); nextLog = buf.next() ) { | |
| 457 // If we've exceeded our burst, then interleave a sleep (yield). | |
| 458 if (Model.logAppendInterval > 0 && | |
| 459 lines >= Model.logAppendInterval ) { | |
| 460 appendBlock(); | |
| 461 | |
| 462 return luci_sleep_promise.sleep(Model.logAppendDelay).then( | |
| 463 () => { | |
| 464 // Enqueue the next push round. | |
| 465 return pushLogs(); | |
| 466 } ); | |
| 467 } | |
| 468 | |
| 469 // Add the next log to the append block. | |
| 470 logBlock.push(nextLog); | |
| 471 if ( nextLog.text && nextLog.text.lines ) { | |
| 472 lines += nextLog.text.lines.length; | |
| 473 } | |
| 474 } | |
| 475 | |
| 476 // If there are any buffered logs, append that block. | |
| 477 appendBlock(); | |
| 478 }); | |
| 479 } | |
| 480 return pushLogs(); | |
| 481 } | |
| 482 | |
| 483 setTailing(v: boolean) { | |
| 484 this.tailing = v; | |
| 485 } | |
| 486 | |
| 487 setAutomatic(v: boolean) { | |
| 488 this.automatic = v; | |
| 489 if ( v ) { | |
| 490 // Passively kick off a new fetch. | |
| 491 this.fetch(false); | |
| 492 } | |
| 493 } | |
| 494 | |
| 495 private buildStreamStatus(v: LogStreamStatus[]): StreamStatusEntry[] { | |
| 496 let maxStatus = FetcherStatus.IDLE; | |
| 497 let maxStatusCount = 0; | |
| 498 let needsAuth = false; | |
| 499 | |
| 500 // Prune any finished entries and accumulate them for status bar change. | |
| 501 v = (v || []).filter( (st) => { | |
| 502 needsAuth = (needsAuth || st.needsAuth); | |
| 503 | |
| 504 if ( st.fetchStatus > maxStatus ) { | |
| 505 maxStatus = st.fetchStatus; | |
| 506 maxStatusCount = 1; | |
| 507 } else if ( st.fetchStatus === maxStatus ) { | |
| 508 maxStatusCount++; | |
| 509 } | |
| 510 | |
| 511 return (! st.finished); | |
| 512 }); | |
| 513 | |
| 514 return v.map( (st): StreamStatusEntry => { | |
| 515 return { | |
| 516 name: ".../+/" + st.stream.name, | |
| 517 desc: st.state, | |
| 518 }; | |
| 519 } ); | |
| 520 } | |
| 521 | |
| 522 private get providerCanSplit(): boolean { | |
| 523 let split = this.provider.split(); | |
| 524 return (!! (split && split.canSplit())); | |
| 525 } | |
| 526 | |
| 527 private get isSplit(): boolean { | |
| 528 let split = this.provider.split(); | |
| 529 return ( !! (split && split.isSplit()) ); | |
| 530 } | |
| 531 | |
| 532 private get fetchedEndOfStream(): boolean { | |
| 533 return (this.provider.fetchedEndOfStream()); | |
| 534 } | |
| 535 | |
| 536 private get fetchedFullStream(): boolean { | |
| 537 return (this.fetchedEndOfStream && (! this.isSplit)); | |
| 538 } | |
| 539 } | |
| 540 | |
| 541 /** | |
| 542 * A token used to repesent an individual fetch. A token can assert whether its | |
| 543 * fetch has been invalidated. | |
| 544 */ | |
| 545 class FetchToken { | |
| 546 private validate: (tok: FetchToken) => boolean; | |
| 547 | |
| 548 constructor(validate: (tok: FetchToken) => boolean) { | |
| 549 this.validate = validate; | |
| 550 } | |
| 551 | |
| 552 get valid(): boolean { | |
| 553 return this.validate(this); | |
| 554 } | |
| 555 | |
| 556 do<T>(p: Promise<T>): Promise<T> { | |
| 557 return p.then( (v): T => { | |
| 558 if ( ! this.valid ) { | |
| 559 throw new Error("Token has been invalidated, discarding fetch."); | |
| 560 } | |
| 561 return v | |
| 562 } ); | |
| 563 } | |
| 564 } | |
| 565 | |
| 566 interface LogProvider { | |
| 567 setStreamStatusCallback(cb: StreamStatusCallback): void; | |
| 568 fetch(l: Location, token: FetchToken): Promise<BufferedLogs>; | |
| 569 | |
| 570 /** Will return null if this LogProvider doesn't support splitting. */ | |
| 571 split(): SplitLogProvider; | |
| 572 fetchedEndOfStream(): boolean; | |
| 573 } | |
| 574 | |
| 575 /** Additional methods for log stream splitting, if supported. */ | |
| 576 interface SplitLogProvider { | |
| 577 canSplit(): boolean | |
| 578 isSplit(): boolean; | |
| 579 } | |
| 580 | |
| 581 /** A LogStream is a LogProvider manages a single log stream. */ | |
| 582 class LogStream implements LogProvider { | |
| 583 /** | |
| 584 * Always begin with a small fetch. We'll disable this afterward the first | |
| 585 * finishes. | |
| 586 */ | |
| 587 private initialFetch = true; | |
| 588 | |
| 589 private fetcher: Fetcher; | |
| 590 | |
| 591 /** The log stream index of the next head() log. */ | |
| 592 private nextHeadIndex = 0; | |
| 593 /** | |
| 594 * The lowest log stream index of all of the tail logs. If this is <0, then | |
| 595 * it is uninitialized. | |
| 596 */ | |
| 597 private firstTailIndex = -1; | |
| 598 /** | |
| 599 * The next log stream index to fetch to continue pulling logs from the | |
| 600 * bottom. If this is <0, it is uninitialized. | |
| 601 */ | |
| 602 private nextBottomIndex = -1; | |
| 603 | |
| 604 private streamStatusCallback: StreamStatusCallback; | |
| 605 | |
| 606 /** The size of the tail walkback region. */ | |
| 607 private static TAIL_WALKBACK = 500; | |
| 608 | |
| 609 constructor(client: luci_rpc.Client, readonly stream: LogDog.Stream, | |
| 610 readonly initialFetchSize: number, | |
| 611 readonly maxFetchSize: number ) { | |
| 612 this.fetcher = new Fetcher(client, stream); | |
| 613 this.fetcher.setStatusChangedCallback( () => { | |
| 614 this.statusChanged(); | |
| 615 }); | |
| 616 } | |
| 617 | |
| 618 get fetcherStatus(): FetcherStatus { return this.fetcher.status; } | |
| 619 | |
| 620 fetch(l: Location, token: FetchToken): Promise<BufferedLogs> { | |
| 621 // Determine which method to use based on the insertion point and current | |
| 622 // log stream fetch state. | |
| 623 let getLogs: Promise<LogDog.LogEntry[]>; | |
| 624 switch( l ) { | |
| 625 case Location.HEAD: | |
| 626 getLogs = this.getHead(token); | |
| 627 break; | |
| 628 | |
| 629 case Location.TAIL: | |
| 630 getLogs = this.getTail(token); | |
| 631 break; | |
| 632 | |
| 633 case Location.BOTTOM: | |
| 634 getLogs = this.getBottom(token); | |
| 635 break; | |
| 636 } | |
| 637 | |
| 638 return getLogs.then( (logs: LogDog.LogEntry[]) => { | |
| 639 this.initialFetch = false; | |
| 640 this.statusChanged(); | |
| 641 return new BufferedLogs(logs); | |
| 642 }).catch( (err: Error) => { | |
| 643 err = resolveErr(err); | |
| 644 throw err; | |
| 645 }); | |
| 646 } | |
| 647 | |
| 648 setStreamStatusCallback(cb: StreamStatusCallback) { | |
| 649 this.streamStatusCallback = cb; | |
| 650 } | |
| 651 | |
| 652 private statusChanged() { | |
| 653 if ( this.streamStatusCallback ) { | |
| 654 this.streamStatusCallback([this.getStreamStatus()]); | |
| 655 } | |
| 656 } | |
| 657 | |
| 658 getStreamStatus(): LogStreamStatus { | |
| 659 let pieces = new Array<string>(); | |
| 660 let tidx = this.fetcher.terminalIndex; | |
| 661 if ( this.nextHeadIndex > 0 ) { | |
| 662 pieces.push("1.." + this.nextHeadIndex); | |
| 663 } else { | |
| 664 pieces.push("0"); | |
| 665 } | |
| 666 if ( this.isSplit() ) { | |
| 667 if ( tidx >= 0 ) { | |
| 668 pieces.push("| " + this.firstTailIndex + " / " + tidx); | |
| 669 tidx = -1; | |
| 670 } else { | |
| 671 pieces.push("| " + this.firstTailIndex + ".." + this.nextBottomIndex + | |
| 672 " ..."); | |
| 673 } | |
| 674 } else if (tidx >= 0) { | |
| 675 pieces.push("/ " + tidx); | |
| 676 } else { | |
| 677 pieces.push("..."); | |
| 678 } | |
| 679 | |
| 680 let needsAuth = false; | |
| 681 let finished = this.finished; | |
| 682 if ( finished ) { | |
| 683 pieces.push("(Finished)"); | |
| 684 } else { | |
| 685 switch ( this.fetcher.status ) { | |
| 686 case FetcherStatus.IDLE: | |
| 687 case FetcherStatus.LOADING: | |
| 688 pieces.push("(Loading)"); | |
| 689 break; | |
| 690 | |
| 691 case FetcherStatus.STREAMING: | |
| 692 pieces.push("(Streaming)"); | |
| 693 break; | |
| 694 | |
| 695 case FetcherStatus.MISSING: | |
| 696 pieces.push("(Missing)"); | |
| 697 break; | |
| 698 | |
| 699 case FetcherStatus.ERROR: | |
| 700 let err = resolveErr(this.fetcher.lastError); | |
| 701 if (err === NotAuthenticatedError ) { | |
| 702 pieces.push("(Auth Error)"); | |
| 703 needsAuth = true; | |
| 704 } else { | |
| 705 pieces.push("(Error)"); | |
| 706 } | |
| 707 break; | |
| 708 } | |
| 709 } | |
| 710 | |
| 711 return { | |
| 712 stream: this.stream, | |
| 713 state: pieces.join(" "), | |
| 714 finished: finished, | |
| 715 fetchStatus: this.fetcher.status, | |
| 716 needsAuth: needsAuth, | |
| 717 }; | |
| 718 } | |
| 719 | |
| 720 split(): SplitLogProvider { | |
| 721 return this; | |
| 722 } | |
| 723 | |
| 724 isSplit(): boolean { | |
| 725 // We're split if we have a bottom and we're not finished tailing. | |
| 726 return ( this.firstTailIndex >= 0 && | |
| 727 (this.nextHeadIndex < this.firstTailIndex) ); | |
| 728 } | |
| 729 | |
| 730 canSplit(): boolean { | |
| 731 return ( ! (this.isSplit() || this.caughtUp) ); | |
| 732 } | |
| 733 | |
| 734 private get caughtUp(): boolean { | |
| 735 // We're caught up if we have both a head and bottom index, and the head | |
| 736 // is at or past the bottom. | |
| 737 return ( this.nextHeadIndex >= 0 && this.nextBottomIndex >= 0 && | |
| 738 this.nextHeadIndex >= this.nextBottomIndex ); | |
| 739 } | |
| 740 | |
| 741 fetchedEndOfStream(): boolean { | |
| 742 let tidx = this.fetcher.terminalIndex; | |
| 743 return ( tidx >= 0 && ( | |
| 744 (this.nextHeadIndex > tidx) || (this.nextBottomIndex > tidx) ) ); | |
| 745 } | |
| 746 | |
| 747 private get finished(): boolean { | |
| 748 return ( (! this.isSplit()) && this.fetchedEndOfStream() ); | |
| 749 } | |
| 750 | |
| 751 private updateIndexes() { | |
| 752 if ( this.firstTailIndex >= 0 ) { | |
| 753 if ( this.nextBottomIndex < this.firstTailIndex ) { | |
| 754 this.nextBottomIndex = this.firstTailIndex + 1; | |
| 755 } | |
| 756 | |
| 757 if ( this.nextHeadIndex >= this.firstTailIndex && | |
| 758 this.nextBottomIndex >= 0) { | |
| 759 // Synchronize our head and bottom pointers. | |
| 760 this.nextHeadIndex = this.nextBottomIndex = | |
| 761 Math.max(this.nextHeadIndex, this.nextBottomIndex); | |
| 762 } | |
| 763 } | |
| 764 } | |
| 765 | |
| 766 private nextFetcherOptions(): FetcherOptions { | |
| 767 let opts: FetcherOptions = {}; | |
| 768 if ( this.initialFetch ) { | |
| 769 opts.byteCount = this.initialFetchSize; | |
| 770 } else if ( this.maxFetchSize > 0 ) { | |
| 771 opts.byteCount = this.maxFetchSize; | |
| 772 } | |
| 773 return opts; | |
| 774 } | |
| 775 | |
| 776 private getHead(token: FetchToken): Promise<LogDog.LogEntry[]> { | |
| 777 this.updateIndexes(); | |
| 778 | |
| 779 if ( this.finished ) { | |
| 780 // Our HEAD region has met/surpassed our TAIL region, so there are no | |
| 781 // HEAD logs to return. Only bottom. | |
| 782 return Promise.resolve(); | |
| 783 } | |
| 784 | |
| 785 // If we have a tail pointer, only fetch HEAD up to that point. | |
| 786 let opts = this.nextFetcherOptions(); | |
| 787 if ( this.firstTailIndex >= 0 ) { | |
| 788 opts.logCount = (this.firstTailIndex - this.nextHeadIndex); | |
| 789 } | |
| 790 | |
| 791 return token.do( this.fetcher.get(this.nextHeadIndex, opts) ).then( | |
| 792 (logs) => { | |
| 793 if ( logs && logs.length ) { | |
| 794 this.nextHeadIndex = (logs[logs.length - 1].streamIndex + 1); | |
| 795 this.updateIndexes(); | |
| 796 } | |
| 797 return logs; | |
| 798 } ); | |
| 799 } | |
| 800 | |
| 801 private getTail(token: FetchToken): Promise<LogDog.LogEntry[]> { | |
| 802 // If we haven't performed a Tail before, start with one. | |
| 803 if ( this.firstTailIndex < 0 ) { | |
| 804 let tidx = this.fetcher.terminalIndex; | |
| 805 if ( tidx < 0 ) { | |
| 806 return token.do( this.fetcher.tail() ).then( (logs) => { | |
| 807 // Mark our initial "tail" position. | |
| 808 if ( logs && logs.length ) { | |
| 809 this.firstTailIndex = logs[0].streamIndex; | |
| 810 this.updateIndexes(); | |
| 811 } | |
| 812 return logs; | |
| 813 } ); | |
| 814 } | |
| 815 | |
| 816 this.firstTailIndex = (tidx+1); | |
| 817 this.updateIndexes(); | |
| 818 } | |
| 819 | |
| 820 // We're doing incremental reverse fetches. If we're finished tailing, | |
| 821 // return no logs. | |
| 822 if ( ! this.isSplit() ) { | |
| 823 return Promise.resolve(null); | |
| 824 } | |
| 825 | |
| 826 // Determine our walkback region. | |
| 827 let startIndex = this.firstTailIndex - LogStream.TAIL_WALKBACK; | |
| 828 if ( this.nextHeadIndex >= 0 ) { | |
| 829 if ( startIndex < this.nextHeadIndex ) { | |
| 830 startIndex = this.nextHeadIndex; | |
| 831 } | |
| 832 } else if ( startIndex < 0 ) { | |
| 833 startIndex = 0; | |
| 834 } | |
| 835 let count = (this.firstTailIndex - startIndex); | |
| 836 | |
| 837 // Fetch the full walkback region. | |
| 838 return token.do( this.fetcher.getAll(startIndex, count) ).then( (logs) => { | |
| 839 this.firstTailIndex = startIndex; | |
| 840 this.updateIndexes(); | |
| 841 return logs; | |
| 842 }); | |
| 843 } | |
| 844 | |
| 845 private getBottom(token: FetchToken): Promise<LogDog.LogEntry[]> { | |
| 846 this.updateIndexes(); | |
| 847 | |
| 848 // If there are no more logs in the stream, return no logs. | |
| 849 if ( this.fetchedEndOfStream() ) { | |
| 850 return Promise.resolve(null); | |
| 851 } | |
| 852 | |
| 853 // If our bottom index isn't initialized, initialize it via tail. | |
| 854 if ( this.nextBottomIndex < 0 ) { | |
| 855 return this.getTail(token); | |
| 856 } | |
| 857 | |
| 858 let opts = this.nextFetcherOptions(); | |
| 859 return token.do( this.fetcher.get(this.nextBottomIndex, opts) ).then( | |
| 860 (logs) => { | |
| 861 if ( logs && logs.length ) { | |
| 862 this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1); | |
| 863 } | |
| 864 return logs; | |
| 865 } ); | |
| 866 } | |
| 867 } | |
| 868 | |
| 869 /** | |
| 870 * An aggregate log stream. It presents a single-stream view, but is really | |
| 871 * composed of several log streams interleaved based on their prefix indices | |
| 872 * (if they share a prefix) or timestamps (if they don't). | |
| 873 * | |
| 874 * At least one log entry from each stream must be buffered before any log | |
| 875 * entries can be yielded, since we don't know what ordering to apply otherwise. | |
| 876 * To make this fast, we will make the first request for each stream small so | |
| 877 * it finishes quickly and we can start rendering. Subsequent entries will be | |
| 878 * larger for efficiency. | |
| 879 * | |
| 880 * @param {LogStream} streams the composite streams. | |
| 881 */ | |
| 882 class AggregateLogStream implements LogProvider { | |
| 883 | |
| 884 private streams: AggregateLogStream.Entry[]; | |
| 885 private active: AggregateLogStream.Entry[]; | |
| 886 private currentNextPromise: Promise<BufferedLogs>; | |
| 887 private compareLogs: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number; | |
| 888 | |
| 889 private streamStatusCallback: StreamStatusCallback; | |
| 890 | |
| 891 constructor(streams: LogStream[]) { | |
| 892 // Input streams, ordered by input order. | |
| 893 this.streams = streams.map<AggregateLogStream.Entry>( (ls, i) => { | |
| 894 ls.setStreamStatusCallback( (st: LogStreamStatus[]) => { | |
| 895 if ( st ) { | |
| 896 this.streams[i].status = st[0]; | |
| 897 this.statusChanged(); | |
| 898 } | |
| 899 }); | |
| 900 | |
| 901 return { | |
| 902 ls: ls, | |
| 903 buffer: null, | |
| 904 needsAuth: false, | |
| 905 status: ls.getStreamStatus(), | |
| 906 }; | |
| 907 } ); | |
| 908 | |
| 909 // Subset of input streams that are still active (not finished). | |
| 910 this.active = this.streams; | |
| 911 | |
| 912 // The currently-active "next" promise. | |
| 913 this.currentNextPromise = null; | |
| 914 | |
| 915 // Determine our log comparison function. If all of our logs share a prefix, | |
| 916 // we will use the prefix index. Otherwise, we will use the timestamp. | |
| 917 let template: LogDog.Stream = null; | |
| 918 let sharedPrefix = this.streams.every( (entry) => { | |
| 919 if ( ! template ) { | |
| 920 template = entry.ls.stream; | |
| 921 return true; | |
| 922 } | |
| 923 return template.samePrefixAs(entry.ls.stream); | |
| 924 }); | |
| 925 | |
| 926 this.compareLogs = (( sharedPrefix ) ? | |
| 927 (a, b) => { | |
| 928 return (a.prefixIndex - b.prefixIndex); | |
| 929 } : | |
| 930 (a, b) => { | |
| 931 return a.timestamp.getTime() - b.timestamp.getTime(); | |
| 932 }); | |
| 933 } | |
| 934 | |
| 935 split(): SplitLogProvider { return null; } | |
| 936 fetchedEndOfStream(): boolean { return ( ! this.active.length ); } | |
| 937 | |
| 938 setStreamStatusCallback(cb: StreamStatusCallback) { | |
| 939 this.streamStatusCallback = cb; | |
| 940 } | |
| 941 | |
| 942 private statusChanged() { | |
| 943 if ( this.streamStatusCallback ) { | |
| 944 // Iterate through our composite stream statuses and pick the one that we | |
| 945 // want to report. | |
| 946 this.streamStatusCallback( this.streams.map( (entry): LogStreamStatus => { | |
| 947 return entry.status; | |
| 948 } )); | |
| 949 } | |
| 950 } | |
| 951 | |
| 952 /** | |
| 953 * Implements LogProvider.next | |
| 954 */ | |
| 955 fetch(l: Location, token: FetchToken): Promise<BufferedLogs> { | |
| 956 // If we're already are fetching the next buffer, this is an error. | |
| 957 if (this.currentNextPromise) { | |
| 958 throw new Error("In-progress next(), cannot start another."); | |
| 959 } | |
| 960 | |
| 961 // Filter out any finished streams from our active list. A stream is | |
| 962 // finished if it is finished streaming and we don't have a retained buffer | |
| 963 // from it. | |
| 964 // | |
| 965 // This updates our "finished" property, since it's derived from the length | |
| 966 // of our active array. | |
| 967 this.active = this.active.filter( (entry) => { | |
| 968 return ( (! entry.buffer) || entry.buffer.peek() || | |
| 969 (! entry.ls.fetchedEndOfStream()) ); | |
| 970 }); | |
| 971 | |
| 972 if ( ! this.active.length ) { | |
| 973 // No active streams, so we're finished. Permanently set our promise to | |
| 974 // the finished state. | |
| 975 return Promise.resolve(); | |
| 976 } | |
| 977 | |
| 978 // Fill all buffers for all active streams. This may result in an RPC to | |
| 979 // load new buffer content for streams whose buffers are empty. | |
| 980 // | |
| 981 // If any stream doesn't currently have buffered logs, we will call their | |
| 982 // "next()" methods to pull the next set of logs. This will result in one of | |
| 983 // three possibilities: | |
| 984 // - A BufferedLogs will be returned containing the next logs for this strea
m. | |
| 985 // The log stream may also be finished. | |
| 986 // - null will be returned, and this log stream must now be finished. | |
| 987 // - An error will be returned. | |
| 988 // | |
| 989 // The error is interesting, since we must present a common error view to ou
r | |
| 990 // caller. If all returned errors are "NotAuthenticatedError", we will retur
n | |
| 991 // a NotAuthenticatedError. Otherwise, we will return a generic "streams | |
| 992 // failed" error. | |
| 993 // | |
| 994 // The outer Promise will pull logs for any streams that don't have any. | |
| 995 // On success, the "buffer" for the entry will be populated. On failure, an | |
| 996 // error will be returned. Because Promise.all fails fast, we will catch inn
er | |
| 997 // errors and return them as values (null if no error). | |
| 998 this.currentNextPromise = Promise.all( this.active.map( (entry) => { | |
| 999 // If the entry's buffer still has data, use it immediately. | |
| 1000 if (entry.buffer && entry.buffer.peek()) { | |
| 1001 return null; | |
| 1002 } | |
| 1003 | |
| 1004 // No buffered logs. Call the stream's "next()" method to get some. | |
| 1005 return entry.ls.fetch(Location.HEAD, token).then( | |
| 1006 (buffer): Error => { | |
| 1007 // Retain this buffer. | |
| 1008 entry.buffer = buffer; | |
| 1009 return null; | |
| 1010 } | |
| 1011 ).catch( (error: Error) => { | |
| 1012 // Log stream source of error. Raise a generic "failed to buffer" | |
| 1013 // error. This will become a permanent failure. | |
| 1014 console.error("Error loading buffer for", entry.ls.stream.fullName(), | |
| 1015 "(", entry.ls, "): ", error); | |
| 1016 return error; | |
| 1017 }); | |
| 1018 })).then( (results: Error[]): BufferedLogs => { | |
| 1019 // Identify any errors that we hit. | |
| 1020 let buffers = new Array<BufferedLogs>(this.active.length); | |
| 1021 let errors: Error[] = []; | |
| 1022 results.forEach( (err, idx) => { | |
| 1023 buffers[idx] = this.active[idx].buffer; | |
| 1024 if ( err ) { errors[idx] = err; } | |
| 1025 }); | |
| 1026 | |
| 1027 // We are done, and will return a value. | |
| 1028 this.currentNextPromise = null; | |
| 1029 if ( errors.length ) { | |
| 1030 throw this._aggregateErrors(errors); | |
| 1031 } | |
| 1032 return this._aggregateBuffers(buffers); | |
| 1033 }); | |
| 1034 | |
| 1035 return this.currentNextPromise; | |
| 1036 } | |
| 1037 | |
| 1038 private _aggregateErrors(errors: Error[]): Error { | |
| 1039 let isNotAuthenticated = false; | |
| 1040 errors.every( (err) => { | |
| 1041 if ( ! err ) { return true; } | |
| 1042 if ( err === NotAuthenticatedError ) { | |
| 1043 isNotAuthenticated = true; | |
| 1044 return true; | |
| 1045 } | |
| 1046 isNotAuthenticated = false; | |
| 1047 return false; | |
| 1048 }); | |
| 1049 return (( isNotAuthenticated ) ? | |
| 1050 (NotAuthenticatedError) : new Error("Stream Error")); | |
| 1051 } | |
| 1052 | |
| 1053 private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs { | |
| 1054 switch ( buffers.length ) { | |
| 1055 case 0: | |
| 1056 // No buffers, so no logs. | |
| 1057 return new BufferedLogs(null); | |
| 1058 case 1: | |
| 1059 // As a special case, if we only have one buffer, and we assume that its | |
| 1060 // entries are sorted, then that buffer is a return value. | |
| 1061 return new BufferedLogs(buffers[0].getAll()); | |
| 1062 } | |
| 1063 | |
| 1064 // Preload our peek array. | |
| 1065 let incomplete = false; | |
| 1066 let peek = buffers.map(function(buf) { | |
| 1067 var le = buf.peek(); | |
| 1068 if (! le) { | |
| 1069 incomplete = true; | |
| 1070 } | |
| 1071 return le; | |
| 1072 }); | |
| 1073 if (incomplete) { | |
| 1074 // One of our input buffers had no log entries. | |
| 1075 return new BufferedLogs(null); | |
| 1076 } | |
| 1077 | |
| 1078 // Assemble our aggregate buffer array. | |
| 1079 let entries: LogDog.LogEntry[] = []; | |
| 1080 while (true) { | |
| 1081 // Choose the next stream. | |
| 1082 var earliest = 0; | |
| 1083 for (var i = 1; i < buffers.length; i++) { | |
| 1084 if (this.compareLogs(peek[i], peek[earliest]) < 0) { | |
| 1085 earliest = i; | |
| 1086 } | |
| 1087 } | |
| 1088 | |
| 1089 // Get the next log from the earliest stream. | |
| 1090 entries.push(buffers[earliest].next()); | |
| 1091 | |
| 1092 // Repopulate that buffer's "peek" value. If the buffer has no more | |
| 1093 // entries, then we're done. | |
| 1094 var next = buffers[earliest].peek(); | |
| 1095 if (!next) { | |
| 1096 return new BufferedLogs(entries); | |
| 1097 } | |
| 1098 peek[earliest] = next; | |
| 1099 } | |
| 1100 } | |
| 1101 } | |
| 1102 | |
| 1103 module AggregateLogStream { | |
| 1104 export type Entry = { | |
| 1105 ls: LogStream; | |
| 1106 buffer: BufferedLogs; | |
| 1107 needsAuth: boolean; | |
| 1108 status: LogStreamStatus; | |
| 1109 } | |
| 1110 } | |
| 1111 | |
| 1112 /** | |
| 1113 * A buffer of ordered log entries. | |
| 1114 * | |
| 1115 * Assumes total ownership of the input log buffer, which can be null to | |
| 1116 * indicate no logs. | |
| 1117 */ | |
| 1118 class BufferedLogs { | |
| 1119 private logs: LogDog.LogEntry[] | null; | |
| 1120 private index: number; | |
| 1121 | |
| 1122 constructor(logs: LogDog.LogEntry[] | null) { | |
| 1123 this.logs = logs; | |
| 1124 this.index = 0; | |
| 1125 } | |
| 1126 | |
| 1127 peek(): LogDog.LogEntry | null { | |
| 1128 return (this.logs) ? (this.logs[this.index]) : (null); | |
| 1129 } | |
| 1130 | |
| 1131 getAll(): LogDog.LogEntry[] { | |
| 1132 // Pop all logs. | |
| 1133 var logs = this.logs; | |
| 1134 this.logs = null; | |
| 1135 return logs; | |
| 1136 } | |
| 1137 | |
| 1138 next() : LogDog.LogEntry | null { | |
| 1139 if (! (this.logs && this.logs.length)) { | |
| 1140 return null; | |
| 1141 } | |
| 1142 | |
| 1143 // Get the next log and increment our index. | |
| 1144 var log = this.logs[this.index++]; | |
| 1145 if (this.index >= this.logs.length) { | |
| 1146 this.logs = null; | |
| 1147 } | |
| 1148 return log; | |
| 1149 } | |
| 1150 } | |
| OLD | NEW |