OLD | NEW |
---|---|
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 import 'dart:collection'; | 6 import 'dart:collection'; |
7 | 7 |
8 import 'package:collection/collection.dart'; | 8 import 'package:collection/collection.dart'; |
9 | 9 |
10 import "cancelable_operation.dart"; | 10 import "cancelable_operation.dart"; |
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
115 /// one events. | 115 /// one events. |
116 Future<bool> get hasNext { | 116 Future<bool> get hasNext { |
117 if (!_isClosed) { | 117 if (!_isClosed) { |
118 var hasNextRequest = new _HasNextRequest(); | 118 var hasNextRequest = new _HasNextRequest(); |
119 _addRequest(hasNextRequest); | 119 _addRequest(hasNextRequest); |
120 return hasNextRequest.future; | 120 return hasNextRequest.future; |
121 } | 121 } |
122 throw _failClosed(); | 122 throw _failClosed(); |
123 } | 123 } |
124 | 124 |
125 | |
126 /// Look at the next [count] data events without consuming them. | |
nweiz
2017/01/24 22:37:59
"Look at" -> "Returns"
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
| |
127 /// | |
128 /// Works like [take] except that the events are left in the queue. | |
129 /// If one of the next [count] events is an error, the returned future | |
130 /// completes with this error, and the error is still left in the queue. | |
131 Future<List<T>> lookAhead(int count) { | |
132 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
nweiz
2017/01/24 22:37:59
`RangeError.checkNotNegative()`?
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
| |
133 if (!_isClosed) { | |
134 var request = new _LookAheadRequest<T>(count); | |
135 _addRequest(request); | |
136 return request.future; | |
137 } | |
138 throw _failClosed(); | |
139 } | |
nweiz
2017/01/24 22:37:59
Why not just implement these in terms of transacti
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Because it would also be a lot more expensive.
I p
nweiz
2017/01/25 22:41:18
Do you have benchmarks where the extra processing
Lasse Reichstein Nielsen
2017/01/26 12:13:43
I've added a benchmark below. It shows that `retur
| |
140 | |
125 /// Requests the next (yet unrequested) event from the stream. | 141 /// Requests the next (yet unrequested) event from the stream. |
126 /// | 142 /// |
127 /// When the requested event arrives, the returned future is completed with | 143 /// When the requested event arrives, the returned future is completed with |
128 /// the event. | 144 /// the event. |
129 /// If the event is a data event, the returned future completes | 145 /// If the event is a data event, the returned future completes |
130 /// with its value. | 146 /// with its value. |
131 /// If the event is an error event, the returned future completes with | 147 /// If the event is an error event, the returned future completes with |
132 /// its error and stack trace. | 148 /// its error and stack trace. |
133 /// If the stream closes before an event arrives, the returned future | 149 /// If the stream closes before an event arrives, the returned future |
134 /// completes with a [StateError]. | 150 /// completes with a [StateError]. |
135 /// | 151 /// |
136 /// It's possible to have several pending [next] calls (or other requests), | 152 /// It's possible to have several pending [next] calls (or other requests), |
137 /// and they will be completed in the order they were requested, by the | 153 /// and they will be completed in the order they were requested, by the |
138 /// first events that were not consumed by previous requeusts. | 154 /// first events that were not consumed by previous requeusts. |
139 Future<T> get next { | 155 Future<T> get next { |
140 if (!_isClosed) { | 156 if (!_isClosed) { |
141 var nextRequest = new _NextRequest<T>(); | 157 var nextRequest = new _NextRequest<T>(); |
142 _addRequest(nextRequest); | 158 _addRequest(nextRequest); |
143 return nextRequest.future; | 159 return nextRequest.future; |
144 } | 160 } |
145 throw _failClosed(); | 161 throw _failClosed(); |
146 } | 162 } |
147 | 163 |
164 /// Looks at the next (yet unrequested) event from the stream. | |
nweiz
2017/01/24 22:37:59
"Looks at" -> "Returns"
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
| |
165 /// | |
166 /// Like [next] except that the event is not consumed. | |
167 /// If the next event is an error event, it stays in the queue. | |
168 Future<T> get peek { | |
169 if (!_isClosed) { | |
170 var nextRequest = new _PeekRequest<T>(); | |
171 _addRequest(nextRequest); | |
172 return nextRequest.future; | |
173 } | |
174 throw _failClosed(); | |
175 } | |
176 | |
148 /// Returns a stream of all the remaning events of the source stream. | 177 /// Returns a stream of all the remaning events of the source stream. |
149 /// | 178 /// |
150 /// All requested [next], [skip] or [take] operations are completed | 179 /// All requested [next], [skip] or [take] operations are completed |
151 /// first, and then any remaining events are provided as events of | 180 /// first, and then any remaining events are provided as events of |
152 /// the returned stream. | 181 /// the returned stream. |
153 /// | 182 /// |
154 /// Using `rest` closes this stream queue. After getting the | 183 /// Using `rest` closes this stream queue. After getting the |
155 /// `rest` the caller may no longer request other events, like | 184 /// `rest` the caller may no longer request other events, like |
156 /// after calling [cancel]. | 185 /// after calling [cancel]. |
157 Stream<T> get rest { | 186 Stream<T> get rest { |
(...skipping 179 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
337 /// subscription providing the events. | 366 /// subscription providing the events. |
338 /// | 367 /// |
339 /// If [immediate] is `true`, the source is instead canceled | 368 /// If [immediate] is `true`, the source is instead canceled |
340 /// immediately. Any pending events are completed as though the underlying | 369 /// immediately. Any pending events are completed as though the underlying |
341 /// stream had closed. | 370 /// stream had closed. |
342 /// | 371 /// |
343 /// The returned future completes with the result of calling | 372 /// The returned future completes with the result of calling |
344 /// `cancel`. | 373 /// `cancel`. |
345 /// | 374 /// |
346 /// After calling `cancel`, no further events can be requested. | 375 /// After calling `cancel`, no further events can be requested. |
347 /// None of [next], [rest], [skip], [take] or [cancel] may be | 376 /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel] |
348 /// called again. | 377 /// may be called again. |
nweiz
2017/01/24 22:37:59
Is it really necessary to explicitly list all the
Lasse Reichstein Nielsen
2017/01/25 07:58:03
It seemed like a great idea at the time ... but it
| |
349 Future cancel({bool immediate: false}) { | 378 Future cancel({bool immediate: false}) { |
350 if (_isClosed) throw _failClosed(); | 379 if (_isClosed) throw _failClosed(); |
351 _isClosed = true; | 380 _isClosed = true; |
352 | 381 |
353 if (!immediate) { | 382 if (!immediate) { |
354 var request = new _CancelRequest(this); | 383 var request = new _CancelRequest(this); |
355 _addRequest(request); | 384 _addRequest(request); |
356 return request.future; | 385 return request.future; |
357 } | 386 } |
358 | 387 |
(...skipping 337 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
696 _NextRequest(); | 725 _NextRequest(); |
697 | 726 |
698 Future<T> get future => _completer.future; | 727 Future<T> get future => _completer.future; |
699 | 728 |
700 bool update(QueueList<Result<T>> events, bool isDone) { | 729 bool update(QueueList<Result<T>> events, bool isDone) { |
701 if (events.isNotEmpty) { | 730 if (events.isNotEmpty) { |
702 events.removeFirst().complete(_completer); | 731 events.removeFirst().complete(_completer); |
703 return true; | 732 return true; |
704 } | 733 } |
705 if (isDone) { | 734 if (isDone) { |
706 var errorFuture = | 735 _completer.completeError(new StateError("No elements"), |
707 new Future.sync(() => throw new StateError("No elements")); | 736 StackTrace.current); |
708 _completer.complete(errorFuture); | |
709 return true; | 737 return true; |
710 } | 738 } |
711 return false; | 739 return false; |
712 } | 740 } |
713 } | 741 } |
714 | 742 |
743 | |
744 /// Request for a [StreamQueue.peek] call. | |
745 /// | |
746 /// Completes the returned future when receiving the first event, | |
747 /// and is then complete, but doesn't consume the event. | |
748 class _PeekRequest<T> implements _EventRequest<T> { | |
749 /// Completer for the future returned by [StreamQueue.next]. | |
750 final _completer = new Completer<T>(); | |
751 | |
752 _PeekRequest(); | |
753 | |
754 Future<T> get future => _completer.future; | |
755 | |
756 bool update(QueueList<Result<T>> events, bool isDone) { | |
757 if (events.isNotEmpty) { | |
758 events.first.complete(_completer); | |
759 return true; | |
760 } | |
761 if (isDone) { | |
762 _completer.completeError(new StateError("No elements"), | |
763 StackTrace.current); | |
764 return true; | |
765 } | |
766 return false; | |
767 } | |
768 } | |
769 | |
770 | |
715 /// Request for a [StreamQueue.skip] call. | 771 /// Request for a [StreamQueue.skip] call. |
716 class _SkipRequest<T> implements _EventRequest<T> { | 772 class _SkipRequest<T> implements _EventRequest<T> { |
717 /// Completer for the future returned by the skip call. | 773 /// Completer for the future returned by the skip call. |
718 final _completer = new Completer<int>(); | 774 final _completer = new Completer<int>(); |
719 | 775 |
720 /// Number of remaining events to skip. | 776 /// Number of remaining events to skip. |
721 /// | 777 /// |
722 /// The request [isComplete] when the values reaches zero. | 778 /// The request [isComplete] when the values reaches zero. |
723 /// | 779 /// |
724 /// Decremented when an event is seen. | 780 /// Decremented when an event is seen. |
(...skipping 17 matching lines...) Expand all Loading... | |
742 if (event.isError) { | 798 if (event.isError) { |
743 _completer.completeError(event.asError.error, event.asError.stackTrace); | 799 _completer.completeError(event.asError.error, event.asError.stackTrace); |
744 return true; | 800 return true; |
745 } | 801 } |
746 } | 802 } |
747 _completer.complete(_eventsToSkip); | 803 _completer.complete(_eventsToSkip); |
748 return true; | 804 return true; |
749 } | 805 } |
750 } | 806 } |
751 | 807 |
752 /// Request for a [StreamQueue.take] call. | 808 /// Common superclass for [_TakeRequest] and [_LookAheadRequest]. |
753 class _TakeRequest<T> implements _EventRequest<T> { | 809 abstract class _ListRequest<T> implements _EventRequest<T> { |
754 /// Completer for the future returned by the take call. | 810 /// Completer for the future returned by the take call. |
755 final _completer = new Completer<List<T>>(); | 811 final _completer = new Completer<List<T>>(); |
756 | 812 |
757 /// List collecting events until enough have been seen. | 813 /// List collecting events until enough have been seen. |
758 final _list = <T>[]; | 814 final _list = <T>[]; |
759 | 815 |
760 /// Number of events to capture. | 816 /// Number of events to capture. |
761 /// | 817 /// |
762 /// The request [isComplete] when the length of [_list] reaches | 818 /// The request [isComplete] when the length of [_list] reaches |
763 /// this value. | 819 /// this value. |
764 final int _eventsToTake; | 820 final int _eventsToTake; |
765 | 821 |
766 _TakeRequest(this._eventsToTake); | 822 _ListRequest(this._eventsToTake); |
767 | 823 |
768 /// The future completed when the correct number of events have been captured. | 824 /// The future completed when the correct number of events have been captured. |
769 Future<List<T>> get future => _completer.future; | 825 Future<List<T>> get future => _completer.future; |
826 } | |
827 | |
828 | |
829 /// Request for a [StreamQueue.take] call. | |
830 class _TakeRequest<T> extends _ListRequest<T> { | |
831 _TakeRequest(int eventsToTake) : super(eventsToTake); | |
770 | 832 |
771 bool update(QueueList<Result<T>> events, bool isDone) { | 833 bool update(QueueList<Result<T>> events, bool isDone) { |
772 while (_list.length < _eventsToTake) { | 834 while (_list.length < _eventsToTake) { |
773 if (events.isEmpty) { | 835 if (events.isEmpty) { |
774 if (isDone) break; | 836 if (isDone) break; |
775 return false; | 837 return false; |
776 } | 838 } |
777 | 839 |
778 var event = events.removeFirst(); | 840 var event = events.removeFirst(); |
779 if (event.isError) { | 841 if (event.isError) { |
780 _completer.completeError(event.asError.error, event.asError.stackTrace); | 842 event.asError.complete(_completer); |
781 return true; | 843 return true; |
782 } | 844 } |
783 _list.add(event.asValue.value); | 845 _list.add(event.asValue.value); |
784 } | 846 } |
785 _completer.complete(_list); | 847 _completer.complete(_list); |
786 return true; | 848 return true; |
787 } | 849 } |
788 } | 850 } |
789 | 851 |
852 | |
853 /// Request for a [StreamQueue.lookAhead] call. | |
854 class _LookAheadRequest<T> extends _ListRequest<T> { | |
855 _LookAheadRequest(int eventsToTake) : super(eventsToTake); | |
856 | |
857 bool update(QueueList<Result<T>> events, bool isDone) { | |
858 while (_list.length < _eventsToTake) { | |
859 if (events.length == _list.length) { | |
860 if (isDone) break; | |
861 return false; | |
862 } | |
863 var event = events.elementAt(_list.length); | |
864 if (event.isError) { | |
865 event.asError.complete(_completer); | |
866 return true; | |
867 } | |
868 _list.add(event.asValue.value); | |
869 } | |
870 _completer.complete(_list); | |
871 return true; | |
872 } | |
873 } | |
874 | |
875 | |
790 /// Request for a [StreamQueue.cancel] call. | 876 /// Request for a [StreamQueue.cancel] call. |
791 /// | 877 /// |
792 /// The request needs no events, it just waits in the request queue | 878 /// The request needs no events, it just waits in the request queue |
793 /// until all previous events are fulfilled, then it cancels the stream queue | 879 /// until all previous events are fulfilled, then it cancels the stream queue |
794 /// source subscription. | 880 /// source subscription. |
795 class _CancelRequest<T> implements _EventRequest<T> { | 881 class _CancelRequest<T> implements _EventRequest<T> { |
796 /// Completer for the future returned by the `cancel` call. | 882 /// Completer for the future returned by the `cancel` call. |
797 final _completer = new Completer(); | 883 final _completer = new Completer(); |
798 /// | 884 /// |
799 /// When the event is completed, it needs to cancel the active subscription | 885 /// When the event is completed, it needs to cancel the active subscription |
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
904 } | 990 } |
905 | 991 |
906 bool update(QueueList<Result<T>> events, bool isDone) { | 992 bool update(QueueList<Result<T>> events, bool isDone) { |
907 while (_eventsSent < events.length) { | 993 while (_eventsSent < events.length) { |
908 events[_eventsSent++].addTo(_controller); | 994 events[_eventsSent++].addTo(_controller); |
909 } | 995 } |
910 if (isDone && !_controller.isClosed) _controller.close(); | 996 if (isDone && !_controller.isClosed) _controller.close(); |
911 return false; | 997 return false; |
912 } | 998 } |
913 } | 999 } |
OLD | NEW |