Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(151)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 2149893002: Make StreamIterator not delay pausing between requests. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Address comment. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | tests/language/async_star_pause_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace); 10 void _addError(Object error, StackTrace stackTrace);
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
79 * * an error is sent, and [cancelOnError] is true, or 79 * * an error is sent, and [cancelOnError] is true, or
80 * * a done event is sent. 80 * * a done event is sent.
81 * 81 *
82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the 82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the
83 * state is unset, and no furher events must be delivered. 83 * state is unset, and no furher events must be delivered.
84 */ 84 */
85 static const int _STATE_WAIT_FOR_CANCEL = 16; 85 static const int _STATE_WAIT_FOR_CANCEL = 16;
86 static const int _STATE_IN_CALLBACK = 32; 86 static const int _STATE_IN_CALLBACK = 32;
87 static const int _STATE_HAS_PENDING = 64; 87 static const int _STATE_HAS_PENDING = 64;
88 static const int _STATE_PAUSE_COUNT = 128; 88 static const int _STATE_PAUSE_COUNT = 128;
89 static const int _STATE_PAUSE_COUNT_SHIFT = 7;
90 89
91 /* Event handlers provided in constructor. */ 90 /* Event handlers provided in constructor. */
92 _DataHandler<T> _onData; 91 _DataHandler<T> _onData;
93 Function _onError; 92 Function _onError;
94 _DoneHandler _onDone; 93 _DoneHandler _onDone;
95 final Zone _zone = Zone.current; 94 final Zone _zone = Zone.current;
96 95
97 /** Bit vector based on state-constants above. */ 96 /** Bit vector based on state-constants above. */
98 int _state; 97 int _state;
99 98
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after
232 void _cancel() { 231 void _cancel() {
233 _state |= _STATE_CANCELED; 232 _state |= _STATE_CANCELED;
234 if (_hasPending) { 233 if (_hasPending) {
235 _pending.cancelSchedule(); 234 _pending.cancelSchedule();
236 } 235 }
237 if (!_inCallback) _pending = null; 236 if (!_inCallback) _pending = null;
238 _cancelFuture = _onCancel(); 237 _cancelFuture = _onCancel();
239 } 238 }
240 239
241 /** 240 /**
242 * Increment the pause count.
243 *
244 * Also marks input as paused.
245 */
246 void _incrementPauseCount() {
247 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
248 }
249
250 /**
251 * Decrements the pause count. 241 * Decrements the pause count.
252 * 242 *
253 * Does not automatically unpause the input (call [_onResume]) when 243 * Does not automatically unpause the input (call [_onResume]) when
254 * the pause count reaches zero. This is handled elsewhere, and only 244 * the pause count reaches zero. This is handled elsewhere, and only
255 * if there are no pending events buffered. 245 * if there are no pending events buffered.
256 */ 246 */
257 void _decrementPauseCount() { 247 void _decrementPauseCount() {
258 assert(_isPaused); 248 assert(_isPaused);
259 _state -= _STATE_PAUSE_COUNT; 249 _state -= _STATE_PAUSE_COUNT;
260 } 250 }
(...skipping 454 matching lines...) Expand 10 before | Expand all | Expand 10 after
715 } 705 }
716 event.perform(dispatch); 706 event.perform(dispatch);
717 } 707 }
718 708
719 void clear() { 709 void clear() {
720 if (isScheduled) cancelSchedule(); 710 if (isScheduled) cancelSchedule();
721 firstPendingEvent = lastPendingEvent = null; 711 firstPendingEvent = lastPendingEvent = null;
722 } 712 }
723 } 713 }
724 714
725 class _BroadcastLinkedList {
726 _BroadcastLinkedList _next;
727 _BroadcastLinkedList _previous;
728
729 void _unlink() {
730 _previous._next = _next;
731 _next._previous = _previous;
732 _next = _previous = this;
733 }
734
735 void _insertBefore(_BroadcastLinkedList newNext) {
736 _BroadcastLinkedList newPrevious = newNext._previous;
737 newPrevious._next = this;
738 newNext._previous = _previous;
739 _previous._next = newNext;
740 _previous = newPrevious;
741 }
742 }
743
744 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); 715 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);
745 716
746 /** 717 /**
747 * Done subscription that will send one done event as soon as possible. 718 * Done subscription that will send one done event as soon as possible.
748 */ 719 */
749 class _DoneStreamSubscription<T> implements StreamSubscription<T> { 720 class _DoneStreamSubscription<T> implements StreamSubscription<T> {
750 static const int _DONE_SENT = 1; 721 static const int _DONE_SENT = 1;
751 static const int _SCHEDULED = 2; 722 static const int _SCHEDULED = 2;
752 static const int _PAUSED = 4; 723 static const int _PAUSED = 4;
753 724
(...skipping 180 matching lines...) Expand 10 before | Expand all | Expand 10 after
934 905
935 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 906 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
936 throw new UnsupportedError( 907 throw new UnsupportedError(
937 "Cannot change handlers of asBroadcastStream source subscription."); 908 "Cannot change handlers of asBroadcastStream source subscription.");
938 } 909 }
939 } 910 }
940 911
941 912
942 /** 913 /**
943 * Simple implementation of [StreamIterator]. 914 * Simple implementation of [StreamIterator].
915 *
916 * Pauses the stream between calls to [moveNext].
944 */ 917 */
945 class _StreamIteratorImpl<T> implements StreamIterator<T> { 918 class _StreamIterator<T> implements StreamIterator<T> {
946 // Internal state of the stream iterator. 919 // The stream iterator is always in one of four states.
947 // At any time, it is in one of these states. 920 // The value of the [_stateData] field depends on the state.
948 // The interpretation of the [_futureOrPrefecth] field depends on the state. 921 //
949 // In _STATE_MOVING, the _data field holds the most recently returned 922 // When `_subscription == null` and `_stateData != null`:
950 // future. 923 // The stream iterator has been created, but [moveNext] has not been called
951 // When in one of the _STATE_EXTRA_* states, the it may hold the 924 // yet. The [_stateData] field contains the stream to listen to on the first
952 // next data/error object, and the subscription is paused. 925 // call to [moveNext] and [current] returns `null`.
953 926 //
954 /// The simple state where [_data] holds the data to return, and [moveNext] 927 // When `_subscription != null` and `!_isPaused`:
955 /// is allowed. The subscription is actively listening. 928 // The user has called [moveNext] and the iterator is waiting for the next
956 static const int _STATE_FOUND = 0; 929 // event. The [_stateData] field contains the [_Future] returned by the
957 /// State set after [moveNext] has returned false or an error, 930 // [_moveNext] call and [current] returns `null.`
958 /// or after calling [cancel]. The subscription is always canceled. 931 //
959 static const int _STATE_DONE = 1; 932 // When `_subscription != null` and `_isPaused`:
960 /// State set after calling [moveNext], but before its returned future has 933 // The most recent call to [moveNext] has completed with a `true` value
961 /// completed. Calling [moveNext] again is not allowed in this state. 934 // and [current] provides the value of the data event.
962 /// The subscription is actively listening. 935 // The [_stateData] field contains the [current] value.
963 static const int _STATE_MOVING = 2; 936 //
964 /// States set when another event occurs while in _STATE_FOUND. 937 // When `_subscription == null` and `_stateData == null`:
965 /// This extra overflow event is cached until the next call to [moveNext], 938 // The stream has completed or been canceled using [cancel].
966 /// which will complete as if it received the event normally. 939 // The stream completes on either a done event or an error event.
967 /// The subscription is paused in these states, so we only ever get one 940 // The last call to [moveNext] has completed with `false` and [current]
968 /// event too many. 941 // returns `null`.
969 static const int _STATE_EXTRA_DATA = 3;
970 static const int _STATE_EXTRA_ERROR = 4;
971 static const int _STATE_EXTRA_DONE = 5;
972 942
973 /// Subscription being listened to. 943 /// Subscription being listened to.
944 ///
945 /// Set to `null` when the stream subscription is done or canceled.
974 StreamSubscription _subscription; 946 StreamSubscription _subscription;
975 947
976 /// The current element represented by the most recent call to moveNext. 948 /// Data value depending on the current state.
977 /// 949 ///
978 /// Is null between the time moveNext is called and its future completes. 950 /// Before first call to [moveNext]: The stream to listen to.
979 T _current = null; 951 ///
952 /// After calling [moveNext] but before the returned future completes:
953 /// The returned future.
954 ///
955 /// After calling [moveNext] and the returned future has completed
956 /// with `true`: The value of [current].
957 ///
958 /// After calling [moveNext] and the returned future has completed
959 /// with `false`, or after calling [cancel]: `null`.
960 Object _stateData;
980 961
981 /// The future returned by the most recent call to [moveNext]. 962 /// Whether the iterator is between calls to `moveNext`.
982 /// 963 /// This will usually cause the [_subscription] to be paused, but as an
983 /// Also used to store the next value/error in case the stream provides an 964 /// optimization, we only pause after the [moveNext] future has been
984 /// event before [moveNext] is called again. In that case, the stream will 965 /// completed.
985 /// be paused to prevent further events. 966 bool _isPaused = false;
986 var/*Future<bool> or T*/ _futureOrPrefetch = null;
987 967
988 /// The current state. 968 _StreamIterator(final Stream<T> stream) : _stateData = stream;
989 int _state = _STATE_FOUND;
990 969
991 _StreamIteratorImpl(final Stream<T> stream) { 970 T get current {
992 _subscription = stream.listen(_onData, 971 if (_subscription != null && _isPaused) {
993 onError: _onError, 972 return _stateData as Object /*=T*/;
994 onDone: _onDone, 973 }
995 cancelOnError: true); 974 return null;
996 } 975 }
997 976
998 T get current => _current;
999
1000 Future<bool> moveNext() { 977 Future<bool> moveNext() {
1001 if (_state == _STATE_DONE) { 978 if (_subscription != null) {
1002 return new _Future<bool>.immediate(false); 979 if (_isPaused) {
1003 } 980 var future = new _Future<bool>();
1004 if (_state == _STATE_MOVING) { 981 _stateData = future;
982 _isPaused = false;
983 _subscription.resume();
984 return future;
985 }
1005 throw new StateError("Already waiting for next."); 986 throw new StateError("Already waiting for next.");
1006 } 987 }
1007 if (_state == _STATE_FOUND) { 988 return _initializeOrDone();
1008 _state = _STATE_MOVING;
1009 _current = null;
1010 var result = new _Future<bool>();
1011 _futureOrPrefetch = result;
1012 return result;
1013 } else {
1014 assert(_state >= _STATE_EXTRA_DATA);
1015 switch (_state) {
1016 case _STATE_EXTRA_DATA:
1017 _state = _STATE_FOUND;
1018 _current = _futureOrPrefetch as Object /*=T*/;
1019 _futureOrPrefetch = null;
1020 _subscription.resume();
1021 return new _Future<bool>.immediate(true);
1022 case _STATE_EXTRA_ERROR:
1023 AsyncError prefetch = _futureOrPrefetch;
1024 _clear();
1025 return new _Future<bool>.immediateError(prefetch.error,
1026 prefetch.stackTrace);
1027 case _STATE_EXTRA_DONE:
1028 _clear();
1029 return new _Future<bool>.immediate(false);
1030 }
1031 }
1032 } 989 }
1033 990
1034 /** Clears up the internal state when the iterator ends. */ 991 /// Called if there is no active subscription when [moveNext] is called.
1035 void _clear() { 992 ///
1036 _subscription = null; 993 /// Either starts listening on the stream if this is the first call to
1037 _futureOrPrefetch = null; 994 /// [moveNext], or returns a `false` future because the stream has already
1038 _current = null; 995 /// ended.
1039 _state = _STATE_DONE; 996 Future<bool> _initializeOrDone() {
997 assert(_subscription == null);
998 var stateData = _stateData;
999 if (stateData != null) {
1000 Stream<T> stream = stateData as Object /*=Stream<T>*/;
1001 _subscription = stream.listen(
1002 _onData, onError: _onError, onDone: _onDone, cancelOnError: true);
1003 var future = new _Future<bool>();
1004 _stateData = future;
1005 return future;
1006 }
1007 return new _Future<bool>.immediate(false);
1040 } 1008 }
1041 1009
1042 Future cancel() { 1010 Future cancel() {
1043 StreamSubscription subscription = _subscription; 1011 StreamSubscription<T> subscription = _subscription;
1044 if (subscription == null) return Future._nullFuture; 1012 Object stateData = _stateData;
1045 if (_state == _STATE_MOVING) { 1013 _stateData = null;
1046 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; 1014 if (subscription != null) {
1047 _clear(); 1015 _subscription = null;
1048 hasNext._complete(false); 1016 if (!_isPaused) {
1049 } else { 1017 _Future<bool> future = stateData as Object /*=_Future<bool>*/;
1050 _clear(); 1018 future._asyncComplete(false);
1019 }
1020 return subscription.cancel();
1051 } 1021 }
1052 return subscription.cancel(); 1022 return Future._nullFuture;
1053 } 1023 }
1054 1024
1055 void _onData(T data) { 1025 void _onData(T data) {
1056 if (_state == _STATE_MOVING) { 1026 assert(_subscription != null && !_isPaused);
1057 _current = data; 1027 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
1058 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; 1028 _stateData = data;
1059 _futureOrPrefetch = null; 1029 _isPaused = true;
1060 _state = _STATE_FOUND; 1030 moveNextFuture._complete(true);
1061 hasNext._complete(true); 1031 if (_subscription != null && _isPaused) _subscription.pause();
1062 return;
1063 }
1064 _subscription.pause();
1065 assert(_futureOrPrefetch == null);
1066 _futureOrPrefetch = data;
1067 _state = _STATE_EXTRA_DATA;
1068 } 1032 }
1069 1033
1070 void _onError(Object error, [StackTrace stackTrace]) { 1034 void _onError(Object error, [StackTrace stackTrace]) {
1071 if (_state == _STATE_MOVING) { 1035 assert(_subscription != null && !_isPaused);
1072 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; 1036 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
1073 // We have cancelOnError: true, so the subscription is canceled. 1037 _subscription = null;
1074 _clear(); 1038 _stateData = null;
1075 hasNext._completeError(error, stackTrace); 1039 moveNextFuture._completeError(error, stackTrace);
1076 return;
1077 }
1078 _subscription.pause();
1079 assert(_futureOrPrefetch == null);
1080 _futureOrPrefetch = new AsyncError(error, stackTrace);
1081 _state = _STATE_EXTRA_ERROR;
1082 } 1040 }
1083 1041
1084 void _onDone() { 1042 void _onDone() {
1085 if (_state == _STATE_MOVING) { 1043 assert(_subscription != null && !_isPaused);
1086 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; 1044 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
1087 _clear(); 1045 _subscription = null;
1088 hasNext._complete(false); 1046 _stateData = null;
1089 return; 1047 moveNextFuture._complete(false);
1090 }
1091 _subscription.pause();
1092 _futureOrPrefetch = null;
1093 _state = _STATE_EXTRA_DONE;
1094 } 1048 }
1095 } 1049 }
1096 1050
1097 /** An empty broadcast stream, sending a done event as soon as possible. */ 1051 /** An empty broadcast stream, sending a done event as soon as possible. */
1098 class _EmptyStream<T> extends Stream<T> { 1052 class _EmptyStream<T> extends Stream<T> {
1099 const _EmptyStream() : super._internal(); 1053 const _EmptyStream() : super._internal();
1100 bool get isBroadcast => true; 1054 bool get isBroadcast => true;
1101 StreamSubscription<T> listen(void onData(T data), 1055 StreamSubscription<T> listen(void onData(T data),
1102 {Function onError, 1056 {Function onError,
1103 void onDone(), 1057 void onDone(),
1104 bool cancelOnError}) { 1058 bool cancelOnError}) {
1105 return new _DoneStreamSubscription<T>(onDone); 1059 return new _DoneStreamSubscription<T>(onDone);
1106 } 1060 }
1107 } 1061 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | tests/language/async_star_pause_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698