OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 import 'dart:collection'; | 6 import 'dart:collection'; |
7 | 7 |
8 import 'package:collection/collection.dart'; | 8 import 'package:collection/collection.dart'; |
9 | 9 |
10 import "cancelable_operation.dart"; | 10 import "cancelable_operation.dart"; |
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
124 /// one events. | 124 /// one events. |
125 Future<bool> get hasNext { | 125 Future<bool> get hasNext { |
126 if (!_isClosed) { | 126 if (!_isClosed) { |
127 var hasNextRequest = new _HasNextRequest(); | 127 var hasNextRequest = new _HasNextRequest(); |
128 _addRequest(hasNextRequest); | 128 _addRequest(hasNextRequest); |
129 return hasNextRequest.future; | 129 return hasNextRequest.future; |
130 } | 130 } |
131 throw _failClosed(); | 131 throw _failClosed(); |
132 } | 132 } |
133 | 133 |
| 134 |
| 135 /// Returns the next [count] data events without consuming them. |
| 136 /// |
| 137 /// Works like [take] except that the events are left in the queue. |
| 138 /// If one of the next [count] events is an error, the returned future |
| 139 /// completes with this error, and the error is still left in the queue. |
| 140 Future<List<T>> lookAhead(int count) { |
| 141 RangeError.checkNotNegative(count, "count"); |
| 142 if (!_isClosed) { |
| 143 var request = new _LookAheadRequest<T>(count); |
| 144 _addRequest(request); |
| 145 return request.future; |
| 146 } |
| 147 throw _failClosed(); |
| 148 } |
| 149 |
134 /// Requests the next (yet unrequested) event from the stream. | 150 /// Requests the next (yet unrequested) event from the stream. |
135 /// | 151 /// |
136 /// When the requested event arrives, the returned future is completed with | 152 /// When the requested event arrives, the returned future is completed with |
137 /// the event. | 153 /// the event. |
138 /// If the event is a data event, the returned future completes | 154 /// If the event is a data event, the returned future completes |
139 /// with its value. | 155 /// with its value. |
140 /// If the event is an error event, the returned future completes with | 156 /// If the event is an error event, the returned future completes with |
141 /// its error and stack trace. | 157 /// its error and stack trace. |
142 /// If the stream closes before an event arrives, the returned future | 158 /// If the stream closes before an event arrives, the returned future |
143 /// completes with a [StateError]. | 159 /// completes with a [StateError]. |
144 /// | 160 /// |
145 /// It's possible to have several pending [next] calls (or other requests), | 161 /// It's possible to have several pending [next] calls (or other requests), |
146 /// and they will be completed in the order they were requested, by the | 162 /// and they will be completed in the order they were requested, by the |
147 /// first events that were not consumed by previous requeusts. | 163 /// first events that were not consumed by previous requeusts. |
148 Future<T> get next { | 164 Future<T> get next { |
149 if (!_isClosed) { | 165 if (!_isClosed) { |
150 var nextRequest = new _NextRequest<T>(); | 166 var nextRequest = new _NextRequest<T>(); |
151 _addRequest(nextRequest); | 167 _addRequest(nextRequest); |
152 return nextRequest.future; | 168 return nextRequest.future; |
153 } | 169 } |
154 throw _failClosed(); | 170 throw _failClosed(); |
155 } | 171 } |
156 | 172 |
| 173 /// Returns the next (yet unrequested) event from the stream. |
| 174 /// |
| 175 /// Like [next] except that the event is not consumed. |
| 176 /// If the next event is an error event, it also stays in the queue. |
| 177 Future<T> get peek { |
| 178 if (!_isClosed) { |
| 179 var nextRequest = new _PeekRequest<T>(); |
| 180 _addRequest(nextRequest); |
| 181 return nextRequest.future; |
| 182 } |
| 183 throw _failClosed(); |
| 184 } |
| 185 |
157 /// Returns a stream of all the remaning events of the source stream. | 186 /// Returns a stream of all the remaning events of the source stream. |
158 /// | 187 /// |
159 /// All requested [next], [skip] or [take] operations are completed | 188 /// All requested [next], [skip] or [take] operations are completed |
160 /// first, and then any remaining events are provided as events of | 189 /// first, and then any remaining events are provided as events of |
161 /// the returned stream. | 190 /// the returned stream. |
162 /// | 191 /// |
163 /// Using `rest` closes this stream queue. After getting the | 192 /// Using `rest` closes this stream queue. After getting the |
164 /// `rest` the caller may no longer request other events, like | 193 /// `rest` the caller may no longer request other events, like |
165 /// after calling [cancel]. | 194 /// after calling [cancel]. |
166 Stream<T> get rest { | 195 Stream<T> get rest { |
(...skipping 15 matching lines...) Expand all Loading... |
182 /// | 211 /// |
183 /// If an error occurs before `count` data events have been skipped, | 212 /// If an error occurs before `count` data events have been skipped, |
184 /// the returned future completes with that error instead. | 213 /// the returned future completes with that error instead. |
185 /// | 214 /// |
186 /// If the stream closes before `count` data events, | 215 /// If the stream closes before `count` data events, |
187 /// the remaining unskipped event count is returned. | 216 /// the remaining unskipped event count is returned. |
188 /// If the returned future completes with the integer `0`, | 217 /// If the returned future completes with the integer `0`, |
189 /// then all events were succssfully skipped. If the value | 218 /// then all events were succssfully skipped. If the value |
190 /// is greater than zero then the stream ended early. | 219 /// is greater than zero then the stream ended early. |
191 Future<int> skip(int count) { | 220 Future<int> skip(int count) { |
192 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 221 RangeError.checkNotNegative(count, "count"); |
193 if (!_isClosed) { | 222 if (!_isClosed) { |
194 var request = new _SkipRequest(count); | 223 var request = new _SkipRequest(count); |
195 _addRequest(request); | 224 _addRequest(request); |
196 return request.future; | 225 return request.future; |
197 } | 226 } |
198 throw _failClosed(); | 227 throw _failClosed(); |
199 } | 228 } |
200 | 229 |
201 /// Requests the next [count] data events as a list. | 230 /// Requests the next [count] data events as a list. |
202 /// | 231 /// |
203 /// The [count] must be non-negative. | 232 /// The [count] must be non-negative. |
204 /// | 233 /// |
205 /// Equivalent to calling [next] `count` times and | 234 /// Equivalent to calling [next] `count` times and |
206 /// storing the data values in a list. | 235 /// storing the data values in a list. |
207 /// | 236 /// |
208 /// If an error occurs before `count` data events has | 237 /// If an error occurs before `count` data events has |
209 /// been collected, the returned future completes with | 238 /// been collected, the returned future completes with |
210 /// that error instead. | 239 /// that error instead. |
211 /// | 240 /// |
212 /// If the stream closes before `count` data events, | 241 /// If the stream closes before `count` data events, |
213 /// the returned future completes with the list | 242 /// the returned future completes with the list |
214 /// of data collected so far. That is, the returned | 243 /// of data collected so far. That is, the returned |
215 /// list may have fewer than [count] elements. | 244 /// list may have fewer than [count] elements. |
216 Future<List<T>> take(int count) { | 245 Future<List<T>> take(int count) { |
217 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 246 RangeError.checkNotNegative(count, "count"); |
218 if (!_isClosed) { | 247 if (!_isClosed) { |
219 var request = new _TakeRequest<T>(count); | 248 var request = new _TakeRequest<T>(count); |
220 _addRequest(request); | 249 _addRequest(request); |
221 return request.future; | 250 return request.future; |
222 } | 251 } |
223 throw _failClosed(); | 252 throw _failClosed(); |
224 } | 253 } |
225 | 254 |
226 /// Requests a transaction that can conditionally consume events. | 255 /// Requests a transaction that can conditionally consume events. |
227 /// | 256 /// |
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
345 /// all previously requested events have been processed, then it cancels the | 374 /// all previously requested events have been processed, then it cancels the |
346 /// subscription providing the events. | 375 /// subscription providing the events. |
347 /// | 376 /// |
348 /// If [immediate] is `true`, the source is instead canceled | 377 /// If [immediate] is `true`, the source is instead canceled |
349 /// immediately. Any pending events are completed as though the underlying | 378 /// immediately. Any pending events are completed as though the underlying |
350 /// stream had closed. | 379 /// stream had closed. |
351 /// | 380 /// |
352 /// The returned future completes with the result of calling | 381 /// The returned future completes with the result of calling |
353 /// `cancel`. | 382 /// `cancel`. |
354 /// | 383 /// |
355 /// After calling `cancel`, no further events can be requested. | 384 /// After calling `cancel`, no further events can be requested, |
356 /// None of [next], [rest], [skip], [take] or [cancel] may be | 385 /// so methods like [next] or [peek] may not be called again. |
357 /// called again. | |
358 Future cancel({bool immediate: false}) { | 386 Future cancel({bool immediate: false}) { |
359 if (_isClosed) throw _failClosed(); | 387 if (_isClosed) throw _failClosed(); |
360 _isClosed = true; | 388 _isClosed = true; |
361 | 389 |
362 if (!immediate) { | 390 if (!immediate) { |
363 var request = new _CancelRequest(this); | 391 var request = new _CancelRequest(this); |
364 _addRequest(request); | 392 _addRequest(request); |
365 return request.future; | 393 return request.future; |
366 } | 394 } |
367 | 395 |
(...skipping 317 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
685 _NextRequest(); | 713 _NextRequest(); |
686 | 714 |
687 Future<T> get future => _completer.future; | 715 Future<T> get future => _completer.future; |
688 | 716 |
689 bool update(QueueList<Result<T>> events, bool isDone) { | 717 bool update(QueueList<Result<T>> events, bool isDone) { |
690 if (events.isNotEmpty) { | 718 if (events.isNotEmpty) { |
691 events.removeFirst().complete(_completer); | 719 events.removeFirst().complete(_completer); |
692 return true; | 720 return true; |
693 } | 721 } |
694 if (isDone) { | 722 if (isDone) { |
695 var errorFuture = | 723 _completer.completeError(new StateError("No elements"), |
696 new Future.sync(() => throw new StateError("No elements")); | 724 StackTrace.current); |
697 _completer.complete(errorFuture); | |
698 return true; | 725 return true; |
699 } | 726 } |
700 return false; | 727 return false; |
701 } | 728 } |
702 } | 729 } |
703 | 730 |
| 731 |
| 732 /// Request for a [StreamQueue.peek] call. |
| 733 /// |
| 734 /// Completes the returned future when receiving the first event, |
| 735 /// and is then complete, but doesn't consume the event. |
| 736 class _PeekRequest<T> implements _EventRequest<T> { |
| 737 /// Completer for the future returned by [StreamQueue.next]. |
| 738 final _completer = new Completer<T>(); |
| 739 |
| 740 _PeekRequest(); |
| 741 |
| 742 Future<T> get future => _completer.future; |
| 743 |
| 744 bool update(QueueList<Result<T>> events, bool isDone) { |
| 745 if (events.isNotEmpty) { |
| 746 events.first.complete(_completer); |
| 747 return true; |
| 748 } |
| 749 if (isDone) { |
| 750 _completer.completeError(new StateError("No elements"), |
| 751 StackTrace.current); |
| 752 return true; |
| 753 } |
| 754 return false; |
| 755 } |
| 756 } |
| 757 |
| 758 |
704 /// Request for a [StreamQueue.skip] call. | 759 /// Request for a [StreamQueue.skip] call. |
705 class _SkipRequest<T> implements _EventRequest<T> { | 760 class _SkipRequest<T> implements _EventRequest<T> { |
706 /// Completer for the future returned by the skip call. | 761 /// Completer for the future returned by the skip call. |
707 final _completer = new Completer<int>(); | 762 final _completer = new Completer<int>(); |
708 | 763 |
709 /// Number of remaining events to skip. | 764 /// Number of remaining events to skip. |
710 /// | 765 /// |
711 /// The request [isComplete] when the values reaches zero. | 766 /// The request [isComplete] when the values reaches zero. |
712 /// | 767 /// |
713 /// Decremented when an event is seen. | 768 /// Decremented when an event is seen. |
(...skipping 17 matching lines...) Expand all Loading... |
731 if (event.isError) { | 786 if (event.isError) { |
732 _completer.completeError(event.asError.error, event.asError.stackTrace); | 787 _completer.completeError(event.asError.error, event.asError.stackTrace); |
733 return true; | 788 return true; |
734 } | 789 } |
735 } | 790 } |
736 _completer.complete(_eventsToSkip); | 791 _completer.complete(_eventsToSkip); |
737 return true; | 792 return true; |
738 } | 793 } |
739 } | 794 } |
740 | 795 |
741 /// Request for a [StreamQueue.take] call. | 796 /// Common superclass for [_TakeRequest] and [_LookAheadRequest]. |
742 class _TakeRequest<T> implements _EventRequest<T> { | 797 abstract class _ListRequest<T> implements _EventRequest<T> { |
743 /// Completer for the future returned by the take call. | 798 /// Completer for the future returned by the take call. |
744 final _completer = new Completer<List<T>>(); | 799 final _completer = new Completer<List<T>>(); |
745 | 800 |
746 /// List collecting events until enough have been seen. | 801 /// List collecting events until enough have been seen. |
747 final _list = <T>[]; | 802 final _list = <T>[]; |
748 | 803 |
749 /// Number of events to capture. | 804 /// Number of events to capture. |
750 /// | 805 /// |
751 /// The request [isComplete] when the length of [_list] reaches | 806 /// The request [isComplete] when the length of [_list] reaches |
752 /// this value. | 807 /// this value. |
753 final int _eventsToTake; | 808 final int _eventsToTake; |
754 | 809 |
755 _TakeRequest(this._eventsToTake); | 810 _ListRequest(this._eventsToTake); |
756 | 811 |
757 /// The future completed when the correct number of events have been captured. | 812 /// The future completed when the correct number of events have been captured. |
758 Future<List<T>> get future => _completer.future; | 813 Future<List<T>> get future => _completer.future; |
| 814 } |
| 815 |
| 816 |
| 817 /// Request for a [StreamQueue.take] call. |
| 818 class _TakeRequest<T> extends _ListRequest<T> { |
| 819 _TakeRequest(int eventsToTake) : super(eventsToTake); |
759 | 820 |
760 bool update(QueueList<Result<T>> events, bool isDone) { | 821 bool update(QueueList<Result<T>> events, bool isDone) { |
761 while (_list.length < _eventsToTake) { | 822 while (_list.length < _eventsToTake) { |
762 if (events.isEmpty) { | 823 if (events.isEmpty) { |
763 if (isDone) break; | 824 if (isDone) break; |
764 return false; | 825 return false; |
765 } | 826 } |
766 | 827 |
767 var event = events.removeFirst(); | 828 var event = events.removeFirst(); |
768 if (event.isError) { | 829 if (event.isError) { |
769 _completer.completeError(event.asError.error, event.asError.stackTrace); | 830 event.asError.complete(_completer); |
770 return true; | 831 return true; |
771 } | 832 } |
772 _list.add(event.asValue.value); | 833 _list.add(event.asValue.value); |
773 } | 834 } |
774 _completer.complete(_list); | 835 _completer.complete(_list); |
775 return true; | 836 return true; |
776 } | 837 } |
777 } | 838 } |
778 | 839 |
| 840 |
| 841 /// Request for a [StreamQueue.lookAhead] call. |
| 842 class _LookAheadRequest<T> extends _ListRequest<T> { |
| 843 _LookAheadRequest(int eventsToTake) : super(eventsToTake); |
| 844 |
| 845 bool update(QueueList<Result<T>> events, bool isDone) { |
| 846 while (_list.length < _eventsToTake) { |
| 847 if (events.length == _list.length) { |
| 848 if (isDone) break; |
| 849 return false; |
| 850 } |
| 851 var event = events.elementAt(_list.length); |
| 852 if (event.isError) { |
| 853 event.asError.complete(_completer); |
| 854 return true; |
| 855 } |
| 856 _list.add(event.asValue.value); |
| 857 } |
| 858 _completer.complete(_list); |
| 859 return true; |
| 860 } |
| 861 } |
| 862 |
| 863 |
779 /// Request for a [StreamQueue.cancel] call. | 864 /// Request for a [StreamQueue.cancel] call. |
780 /// | 865 /// |
781 /// The request needs no events, it just waits in the request queue | 866 /// The request needs no events, it just waits in the request queue |
782 /// until all previous events are fulfilled, then it cancels the stream queue | 867 /// until all previous events are fulfilled, then it cancels the stream queue |
783 /// source subscription. | 868 /// source subscription. |
784 class _CancelRequest<T> implements _EventRequest<T> { | 869 class _CancelRequest<T> implements _EventRequest<T> { |
785 /// Completer for the future returned by the `cancel` call. | 870 /// Completer for the future returned by the `cancel` call. |
786 final _completer = new Completer(); | 871 final _completer = new Completer(); |
787 /// | 872 /// |
788 /// When the event is completed, it needs to cancel the active subscription | 873 /// When the event is completed, it needs to cancel the active subscription |
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
893 } | 978 } |
894 | 979 |
895 bool update(QueueList<Result<T>> events, bool isDone) { | 980 bool update(QueueList<Result<T>> events, bool isDone) { |
896 while (_eventsSent < events.length) { | 981 while (_eventsSent < events.length) { |
897 events[_eventsSent++].addTo(_controller); | 982 events[_eventsSent++].addTo(_controller); |
898 } | 983 } |
899 if (isDone && !_controller.isClosed) _controller.close(); | 984 if (isDone && !_controller.isClosed) _controller.close(); |
900 return false; | 985 return false; |
901 } | 986 } |
902 } | 987 } |
OLD | NEW |