| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 import 'dart:collection'; | 6 import 'dart:collection'; |
| 7 | 7 |
| 8 import 'package:collection/collection.dart'; | 8 import 'package:collection/collection.dart'; |
| 9 | 9 |
| 10 import "cancelable_operation.dart"; | 10 import "cancelable_operation.dart"; |
| (...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 124 /// one events. | 124 /// one events. |
| 125 Future<bool> get hasNext { | 125 Future<bool> get hasNext { |
| 126 if (!_isClosed) { | 126 if (!_isClosed) { |
| 127 var hasNextRequest = new _HasNextRequest(); | 127 var hasNextRequest = new _HasNextRequest(); |
| 128 _addRequest(hasNextRequest); | 128 _addRequest(hasNextRequest); |
| 129 return hasNextRequest.future; | 129 return hasNextRequest.future; |
| 130 } | 130 } |
| 131 throw _failClosed(); | 131 throw _failClosed(); |
| 132 } | 132 } |
| 133 | 133 |
| 134 | |
| 135 /// Look at the next [count] data events without consuming them. | 134 /// Look at the next [count] data events without consuming them. |
| 136 /// | 135 /// |
| 137 /// Works like [take] except that the events are left in the queue. | 136 /// Works like [take] except that the events are left in the queue. |
| 138 /// If one of the next [count] events is an error, the returned future | 137 /// If one of the next [count] events is an error, the returned future |
| 139 /// completes with this error, and the error is still left in the queue. | 138 /// completes with this error, and the error is still left in the queue. |
| 140 Future<List<T>> lookAhead(int count) { | 139 Future<List<T>> lookAhead(int count) { |
| 141 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 140 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| 142 if (!_isClosed) { | 141 if (!_isClosed) { |
| 143 var request = new _LookAheadRequest<T>(count); | 142 var request = new _LookAheadRequest<T>(count); |
| 144 _addRequest(request); | 143 _addRequest(request); |
| (...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 346 /// ```dart | 345 /// ```dart |
| 347 /// final _stdinQueue = new StreamQueue(stdin); | 346 /// final _stdinQueue = new StreamQueue(stdin); |
| 348 /// | 347 /// |
| 349 /// /// Returns an operation that completes when the user sends a line to | 348 /// /// Returns an operation that completes when the user sends a line to |
| 350 /// /// standard input. | 349 /// /// standard input. |
| 351 /// /// | 350 /// /// |
| 352 /// /// If the operation is canceled, stops waiting for user input. | 351 /// /// If the operation is canceled, stops waiting for user input. |
| 353 /// CancelableOperation<String> nextStdinLine() => | 352 /// CancelableOperation<String> nextStdinLine() => |
| 354 /// _stdinQueue.cancelable((queue) => queue.next); | 353 /// _stdinQueue.cancelable((queue) => queue.next); |
| 355 /// ``` | 354 /// ``` |
| 356 CancelableOperation/*<S>*/ cancelable/*<S>*/( | 355 CancelableOperation<S> cancelable<S>( |
| 357 Future/*<S>*/ callback(StreamQueue<T> queue)) { | 356 Future<S> callback(StreamQueue<T> queue)) { |
| 358 var transaction = startTransaction(); | 357 var transaction = startTransaction(); |
| 359 var completer = new CancelableCompleter/*<S>*/(onCancel: () { | 358 var completer = new CancelableCompleter<S>(onCancel: () { |
| 360 transaction.reject(); | 359 transaction.reject(); |
| 361 }); | 360 }); |
| 362 | 361 |
| 363 var queue = transaction.newQueue(); | 362 var queue = transaction.newQueue(); |
| 364 completer.complete(callback(queue).whenComplete(() { | 363 completer.complete(callback(queue).whenComplete(() { |
| 365 if (!completer.isCanceled) transaction.commit(queue); | 364 if (!completer.isCanceled) transaction.commit(queue); |
| 366 })); | 365 })); |
| 367 | 366 |
| 368 return completer.operation; | 367 return completer.operation; |
| 369 } | 368 } |
| (...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 487 /// immediately, it skips the queue. | 486 /// immediately, it skips the queue. |
| 488 void _addRequest(_EventRequest request) { | 487 void _addRequest(_EventRequest request) { |
| 489 if (_requestQueue.isEmpty) { | 488 if (_requestQueue.isEmpty) { |
| 490 if (request.update(_eventQueue, _isDone)) return; | 489 if (request.update(_eventQueue, _isDone)) return; |
| 491 _ensureListening(); | 490 _ensureListening(); |
| 492 } | 491 } |
| 493 _requestQueue.add(request); | 492 _requestQueue.add(request); |
| 494 } | 493 } |
| 495 } | 494 } |
| 496 | 495 |
| 497 | |
| 498 /// The default implementation of [StreamQueue]. | 496 /// The default implementation of [StreamQueue]. |
| 499 /// | 497 /// |
| 500 /// This queue gets its events from a stream which is listened | 498 /// This queue gets its events from a stream which is listened |
| 501 /// to when a request needs events. | 499 /// to when a request needs events. |
| 502 class _StreamQueue<T> extends StreamQueue<T> { | 500 class _StreamQueue<T> extends StreamQueue<T> { |
| 503 /// Source of events. | 501 /// Source of events. |
| 504 final Stream<T> _sourceStream; | 502 final Stream<T> _sourceStream; |
| 505 | 503 |
| 506 /// Subscription on [_sourceStream] while listening for events. | 504 /// Subscription on [_sourceStream] while listening for events. |
| 507 /// | 505 /// |
| 508 /// Set to subscription when listening, and set to `null` when the | 506 /// Set to subscription when listening, and set to `null` when the |
| 509 /// subscription is done (and [_isDone] is set to true). | 507 /// subscription is done (and [_isDone] is set to true). |
| 510 StreamSubscription<T> _subscription; | 508 StreamSubscription<T> _subscription; |
| 511 | 509 |
| 512 _StreamQueue(this._sourceStream) : super._(); | 510 _StreamQueue(this._sourceStream) : super._(); |
| 513 | 511 |
| 514 Future _cancel() { | 512 Future _cancel() { |
| 515 if (_isDone) return null; | 513 if (_isDone) return null; |
| 516 if (_subscription == null) _subscription = _sourceStream.listen(null); | 514 if (_subscription == null) _subscription = _sourceStream.listen(null); |
| 517 var future = _subscription.cancel(); | 515 var future = _subscription.cancel(); |
| 518 _close(); | 516 _close(); |
| 519 return future; | 517 return future; |
| 520 } | 518 } |
| 521 | 519 |
| 522 void _ensureListening() { | 520 void _ensureListening() { |
| 523 if (_isDone) return; | 521 if (_isDone) return; |
| 524 if (_subscription == null) { | 522 if (_subscription == null) { |
| 525 _subscription = | 523 _subscription = _sourceStream.listen((data) { |
| 526 _sourceStream.listen( | 524 _addResult(new Result.value(data)); |
| 527 (data) { | 525 }, onError: (error, StackTrace stackTrace) { |
| 528 _addResult(new Result.value(data)); | 526 _addResult(new Result.error(error, stackTrace)); |
| 529 }, | 527 }, onDone: () { |
| 530 onError: (error, StackTrace stackTrace) { | 528 _subscription = null; |
| 531 _addResult(new Result.error(error, stackTrace)); | 529 this._close(); |
| 532 }, | 530 }); |
| 533 onDone: () { | |
| 534 _subscription = null; | |
| 535 this._close(); | |
| 536 }); | |
| 537 } else { | 531 } else { |
| 538 _subscription.resume(); | 532 _subscription.resume(); |
| 539 } | 533 } |
| 540 } | 534 } |
| 541 | 535 |
| 542 void _pause() { | 536 void _pause() { |
| 543 _subscription.pause(); | 537 _subscription.pause(); |
| 544 } | 538 } |
| 545 | 539 |
| 546 Stream<T> _extractStream() { | 540 Stream<T> _extractStream() { |
| (...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 642 } | 636 } |
| 643 | 637 |
| 644 // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s | 638 // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s |
| 645 // request queue, and runs the next request. | 639 // request queue, and runs the next request. |
| 646 void _done() { | 640 void _done() { |
| 647 _splitter.close(); | 641 _splitter.close(); |
| 648 for (var queue in _queues) { | 642 for (var queue in _queues) { |
| 649 queue._cancel(); | 643 queue._cancel(); |
| 650 } | 644 } |
| 651 | 645 |
| 652 assert((_parent._requestQueue.first as _TransactionRequest) | 646 assert((_parent._requestQueue.first as _TransactionRequest).transaction == |
| 653 .transaction == this); | 647 this); |
| 654 _parent._requestQueue.removeFirst(); | 648 _parent._requestQueue.removeFirst(); |
| 655 _parent._updateRequests(); | 649 _parent._updateRequests(); |
| 656 } | 650 } |
| 657 | 651 |
| 658 /// Throws a [StateError] if [accept] or [reject] has already been called. | 652 /// Throws a [StateError] if [accept] or [reject] has already been called. |
| 659 void _assertActive() { | 653 void _assertActive() { |
| 660 if (_committed) { | 654 if (_committed) { |
| 661 throw new StateError("This transaction has already been accepted."); | 655 throw new StateError("This transaction has already been accepted."); |
| 662 } else if (_rejected) { | 656 } else if (_rejected) { |
| 663 throw new StateError("This transaction has already been rejected."); | 657 throw new StateError("This transaction has already been rejected."); |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 714 _NextRequest(); | 708 _NextRequest(); |
| 715 | 709 |
| 716 Future<T> get future => _completer.future; | 710 Future<T> get future => _completer.future; |
| 717 | 711 |
| 718 bool update(QueueList<Result<T>> events, bool isDone) { | 712 bool update(QueueList<Result<T>> events, bool isDone) { |
| 719 if (events.isNotEmpty) { | 713 if (events.isNotEmpty) { |
| 720 events.removeFirst().complete(_completer); | 714 events.removeFirst().complete(_completer); |
| 721 return true; | 715 return true; |
| 722 } | 716 } |
| 723 if (isDone) { | 717 if (isDone) { |
| 724 _completer.completeError(new StateError("No elements"), | 718 _completer.completeError( |
| 725 StackTrace.current); | 719 new StateError("No elements"), StackTrace.current); |
| 726 return true; | 720 return true; |
| 727 } | 721 } |
| 728 return false; | 722 return false; |
| 729 } | 723 } |
| 730 } | 724 } |
| 731 | 725 |
| 732 | |
| 733 /// Request for a [StreamQueue.peek] call. | 726 /// Request for a [StreamQueue.peek] call. |
| 734 /// | 727 /// |
| 735 /// Completes the returned future when receiving the first event, | 728 /// Completes the returned future when receiving the first event, |
| 736 /// and is then complete, but doesn't consume the event. | 729 /// and is then complete, but doesn't consume the event. |
| 737 class _PeekRequest<T> implements _EventRequest<T> { | 730 class _PeekRequest<T> implements _EventRequest<T> { |
| 738 /// Completer for the future returned by [StreamQueue.next]. | 731 /// Completer for the future returned by [StreamQueue.next]. |
| 739 final _completer = new Completer<T>(); | 732 final _completer = new Completer<T>(); |
| 740 | 733 |
| 741 _PeekRequest(); | 734 _PeekRequest(); |
| 742 | 735 |
| 743 Future<T> get future => _completer.future; | 736 Future<T> get future => _completer.future; |
| 744 | 737 |
| 745 bool update(QueueList<Result<T>> events, bool isDone) { | 738 bool update(QueueList<Result<T>> events, bool isDone) { |
| 746 if (events.isNotEmpty) { | 739 if (events.isNotEmpty) { |
| 747 events.first.complete(_completer); | 740 events.first.complete(_completer); |
| 748 return true; | 741 return true; |
| 749 } | 742 } |
| 750 if (isDone) { | 743 if (isDone) { |
| 751 _completer.completeError(new StateError("No elements"), | 744 _completer.completeError( |
| 752 StackTrace.current); | 745 new StateError("No elements"), StackTrace.current); |
| 753 return true; | 746 return true; |
| 754 } | 747 } |
| 755 return false; | 748 return false; |
| 756 } | 749 } |
| 757 } | 750 } |
| 758 | 751 |
| 759 | |
| 760 /// Request for a [StreamQueue.skip] call. | 752 /// Request for a [StreamQueue.skip] call. |
| 761 class _SkipRequest<T> implements _EventRequest<T> { | 753 class _SkipRequest<T> implements _EventRequest<T> { |
| 762 /// Completer for the future returned by the skip call. | 754 /// Completer for the future returned by the skip call. |
| 763 final _completer = new Completer<int>(); | 755 final _completer = new Completer<int>(); |
| 764 | 756 |
| 765 /// Number of remaining events to skip. | 757 /// Number of remaining events to skip. |
| 766 /// | 758 /// |
| 767 /// The request [isComplete] when the values reaches zero. | 759 /// The request [isComplete] when the values reaches zero. |
| 768 /// | 760 /// |
| 769 /// Decremented when an event is seen. | 761 /// Decremented when an event is seen. |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 807 /// The request [isComplete] when the length of [_list] reaches | 799 /// The request [isComplete] when the length of [_list] reaches |
| 808 /// this value. | 800 /// this value. |
| 809 final int _eventsToTake; | 801 final int _eventsToTake; |
| 810 | 802 |
| 811 _ListRequest(this._eventsToTake); | 803 _ListRequest(this._eventsToTake); |
| 812 | 804 |
| 813 /// The future completed when the correct number of events have been captured. | 805 /// The future completed when the correct number of events have been captured. |
| 814 Future<List<T>> get future => _completer.future; | 806 Future<List<T>> get future => _completer.future; |
| 815 } | 807 } |
| 816 | 808 |
| 817 | |
| 818 /// Request for a [StreamQueue.take] call. | 809 /// Request for a [StreamQueue.take] call. |
| 819 class _TakeRequest<T> extends _ListRequest<T> { | 810 class _TakeRequest<T> extends _ListRequest<T> { |
| 820 _TakeRequest(int eventsToTake) : super(eventsToTake); | 811 _TakeRequest(int eventsToTake) : super(eventsToTake); |
| 821 | 812 |
| 822 bool update(QueueList<Result<T>> events, bool isDone) { | 813 bool update(QueueList<Result<T>> events, bool isDone) { |
| 823 while (_list.length < _eventsToTake) { | 814 while (_list.length < _eventsToTake) { |
| 824 if (events.isEmpty) { | 815 if (events.isEmpty) { |
| 825 if (isDone) break; | 816 if (isDone) break; |
| 826 return false; | 817 return false; |
| 827 } | 818 } |
| 828 | 819 |
| 829 var event = events.removeFirst(); | 820 var event = events.removeFirst(); |
| 830 if (event.isError) { | 821 if (event.isError) { |
| 831 event.asError.complete(_completer); | 822 event.asError.complete(_completer); |
| 832 return true; | 823 return true; |
| 833 } | 824 } |
| 834 _list.add(event.asValue.value); | 825 _list.add(event.asValue.value); |
| 835 } | 826 } |
| 836 _completer.complete(_list); | 827 _completer.complete(_list); |
| 837 return true; | 828 return true; |
| 838 } | 829 } |
| 839 } | 830 } |
| 840 | 831 |
| 841 | |
| 842 /// Request for a [StreamQueue.lookAhead] call. | 832 /// Request for a [StreamQueue.lookAhead] call. |
| 843 class _LookAheadRequest<T> extends _ListRequest<T> { | 833 class _LookAheadRequest<T> extends _ListRequest<T> { |
| 844 _LookAheadRequest(int eventsToTake) : super(eventsToTake); | 834 _LookAheadRequest(int eventsToTake) : super(eventsToTake); |
| 845 | 835 |
| 846 bool update(QueueList<Result<T>> events, bool isDone) { | 836 bool update(QueueList<Result<T>> events, bool isDone) { |
| 847 while (_list.length < _eventsToTake) { | 837 while (_list.length < _eventsToTake) { |
| 848 if (events.length == _list.length) { | 838 if (events.length == _list.length) { |
| 849 if (isDone) break; | 839 if (isDone) break; |
| 850 return false; | 840 return false; |
| 851 } | 841 } |
| 852 var event = events.elementAt(_list.length); | 842 var event = events.elementAt(_list.length); |
| 853 if (event.isError) { | 843 if (event.isError) { |
| 854 event.asError.complete(_completer); | 844 event.asError.complete(_completer); |
| 855 return true; | 845 return true; |
| 856 } | 846 } |
| 857 _list.add(event.asValue.value); | 847 _list.add(event.asValue.value); |
| 858 } | 848 } |
| 859 _completer.complete(_list); | 849 _completer.complete(_list); |
| 860 return true; | 850 return true; |
| 861 } | 851 } |
| 862 } | 852 } |
| 863 | 853 |
| 864 | |
| 865 /// Request for a [StreamQueue.cancel] call. | 854 /// Request for a [StreamQueue.cancel] call. |
| 866 /// | 855 /// |
| 867 /// The request needs no events, it just waits in the request queue | 856 /// The request needs no events, it just waits in the request queue |
| 868 /// until all previous events are fulfilled, then it cancels the stream queue | 857 /// until all previous events are fulfilled, then it cancels the stream queue |
| 869 /// source subscription. | 858 /// source subscription. |
| 870 class _CancelRequest<T> implements _EventRequest<T> { | 859 class _CancelRequest<T> implements _EventRequest<T> { |
| 871 /// Completer for the future returned by the `cancel` call. | 860 /// Completer for the future returned by the `cancel` call. |
| 872 final _completer = new Completer(); | 861 final _completer = new Completer(); |
| 862 |
| 873 /// | 863 /// |
| 874 /// When the event is completed, it needs to cancel the active subscription | 864 /// When the event is completed, it needs to cancel the active subscription |
| 875 /// of the `StreamQueue` object, if any. | 865 /// of the `StreamQueue` object, if any. |
| 876 final StreamQueue _streamQueue; | 866 final StreamQueue _streamQueue; |
| 877 | 867 |
| 878 _CancelRequest(this._streamQueue); | 868 _CancelRequest(this._streamQueue); |
| 879 | 869 |
| 880 /// The future completed when the cancel request is completed. | 870 /// The future completed when the cancel request is completed. |
| 881 Future get future => _completer.future; | 871 Future get future => _completer.future; |
| 882 | 872 |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 918 } else { | 908 } else { |
| 919 _completer.setSourceStream(_streamQueue._extractStream()); | 909 _completer.setSourceStream(_streamQueue._extractStream()); |
| 920 } | 910 } |
| 921 } else { | 911 } else { |
| 922 // There are prefetched events which needs to be added before the | 912 // There are prefetched events which needs to be added before the |
| 923 // remaining stream. | 913 // remaining stream. |
| 924 var controller = new StreamController<T>(); | 914 var controller = new StreamController<T>(); |
| 925 for (var event in events) { | 915 for (var event in events) { |
| 926 event.addTo(controller); | 916 event.addTo(controller); |
| 927 } | 917 } |
| 928 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) | 918 controller |
| 929 .whenComplete(controller.close); | 919 .addStream(_streamQueue._extractStream(), cancelOnError: false) |
| 920 .whenComplete(controller.close); |
| 930 _completer.setSourceStream(controller.stream); | 921 _completer.setSourceStream(controller.stream); |
| 931 } | 922 } |
| 932 return true; | 923 return true; |
| 933 } | 924 } |
| 934 } | 925 } |
| 935 | 926 |
| 936 /// Request for a [StreamQueue.hasNext] call. | 927 /// Request for a [StreamQueue.hasNext] call. |
| 937 /// | 928 /// |
| 938 /// Completes the [future] with `true` if it sees any event, | 929 /// Completes the [future] with `true` if it sees any event, |
| 939 /// but doesn't consume the event. | 930 /// but doesn't consume the event. |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 979 } | 970 } |
| 980 | 971 |
| 981 bool update(QueueList<Result<T>> events, bool isDone) { | 972 bool update(QueueList<Result<T>> events, bool isDone) { |
| 982 while (_eventsSent < events.length) { | 973 while (_eventsSent < events.length) { |
| 983 events[_eventsSent++].addTo(_controller); | 974 events[_eventsSent++].addTo(_controller); |
| 984 } | 975 } |
| 985 if (isDone && !_controller.isClosed) _controller.close(); | 976 if (isDone && !_controller.isClosed) _controller.close(); |
| 986 return false; | 977 return false; |
| 987 } | 978 } |
| 988 } | 979 } |
| OLD | NEW |