| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 import 'dart:collection'; | 6 import 'dart:collection'; |
| 7 | 7 |
| 8 import 'package:collection/collection.dart'; | 8 import 'package:collection/collection.dart'; |
| 9 | 9 |
| 10 import "cancelable_operation.dart"; | 10 import "cancelable_operation.dart"; |
| (...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 124 /// one events. | 124 /// one events. |
| 125 Future<bool> get hasNext { | 125 Future<bool> get hasNext { |
| 126 if (!_isClosed) { | 126 if (!_isClosed) { |
| 127 var hasNextRequest = new _HasNextRequest(); | 127 var hasNextRequest = new _HasNextRequest(); |
| 128 _addRequest(hasNextRequest); | 128 _addRequest(hasNextRequest); |
| 129 return hasNextRequest.future; | 129 return hasNextRequest.future; |
| 130 } | 130 } |
| 131 throw _failClosed(); | 131 throw _failClosed(); |
| 132 } | 132 } |
| 133 | 133 |
| 134 |
| 135 /// Returns the next [count] data events without consuming them. |
| 136 /// |
| 137 /// Works like [take] except that the events are left in the queue. |
| 138 /// If one of the next [count] events is an error, the returned future |
| 139 /// completes with this error, and the error is still left in the queue. |
| 140 Future<List<T>> lookAhead(int count) { |
| 141 RangeError.checkNotNegative(count, "count"); |
| 142 if (!_isClosed) { |
| 143 var request = new _LookAheadRequest<T>(count); |
| 144 _addRequest(request); |
| 145 return request.future; |
| 146 } |
| 147 throw _failClosed(); |
| 148 } |
| 149 |
| 134 /// Requests the next (yet unrequested) event from the stream. | 150 /// Requests the next (yet unrequested) event from the stream. |
| 135 /// | 151 /// |
| 136 /// When the requested event arrives, the returned future is completed with | 152 /// When the requested event arrives, the returned future is completed with |
| 137 /// the event. | 153 /// the event. |
| 138 /// If the event is a data event, the returned future completes | 154 /// If the event is a data event, the returned future completes |
| 139 /// with its value. | 155 /// with its value. |
| 140 /// If the event is an error event, the returned future completes with | 156 /// If the event is an error event, the returned future completes with |
| 141 /// its error and stack trace. | 157 /// its error and stack trace. |
| 142 /// If the stream closes before an event arrives, the returned future | 158 /// If the stream closes before an event arrives, the returned future |
| 143 /// completes with a [StateError]. | 159 /// completes with a [StateError]. |
| 144 /// | 160 /// |
| 145 /// It's possible to have several pending [next] calls (or other requests), | 161 /// It's possible to have several pending [next] calls (or other requests), |
| 146 /// and they will be completed in the order they were requested, by the | 162 /// and they will be completed in the order they were requested, by the |
| 147 /// first events that were not consumed by previous requeusts. | 163 /// first events that were not consumed by previous requeusts. |
| 148 Future<T> get next { | 164 Future<T> get next { |
| 149 if (!_isClosed) { | 165 if (!_isClosed) { |
| 150 var nextRequest = new _NextRequest<T>(); | 166 var nextRequest = new _NextRequest<T>(); |
| 151 _addRequest(nextRequest); | 167 _addRequest(nextRequest); |
| 152 return nextRequest.future; | 168 return nextRequest.future; |
| 153 } | 169 } |
| 154 throw _failClosed(); | 170 throw _failClosed(); |
| 155 } | 171 } |
| 156 | 172 |
| 173 /// Returns the next (yet unrequested) event from the stream. |
| 174 /// |
| 175 /// Like [next] except that the event is not consumed. |
| 176 /// If the next event is an error event, it also stays in the queue. |
| 177 Future<T> get peek { |
| 178 if (!_isClosed) { |
| 179 var nextRequest = new _PeekRequest<T>(); |
| 180 _addRequest(nextRequest); |
| 181 return nextRequest.future; |
| 182 } |
| 183 throw _failClosed(); |
| 184 } |
| 185 |
| 157 /// Returns a stream of all the remaning events of the source stream. | 186 /// Returns a stream of all the remaning events of the source stream. |
| 158 /// | 187 /// |
| 159 /// All requested [next], [skip] or [take] operations are completed | 188 /// All requested [next], [skip] or [take] operations are completed |
| 160 /// first, and then any remaining events are provided as events of | 189 /// first, and then any remaining events are provided as events of |
| 161 /// the returned stream. | 190 /// the returned stream. |
| 162 /// | 191 /// |
| 163 /// Using `rest` closes this stream queue. After getting the | 192 /// Using `rest` closes this stream queue. After getting the |
| 164 /// `rest` the caller may no longer request other events, like | 193 /// `rest` the caller may no longer request other events, like |
| 165 /// after calling [cancel]. | 194 /// after calling [cancel]. |
| 166 Stream<T> get rest { | 195 Stream<T> get rest { |
| (...skipping 15 matching lines...) Expand all Loading... |
| 182 /// | 211 /// |
| 183 /// If an error occurs before `count` data events have been skipped, | 212 /// If an error occurs before `count` data events have been skipped, |
| 184 /// the returned future completes with that error instead. | 213 /// the returned future completes with that error instead. |
| 185 /// | 214 /// |
| 186 /// If the stream closes before `count` data events, | 215 /// If the stream closes before `count` data events, |
| 187 /// the remaining unskipped event count is returned. | 216 /// the remaining unskipped event count is returned. |
| 188 /// If the returned future completes with the integer `0`, | 217 /// If the returned future completes with the integer `0`, |
| 189 /// then all events were succssfully skipped. If the value | 218 /// then all events were succssfully skipped. If the value |
| 190 /// is greater than zero then the stream ended early. | 219 /// is greater than zero then the stream ended early. |
| 191 Future<int> skip(int count) { | 220 Future<int> skip(int count) { |
| 192 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 221 RangeError.checkNotNegative(count, "count"); |
| 193 if (!_isClosed) { | 222 if (!_isClosed) { |
| 194 var request = new _SkipRequest(count); | 223 var request = new _SkipRequest(count); |
| 195 _addRequest(request); | 224 _addRequest(request); |
| 196 return request.future; | 225 return request.future; |
| 197 } | 226 } |
| 198 throw _failClosed(); | 227 throw _failClosed(); |
| 199 } | 228 } |
| 200 | 229 |
| 201 /// Requests the next [count] data events as a list. | 230 /// Requests the next [count] data events as a list. |
| 202 /// | 231 /// |
| 203 /// The [count] must be non-negative. | 232 /// The [count] must be non-negative. |
| 204 /// | 233 /// |
| 205 /// Equivalent to calling [next] `count` times and | 234 /// Equivalent to calling [next] `count` times and |
| 206 /// storing the data values in a list. | 235 /// storing the data values in a list. |
| 207 /// | 236 /// |
| 208 /// If an error occurs before `count` data events has | 237 /// If an error occurs before `count` data events has |
| 209 /// been collected, the returned future completes with | 238 /// been collected, the returned future completes with |
| 210 /// that error instead. | 239 /// that error instead. |
| 211 /// | 240 /// |
| 212 /// If the stream closes before `count` data events, | 241 /// If the stream closes before `count` data events, |
| 213 /// the returned future completes with the list | 242 /// the returned future completes with the list |
| 214 /// of data collected so far. That is, the returned | 243 /// of data collected so far. That is, the returned |
| 215 /// list may have fewer than [count] elements. | 244 /// list may have fewer than [count] elements. |
| 216 Future<List<T>> take(int count) { | 245 Future<List<T>> take(int count) { |
| 217 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | 246 RangeError.checkNotNegative(count, "count"); |
| 218 if (!_isClosed) { | 247 if (!_isClosed) { |
| 219 var request = new _TakeRequest<T>(count); | 248 var request = new _TakeRequest<T>(count); |
| 220 _addRequest(request); | 249 _addRequest(request); |
| 221 return request.future; | 250 return request.future; |
| 222 } | 251 } |
| 223 throw _failClosed(); | 252 throw _failClosed(); |
| 224 } | 253 } |
| 225 | 254 |
| 226 /// Requests a transaction that can conditionally consume events. | 255 /// Requests a transaction that can conditionally consume events. |
| 227 /// | 256 /// |
| (...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 345 /// all previously requested events have been processed, then it cancels the | 374 /// all previously requested events have been processed, then it cancels the |
| 346 /// subscription providing the events. | 375 /// subscription providing the events. |
| 347 /// | 376 /// |
| 348 /// If [immediate] is `true`, the source is instead canceled | 377 /// If [immediate] is `true`, the source is instead canceled |
| 349 /// immediately. Any pending events are completed as though the underlying | 378 /// immediately. Any pending events are completed as though the underlying |
| 350 /// stream had closed. | 379 /// stream had closed. |
| 351 /// | 380 /// |
| 352 /// The returned future completes with the result of calling | 381 /// The returned future completes with the result of calling |
| 353 /// `cancel`. | 382 /// `cancel`. |
| 354 /// | 383 /// |
| 355 /// After calling `cancel`, no further events can be requested. | 384 /// After calling `cancel`, no further events can be requested, |
| 356 /// None of [next], [rest], [skip], [take] or [cancel] may be | 385 /// so methods like [next] or [peek] may not be called again. |
| 357 /// called again. | |
| 358 Future cancel({bool immediate: false}) { | 386 Future cancel({bool immediate: false}) { |
| 359 if (_isClosed) throw _failClosed(); | 387 if (_isClosed) throw _failClosed(); |
| 360 _isClosed = true; | 388 _isClosed = true; |
| 361 | 389 |
| 362 if (!immediate) { | 390 if (!immediate) { |
| 363 var request = new _CancelRequest(this); | 391 var request = new _CancelRequest(this); |
| 364 _addRequest(request); | 392 _addRequest(request); |
| 365 return request.future; | 393 return request.future; |
| 366 } | 394 } |
| 367 | 395 |
| (...skipping 317 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 685 _NextRequest(); | 713 _NextRequest(); |
| 686 | 714 |
| 687 Future<T> get future => _completer.future; | 715 Future<T> get future => _completer.future; |
| 688 | 716 |
| 689 bool update(QueueList<Result<T>> events, bool isDone) { | 717 bool update(QueueList<Result<T>> events, bool isDone) { |
| 690 if (events.isNotEmpty) { | 718 if (events.isNotEmpty) { |
| 691 events.removeFirst().complete(_completer); | 719 events.removeFirst().complete(_completer); |
| 692 return true; | 720 return true; |
| 693 } | 721 } |
| 694 if (isDone) { | 722 if (isDone) { |
| 695 var errorFuture = | 723 _completer.completeError(new StateError("No elements"), |
| 696 new Future.sync(() => throw new StateError("No elements")); | 724 StackTrace.current); |
| 697 _completer.complete(errorFuture); | |
| 698 return true; | 725 return true; |
| 699 } | 726 } |
| 700 return false; | 727 return false; |
| 701 } | 728 } |
| 702 } | 729 } |
| 703 | 730 |
| 731 |
| 732 /// Request for a [StreamQueue.peek] call. |
| 733 /// |
| 734 /// Completes the returned future when receiving the first event, |
| 735 /// and is then complete, but doesn't consume the event. |
| 736 class _PeekRequest<T> implements _EventRequest<T> { |
| 737 /// Completer for the future returned by [StreamQueue.next]. |
| 738 final _completer = new Completer<T>(); |
| 739 |
| 740 _PeekRequest(); |
| 741 |
| 742 Future<T> get future => _completer.future; |
| 743 |
| 744 bool update(QueueList<Result<T>> events, bool isDone) { |
| 745 if (events.isNotEmpty) { |
| 746 events.first.complete(_completer); |
| 747 return true; |
| 748 } |
| 749 if (isDone) { |
| 750 _completer.completeError(new StateError("No elements"), |
| 751 StackTrace.current); |
| 752 return true; |
| 753 } |
| 754 return false; |
| 755 } |
| 756 } |
| 757 |
| 758 |
| 704 /// Request for a [StreamQueue.skip] call. | 759 /// Request for a [StreamQueue.skip] call. |
| 705 class _SkipRequest<T> implements _EventRequest<T> { | 760 class _SkipRequest<T> implements _EventRequest<T> { |
| 706 /// Completer for the future returned by the skip call. | 761 /// Completer for the future returned by the skip call. |
| 707 final _completer = new Completer<int>(); | 762 final _completer = new Completer<int>(); |
| 708 | 763 |
| 709 /// Number of remaining events to skip. | 764 /// Number of remaining events to skip. |
| 710 /// | 765 /// |
| 711 /// The request [isComplete] when the values reaches zero. | 766 /// The request [isComplete] when the values reaches zero. |
| 712 /// | 767 /// |
| 713 /// Decremented when an event is seen. | 768 /// Decremented when an event is seen. |
| (...skipping 17 matching lines...) Expand all Loading... |
| 731 if (event.isError) { | 786 if (event.isError) { |
| 732 _completer.completeError(event.asError.error, event.asError.stackTrace); | 787 _completer.completeError(event.asError.error, event.asError.stackTrace); |
| 733 return true; | 788 return true; |
| 734 } | 789 } |
| 735 } | 790 } |
| 736 _completer.complete(_eventsToSkip); | 791 _completer.complete(_eventsToSkip); |
| 737 return true; | 792 return true; |
| 738 } | 793 } |
| 739 } | 794 } |
| 740 | 795 |
| 741 /// Request for a [StreamQueue.take] call. | 796 /// Common superclass for [_TakeRequest] and [_LookAheadRequest]. |
| 742 class _TakeRequest<T> implements _EventRequest<T> { | 797 abstract class _ListRequest<T> implements _EventRequest<T> { |
| 743 /// Completer for the future returned by the take call. | 798 /// Completer for the future returned by the take call. |
| 744 final _completer = new Completer<List<T>>(); | 799 final _completer = new Completer<List<T>>(); |
| 745 | 800 |
| 746 /// List collecting events until enough have been seen. | 801 /// List collecting events until enough have been seen. |
| 747 final _list = <T>[]; | 802 final _list = <T>[]; |
| 748 | 803 |
| 749 /// Number of events to capture. | 804 /// Number of events to capture. |
| 750 /// | 805 /// |
| 751 /// The request [isComplete] when the length of [_list] reaches | 806 /// The request [isComplete] when the length of [_list] reaches |
| 752 /// this value. | 807 /// this value. |
| 753 final int _eventsToTake; | 808 final int _eventsToTake; |
| 754 | 809 |
| 755 _TakeRequest(this._eventsToTake); | 810 _ListRequest(this._eventsToTake); |
| 756 | 811 |
| 757 /// The future completed when the correct number of events have been captured. | 812 /// The future completed when the correct number of events have been captured. |
| 758 Future<List<T>> get future => _completer.future; | 813 Future<List<T>> get future => _completer.future; |
| 814 } |
| 815 |
| 816 |
| 817 /// Request for a [StreamQueue.take] call. |
| 818 class _TakeRequest<T> extends _ListRequest<T> { |
| 819 _TakeRequest(int eventsToTake) : super(eventsToTake); |
| 759 | 820 |
| 760 bool update(QueueList<Result<T>> events, bool isDone) { | 821 bool update(QueueList<Result<T>> events, bool isDone) { |
| 761 while (_list.length < _eventsToTake) { | 822 while (_list.length < _eventsToTake) { |
| 762 if (events.isEmpty) { | 823 if (events.isEmpty) { |
| 763 if (isDone) break; | 824 if (isDone) break; |
| 764 return false; | 825 return false; |
| 765 } | 826 } |
| 766 | 827 |
| 767 var event = events.removeFirst(); | 828 var event = events.removeFirst(); |
| 768 if (event.isError) { | 829 if (event.isError) { |
| 769 _completer.completeError(event.asError.error, event.asError.stackTrace); | 830 event.asError.complete(_completer); |
| 770 return true; | 831 return true; |
| 771 } | 832 } |
| 772 _list.add(event.asValue.value); | 833 _list.add(event.asValue.value); |
| 773 } | 834 } |
| 774 _completer.complete(_list); | 835 _completer.complete(_list); |
| 775 return true; | 836 return true; |
| 776 } | 837 } |
| 777 } | 838 } |
| 778 | 839 |
| 840 |
| 841 /// Request for a [StreamQueue.lookAhead] call. |
| 842 class _LookAheadRequest<T> extends _ListRequest<T> { |
| 843 _LookAheadRequest(int eventsToTake) : super(eventsToTake); |
| 844 |
| 845 bool update(QueueList<Result<T>> events, bool isDone) { |
| 846 while (_list.length < _eventsToTake) { |
| 847 if (events.length == _list.length) { |
| 848 if (isDone) break; |
| 849 return false; |
| 850 } |
| 851 var event = events.elementAt(_list.length); |
| 852 if (event.isError) { |
| 853 event.asError.complete(_completer); |
| 854 return true; |
| 855 } |
| 856 _list.add(event.asValue.value); |
| 857 } |
| 858 _completer.complete(_list); |
| 859 return true; |
| 860 } |
| 861 } |
| 862 |
| 863 |
| 779 /// Request for a [StreamQueue.cancel] call. | 864 /// Request for a [StreamQueue.cancel] call. |
| 780 /// | 865 /// |
| 781 /// The request needs no events, it just waits in the request queue | 866 /// The request needs no events, it just waits in the request queue |
| 782 /// until all previous events are fulfilled, then it cancels the stream queue | 867 /// until all previous events are fulfilled, then it cancels the stream queue |
| 783 /// source subscription. | 868 /// source subscription. |
| 784 class _CancelRequest<T> implements _EventRequest<T> { | 869 class _CancelRequest<T> implements _EventRequest<T> { |
| 785 /// Completer for the future returned by the `cancel` call. | 870 /// Completer for the future returned by the `cancel` call. |
| 786 final _completer = new Completer(); | 871 final _completer = new Completer(); |
| 787 /// | 872 /// |
| 788 /// When the event is completed, it needs to cancel the active subscription | 873 /// When the event is completed, it needs to cancel the active subscription |
| (...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 893 } | 978 } |
| 894 | 979 |
| 895 bool update(QueueList<Result<T>> events, bool isDone) { | 980 bool update(QueueList<Result<T>> events, bool isDone) { |
| 896 while (_eventsSent < events.length) { | 981 while (_eventsSent < events.length) { |
| 897 events[_eventsSent++].addTo(_controller); | 982 events[_eventsSent++].addTo(_controller); |
| 898 } | 983 } |
| 899 if (isDone && !_controller.isClosed) _controller.close(); | 984 if (isDone && !_controller.isClosed) _controller.close(); |
| 900 return false; | 985 return false; |
| 901 } | 986 } |
| 902 } | 987 } |
| OLD | NEW |