| OLD | NEW |
| 1 /* | 1 /* |
| 2 Copyright 2016 The LUCI Authors. All rights reserved. | 2 Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 Use of this source code is governed under the Apache License, Version 2.0 | 3 Use of this source code is governed under the Apache License, Version 2.0 |
| 4 that can be found in the LICENSE file. | 4 that can be found in the LICENSE file. |
| 5 */ | 5 */ |
| 6 | 6 |
| 7 ///<reference path="../logdog-stream/logdog.ts" /> | 7 ///<reference path="../logdog-stream/logdog.ts" /> |
| 8 ///<reference path="../luci-operation/operation.ts" /> | 8 ///<reference path="../luci-operation/operation.ts" /> |
| 9 ///<reference path="../luci-sleep-promise/promise.ts" /> | 9 ///<reference path="../luci-sleep-promise/promise.ts" /> |
| 10 ///<reference path="../rpc/client.ts" /> | 10 ///<reference path="../rpc/client.ts" /> |
| (...skipping 312 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 323 this.clearCurrentOperation(); | 323 this.clearCurrentOperation(); |
| 324 this.provider = this.nullProvider(); | 324 this.provider = this.nullProvider(); |
| 325 | 325 |
| 326 this.updateControls(); | 326 this.updateControls(); |
| 327 } | 327 } |
| 328 | 328 |
| 329 private nullProvider(): LogProvider { | 329 private nullProvider(): LogProvider { |
| 330 return new AggregateLogStream([]); | 330 return new AggregateLogStream([]); |
| 331 } | 331 } |
| 332 | 332 |
| 333 private clearCurrentOperation() { | 333 /** |
| 334 * Clears the current operation, cancelling it if set. If the operation is |
| 335 * cleared, the current fetch and rendering states will be reset. |
| 336 * |
| 337 * @param op if provided, only cancel the current operation if it equals |
| 338 * the supplied "op". If "op" does not match the current operation, it |
| 339 * will be cancelled, but the current operation will be left in-tact. |
| 340 * If "op" is undefined, cancel the current operation regardless. |
| 341 */ |
| 342 private clearCurrentOperation(op?: luci.Operation) { |
| 334 if (this.currentOperation) { | 343 if (this.currentOperation) { |
| 344 if (op && op !== this.currentOperation) { |
| 345 // Conditional clear, and we are not the current operation, so do |
| 346 // nothing. |
| 347 op.cancel(); |
| 348 return; |
| 349 } |
| 335 this.currentOperation.cancel(); | 350 this.currentOperation.cancel(); |
| 336 this.currentOperation = this.currentFetchPromise = null; | 351 this.currentOperation = this.currentFetchPromise = null; |
| 337 } | 352 } |
| 338 this.rendering = false; | 353 this.rendering = false; |
| 339 } | 354 } |
| 340 | 355 |
| 341 private get loadingState(): LoadingState { | 356 private get loadingState(): LoadingState { |
| 342 return this.loadingStateValue; | 357 return this.loadingStateValue; |
| 343 } | 358 } |
| 344 private set loadingState(v: LoadingState) { | 359 private set loadingState(v: LoadingState) { |
| (...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 436 // We're not split. If we haven't reached end of stream, fetch logs from | 451 // We're not split. If we haven't reached end of stream, fetch logs from |
| 437 // HEAD. | 452 // HEAD. |
| 438 return this.fetchLocation(Location.HEAD, cancel); | 453 return this.fetchLocation(Location.HEAD, cancel); |
| 439 } | 454 } |
| 440 | 455 |
| 441 /** Fetch logs from an explicit location. */ | 456 /** Fetch logs from an explicit location. */ |
| 442 async fetchLocation(l: Location, cancel: boolean): Promise<void> { | 457 async fetchLocation(l: Location, cancel: boolean): Promise<void> { |
| 443 if (this.currentFetchPromise && (!cancel)) { | 458 if (this.currentFetchPromise && (!cancel)) { |
| 444 return this.currentFetchPromise; | 459 return this.currentFetchPromise; |
| 445 } | 460 } |
| 461 this.clearCurrentOperation(); |
| 446 | 462 |
| 447 // If our provider is finished, then do nothing. | 463 // If our provider is finished, then do nothing. |
| 448 if (this.fetchedFullStream) { | 464 if (this.fetchedFullStream) { |
| 449 // There are no more logs. | 465 // There are no more logs. |
| 450 return undefined; | 466 return undefined; |
| 451 } | 467 } |
| 452 | 468 |
| 453 // If we're asked to fetch BOTTOM, but we're not split, fetch HEAD | 469 // If we're asked to fetch BOTTOM, but we're not split, fetch HEAD |
| 454 // instead. | 470 // instead. |
| 455 if (l === Location.BOTTOM && !this.isSplit) { | 471 if (l === Location.BOTTOM && !this.isSplit) { |
| 456 l = Location.HEAD; | 472 l = Location.HEAD; |
| 457 } | 473 } |
| 458 | 474 |
| 459 // If we're not split, always fetch from BOTTOM. | |
| 460 this.loadingState = LoadingState.LOADING; | |
| 461 let loadingWhileTimer = | |
| 462 new luci.Timer(Model.LOADING_WHILE_THRESHOLD_MS, () => { | |
| 463 if (this.loadingState === LoadingState.LOADING) { | |
| 464 this.loadingState = LoadingState.LOADING_BEEN_A_WHILE; | |
| 465 } | |
| 466 }); | |
| 467 | |
| 468 // Rotate our fetch ID. This will effectively cancel any pending fetches. | 475 // Rotate our fetch ID. This will effectively cancel any pending fetches. |
| 469 this.currentOperation = new luci.Operation(); | 476 this.currentOperation = new luci.Operation(); |
| 470 this.currentFetchPromise = | 477 this.currentFetchPromise = |
| 471 this.fetchLocationImpl(l, this.currentOperation); | 478 this.fetchLocationImpl(l, this.currentOperation); |
| 472 try { | |
| 473 await this.currentFetchPromise; | |
| 474 } catch (err) { | |
| 475 loadingWhileTimer.cancel(); | |
| 476 | |
| 477 // If we've been canceled, discard this result. | |
| 478 if (err === luci.Operation.CANCELLED) { | |
| 479 return; | |
| 480 } | |
| 481 | |
| 482 this.clearCurrentOperation(); | |
| 483 if (err === NOT_AUTHENTICATED) { | |
| 484 this.loadingState = LoadingState.NEEDS_AUTH; | |
| 485 | |
| 486 // We failed because we were not authenticated. Mark this | |
| 487 // so we can retry if that state changes. | |
| 488 await this.authChangedPromise; | |
| 489 | |
| 490 // Our authentication state changed during the fetch! | |
| 491 // Retry automatically. | |
| 492 return this.fetchLocation(l, false); | |
| 493 } | |
| 494 | |
| 495 console.error('Failed to load log streams:', err); | |
| 496 }; | |
| 497 | |
| 498 loadingWhileTimer.cancel(); | |
| 499 return this.currentFetchPromise; | 479 return this.currentFetchPromise; |
| 500 } | 480 } |
| 501 | 481 |
| 502 private async fetchLocationImpl(l: Location, op: luci.Operation) { | 482 private async fetchLocationImpl(l: Location, op: luci.Operation) { |
| 483 for (let continueFetching = true; continueFetching;) { |
| 484 this.loadingState = LoadingState.LOADING; |
| 485 |
| 486 let loadingWhileTimer = |
| 487 new luci.Timer(Model.LOADING_WHILE_THRESHOLD_MS, () => { |
| 488 if (this.loadingState === LoadingState.LOADING) { |
| 489 this.loadingState = LoadingState.LOADING_BEEN_A_WHILE; |
| 490 } |
| 491 }); |
| 492 |
| 493 let hasLogs = false; |
| 494 try { |
| 495 hasLogs = await this.fetchLocationRound(l, op); |
| 496 } catch (err) { |
| 497 // Cancel the timer here, since we may enter other states in this |
| 498 // "catch" block and we don't want to have the timer override them. |
| 499 loadingWhileTimer.cancel(); |
| 500 |
| 501 // If we've been canceled, discard this result. |
| 502 if (err === luci.Operation.CANCELLED) { |
| 503 return; |
| 504 } |
| 505 |
| 506 this.clearCurrentOperation(op); |
| 507 if (err === NOT_AUTHENTICATED) { |
| 508 this.loadingState = LoadingState.NEEDS_AUTH; |
| 509 |
| 510 // We failed because we were not authenticated. Mark this |
| 511 // so we can retry if that state changes. |
| 512 await this.authChangedPromise; |
| 513 |
| 514 // Our authentication state changed during the fetch! |
| 515 // Retry automatically. |
| 516 continueFetching = true; |
| 517 continue; |
| 518 } |
| 519 |
| 520 console.error('Failed to load log streams:', err); |
| 521 return; |
| 522 } finally { |
| 523 loadingWhileTimer.cancel(); |
| 524 } |
| 525 |
| 526 continueFetching = (this.automatic && hasLogs); |
| 527 if (continueFetching) { |
| 528 console.log('Automatic: starting next fetch.'); |
| 529 } |
| 530 } |
| 531 |
| 532 // Post-fetch cleanup. |
| 533 this.clearCurrentOperation(op); |
| 534 } |
| 535 |
| 536 private async fetchLocationRound(l: Location, op: luci.Operation) { |
| 503 let buf = await this.provider.fetch(op, l); | 537 let buf = await this.provider.fetch(op, l); |
| 504 | 538 |
| 505 // Clear our fetching status. | 539 // Clear our fetching status. |
| 506 this.rendering = true; | 540 this.rendering = true; |
| 507 this.loadingState = LoadingState.RENDERING; | 541 this.loadingState = LoadingState.RENDERING; |
| 508 let hasLogs = (buf && buf.peek()); | 542 let hasLogs = !!(buf && buf.peek()); |
| 509 | 543 |
| 510 // Resolve any previous rendering Promise that we have. This | 544 // Resolve any previous rendering Promise that we have. This |
| 511 // makes sure our rendering and fetching don't get more than | 545 // makes sure our rendering and fetching don't get more than |
| 512 // one round out of sync. | 546 // one round out of sync. |
| 513 if (this.renderPromise) { | 547 if (this.renderPromise) { |
| 514 await this.renderPromise; | 548 await this.renderPromise; |
| 515 } | 549 } |
| 516 // Post-fetch cleanup. | |
| 517 this.clearCurrentOperation(); | |
| 518 | 550 |
| 519 // Clear our loading state (updates controls automatically). | 551 // Clear our loading state (updates controls automatically). |
| 520 this.loadingState = LoadingState.RENDERING; | 552 this.loadingState = LoadingState.RENDERING; |
| 521 | 553 |
| 522 // Initiate the next render. This will happen in the | 554 // Initiate the next render. This will happen in the |
| 523 // background while we enqueue our next fetch. | 555 // background while we enqueue our next fetch. |
| 524 let doRender = async () => { | 556 let doRender = async () => { |
| 525 await this.renderLogs(buf, l); | 557 await this.renderLogs(buf, l); |
| 526 if (this.loadingState === LoadingState.RENDERING) { | 558 if (this.loadingState === LoadingState.RENDERING) { |
| 527 this.loadingState = LoadingState.NONE; | 559 this.loadingState = LoadingState.NONE; |
| 528 } | 560 } |
| 529 }; | 561 }; |
| 530 this.renderPromise = doRender(); | 562 this.renderPromise = doRender(); |
| 531 | 563 |
| 532 if (this.fetchedFullStream) { | 564 if (this.fetchedFullStream) { |
| 533 return; | 565 return false; |
| 534 } | 566 } |
| 535 | 567 return hasLogs; |
| 536 // The fetch is finished. If we're automatic, and we got logs, start the | |
| 537 // next fetch. | |
| 538 if (this.automatic && hasLogs) { | |
| 539 console.log('Automatic: starting next fetch.'); | |
| 540 return this.fetch(false); | |
| 541 } | |
| 542 } | 568 } |
| 543 | 569 |
| 544 private async renderLogs(buf: BufferedLogs, l: Location) { | 570 private async renderLogs(buf: BufferedLogs, l: Location) { |
| 545 if (!(buf && buf.peek())) { | 571 if (!(buf && buf.peek())) { |
| 546 return; | 572 return; |
| 547 } | 573 } |
| 548 | 574 |
| 549 let logBlock: LogDog.LogEntry[] = []; | 575 let logBlock: LogDog.LogEntry[] = []; |
| 550 | 576 |
| 551 // Create a promise loop to push logs at intervals. | 577 // Create a promise loop to push logs at intervals. |
| (...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 713 | 739 |
| 714 /** The size of the tail walkback region. */ | 740 /** The size of the tail walkback region. */ |
| 715 private static TAIL_WALKBACK = 500; | 741 private static TAIL_WALKBACK = 500; |
| 716 | 742 |
| 717 constructor( | 743 constructor( |
| 718 client: LogDog.Client, readonly stream: LogDog.StreamPath, | 744 client: LogDog.Client, readonly stream: LogDog.StreamPath, |
| 719 readonly initialFetchSize: number, readonly fetchSize: number) { | 745 readonly initialFetchSize: number, readonly fetchSize: number) { |
| 720 this.fetcher = new LogDog.Fetcher(client, stream); | 746 this.fetcher = new LogDog.Fetcher(client, stream); |
| 721 } | 747 } |
| 722 | 748 |
| 723 private clearActiveFetch() { | |
| 724 if (this.activeFetch) { | |
| 725 this.activeFetch.op.cancel(); | |
| 726 } | |
| 727 } | |
| 728 | |
| 729 private setActiveFetch(fetch: LogDog.Fetch): LogDog.Fetch { | 749 private setActiveFetch(fetch: LogDog.Fetch): LogDog.Fetch { |
| 730 this.clearActiveFetch(); | |
| 731 this.activeFetch = fetch; | 750 this.activeFetch = fetch; |
| 732 this.activeFetch.addStateChangedCallback((_: LogDog.Fetch) => { | 751 this.activeFetch.addStateChangedCallback((_: LogDog.Fetch) => { |
| 733 this.statusChanged(); | 752 this.statusChanged(); |
| 734 }); | 753 }); |
| 735 return fetch; | 754 return fetch; |
| 736 } | 755 } |
| 737 | 756 |
| 738 get fetchStatus(): LogDog.FetchStatus { | 757 get fetchStatus(): LogDog.FetchStatus { |
| 739 if (this.activeFetch) { | 758 if (this.activeFetch) { |
| 740 return this.activeFetch.lastStatus; | 759 return this.activeFetch.lastStatus; |
| 741 } | 760 } |
| 742 return LogDog.FetchStatus.IDLE; | 761 return LogDog.FetchStatus.IDLE; |
| 743 } | 762 } |
| 744 | 763 |
| 745 get fetchError(): Error|undefined { | 764 get fetchError(): Error|undefined { |
| 746 if (this.activeFetch) { | 765 if (this.activeFetch) { |
| 747 return this.activeFetch.lastError; | 766 return this.activeFetch.lastError; |
| 748 } | 767 } |
| 749 return undefined; | 768 return undefined; |
| 750 } | 769 } |
| 751 | 770 |
| 752 async fetch(op: luci.Operation, l: Location) { | 771 async fetch(op: luci.Operation, l: Location) { |
| 753 this.clearActiveFetch(); | |
| 754 | |
| 755 // Determine which method to use based on the insertion point and current | 772 // Determine which method to use based on the insertion point and current |
| 756 // log stream fetch state. | 773 // log stream fetch state. |
| 757 let getLogs: Promise<LogDog.LogEntry[]>; | 774 let getLogs: Promise<LogDog.LogEntry[]>; |
| 758 switch (l) { | 775 switch (l) { |
| 759 case Location.HEAD: | 776 case Location.HEAD: |
| 760 getLogs = this.getHead(op); | 777 getLogs = this.getHead(op); |
| 761 break; | 778 break; |
| 762 | 779 |
| 763 case Location.TAIL: | 780 case Location.TAIL: |
| 764 getLogs = this.getTail(op); | 781 getLogs = this.getTail(op); |
| (...skipping 662 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1427 | 1444 |
| 1428 // Get the next log and increment our index. | 1445 // Get the next log and increment our index. |
| 1429 let log = this.logs[this.index++]; | 1446 let log = this.logs[this.index++]; |
| 1430 if (this.index >= this.logs.length) { | 1447 if (this.index >= this.logs.length) { |
| 1431 this.logs = null; | 1448 this.logs = null; |
| 1432 } | 1449 } |
| 1433 return log; | 1450 return log; |
| 1434 } | 1451 } |
| 1435 } | 1452 } |
| 1436 } | 1453 } |
| OLD | NEW |