OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 import 'dart:collection'; | 6 import 'dart:collection'; |
7 | 7 |
8 import 'package:collection/collection.dart'; | 8 import 'package:collection/collection.dart'; |
9 | 9 |
10 import "cancelable_operation.dart"; | 10 import "cancelable_operation.dart"; |
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
124 /// one events. | 124 /// one events. |
125 Future<bool> get hasNext { | 125 Future<bool> get hasNext { |
126 if (!_isClosed) { | 126 if (!_isClosed) { |
127 var hasNextRequest = new _HasNextRequest(); | 127 var hasNextRequest = new _HasNextRequest(); |
128 _addRequest(hasNextRequest); | 128 _addRequest(hasNextRequest); |
129 return hasNextRequest.future; | 129 return hasNextRequest.future; |
130 } | 130 } |
131 throw _failClosed(); | 131 throw _failClosed(); |
132 } | 132 } |
133 | 133 |
134 | |
135 /// Look at the next [count] data events without consuming them. | 134 /// Look at the next [count] data events without consuming them. |
136 /// | 135 /// |
137 /// Works like [take] except that the events are left in the queue. | 136 /// Works like [take] except that the events are left in the queue. |
138 /// If one of the next [count] events is an error, the returned future | 137 /// If one of the next [count] events is an error, the returned future |
139 /// completes with this error, and the error is still left in the queue. | 138 /// completes with this error, and the error is still left in the queue. |
140 Future<List<T>> lookAhead(int count) { | 139 Future<List<T>> lookAhead(int count) { |
141 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 140 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
142 if (!_isClosed) { | 141 if (!_isClosed) { |
143 var request = new _LookAheadRequest<T>(count); | 142 var request = new _LookAheadRequest<T>(count); |
144 _addRequest(request); | 143 _addRequest(request); |
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
346 /// ```dart | 345 /// ```dart |
347 /// final _stdinQueue = new StreamQueue(stdin); | 346 /// final _stdinQueue = new StreamQueue(stdin); |
348 /// | 347 /// |
349 /// /// Returns an operation that completes when the user sends a line to | 348 /// /// Returns an operation that completes when the user sends a line to |
350 /// /// standard input. | 349 /// /// standard input. |
351 /// /// | 350 /// /// |
352 /// /// If the operation is canceled, stops waiting for user input. | 351 /// /// If the operation is canceled, stops waiting for user input. |
353 /// CancelableOperation<String> nextStdinLine() => | 352 /// CancelableOperation<String> nextStdinLine() => |
354 /// _stdinQueue.cancelable((queue) => queue.next); | 353 /// _stdinQueue.cancelable((queue) => queue.next); |
355 /// ``` | 354 /// ``` |
356 CancelableOperation/*<S>*/ cancelable/*<S>*/( | 355 CancelableOperation<S> cancelable<S>( |
357 Future/*<S>*/ callback(StreamQueue<T> queue)) { | 356 Future<S> callback(StreamQueue<T> queue)) { |
358 var transaction = startTransaction(); | 357 var transaction = startTransaction(); |
359 var completer = new CancelableCompleter/*<S>*/(onCancel: () { | 358 var completer = new CancelableCompleter<S>(onCancel: () { |
360 transaction.reject(); | 359 transaction.reject(); |
361 }); | 360 }); |
362 | 361 |
363 var queue = transaction.newQueue(); | 362 var queue = transaction.newQueue(); |
364 completer.complete(callback(queue).whenComplete(() { | 363 completer.complete(callback(queue).whenComplete(() { |
365 if (!completer.isCanceled) transaction.commit(queue); | 364 if (!completer.isCanceled) transaction.commit(queue); |
366 })); | 365 })); |
367 | 366 |
368 return completer.operation; | 367 return completer.operation; |
369 } | 368 } |
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
487 /// immediately, it skips the queue. | 486 /// immediately, it skips the queue. |
488 void _addRequest(_EventRequest request) { | 487 void _addRequest(_EventRequest request) { |
489 if (_requestQueue.isEmpty) { | 488 if (_requestQueue.isEmpty) { |
490 if (request.update(_eventQueue, _isDone)) return; | 489 if (request.update(_eventQueue, _isDone)) return; |
491 _ensureListening(); | 490 _ensureListening(); |
492 } | 491 } |
493 _requestQueue.add(request); | 492 _requestQueue.add(request); |
494 } | 493 } |
495 } | 494 } |
496 | 495 |
497 | |
498 /// The default implementation of [StreamQueue]. | 496 /// The default implementation of [StreamQueue]. |
499 /// | 497 /// |
500 /// This queue gets its events from a stream which is listened | 498 /// This queue gets its events from a stream which is listened |
501 /// to when a request needs events. | 499 /// to when a request needs events. |
502 class _StreamQueue<T> extends StreamQueue<T> { | 500 class _StreamQueue<T> extends StreamQueue<T> { |
503 /// Source of events. | 501 /// Source of events. |
504 final Stream<T> _sourceStream; | 502 final Stream<T> _sourceStream; |
505 | 503 |
506 /// Subscription on [_sourceStream] while listening for events. | 504 /// Subscription on [_sourceStream] while listening for events. |
507 /// | 505 /// |
508 /// Set to subscription when listening, and set to `null` when the | 506 /// Set to subscription when listening, and set to `null` when the |
509 /// subscription is done (and [_isDone] is set to true). | 507 /// subscription is done (and [_isDone] is set to true). |
510 StreamSubscription<T> _subscription; | 508 StreamSubscription<T> _subscription; |
511 | 509 |
512 _StreamQueue(this._sourceStream) : super._(); | 510 _StreamQueue(this._sourceStream) : super._(); |
513 | 511 |
514 Future _cancel() { | 512 Future _cancel() { |
515 if (_isDone) return null; | 513 if (_isDone) return null; |
516 if (_subscription == null) _subscription = _sourceStream.listen(null); | 514 if (_subscription == null) _subscription = _sourceStream.listen(null); |
517 var future = _subscription.cancel(); | 515 var future = _subscription.cancel(); |
518 _close(); | 516 _close(); |
519 return future; | 517 return future; |
520 } | 518 } |
521 | 519 |
522 void _ensureListening() { | 520 void _ensureListening() { |
523 if (_isDone) return; | 521 if (_isDone) return; |
524 if (_subscription == null) { | 522 if (_subscription == null) { |
525 _subscription = | 523 _subscription = _sourceStream.listen((data) { |
526 _sourceStream.listen( | 524 _addResult(new Result.value(data)); |
527 (data) { | 525 }, onError: (error, StackTrace stackTrace) { |
528 _addResult(new Result.value(data)); | 526 _addResult(new Result.error(error, stackTrace)); |
529 }, | 527 }, onDone: () { |
530 onError: (error, StackTrace stackTrace) { | 528 _subscription = null; |
531 _addResult(new Result.error(error, stackTrace)); | 529 this._close(); |
532 }, | 530 }); |
533 onDone: () { | |
534 _subscription = null; | |
535 this._close(); | |
536 }); | |
537 } else { | 531 } else { |
538 _subscription.resume(); | 532 _subscription.resume(); |
539 } | 533 } |
540 } | 534 } |
541 | 535 |
542 void _pause() { | 536 void _pause() { |
543 _subscription.pause(); | 537 _subscription.pause(); |
544 } | 538 } |
545 | 539 |
546 Stream<T> _extractStream() { | 540 Stream<T> _extractStream() { |
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
642 } | 636 } |
643 | 637 |
644 // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s | 638 // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s |
645 // request queue, and runs the next request. | 639 // request queue, and runs the next request. |
646 void _done() { | 640 void _done() { |
647 _splitter.close(); | 641 _splitter.close(); |
648 for (var queue in _queues) { | 642 for (var queue in _queues) { |
649 queue._cancel(); | 643 queue._cancel(); |
650 } | 644 } |
651 | 645 |
652 assert((_parent._requestQueue.first as _TransactionRequest) | 646 assert((_parent._requestQueue.first as _TransactionRequest).transaction == |
653 .transaction == this); | 647 this); |
654 _parent._requestQueue.removeFirst(); | 648 _parent._requestQueue.removeFirst(); |
655 _parent._updateRequests(); | 649 _parent._updateRequests(); |
656 } | 650 } |
657 | 651 |
658 /// Throws a [StateError] if [accept] or [reject] has already been called. | 652 /// Throws a [StateError] if [accept] or [reject] has already been called. |
659 void _assertActive() { | 653 void _assertActive() { |
660 if (_committed) { | 654 if (_committed) { |
661 throw new StateError("This transaction has already been accepted."); | 655 throw new StateError("This transaction has already been accepted."); |
662 } else if (_rejected) { | 656 } else if (_rejected) { |
663 throw new StateError("This transaction has already been rejected."); | 657 throw new StateError("This transaction has already been rejected."); |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
714 _NextRequest(); | 708 _NextRequest(); |
715 | 709 |
716 Future<T> get future => _completer.future; | 710 Future<T> get future => _completer.future; |
717 | 711 |
718 bool update(QueueList<Result<T>> events, bool isDone) { | 712 bool update(QueueList<Result<T>> events, bool isDone) { |
719 if (events.isNotEmpty) { | 713 if (events.isNotEmpty) { |
720 events.removeFirst().complete(_completer); | 714 events.removeFirst().complete(_completer); |
721 return true; | 715 return true; |
722 } | 716 } |
723 if (isDone) { | 717 if (isDone) { |
724 _completer.completeError(new StateError("No elements"), | 718 _completer.completeError( |
725 StackTrace.current); | 719 new StateError("No elements"), StackTrace.current); |
726 return true; | 720 return true; |
727 } | 721 } |
728 return false; | 722 return false; |
729 } | 723 } |
730 } | 724 } |
731 | 725 |
732 | |
733 /// Request for a [StreamQueue.peek] call. | 726 /// Request for a [StreamQueue.peek] call. |
734 /// | 727 /// |
735 /// Completes the returned future when receiving the first event, | 728 /// Completes the returned future when receiving the first event, |
736 /// and is then complete, but doesn't consume the event. | 729 /// and is then complete, but doesn't consume the event. |
737 class _PeekRequest<T> implements _EventRequest<T> { | 730 class _PeekRequest<T> implements _EventRequest<T> { |
738 /// Completer for the future returned by [StreamQueue.next]. | 731 /// Completer for the future returned by [StreamQueue.next]. |
739 final _completer = new Completer<T>(); | 732 final _completer = new Completer<T>(); |
740 | 733 |
741 _PeekRequest(); | 734 _PeekRequest(); |
742 | 735 |
743 Future<T> get future => _completer.future; | 736 Future<T> get future => _completer.future; |
744 | 737 |
745 bool update(QueueList<Result<T>> events, bool isDone) { | 738 bool update(QueueList<Result<T>> events, bool isDone) { |
746 if (events.isNotEmpty) { | 739 if (events.isNotEmpty) { |
747 events.first.complete(_completer); | 740 events.first.complete(_completer); |
748 return true; | 741 return true; |
749 } | 742 } |
750 if (isDone) { | 743 if (isDone) { |
751 _completer.completeError(new StateError("No elements"), | 744 _completer.completeError( |
752 StackTrace.current); | 745 new StateError("No elements"), StackTrace.current); |
753 return true; | 746 return true; |
754 } | 747 } |
755 return false; | 748 return false; |
756 } | 749 } |
757 } | 750 } |
758 | 751 |
759 | |
760 /// Request for a [StreamQueue.skip] call. | 752 /// Request for a [StreamQueue.skip] call. |
761 class _SkipRequest<T> implements _EventRequest<T> { | 753 class _SkipRequest<T> implements _EventRequest<T> { |
762 /// Completer for the future returned by the skip call. | 754 /// Completer for the future returned by the skip call. |
763 final _completer = new Completer<int>(); | 755 final _completer = new Completer<int>(); |
764 | 756 |
765 /// Number of remaining events to skip. | 757 /// Number of remaining events to skip. |
766 /// | 758 /// |
767 /// The request [isComplete] when the values reaches zero. | 759 /// The request [isComplete] when the values reaches zero. |
768 /// | 760 /// |
769 /// Decremented when an event is seen. | 761 /// Decremented when an event is seen. |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
807 /// The request [isComplete] when the length of [_list] reaches | 799 /// The request [isComplete] when the length of [_list] reaches |
808 /// this value. | 800 /// this value. |
809 final int _eventsToTake; | 801 final int _eventsToTake; |
810 | 802 |
811 _ListRequest(this._eventsToTake); | 803 _ListRequest(this._eventsToTake); |
812 | 804 |
813 /// The future completed when the correct number of events have been captured. | 805 /// The future completed when the correct number of events have been captured. |
814 Future<List<T>> get future => _completer.future; | 806 Future<List<T>> get future => _completer.future; |
815 } | 807 } |
816 | 808 |
817 | |
818 /// Request for a [StreamQueue.take] call. | 809 /// Request for a [StreamQueue.take] call. |
819 class _TakeRequest<T> extends _ListRequest<T> { | 810 class _TakeRequest<T> extends _ListRequest<T> { |
820 _TakeRequest(int eventsToTake) : super(eventsToTake); | 811 _TakeRequest(int eventsToTake) : super(eventsToTake); |
821 | 812 |
822 bool update(QueueList<Result<T>> events, bool isDone) { | 813 bool update(QueueList<Result<T>> events, bool isDone) { |
823 while (_list.length < _eventsToTake) { | 814 while (_list.length < _eventsToTake) { |
824 if (events.isEmpty) { | 815 if (events.isEmpty) { |
825 if (isDone) break; | 816 if (isDone) break; |
826 return false; | 817 return false; |
827 } | 818 } |
828 | 819 |
829 var event = events.removeFirst(); | 820 var event = events.removeFirst(); |
830 if (event.isError) { | 821 if (event.isError) { |
831 event.asError.complete(_completer); | 822 event.asError.complete(_completer); |
832 return true; | 823 return true; |
833 } | 824 } |
834 _list.add(event.asValue.value); | 825 _list.add(event.asValue.value); |
835 } | 826 } |
836 _completer.complete(_list); | 827 _completer.complete(_list); |
837 return true; | 828 return true; |
838 } | 829 } |
839 } | 830 } |
840 | 831 |
841 | |
842 /// Request for a [StreamQueue.lookAhead] call. | 832 /// Request for a [StreamQueue.lookAhead] call. |
843 class _LookAheadRequest<T> extends _ListRequest<T> { | 833 class _LookAheadRequest<T> extends _ListRequest<T> { |
844 _LookAheadRequest(int eventsToTake) : super(eventsToTake); | 834 _LookAheadRequest(int eventsToTake) : super(eventsToTake); |
845 | 835 |
846 bool update(QueueList<Result<T>> events, bool isDone) { | 836 bool update(QueueList<Result<T>> events, bool isDone) { |
847 while (_list.length < _eventsToTake) { | 837 while (_list.length < _eventsToTake) { |
848 if (events.length == _list.length) { | 838 if (events.length == _list.length) { |
849 if (isDone) break; | 839 if (isDone) break; |
850 return false; | 840 return false; |
851 } | 841 } |
852 var event = events.elementAt(_list.length); | 842 var event = events.elementAt(_list.length); |
853 if (event.isError) { | 843 if (event.isError) { |
854 event.asError.complete(_completer); | 844 event.asError.complete(_completer); |
855 return true; | 845 return true; |
856 } | 846 } |
857 _list.add(event.asValue.value); | 847 _list.add(event.asValue.value); |
858 } | 848 } |
859 _completer.complete(_list); | 849 _completer.complete(_list); |
860 return true; | 850 return true; |
861 } | 851 } |
862 } | 852 } |
863 | 853 |
864 | |
865 /// Request for a [StreamQueue.cancel] call. | 854 /// Request for a [StreamQueue.cancel] call. |
866 /// | 855 /// |
867 /// The request needs no events, it just waits in the request queue | 856 /// The request needs no events, it just waits in the request queue |
868 /// until all previous events are fulfilled, then it cancels the stream queue | 857 /// until all previous events are fulfilled, then it cancels the stream queue |
869 /// source subscription. | 858 /// source subscription. |
870 class _CancelRequest<T> implements _EventRequest<T> { | 859 class _CancelRequest<T> implements _EventRequest<T> { |
871 /// Completer for the future returned by the `cancel` call. | 860 /// Completer for the future returned by the `cancel` call. |
872 final _completer = new Completer(); | 861 final _completer = new Completer(); |
| 862 |
873 /// | 863 /// |
874 /// When the event is completed, it needs to cancel the active subscription | 864 /// When the event is completed, it needs to cancel the active subscription |
875 /// of the `StreamQueue` object, if any. | 865 /// of the `StreamQueue` object, if any. |
876 final StreamQueue _streamQueue; | 866 final StreamQueue _streamQueue; |
877 | 867 |
878 _CancelRequest(this._streamQueue); | 868 _CancelRequest(this._streamQueue); |
879 | 869 |
880 /// The future completed when the cancel request is completed. | 870 /// The future completed when the cancel request is completed. |
881 Future get future => _completer.future; | 871 Future get future => _completer.future; |
882 | 872 |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
918 } else { | 908 } else { |
919 _completer.setSourceStream(_streamQueue._extractStream()); | 909 _completer.setSourceStream(_streamQueue._extractStream()); |
920 } | 910 } |
921 } else { | 911 } else { |
922 // There are prefetched events which needs to be added before the | 912 // There are prefetched events which needs to be added before the |
923 // remaining stream. | 913 // remaining stream. |
924 var controller = new StreamController<T>(); | 914 var controller = new StreamController<T>(); |
925 for (var event in events) { | 915 for (var event in events) { |
926 event.addTo(controller); | 916 event.addTo(controller); |
927 } | 917 } |
928 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) | 918 controller |
929 .whenComplete(controller.close); | 919 .addStream(_streamQueue._extractStream(), cancelOnError: false) |
| 920 .whenComplete(controller.close); |
930 _completer.setSourceStream(controller.stream); | 921 _completer.setSourceStream(controller.stream); |
931 } | 922 } |
932 return true; | 923 return true; |
933 } | 924 } |
934 } | 925 } |
935 | 926 |
936 /// Request for a [StreamQueue.hasNext] call. | 927 /// Request for a [StreamQueue.hasNext] call. |
937 /// | 928 /// |
938 /// Completes the [future] with `true` if it sees any event, | 929 /// Completes the [future] with `true` if it sees any event, |
939 /// but doesn't consume the event. | 930 /// but doesn't consume the event. |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
979 } | 970 } |
980 | 971 |
981 bool update(QueueList<Result<T>> events, bool isDone) { | 972 bool update(QueueList<Result<T>> events, bool isDone) { |
982 while (_eventsSent < events.length) { | 973 while (_eventsSent < events.length) { |
983 events[_eventsSent++].addTo(_controller); | 974 events[_eventsSent++].addTo(_controller); |
984 } | 975 } |
985 if (isDone && !_controller.isClosed) _controller.close(); | 976 if (isDone && !_controller.isClosed) _controller.close(); |
986 return false; | 977 return false; |
987 } | 978 } |
988 } | 979 } |
OLD | NEW |