| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 import 'dart:collection'; | 6 import 'dart:collection'; |
| 7 | 7 |
| 8 import "result.dart"; | 8 import "result.dart"; |
| 9 import "subscription_stream.dart"; | 9 import "subscription_stream.dart"; |
| 10 import "stream_completer.dart"; | 10 import "stream_completer.dart"; |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 87 | 87 |
| 88 /// Queue of events not used by a request yet. | 88 /// Queue of events not used by a request yet. |
| 89 final Queue<Result> _eventQueue = new Queue(); | 89 final Queue<Result> _eventQueue = new Queue(); |
| 90 | 90 |
| 91 /// Queue of pending requests. | 91 /// Queue of pending requests. |
| 92 /// | 92 /// |
| 93 /// Access through methods below to ensure consistency. | 93 /// Access through methods below to ensure consistency. |
| 94 final Queue<_EventRequest> _requestQueue = new Queue(); | 94 final Queue<_EventRequest> _requestQueue = new Queue(); |
| 95 | 95 |
| 96 /// Create a `StreamQueue` of the events of [source]. | 96 /// Create a `StreamQueue` of the events of [source]. |
| 97 factory StreamQueue(Stream source) = _StreamQueue<T>; | 97 factory StreamQueue(Stream<T> source) = _StreamQueue<T>; |
| 98 | 98 |
| 99 StreamQueue._(); | 99 StreamQueue._(); |
| 100 | 100 |
| 101 /// Asks if the stream has any more events. | 101 /// Asks if the stream has any more events. |
| 102 /// | 102 /// |
| 103 /// Returns a future that completes with `true` if the stream has any | 103 /// Returns a future that completes with `true` if the stream has any |
| 104 /// more events, whether data or error. | 104 /// more events, whether data or error. |
| 105 /// If the stream closes without producing any more events, the returned | 105 /// If the stream closes without producing any more events, the returned |
| 106 /// future completes with `false`. | 106 /// future completes with `false`. |
| 107 /// | 107 /// |
| (...skipping 159 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 267 _pause(); | 267 _pause(); |
| 268 } | 268 } |
| 269 } | 269 } |
| 270 | 270 |
| 271 /// Extracts a stream from the event source and makes this stream queue | 271 /// Extracts a stream from the event source and makes this stream queue |
| 272 /// unusable. | 272 /// unusable. |
| 273 /// | 273 /// |
| 274 /// Can only be used by the very last request (the stream queue must | 274 /// Can only be used by the very last request (the stream queue must |
| 275 /// be closed by that request). | 275 /// be closed by that request). |
| 276 /// Only used by [rest]. | 276 /// Only used by [rest]. |
| 277 Stream _extractStream(); | 277 Stream<T> _extractStream(); |
| 278 | 278 |
| 279 /// Requests that the event source pauses events. | 279 /// Requests that the event source pauses events. |
| 280 /// | 280 /// |
| 281 /// This is called automatically when the request queue is empty. | 281 /// This is called automatically when the request queue is empty. |
| 282 /// | 282 /// |
| 283 /// The event source is restarted by the next call to [_ensureListening]. | 283 /// The event source is restarted by the next call to [_ensureListening]. |
| 284 void _pause(); | 284 void _pause(); |
| 285 | 285 |
| 286 /// Ensures that we are listening on events from the event source. | 286 /// Ensures that we are listening on events from the event source. |
| 287 /// | 287 /// |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 335 } | 335 } |
| 336 } | 336 } |
| 337 | 337 |
| 338 | 338 |
| 339 /// The default implementation of [StreamQueue]. | 339 /// The default implementation of [StreamQueue]. |
| 340 /// | 340 /// |
| 341 /// This queue gets its events from a stream which is listened | 341 /// This queue gets its events from a stream which is listened |
| 342 /// to when a request needs events. | 342 /// to when a request needs events. |
| 343 class _StreamQueue<T> extends StreamQueue<T> { | 343 class _StreamQueue<T> extends StreamQueue<T> { |
| 344 /// Source of events. | 344 /// Source of events. |
| 345 final Stream _sourceStream; | 345 final Stream<T> _sourceStream; |
| 346 | 346 |
| 347 /// Subscription on [_sourceStream] while listening for events. | 347 /// Subscription on [_sourceStream] while listening for events. |
| 348 /// | 348 /// |
| 349 /// Set to subscription when listening, and set to `null` when the | 349 /// Set to subscription when listening, and set to `null` when the |
| 350 /// subscription is done (and [_isDone] is set to true). | 350 /// subscription is done (and [_isDone] is set to true). |
| 351 StreamSubscription _subscription; | 351 StreamSubscription<T> _subscription; |
| 352 | 352 |
| 353 _StreamQueue(this._sourceStream) : super._(); | 353 _StreamQueue(this._sourceStream) : super._(); |
| 354 | 354 |
| 355 Future _cancel() { | 355 Future _cancel() { |
| 356 if (_isDone) return null; | 356 if (_isDone) return null; |
| 357 if (_subscription == null) _subscription = _sourceStream.listen(null); | 357 if (_subscription == null) _subscription = _sourceStream.listen(null); |
| 358 var future = _subscription.cancel(); | 358 var future = _subscription.cancel(); |
| 359 _close(); | 359 _close(); |
| 360 return future; | 360 return future; |
| 361 } | 361 } |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 415 /// | 415 /// |
| 416 /// Events from the source stream are sent to the first request in the | 416 /// Events from the source stream are sent to the first request in the |
| 417 /// queue until it reports itself as [isComplete]. | 417 /// queue until it reports itself as [isComplete]. |
| 418 /// | 418 /// |
| 419 /// When the first request in the queue `isComplete`, either when becoming | 419 /// When the first request in the queue `isComplete`, either when becoming |
| 420 /// the first request or after receiving an event, its [close] methods is | 420 /// the first request or after receiving an event, its [close] methods is |
| 421 /// called. | 421 /// called. |
| 422 /// | 422 /// |
| 423 /// The [close] method is also called immediately when the source stream | 423 /// The [close] method is also called immediately when the source stream |
| 424 /// is done. | 424 /// is done. |
| 425 abstract class _EventRequest { | 425 abstract class _EventRequest<T> { |
| 426 /// Handle available events. | 426 /// Handle available events. |
| 427 /// | 427 /// |
| 428 /// The available events are provided as a queue. The `update` function | 428 /// The available events are provided as a queue. The `update` function |
| 429 /// should only remove events from the front of the event queue, e.g., | 429 /// should only remove events from the front of the event queue, e.g., |
| 430 /// using [removeFirst]. | 430 /// using [removeFirst]. |
| 431 /// | 431 /// |
| 432 /// Returns `true` if the request is completed, or `false` if it needs | 432 /// Returns `true` if the request is completed, or `false` if it needs |
| 433 /// more events. | 433 /// more events. |
| 434 /// The call may keep events in the queue until the requeust is complete, | 434 /// The call may keep events in the queue until the requeust is complete, |
| 435 /// or it may remove them immediately. | 435 /// or it may remove them immediately. |
| 436 /// | 436 /// |
| 437 /// If the method returns true, the request is considered fulfilled, and | 437 /// If the method returns true, the request is considered fulfilled, and |
| 438 /// will never be called again. | 438 /// will never be called again. |
| 439 /// | 439 /// |
| 440 /// This method is called when a request reaches the front of the request | 440 /// This method is called when a request reaches the front of the request |
| 441 /// queue, and if it returns `false`, it's called again every time a new event | 441 /// queue, and if it returns `false`, it's called again every time a new event |
| 442 /// becomes available, or when the stream closes. | 442 /// becomes available, or when the stream closes. |
| 443 /// If the function returns `false` when the stream has already closed | 443 /// If the function returns `false` when the stream has already closed |
| 444 /// ([isDone] is true), then the request must call | 444 /// ([isDone] is true), then the request must call |
| 445 /// [StreamQueue._updateRequests] itself when it's ready to continue. | 445 /// [StreamQueue._updateRequests] itself when it's ready to continue. |
| 446 bool update(Queue<Result> events, bool isDone); | 446 bool update(Queue<Result<T>> events, bool isDone); |
| 447 } | 447 } |
| 448 | 448 |
| 449 /// Request for a [StreamQueue.next] call. | 449 /// Request for a [StreamQueue.next] call. |
| 450 /// | 450 /// |
| 451 /// Completes the returned future when receiving the first event, | 451 /// Completes the returned future when receiving the first event, |
| 452 /// and is then complete. | 452 /// and is then complete. |
| 453 class _NextRequest<T> implements _EventRequest { | 453 class _NextRequest<T> implements _EventRequest<T> { |
| 454 /// Completer for the future returned by [StreamQueue.next]. | 454 /// Completer for the future returned by [StreamQueue.next]. |
| 455 final Completer _completer; | 455 final _completer = new Completer<T>(); |
| 456 | 456 |
| 457 _NextRequest() : _completer = new Completer<T>(); | 457 _NextRequest(); |
| 458 | 458 |
| 459 Future<T> get future => _completer.future; | 459 Future<T> get future => _completer.future; |
| 460 | 460 |
| 461 bool update(Queue<Result> events, bool isDone) { | 461 bool update(Queue<Result<T>> events, bool isDone) { |
| 462 if (events.isNotEmpty) { | 462 if (events.isNotEmpty) { |
| 463 events.removeFirst().complete(_completer); | 463 events.removeFirst().complete(_completer); |
| 464 return true; | 464 return true; |
| 465 } | 465 } |
| 466 if (isDone) { | 466 if (isDone) { |
| 467 var errorFuture = | 467 var errorFuture = |
| 468 new Future.sync(() => throw new StateError("No elements")); | 468 new Future.sync(() => throw new StateError("No elements")); |
| 469 _completer.complete(errorFuture); | 469 _completer.complete(errorFuture); |
| 470 return true; | 470 return true; |
| 471 } | 471 } |
| 472 return false; | 472 return false; |
| 473 } | 473 } |
| 474 } | 474 } |
| 475 | 475 |
| 476 /// Request for a [StreamQueue.skip] call. | 476 /// Request for a [StreamQueue.skip] call. |
| 477 class _SkipRequest implements _EventRequest { | 477 class _SkipRequest<T> implements _EventRequest<T> { |
| 478 /// Completer for the future returned by the skip call. | 478 /// Completer for the future returned by the skip call. |
| 479 final Completer _completer = new Completer<int>(); | 479 final _completer = new Completer<int>(); |
| 480 | 480 |
| 481 /// Number of remaining events to skip. | 481 /// Number of remaining events to skip. |
| 482 /// | 482 /// |
| 483 /// The request [isComplete] when the values reaches zero. | 483 /// The request [isComplete] when the values reaches zero. |
| 484 /// | 484 /// |
| 485 /// Decremented when an event is seen. | 485 /// Decremented when an event is seen. |
| 486 /// Set to zero when an error is seen since errors abort the skip request. | 486 /// Set to zero when an error is seen since errors abort the skip request. |
| 487 int _eventsToSkip; | 487 int _eventsToSkip; |
| 488 | 488 |
| 489 _SkipRequest(this._eventsToSkip); | 489 _SkipRequest(this._eventsToSkip); |
| 490 | 490 |
| 491 /// The future completed when the correct number of events have been skipped. | 491 /// The future completed when the correct number of events have been skipped. |
| 492 Future get future => _completer.future; | 492 Future<int> get future => _completer.future; |
| 493 | 493 |
| 494 bool update(Queue<Result> events, bool isDone) { | 494 bool update(Queue<Result<T>> events, bool isDone) { |
| 495 while (_eventsToSkip > 0) { | 495 while (_eventsToSkip > 0) { |
| 496 if (events.isEmpty) { | 496 if (events.isEmpty) { |
| 497 if (isDone) break; | 497 if (isDone) break; |
| 498 return false; | 498 return false; |
| 499 } | 499 } |
| 500 _eventsToSkip--; | 500 _eventsToSkip--; |
| 501 | 501 |
| 502 var event = events.removeFirst(); | 502 var event = events.removeFirst(); |
| 503 if (event.isError) { | 503 if (event.isError) { |
| 504 event.complete(_completer); | 504 _completer.completeError(event.asError.error, event.asError.stackTrace); |
| 505 return true; | 505 return true; |
| 506 } | 506 } |
| 507 } | 507 } |
| 508 _completer.complete(_eventsToSkip); | 508 _completer.complete(_eventsToSkip); |
| 509 return true; | 509 return true; |
| 510 } | 510 } |
| 511 } | 511 } |
| 512 | 512 |
| 513 /// Request for a [StreamQueue.take] call. | 513 /// Request for a [StreamQueue.take] call. |
| 514 class _TakeRequest<T> implements _EventRequest { | 514 class _TakeRequest<T> implements _EventRequest<T> { |
| 515 /// Completer for the future returned by the take call. | 515 /// Completer for the future returned by the take call. |
| 516 final Completer _completer; | 516 final _completer = new Completer<List<T>>(); |
| 517 | 517 |
| 518 /// List collecting events until enough have been seen. | 518 /// List collecting events until enough have been seen. |
| 519 final List _list = <T>[]; | 519 final _list = <T>[]; |
| 520 | 520 |
| 521 /// Number of events to capture. | 521 /// Number of events to capture. |
| 522 /// | 522 /// |
| 523 /// The request [isComplete] when the length of [_list] reaches | 523 /// The request [isComplete] when the length of [_list] reaches |
| 524 /// this value. | 524 /// this value. |
| 525 final int _eventsToTake; | 525 final int _eventsToTake; |
| 526 | 526 |
| 527 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 527 _TakeRequest(this._eventsToTake); |
| 528 | 528 |
| 529 /// The future completed when the correct number of events have been captured. | 529 /// The future completed when the correct number of events have been captured. |
| 530 Future get future => _completer.future; | 530 Future<List<T>> get future => _completer.future; |
| 531 | 531 |
| 532 bool update(Queue<Result> events, bool isDone) { | 532 bool update(Queue<Result<T>> events, bool isDone) { |
| 533 while (_list.length < _eventsToTake) { | 533 while (_list.length < _eventsToTake) { |
| 534 if (events.isEmpty) { | 534 if (events.isEmpty) { |
| 535 if (isDone) break; | 535 if (isDone) break; |
| 536 return false; | 536 return false; |
| 537 } | 537 } |
| 538 | 538 |
| 539 var result = events.removeFirst(); | 539 var event = events.removeFirst(); |
| 540 if (result.isError) { | 540 if (event.isError) { |
| 541 result.complete(_completer); | 541 _completer.completeError(event.asError.error, event.asError.stackTrace); |
| 542 return true; | 542 return true; |
| 543 } | 543 } |
| 544 _list.add(result.asValue.value); | 544 _list.add(event.asValue.value); |
| 545 } | 545 } |
| 546 _completer.complete(_list); | 546 _completer.complete(_list); |
| 547 return true; | 547 return true; |
| 548 } | 548 } |
| 549 } | 549 } |
| 550 | 550 |
| 551 /// Request for a [StreamQueue.cancel] call. | 551 /// Request for a [StreamQueue.cancel] call. |
| 552 /// | 552 /// |
| 553 /// The request needs no events, it just waits in the request queue | 553 /// The request needs no events, it just waits in the request queue |
| 554 /// until all previous events are fulfilled, then it cancels the stream queue | 554 /// until all previous events are fulfilled, then it cancels the stream queue |
| 555 /// source subscription. | 555 /// source subscription. |
| 556 class _CancelRequest implements _EventRequest { | 556 class _CancelRequest<T> implements _EventRequest<T> { |
| 557 /// Completer for the future returned by the `cancel` call. | 557 /// Completer for the future returned by the `cancel` call. |
| 558 final Completer _completer = new Completer(); | 558 final _completer = new Completer(); |
| 559 | 559 |
| 560 /// The [StreamQueue] object that has this request queued. | 560 /// The [StreamQueue] object that has this request queued. |
| 561 /// | 561 /// |
| 562 /// When the event is completed, it needs to cancel the active subscription | 562 /// When the event is completed, it needs to cancel the active subscription |
| 563 /// of the `StreamQueue` object, if any. | 563 /// of the `StreamQueue` object, if any. |
| 564 final StreamQueue _streamQueue; | 564 final StreamQueue _streamQueue; |
| 565 | 565 |
| 566 _CancelRequest(this._streamQueue); | 566 _CancelRequest(this._streamQueue); |
| 567 | 567 |
| 568 /// The future completed when the cancel request is completed. | 568 /// The future completed when the cancel request is completed. |
| 569 Future get future => _completer.future; | 569 Future get future => _completer.future; |
| 570 | 570 |
| 571 bool update(Queue<Result> events, bool isDone) { | 571 bool update(Queue<Result<T>> events, bool isDone) { |
| 572 if (_streamQueue._isDone) { | 572 if (_streamQueue._isDone) { |
| 573 _completer.complete(); | 573 _completer.complete(); |
| 574 } else { | 574 } else { |
| 575 _streamQueue._ensureListening(); | 575 _streamQueue._ensureListening(); |
| 576 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); | 576 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); |
| 577 } | 577 } |
| 578 return true; | 578 return true; |
| 579 } | 579 } |
| 580 } | 580 } |
| 581 | 581 |
| 582 /// Request for a [StreamQueue.rest] call. | 582 /// Request for a [StreamQueue.rest] call. |
| 583 /// | 583 /// |
| 584 /// The request is always complete, it just waits in the request queue | 584 /// The request is always complete, it just waits in the request queue |
| 585 /// until all previous events are fulfilled, then it takes over the | 585 /// until all previous events are fulfilled, then it takes over the |
| 586 /// stream events subscription and creates a stream from it. | 586 /// stream events subscription and creates a stream from it. |
| 587 class _RestRequest<T> implements _EventRequest { | 587 class _RestRequest<T> implements _EventRequest<T> { |
| 588 /// Completer for the stream returned by the `rest` call. | 588 /// Completer for the stream returned by the `rest` call. |
| 589 final StreamCompleter _completer = new StreamCompleter<T>(); | 589 final _completer = new StreamCompleter<T>(); |
| 590 | 590 |
| 591 /// The [StreamQueue] object that has this request queued. | 591 /// The [StreamQueue] object that has this request queued. |
| 592 /// | 592 /// |
| 593 /// When the event is completed, it needs to cancel the active subscription | 593 /// When the event is completed, it needs to cancel the active subscription |
| 594 /// of the `StreamQueue` object, if any. | 594 /// of the `StreamQueue` object, if any. |
| 595 final StreamQueue _streamQueue; | 595 final StreamQueue<T> _streamQueue; |
| 596 | 596 |
| 597 _RestRequest(this._streamQueue); | 597 _RestRequest(this._streamQueue); |
| 598 | 598 |
| 599 /// The stream which will contain the remaining events of [_streamQueue]. | 599 /// The stream which will contain the remaining events of [_streamQueue]. |
| 600 Stream<T> get stream => _completer.stream; | 600 Stream<T> get stream => _completer.stream; |
| 601 | 601 |
| 602 bool update(Queue<Result> events, bool isDone) { | 602 bool update(Queue<Result<T>> events, bool isDone) { |
| 603 if (events.isEmpty) { | 603 if (events.isEmpty) { |
| 604 if (_streamQueue._isDone) { | 604 if (_streamQueue._isDone) { |
| 605 _completer.setEmpty(); | 605 _completer.setEmpty(); |
| 606 } else { | 606 } else { |
| 607 _completer.setSourceStream(_streamQueue._extractStream()); | 607 _completer.setSourceStream(_streamQueue._extractStream()); |
| 608 } | 608 } |
| 609 } else { | 609 } else { |
| 610 // There are prefetched events which needs to be added before the | 610 // There are prefetched events which needs to be added before the |
| 611 // remaining stream. | 611 // remaining stream. |
| 612 var controller = new StreamController<T>(); | 612 var controller = new StreamController<T>(); |
| 613 for (var event in events) { | 613 for (var event in events) { |
| 614 event.addTo(controller); | 614 event.addTo(controller); |
| 615 } | 615 } |
| 616 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) | 616 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) |
| 617 .whenComplete(controller.close); | 617 .whenComplete(controller.close); |
| 618 _completer.setSourceStream(controller.stream); | 618 _completer.setSourceStream(controller.stream); |
| 619 } | 619 } |
| 620 return true; | 620 return true; |
| 621 } | 621 } |
| 622 } | 622 } |
| 623 | 623 |
| 624 /// Request for a [StreamQueue.hasNext] call. | 624 /// Request for a [StreamQueue.hasNext] call. |
| 625 /// | 625 /// |
| 626 /// Completes the [future] with `true` if it sees any event, | 626 /// Completes the [future] with `true` if it sees any event, |
| 627 /// but doesn't consume the event. | 627 /// but doesn't consume the event. |
| 628 /// If the request is closed without seeing an event, then | 628 /// If the request is closed without seeing an event, then |
| 629 /// the [future] is completed with `false`. | 629 /// the [future] is completed with `false`. |
| 630 class _HasNextRequest<T> implements _EventRequest { | 630 class _HasNextRequest<T> implements _EventRequest<T> { |
| 631 final Completer _completer = new Completer<bool>(); | 631 final _completer = new Completer<bool>(); |
| 632 | 632 |
| 633 Future<bool> get future => _completer.future; | 633 Future<bool> get future => _completer.future; |
| 634 | 634 |
| 635 bool update(Queue<Result> events, bool isDone) { | 635 bool update(Queue<Result<T>> events, bool isDone) { |
| 636 if (events.isNotEmpty) { | 636 if (events.isNotEmpty) { |
| 637 _completer.complete(true); | 637 _completer.complete(true); |
| 638 return true; | 638 return true; |
| 639 } | 639 } |
| 640 if (isDone) { | 640 if (isDone) { |
| 641 _completer.complete(false); | 641 _completer.complete(false); |
| 642 return true; | 642 return true; |
| 643 } | 643 } |
| 644 return false; | 644 return false; |
| 645 } | 645 } |
| 646 } | 646 } |
| OLD | NEW |