| OLD | NEW |
| (Empty) | |
| 1 /* |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 Use of this source code is governed under the Apache License, Version 2.0 |
| 4 that can be found in the LICENSE file. |
| 5 */ |
| 6 |
| 7 import {Fetcher, FetcherOptions, FetcherStatus} |
| 8 from "logdog-stream-view/fetcher"; |
| 9 import {LogDogQuery, QueryParams, StreamType} from "logdog-stream-view/query"; |
| 10 import {LogDog} from "logdog-stream/logdog"; |
| 11 import * as luci_sleep_promise from "luci-sleep-promise/promise"; |
| 12 import {luci_rpc} from "rpc/client"; |
| 13 |
| 14 /** Sentinel error: not authenticated. */ |
| 15 let NotAuthenticatedError = new Error("Not Authenticated"); |
| 16 |
| 17 function resolveErr(err: Error) { |
| 18 let grpc = luci_rpc.GrpcError.convert(err); |
| 19 if ( grpc && grpc.code == luci_rpc.Code.UNAUTHENTICATED ) { |
| 20 return NotAuthenticatedError; |
| 21 } |
| 22 return err; |
| 23 } |
| 24 |
| 25 /** Stream status entry, as rendered by the view. */ |
| 26 type StreamStatusEntry = { |
| 27 name: string; |
| 28 desc: string; |
| 29 }; |
| 30 |
| 31 /** An individual log stream's status. */ |
| 32 type LogStreamStatus = { |
| 33 stream: LogDog.Stream; |
| 34 state: string; |
| 35 fetchStatus: FetcherStatus; |
| 36 finished: boolean; |
| 37 needsAuth: boolean; |
| 38 } |
| 39 |
| 40 type StreamStatusCallback = (v: LogStreamStatus[]) => void; |
| 41 |
| 42 export enum Location { |
| 43 /** |
| 44 * Represents the upper half of a split view. Logs start at 0 and go through |
| 45 * the HEAD point. |
| 46 */ |
| 47 HEAD, |
| 48 /** |
| 49 * Represents the lower half of the split view. Logs start at the TAIL point |
| 50 * and go through the BOTTOM anchor point. |
| 51 */ |
| 52 TAIL, |
| 53 /** |
| 54 * Represents an anchor point where the split occurred, obtained through a |
| 55 * single "Tail()" RPC call. If the terminal index is known when the split |
| 56 * occurs, this should be the terminal index. |
| 57 */ |
| 58 BOTTOM, |
| 59 } |
| 60 |
| 61 export enum LoadingState { |
| 62 NONE, |
| 63 RESOLVING, |
| 64 LOADING, |
| 65 RENDERING, |
| 66 NEEDS_AUTH, |
| 67 ERROR, |
| 68 } |
| 69 |
| 70 /** Represents control visibility in the view. */ |
| 71 type Controls = { |
| 72 /** Are we completely finished loading stream data? */ |
| 73 canSplit: boolean; |
| 74 /** Are we currently split? */ |
| 75 split: boolean; |
| 76 /** Show the bottom bar? */ |
| 77 bottom: boolean; |
| 78 /** Is the content fully loaded? */ |
| 79 fullyLoaded: boolean; |
| 80 |
| 81 /** Text in the status bar. */ |
| 82 loadingState: LoadingState; |
| 83 /** Stream status entries, or null for no status window. */ |
| 84 streamStatus: StreamStatusEntry[]; |
| 85 } |
| 86 |
| 87 /** Registered callbacks from the LogDog stream view. */ |
| 88 type ViewBinding = { |
| 89 client: luci_rpc.Client; |
| 90 mobile: boolean; |
| 91 |
| 92 pushLogEntries: (entries: LogDog.LogEntry[], l: Location) => void; |
| 93 clearLogEntries: () => void; |
| 94 |
| 95 updateControls: (c: Controls) => void; |
| 96 locationIsVisible: (l: Location) => boolean; |
| 97 }; |
| 98 |
| 99 /** Interface of the specific Model functions used by the view. */ |
| 100 interface ModelInterface { |
| 101 fetch(cancel: boolean): Promise<void>; |
| 102 split(): Promise<void>; |
| 103 |
| 104 reset(): void; |
| 105 setAutomatic(v: boolean): void; |
| 106 setTailing(v: boolean): void; |
| 107 notifyAuthenticationChanged(): void; |
| 108 } |
| 109 |
| 110 export class Model implements ModelInterface { |
| 111 /** If performing a small initial fetch, this is the size of the fetch. */ |
| 112 private static SMALL_INITIAL_FETCH_SIZE = (1024 * 4); |
| 113 /** If performing a large initial fetch, this is the size of the fetch. */ |
| 114 private static LARGE_INITIAL_FETCH_SIZE = (1024 * 24); |
| 115 /** If fetching on a mobile device, fetch in this chunk size. */ |
| 116 private static MOBILE_FETCH_SIZE = (1024 * 256); |
| 117 /** For standard fetching, fetch with this size. */ |
| 118 private static STANDARD_FETCH_SIZE = (4 * 1024 * 1024); |
| 119 |
| 120 /** |
| 121 * If >0, the maximum number of log lines to push at a time. We will sleep |
| 122 * in between these entries to allow the rest of the app to be responsive |
| 123 * during log dumping. |
| 124 */ |
| 125 private static logAppendInterval = 4000; |
| 126 /** Amount of time to sleep in between log append chunks. */ |
| 127 private static logAppendDelay = 0; |
| 128 |
| 129 /** Our log provider. */ |
| 130 private provider: LogProvider = this.nullProvider(); |
| 131 |
| 132 /** |
| 133 * Promise that is resolved when authentication state changes. When this |
| 134 * happens, a new Promise is installed, and future authentication changes |
| 135 * will resolve the new Promise. |
| 136 */ |
| 137 private authChangedPromise: Promise<void> = null; |
| 138 /** |
| 139 * Retained callback (Promise resolve) to invoke when authentication state |
| 140 * changes. |
| 141 */ |
| 142 private authChangedCallback: (() => void) = null; |
| 143 |
| 144 /** The current fetch Promise. */ |
| 145 private currentFetch: Promise<void>; |
| 146 /** The current fetch token. */ |
| 147 private currentFetchToken: FetchToken; |
| 148 |
| 149 /** Are we in automatic mode? */ |
| 150 private automatic = false; |
| 151 /** Are we tailing? */ |
| 152 private tailing = false; |
| 153 /** Are we in the middle of rendering logs? */ |
| 154 private rendering = true; |
| 155 |
| 156 private _loadingState: LoadingState = LoadingState.NONE; |
| 157 private _streamStatus: StreamStatusEntry[]; |
| 158 |
| 159 /** |
| 160 * When rendering a Promise that will resolve when the render completes. We |
| 161 * use this to pipeline parallel data fetching and rendering. |
| 162 */ |
| 163 private renderPromise: Promise<void>; |
| 164 |
| 165 constructor(readonly view: ViewBinding) { |
| 166 this.resetAuthChanged(); |
| 167 } |
| 168 |
| 169 resolve(paths: string[]): Promise<void> { |
| 170 this.reset(); |
| 171 |
| 172 // For any path that is a query, execute that query. |
| 173 this.loadingState = LoadingState.RESOLVING; |
| 174 return Promise.all( paths.map( (path): Promise<LogDog.Stream[]> => { |
| 175 let stream = LogDog.Stream.splitProject(path); |
| 176 if ( ! LogDogQuery.isQuery(stream.path) ) { |
| 177 return Promise.resolve([stream]); |
| 178 } |
| 179 |
| 180 // This "path" is really a query. Construct and execute. |
| 181 let query = new LogDogQuery(this.view.client); |
| 182 let doQuery = (): Promise<LogDog.Stream[]> => { |
| 183 return query.getAll({ |
| 184 project: stream.project, |
| 185 path: stream.path, |
| 186 streamType: StreamType.TEXT, |
| 187 }, 100).then( (result): LogDog.Stream[] => { |
| 188 return result.map( (qr): LogDog.Stream => { |
| 189 return qr.stream; |
| 190 } ); |
| 191 }).catch( (err: Error) => { |
| 192 err = resolveErr(err); |
| 193 if ( err == NotAuthenticatedError ) { |
| 194 return this.authChangedPromise.then( () => { |
| 195 return doQuery(); |
| 196 } ); |
| 197 } |
| 198 |
| 199 throw err; |
| 200 }); |
| 201 } |
| 202 return doQuery(); |
| 203 } ) ).then( (streamBlocks) => { |
| 204 let streams = new Array<LogDog.Stream>(); |
| 205 (streamBlocks || []).forEach( (streamBlock) => { |
| 206 streams.push.apply(streams, streamBlock); |
| 207 } ); |
| 208 |
| 209 |
| 210 let initialFetchSize = ( (streams.length === 1) ? |
| 211 Model.LARGE_INITIAL_FETCH_SIZE : Model.SMALL_INITIAL_FETCH_SIZE ); |
| 212 |
| 213 // Determine our fetch size. |
| 214 let maxFetchSize = ( (this.view.mobile) ? |
| 215 Model.MOBILE_FETCH_SIZE : Model.STANDARD_FETCH_SIZE ); |
| 216 |
| 217 // Generate a LogStream client entry for each composite stream. |
| 218 let logStreams = streams.map( (stream) => { |
| 219 console.log("Resolved log stream:", stream); |
| 220 return new LogStream( |
| 221 this.view.client, stream, initialFetchSize, maxFetchSize); |
| 222 }); |
| 223 |
| 224 // Reset any existing state. |
| 225 this.reset(); |
| 226 |
| 227 // If we have exactly one stream, then use it directly. This allows it to |
| 228 // split. |
| 229 let provider: LogProvider; |
| 230 switch( logStreams.length ) { |
| 231 case 0: |
| 232 provider = this.nullProvider(); |
| 233 break; |
| 234 case 1: |
| 235 provider = logStreams[0]; |
| 236 break; |
| 237 default: |
| 238 provider = new AggregateLogStream(logStreams); |
| 239 break |
| 240 } |
| 241 provider.setStreamStatusCallback((st: LogStreamStatus[]) => { |
| 242 if ( this.provider === provider ) { |
| 243 this.streamStatus = this.buildStreamStatus(st); |
| 244 } |
| 245 }); |
| 246 this.provider = provider; |
| 247 this.loadingState = LoadingState.NONE; |
| 248 } ).catch( (err: Error) => { |
| 249 this.loadingState = LoadingState.ERROR; |
| 250 console.error("Failed to resolve log streams:", err); |
| 251 }); |
| 252 } |
| 253 |
| 254 reset() { |
| 255 this.view.clearLogEntries(); |
| 256 this.clearCurrentFetch(); |
| 257 this.provider = this.nullProvider(); |
| 258 |
| 259 this.updateControls(); |
| 260 } |
| 261 |
| 262 private nullProvider(): LogProvider { return new AggregateLogStream([]); } |
| 263 |
| 264 private mintFetchToken(): FetchToken { |
| 265 return (this.currentFetchToken = new FetchToken( (tok: FetchToken) => { |
| 266 return (tok === this.currentFetchToken); |
| 267 } )); |
| 268 } |
| 269 |
| 270 private clearCurrentFetch() { |
| 271 this.currentFetch = this.currentFetchToken = null; |
| 272 this.rendering = false; |
| 273 } |
| 274 |
| 275 private get loadingState(): LoadingState { return this._loadingState; } |
| 276 private set loadingState(v: LoadingState) { |
| 277 if( v != this._loadingState ) { |
| 278 this._loadingState = v; |
| 279 this.updateControls(); |
| 280 } |
| 281 } |
| 282 |
| 283 private get streamStatus(): StreamStatusEntry[] { return this._streamStatus; } |
| 284 private set streamStatus(st: StreamStatusEntry[]) { |
| 285 this._streamStatus = st; |
| 286 this.updateControls(); |
| 287 } |
| 288 |
| 289 private updateControls() { |
| 290 this.view.updateControls({ |
| 291 canSplit: this.providerCanSplit, |
| 292 split: this.isSplit, |
| 293 bottom: !this.fetchedEndOfStream, |
| 294 fullyLoaded: (this.fetchedFullStream && (! this.rendering)), |
| 295 loadingState: this.loadingState, |
| 296 streamStatus: this.streamStatus, |
| 297 }); |
| 298 } |
| 299 |
| 300 /** |
| 301 * Note that the authentication state for the client has changed. This will |
| 302 * trigger an automatic fetch retry if our previous fetch failed due to |
| 303 * lack of authentication. |
| 304 */ |
| 305 notifyAuthenticationChanged() { |
| 306 // Resolve our current "auth changed" Promise. |
| 307 this.authChangedCallback(); |
| 308 } |
| 309 |
| 310 private resetAuthChanged() { |
| 311 // Resolve our previous function, if it's not already resolved. |
| 312 if ( this.authChangedCallback ) { |
| 313 this.authChangedCallback(); |
| 314 } |
| 315 |
| 316 // Create a new Promise and install it. |
| 317 this.authChangedPromise = new Promise<void>((resolve, reject) => { |
| 318 this.authChangedCallback = resolve; |
| 319 }); |
| 320 } |
| 321 |
| 322 split(): Promise<void> { |
| 323 // If we haven't already split, and our provider lets us split, then go |
| 324 // ahead and do so. |
| 325 if ( this.providerCanSplit ) { |
| 326 return this.fetchLocation(Location.TAIL, true); |
| 327 } |
| 328 return this.fetch(false); |
| 329 } |
| 330 |
| 331 fetch(cancel: boolean): Promise<void> { |
| 332 if ( this.isSplit ) { |
| 333 if ( this.tailing ) { |
| 334 // Next fetch grabs logs from the bottom (continue tailing). |
| 335 if ( ! this.fetchedEndOfStream ) { |
| 336 return this.fetchLocation(Location.BOTTOM, false); |
| 337 } else { |
| 338 return this.fetchLocation(Location.TAIL, false); |
| 339 } |
| 340 } |
| 341 |
| 342 // We're split, but not tailing, so fetch logs from HEAD. |
| 343 return this.fetchLocation(Location.HEAD, false); |
| 344 } |
| 345 |
| 346 // We're not split. If we haven't reached end of stream, fetch logs from |
| 347 // HEAD. |
| 348 return this.fetchLocation(Location.HEAD, false); |
| 349 } |
| 350 |
| 351 /** Fetch logs from an explicit location. */ |
| 352 fetchLocation(l: Location, cancel: boolean) { |
| 353 if ( this.currentFetch && (!cancel) ) { |
| 354 return this.currentFetch; |
| 355 } |
| 356 |
| 357 // If our provider is finished, then do nothing. |
| 358 if ( this.fetchedFullStream ) { |
| 359 // There are no more logs. |
| 360 return Promise.resolve(null); |
| 361 } |
| 362 |
| 363 // If we're asked to fetch BOTTOM, but we're not split, fetch HEAD instead. |
| 364 if ( l === Location.BOTTOM && ! this.isSplit ) { |
| 365 l = Location.HEAD; |
| 366 } |
| 367 |
| 368 // If we're not split, always fetch from BOTTOM. |
| 369 this.loadingState = LoadingState.LOADING; |
| 370 |
| 371 // Rotate our fetch ID. This will effectively cancel any pending fetches. |
| 372 let token = this.mintFetchToken(); |
| 373 return (this.currentFetch = this.provider.fetch(l, token).then( (buf) => { |
| 374 // Clear our fetching status. |
| 375 this.rendering = true; |
| 376 this.loadingState = LoadingState.RENDERING; |
| 377 let pushLogsPromise: Promise<void>; |
| 378 let hasLogs = (buf && buf.peek()); |
| 379 |
| 380 // Resolve any previous rendering Promise that we have. This makes sure |
| 381 // our rendering and fetching don't get more than one round out of sync. |
| 382 return (this.renderPromise || Promise.resolve(null)).then( () => { |
| 383 // Post-fetch cleanup. |
| 384 this.clearCurrentFetch(); |
| 385 |
| 386 // Clear our loading state (updates controls automatically). |
| 387 this.loadingState = LoadingState.RENDERING; |
| 388 |
| 389 // Initiate the next render. This will happen in the background while |
| 390 // we enqueue our next fetch. |
| 391 this.renderPromise = this.renderLogs(buf, l).then( () => { |
| 392 this.renderPromise = null; |
| 393 if ( this.loadingState === LoadingState.RENDERING ) { |
| 394 this.loadingState = LoadingState.NONE; |
| 395 } |
| 396 }); |
| 397 |
| 398 if ( this.fetchedFullStream ) { |
| 399 // If we're finished now, perform our finished cleanup. |
| 400 return; |
| 401 } |
| 402 |
| 403 // The fetch is finished. If we're automatic, and we got logs, start the |
| 404 // next. |
| 405 if ( this.automatic && hasLogs ) { |
| 406 console.log("Automatic: starting next fetch.") |
| 407 return this.fetch(false); |
| 408 } |
| 409 }); |
| 410 }).catch( (err: Error) => { |
| 411 // If we've been canceled, discard this result. |
| 412 if ( ! token.valid ) { |
| 413 return |
| 414 } |
| 415 |
| 416 this.clearCurrentFetch(); |
| 417 if ( err === NotAuthenticatedError ) { |
| 418 this.loadingState = LoadingState.NEEDS_AUTH; |
| 419 |
| 420 // We failed because we were not authenticated. Mark this so we can |
| 421 // retry if that state changes. |
| 422 return this.authChangedPromise.then( () => { |
| 423 // Our authentication state changed during the fetch! Retry |
| 424 // automatically. |
| 425 this.fetchLocation(l, false); |
| 426 }); |
| 427 } |
| 428 |
| 429 console.error("Failed to load log streams:", err); |
| 430 })); |
| 431 } |
| 432 |
| 433 private renderLogs(buf: BufferedLogs, l: Location): Promise<void> { |
| 434 if ( ! (buf && buf.peek()) ) { |
| 435 return Promise.resolve(null); |
| 436 } |
| 437 |
| 438 let lines = 0; |
| 439 let logBlock = new Array<LogDog.LogEntry>(); |
| 440 let appendBlock = () => { |
| 441 if ( logBlock.length ) { |
| 442 console.log("Rendering", logBlock.length, "logs..."); |
| 443 this.view.pushLogEntries(logBlock, l); |
| 444 logBlock.length = 0; |
| 445 lines = 0; |
| 446 |
| 447 // Update our status and controls. |
| 448 this.updateControls(); |
| 449 } |
| 450 }; |
| 451 |
| 452 // Create a promise loop to push logs at intervals. |
| 453 let pushLogs = (): Promise<void> => { |
| 454 return Promise.resolve().then( () => { |
| 455 // Add logs until we reach our interval lines. |
| 456 for ( let nextLog = buf.next(); (nextLog); nextLog = buf.next() ) { |
| 457 // If we've exceeded our burst, then interleave a sleep (yield). |
| 458 if (Model.logAppendInterval > 0 && |
| 459 lines >= Model.logAppendInterval ) { |
| 460 appendBlock(); |
| 461 |
| 462 return luci_sleep_promise.sleep(Model.logAppendDelay).then( |
| 463 () => { |
| 464 // Enqueue the next push round. |
| 465 return pushLogs(); |
| 466 } ); |
| 467 } |
| 468 |
| 469 // Add the next log to the append block. |
| 470 logBlock.push(nextLog); |
| 471 if ( nextLog.text && nextLog.text.lines ) { |
| 472 lines += nextLog.text.lines.length; |
| 473 } |
| 474 } |
| 475 |
| 476 // If there are any buffered logs, append that block. |
| 477 appendBlock(); |
| 478 }); |
| 479 } |
| 480 return pushLogs(); |
| 481 } |
| 482 |
| 483 setTailing(v: boolean) { |
| 484 this.tailing = v; |
| 485 } |
| 486 |
| 487 setAutomatic(v: boolean) { |
| 488 this.automatic = v; |
| 489 if ( v ) { |
| 490 // Passively kick off a new fetch. |
| 491 this.fetch(false); |
| 492 } |
| 493 } |
| 494 |
| 495 private buildStreamStatus(v: LogStreamStatus[]): StreamStatusEntry[] { |
| 496 let maxStatus = FetcherStatus.IDLE; |
| 497 let maxStatusCount = 0; |
| 498 let needsAuth = false; |
| 499 |
| 500 // Prune any finished entries and accumulate them for status bar change. |
| 501 v = (v || []).filter( (st) => { |
| 502 needsAuth = (needsAuth || st.needsAuth); |
| 503 |
| 504 if ( st.fetchStatus > maxStatus ) { |
| 505 maxStatus = st.fetchStatus; |
| 506 maxStatusCount = 1; |
| 507 } else if ( st.fetchStatus === maxStatus ) { |
| 508 maxStatusCount++; |
| 509 } |
| 510 |
| 511 return (! st.finished); |
| 512 }); |
| 513 |
| 514 return v.map( (st): StreamStatusEntry => { |
| 515 return { |
| 516 name: ".../+/" + st.stream.name, |
| 517 desc: st.state, |
| 518 }; |
| 519 } ); |
| 520 } |
| 521 |
| 522 private get providerCanSplit(): boolean { |
| 523 let split = this.provider.split(); |
| 524 return (!! (split && split.canSplit())); |
| 525 } |
| 526 |
| 527 private get isSplit(): boolean { |
| 528 let split = this.provider.split(); |
| 529 return ( !! (split && split.isSplit()) ); |
| 530 } |
| 531 |
| 532 private get fetchedEndOfStream(): boolean { |
| 533 return (this.provider.fetchedEndOfStream()); |
| 534 } |
| 535 |
| 536 private get fetchedFullStream(): boolean { |
| 537 return (this.fetchedEndOfStream && (! this.isSplit)); |
| 538 } |
| 539 } |
| 540 |
| 541 /** |
| 542 * A token used to repesent an individual fetch. A token can assert whether its |
| 543 * fetch has been invalidated. |
| 544 */ |
| 545 class FetchToken { |
| 546 private validate: (tok: FetchToken) => boolean; |
| 547 |
| 548 constructor(validate: (tok: FetchToken) => boolean) { |
| 549 this.validate = validate; |
| 550 } |
| 551 |
| 552 get valid(): boolean { |
| 553 return this.validate(this); |
| 554 } |
| 555 |
| 556 do<T>(p: Promise<T>): Promise<T> { |
| 557 return p.then( (v): T => { |
| 558 if ( ! this.valid ) { |
| 559 throw new Error("Token has been invalidated, discarding fetch."); |
| 560 } |
| 561 return v |
| 562 } ); |
| 563 } |
| 564 } |
| 565 |
| 566 interface LogProvider { |
| 567 setStreamStatusCallback(cb: StreamStatusCallback): void; |
| 568 fetch(l: Location, token: FetchToken): Promise<BufferedLogs>; |
| 569 |
| 570 /** Will return null if this LogProvider doesn't support splitting. */ |
| 571 split(): SplitLogProvider; |
| 572 fetchedEndOfStream(): boolean; |
| 573 } |
| 574 |
| 575 /** Additional methods for log stream splitting, if supported. */ |
| 576 interface SplitLogProvider { |
| 577 canSplit(): boolean |
| 578 isSplit(): boolean; |
| 579 } |
| 580 |
| 581 /** A LogStream is a LogProvider manages a single log stream. */ |
| 582 class LogStream implements LogProvider { |
| 583 /** |
| 584 * Always begin with a small fetch. We'll disable this afterward the first |
| 585 * finishes. |
| 586 */ |
| 587 private initialFetch = true; |
| 588 |
| 589 private fetcher: Fetcher; |
| 590 |
| 591 /** The log stream index of the next head() log. */ |
| 592 private nextHeadIndex = 0; |
| 593 /** |
| 594 * The lowest log stream index of all of the tail logs. If this is <0, then |
| 595 * it is uninitialized. |
| 596 */ |
| 597 private firstTailIndex = -1; |
| 598 /** |
| 599 * The next log stream index to fetch to continue pulling logs from the |
| 600 * bottom. If this is <0, it is uninitialized. |
| 601 */ |
| 602 private nextBottomIndex = -1; |
| 603 |
| 604 private streamStatusCallback: StreamStatusCallback; |
| 605 |
| 606 /** The size of the tail walkback region. */ |
| 607 private static TAIL_WALKBACK = 500; |
| 608 |
| 609 constructor(client: luci_rpc.Client, readonly stream: LogDog.Stream, |
| 610 readonly initialFetchSize: number, |
| 611 readonly maxFetchSize: number ) { |
| 612 this.fetcher = new Fetcher(client, stream); |
| 613 this.fetcher.setStatusChangedCallback( () => { |
| 614 this.statusChanged(); |
| 615 }); |
| 616 } |
| 617 |
| 618 get fetcherStatus(): FetcherStatus { return this.fetcher.status; } |
| 619 |
| 620 fetch(l: Location, token: FetchToken): Promise<BufferedLogs> { |
| 621 // Determine which method to use based on the insertion point and current |
| 622 // log stream fetch state. |
| 623 let getLogs: Promise<LogDog.LogEntry[]>; |
| 624 switch( l ) { |
| 625 case Location.HEAD: |
| 626 getLogs = this.getHead(token); |
| 627 break; |
| 628 |
| 629 case Location.TAIL: |
| 630 getLogs = this.getTail(token); |
| 631 break; |
| 632 |
| 633 case Location.BOTTOM: |
| 634 getLogs = this.getBottom(token); |
| 635 break; |
| 636 } |
| 637 |
| 638 return getLogs.then( (logs: LogDog.LogEntry[]) => { |
| 639 this.initialFetch = false; |
| 640 this.statusChanged(); |
| 641 return new BufferedLogs(logs); |
| 642 }).catch( (err: Error) => { |
| 643 err = resolveErr(err); |
| 644 throw err; |
| 645 }); |
| 646 } |
| 647 |
| 648 setStreamStatusCallback(cb: StreamStatusCallback) { |
| 649 this.streamStatusCallback = cb; |
| 650 } |
| 651 |
| 652 private statusChanged() { |
| 653 if ( this.streamStatusCallback ) { |
| 654 this.streamStatusCallback([this.getStreamStatus()]); |
| 655 } |
| 656 } |
| 657 |
| 658 getStreamStatus(): LogStreamStatus { |
| 659 let pieces = new Array<string>(); |
| 660 let tidx = this.fetcher.terminalIndex; |
| 661 if ( this.nextHeadIndex > 0 ) { |
| 662 pieces.push("1.." + this.nextHeadIndex); |
| 663 } else { |
| 664 pieces.push("0"); |
| 665 } |
| 666 if ( this.isSplit() ) { |
| 667 if ( tidx >= 0 ) { |
| 668 pieces.push("| " + this.firstTailIndex + " / " + tidx); |
| 669 tidx = -1; |
| 670 } else { |
| 671 pieces.push("| " + this.firstTailIndex + ".." + this.nextBottomIndex + |
| 672 " ..."); |
| 673 } |
| 674 } else if (tidx >= 0) { |
| 675 pieces.push("/ " + tidx); |
| 676 } else { |
| 677 pieces.push("..."); |
| 678 } |
| 679 |
| 680 let needsAuth = false; |
| 681 let finished = this.finished; |
| 682 if ( finished ) { |
| 683 pieces.push("(Finished)"); |
| 684 } else { |
| 685 switch ( this.fetcher.status ) { |
| 686 case FetcherStatus.IDLE: |
| 687 case FetcherStatus.LOADING: |
| 688 pieces.push("(Loading)"); |
| 689 break; |
| 690 |
| 691 case FetcherStatus.STREAMING: |
| 692 pieces.push("(Streaming)"); |
| 693 break; |
| 694 |
| 695 case FetcherStatus.MISSING: |
| 696 pieces.push("(Missing)"); |
| 697 break; |
| 698 |
| 699 case FetcherStatus.ERROR: |
| 700 let err = resolveErr(this.fetcher.lastError); |
| 701 if (err === NotAuthenticatedError ) { |
| 702 pieces.push("(Auth Error)"); |
| 703 needsAuth = true; |
| 704 } else { |
| 705 pieces.push("(Error)"); |
| 706 } |
| 707 break; |
| 708 } |
| 709 } |
| 710 |
| 711 return { |
| 712 stream: this.stream, |
| 713 state: pieces.join(" "), |
| 714 finished: finished, |
| 715 fetchStatus: this.fetcher.status, |
| 716 needsAuth: needsAuth, |
| 717 }; |
| 718 } |
| 719 |
| 720 split(): SplitLogProvider { |
| 721 return this; |
| 722 } |
| 723 |
| 724 isSplit(): boolean { |
| 725 // We're split if we have a bottom and we're not finished tailing. |
| 726 return ( this.firstTailIndex >= 0 && |
| 727 (this.nextHeadIndex < this.firstTailIndex) ); |
| 728 } |
| 729 |
| 730 canSplit(): boolean { |
| 731 return ( ! (this.isSplit() || this.caughtUp) ); |
| 732 } |
| 733 |
| 734 private get caughtUp(): boolean { |
| 735 // We're caught up if we have both a head and bottom index, and the head |
| 736 // is at or past the bottom. |
| 737 return ( this.nextHeadIndex >= 0 && this.nextBottomIndex >= 0 && |
| 738 this.nextHeadIndex >= this.nextBottomIndex ); |
| 739 } |
| 740 |
| 741 fetchedEndOfStream(): boolean { |
| 742 let tidx = this.fetcher.terminalIndex; |
| 743 return ( tidx >= 0 && ( |
| 744 (this.nextHeadIndex > tidx) || (this.nextBottomIndex > tidx) ) ); |
| 745 } |
| 746 |
| 747 private get finished(): boolean { |
| 748 return ( (! this.isSplit()) && this.fetchedEndOfStream() ); |
| 749 } |
| 750 |
| 751 private updateIndexes() { |
| 752 if ( this.firstTailIndex >= 0 ) { |
| 753 if ( this.nextBottomIndex < this.firstTailIndex ) { |
| 754 this.nextBottomIndex = this.firstTailIndex + 1; |
| 755 } |
| 756 |
| 757 if ( this.nextHeadIndex >= this.firstTailIndex && |
| 758 this.nextBottomIndex >= 0) { |
| 759 // Synchronize our head and bottom pointers. |
| 760 this.nextHeadIndex = this.nextBottomIndex = |
| 761 Math.max(this.nextHeadIndex, this.nextBottomIndex); |
| 762 } |
| 763 } |
| 764 } |
| 765 |
| 766 private nextFetcherOptions(): FetcherOptions { |
| 767 let opts: FetcherOptions = {}; |
| 768 if ( this.initialFetch ) { |
| 769 opts.byteCount = this.initialFetchSize; |
| 770 } else if ( this.maxFetchSize > 0 ) { |
| 771 opts.byteCount = this.maxFetchSize; |
| 772 } |
| 773 return opts; |
| 774 } |
| 775 |
| 776 private getHead(token: FetchToken): Promise<LogDog.LogEntry[]> { |
| 777 this.updateIndexes(); |
| 778 |
| 779 if ( this.finished ) { |
| 780 // Our HEAD region has met/surpassed our TAIL region, so there are no |
| 781 // HEAD logs to return. Only bottom. |
| 782 return Promise.resolve(); |
| 783 } |
| 784 |
| 785 // If we have a tail pointer, only fetch HEAD up to that point. |
| 786 let opts = this.nextFetcherOptions(); |
| 787 if ( this.firstTailIndex >= 0 ) { |
| 788 opts.logCount = (this.firstTailIndex - this.nextHeadIndex); |
| 789 } |
| 790 |
| 791 return token.do( this.fetcher.get(this.nextHeadIndex, opts) ).then( |
| 792 (logs) => { |
| 793 if ( logs && logs.length ) { |
| 794 this.nextHeadIndex = (logs[logs.length - 1].streamIndex + 1); |
| 795 this.updateIndexes(); |
| 796 } |
| 797 return logs; |
| 798 } ); |
| 799 } |
| 800 |
| 801 private getTail(token: FetchToken): Promise<LogDog.LogEntry[]> { |
| 802 // If we haven't performed a Tail before, start with one. |
| 803 if ( this.firstTailIndex < 0 ) { |
| 804 let tidx = this.fetcher.terminalIndex; |
| 805 if ( tidx < 0 ) { |
| 806 return token.do( this.fetcher.tail() ).then( (logs) => { |
| 807 // Mark our initial "tail" position. |
| 808 if ( logs && logs.length ) { |
| 809 this.firstTailIndex = logs[0].streamIndex; |
| 810 this.updateIndexes(); |
| 811 } |
| 812 return logs; |
| 813 } ); |
| 814 } |
| 815 |
| 816 this.firstTailIndex = (tidx+1); |
| 817 this.updateIndexes(); |
| 818 } |
| 819 |
| 820 // We're doing incremental reverse fetches. If we're finished tailing, |
| 821 // return no logs. |
| 822 if ( ! this.isSplit() ) { |
| 823 return Promise.resolve(null); |
| 824 } |
| 825 |
| 826 // Determine our walkback region. |
| 827 let startIndex = this.firstTailIndex - LogStream.TAIL_WALKBACK; |
| 828 if ( this.nextHeadIndex >= 0 ) { |
| 829 if ( startIndex < this.nextHeadIndex ) { |
| 830 startIndex = this.nextHeadIndex; |
| 831 } |
| 832 } else if ( startIndex < 0 ) { |
| 833 startIndex = 0; |
| 834 } |
| 835 let count = (this.firstTailIndex - startIndex); |
| 836 |
| 837 // Fetch the full walkback region. |
| 838 return token.do( this.fetcher.getAll(startIndex, count) ).then( (logs) => { |
| 839 this.firstTailIndex = startIndex; |
| 840 this.updateIndexes(); |
| 841 return logs; |
| 842 }); |
| 843 } |
| 844 |
| 845 private getBottom(token: FetchToken): Promise<LogDog.LogEntry[]> { |
| 846 this.updateIndexes(); |
| 847 |
| 848 // If there are no more logs in the stream, return no logs. |
| 849 if ( this.fetchedEndOfStream() ) { |
| 850 return Promise.resolve(null); |
| 851 } |
| 852 |
| 853 // If our bottom index isn't initialized, initialize it via tail. |
| 854 if ( this.nextBottomIndex < 0 ) { |
| 855 return this.getTail(token); |
| 856 } |
| 857 |
| 858 let opts = this.nextFetcherOptions(); |
| 859 return token.do( this.fetcher.get(this.nextBottomIndex, opts) ).then( |
| 860 (logs) => { |
| 861 if ( logs && logs.length ) { |
| 862 this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1); |
| 863 } |
| 864 return logs; |
| 865 } ); |
| 866 } |
| 867 } |
| 868 |
| 869 /** |
| 870 * An aggregate log stream. It presents a single-stream view, but is really |
| 871 * composed of several log streams interleaved based on their prefix indices |
| 872 * (if they share a prefix) or timestamps (if they don't). |
| 873 * |
| 874 * At least one log entry from each stream must be buffered before any log |
| 875 * entries can be yielded, since we don't know what ordering to apply otherwise. |
| 876 * To make this fast, we will make the first request for each stream small so |
| 877 * it finishes quickly and we can start rendering. Subsequent entries will be |
| 878 * larger for efficiency. |
| 879 * |
| 880 * @param {LogStream} streams the composite streams. |
| 881 */ |
| 882 class AggregateLogStream implements LogProvider { |
| 883 |
| 884 private streams: AggregateLogStream.Entry[]; |
| 885 private active: AggregateLogStream.Entry[]; |
| 886 private currentNextPromise: Promise<BufferedLogs>; |
| 887 private compareLogs: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number; |
| 888 |
| 889 private streamStatusCallback: StreamStatusCallback; |
| 890 |
| 891 constructor(streams: LogStream[]) { |
| 892 // Input streams, ordered by input order. |
| 893 this.streams = streams.map<AggregateLogStream.Entry>( (ls, i) => { |
| 894 ls.setStreamStatusCallback( (st: LogStreamStatus[]) => { |
| 895 if ( st ) { |
| 896 this.streams[i].status = st[0]; |
| 897 this.statusChanged(); |
| 898 } |
| 899 }); |
| 900 |
| 901 return { |
| 902 ls: ls, |
| 903 buffer: null, |
| 904 needsAuth: false, |
| 905 status: ls.getStreamStatus(), |
| 906 }; |
| 907 } ); |
| 908 |
| 909 // Subset of input streams that are still active (not finished). |
| 910 this.active = this.streams; |
| 911 |
| 912 // The currently-active "next" promise. |
| 913 this.currentNextPromise = null; |
| 914 |
| 915 // Determine our log comparison function. If all of our logs share a prefix, |
| 916 // we will use the prefix index. Otherwise, we will use the timestamp. |
| 917 let template: LogDog.Stream = null; |
| 918 let sharedPrefix = this.streams.every( (entry) => { |
| 919 if ( ! template ) { |
| 920 template = entry.ls.stream; |
| 921 return true; |
| 922 } |
| 923 return template.samePrefixAs(entry.ls.stream); |
| 924 }); |
| 925 |
| 926 this.compareLogs = (( sharedPrefix ) ? |
| 927 (a, b) => { |
| 928 return (a.prefixIndex - b.prefixIndex); |
| 929 } : |
| 930 (a, b) => { |
| 931 return a.timestamp.getTime() - b.timestamp.getTime(); |
| 932 }); |
| 933 } |
| 934 |
| 935 split(): SplitLogProvider { return null; } |
| 936 fetchedEndOfStream(): boolean { return ( ! this.active.length ); } |
| 937 |
| 938 setStreamStatusCallback(cb: StreamStatusCallback) { |
| 939 this.streamStatusCallback = cb; |
| 940 } |
| 941 |
| 942 private statusChanged() { |
| 943 if ( this.streamStatusCallback ) { |
| 944 // Iterate through our composite stream statuses and pick the one that we |
| 945 // want to report. |
| 946 this.streamStatusCallback( this.streams.map( (entry): LogStreamStatus => { |
| 947 return entry.status; |
| 948 } )); |
| 949 } |
| 950 } |
| 951 |
| 952 /** |
| 953 * Implements LogProvider.next |
| 954 */ |
| 955 fetch(l: Location, token: FetchToken): Promise<BufferedLogs> { |
| 956 // If we're already are fetching the next buffer, this is an error. |
| 957 if (this.currentNextPromise) { |
| 958 throw new Error("In-progress next(), cannot start another."); |
| 959 } |
| 960 |
| 961 // Filter out any finished streams from our active list. A stream is |
| 962 // finished if it is finished streaming and we don't have a retained buffer |
| 963 // from it. |
| 964 // |
| 965 // This updates our "finished" property, since it's derived from the length |
| 966 // of our active array. |
| 967 this.active = this.active.filter( (entry) => { |
| 968 return ( (! entry.buffer) || entry.buffer.peek() || |
| 969 (! entry.ls.fetchedEndOfStream()) ); |
| 970 }); |
| 971 |
| 972 if ( ! this.active.length ) { |
| 973 // No active streams, so we're finished. Permanently set our promise to |
| 974 // the finished state. |
| 975 return Promise.resolve(); |
| 976 } |
| 977 |
| 978 // Fill all buffers for all active streams. This may result in an RPC to |
| 979 // load new buffer content for streams whose buffers are empty. |
| 980 // |
| 981 // If any stream doesn't currently have buffered logs, we will call their |
| 982 // "next()" methods to pull the next set of logs. This will result in one of |
| 983 // three possibilities: |
| 984 // - A BufferedLogs will be returned containing the next logs for this strea
m. |
| 985 // The log stream may also be finished. |
| 986 // - null will be returned, and this log stream must now be finished. |
| 987 // - An error will be returned. |
| 988 // |
| 989 // The error is interesting, since we must present a common error view to ou
r |
| 990 // caller. If all returned errors are "NotAuthenticatedError", we will retur
n |
| 991 // a NotAuthenticatedError. Otherwise, we will return a generic "streams |
| 992 // failed" error. |
| 993 // |
| 994 // The outer Promise will pull logs for any streams that don't have any. |
| 995 // On success, the "buffer" for the entry will be populated. On failure, an |
| 996 // error will be returned. Because Promise.all fails fast, we will catch inn
er |
| 997 // errors and return them as values (null if no error). |
| 998 this.currentNextPromise = Promise.all( this.active.map( (entry) => { |
| 999 // If the entry's buffer still has data, use it immediately. |
| 1000 if (entry.buffer && entry.buffer.peek()) { |
| 1001 return null; |
| 1002 } |
| 1003 |
| 1004 // No buffered logs. Call the stream's "next()" method to get some. |
| 1005 return entry.ls.fetch(Location.HEAD, token).then( |
| 1006 (buffer): Error => { |
| 1007 // Retain this buffer. |
| 1008 entry.buffer = buffer; |
| 1009 return null; |
| 1010 } |
| 1011 ).catch( (error: Error) => { |
| 1012 // Log stream source of error. Raise a generic "failed to buffer" |
| 1013 // error. This will become a permanent failure. |
| 1014 console.error("Error loading buffer for", entry.ls.stream.fullName(), |
| 1015 "(", entry.ls, "): ", error); |
| 1016 return error; |
| 1017 }); |
| 1018 })).then( (results: Error[]): BufferedLogs => { |
| 1019 // Identify any errors that we hit. |
| 1020 let buffers = new Array<BufferedLogs>(this.active.length); |
| 1021 let errors: Error[] = []; |
| 1022 results.forEach( (err, idx) => { |
| 1023 buffers[idx] = this.active[idx].buffer; |
| 1024 if ( err ) { errors[idx] = err; } |
| 1025 }); |
| 1026 |
| 1027 // We are done, and will return a value. |
| 1028 this.currentNextPromise = null; |
| 1029 if ( errors.length ) { |
| 1030 throw this._aggregateErrors(errors); |
| 1031 } |
| 1032 return this._aggregateBuffers(buffers); |
| 1033 }); |
| 1034 |
| 1035 return this.currentNextPromise; |
| 1036 } |
| 1037 |
| 1038 private _aggregateErrors(errors: Error[]): Error { |
| 1039 let isNotAuthenticated = false; |
| 1040 errors.every( (err) => { |
| 1041 if ( ! err ) { return true; } |
| 1042 if ( err === NotAuthenticatedError ) { |
| 1043 isNotAuthenticated = true; |
| 1044 return true; |
| 1045 } |
| 1046 isNotAuthenticated = false; |
| 1047 return false; |
| 1048 }); |
| 1049 return (( isNotAuthenticated ) ? |
| 1050 (NotAuthenticatedError) : new Error("Stream Error")); |
| 1051 } |
| 1052 |
| 1053 private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs { |
| 1054 switch ( buffers.length ) { |
| 1055 case 0: |
| 1056 // No buffers, so no logs. |
| 1057 return new BufferedLogs(null); |
| 1058 case 1: |
| 1059 // As a special case, if we only have one buffer, and we assume that its |
| 1060 // entries are sorted, then that buffer is a return value. |
| 1061 return new BufferedLogs(buffers[0].getAll()); |
| 1062 } |
| 1063 |
| 1064 // Preload our peek array. |
| 1065 let incomplete = false; |
| 1066 let peek = buffers.map(function(buf) { |
| 1067 var le = buf.peek(); |
| 1068 if (! le) { |
| 1069 incomplete = true; |
| 1070 } |
| 1071 return le; |
| 1072 }); |
| 1073 if (incomplete) { |
| 1074 // One of our input buffers had no log entries. |
| 1075 return new BufferedLogs(null); |
| 1076 } |
| 1077 |
| 1078 // Assemble our aggregate buffer array. |
| 1079 let entries: LogDog.LogEntry[] = []; |
| 1080 while (true) { |
| 1081 // Choose the next stream. |
| 1082 var earliest = 0; |
| 1083 for (var i = 1; i < buffers.length; i++) { |
| 1084 if (this.compareLogs(peek[i], peek[earliest]) < 0) { |
| 1085 earliest = i; |
| 1086 } |
| 1087 } |
| 1088 |
| 1089 // Get the next log from the earliest stream. |
| 1090 entries.push(buffers[earliest].next()); |
| 1091 |
| 1092 // Repopulate that buffer's "peek" value. If the buffer has no more |
| 1093 // entries, then we're done. |
| 1094 var next = buffers[earliest].peek(); |
| 1095 if (!next) { |
| 1096 return new BufferedLogs(entries); |
| 1097 } |
| 1098 peek[earliest] = next; |
| 1099 } |
| 1100 } |
| 1101 } |
| 1102 |
| 1103 module AggregateLogStream { |
| 1104 export type Entry = { |
| 1105 ls: LogStream; |
| 1106 buffer: BufferedLogs; |
| 1107 needsAuth: boolean; |
| 1108 status: LogStreamStatus; |
| 1109 } |
| 1110 } |
| 1111 |
| 1112 /** |
| 1113 * A buffer of ordered log entries. |
| 1114 * |
| 1115 * Assumes total ownership of the input log buffer, which can be null to |
| 1116 * indicate no logs. |
| 1117 */ |
| 1118 class BufferedLogs { |
| 1119 private logs: LogDog.LogEntry[] | null; |
| 1120 private index: number; |
| 1121 |
| 1122 constructor(logs: LogDog.LogEntry[] | null) { |
| 1123 this.logs = logs; |
| 1124 this.index = 0; |
| 1125 } |
| 1126 |
| 1127 peek(): LogDog.LogEntry | null { |
| 1128 return (this.logs) ? (this.logs[this.index]) : (null); |
| 1129 } |
| 1130 |
| 1131 getAll(): LogDog.LogEntry[] { |
| 1132 // Pop all logs. |
| 1133 var logs = this.logs; |
| 1134 this.logs = null; |
| 1135 return logs; |
| 1136 } |
| 1137 |
| 1138 next() : LogDog.LogEntry | null { |
| 1139 if (! (this.logs && this.logs.length)) { |
| 1140 return null; |
| 1141 } |
| 1142 |
| 1143 // Get the next log and increment our index. |
| 1144 var log = this.logs[this.index++]; |
| 1145 if (this.index >= this.logs.length) { |
| 1146 this.logs = null; |
| 1147 } |
| 1148 return log; |
| 1149 } |
| 1150 } |
| OLD | NEW |