Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(720)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 2149893002: Make StreamIterator not delay pausing between requests. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace); 10 void _addError(Object error, StackTrace stackTrace);
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
79 * * an error is sent, and [cancelOnError] is true, or 79 * * an error is sent, and [cancelOnError] is true, or
80 * * a done event is sent. 80 * * a done event is sent.
81 * 81 *
82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the 82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the
83 * state is unset, and no furher events must be delivered. 83 * state is unset, and no furher events must be delivered.
84 */ 84 */
85 static const int _STATE_WAIT_FOR_CANCEL = 16; 85 static const int _STATE_WAIT_FOR_CANCEL = 16;
86 static const int _STATE_IN_CALLBACK = 32; 86 static const int _STATE_IN_CALLBACK = 32;
87 static const int _STATE_HAS_PENDING = 64; 87 static const int _STATE_HAS_PENDING = 64;
88 static const int _STATE_PAUSE_COUNT = 128; 88 static const int _STATE_PAUSE_COUNT = 128;
89 static const int _STATE_PAUSE_COUNT_SHIFT = 7;
90 89
91 /* Event handlers provided in constructor. */ 90 /* Event handlers provided in constructor. */
92 _DataHandler<T> _onData; 91 _DataHandler<T> _onData;
93 Function _onError; 92 Function _onError;
94 _DoneHandler _onDone; 93 _DoneHandler _onDone;
95 final Zone _zone = Zone.current; 94 final Zone _zone = Zone.current;
96 95
97 /** Bit vector based on state-constants above. */ 96 /** Bit vector based on state-constants above. */
98 int _state; 97 int _state;
99 98
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
225 void _cancel() { 224 void _cancel() {
226 _state |= _STATE_CANCELED; 225 _state |= _STATE_CANCELED;
227 if (_hasPending) { 226 if (_hasPending) {
228 _pending.cancelSchedule(); 227 _pending.cancelSchedule();
229 } 228 }
230 if (!_inCallback) _pending = null; 229 if (!_inCallback) _pending = null;
231 _cancelFuture = _onCancel(); 230 _cancelFuture = _onCancel();
232 } 231 }
233 232
234 /** 233 /**
235 * Increment the pause count.
236 *
237 * Also marks input as paused.
238 */
239 void _incrementPauseCount() {
240 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
241 }
242
243 /**
244 * Decrements the pause count. 234 * Decrements the pause count.
245 * 235 *
246 * Does not automatically unpause the input (call [_onResume]) when 236 * Does not automatically unpause the input (call [_onResume]) when
247 * the pause count reaches zero. This is handled elsewhere, and only 237 * the pause count reaches zero. This is handled elsewhere, and only
248 * if there are no pending events buffered. 238 * if there are no pending events buffered.
249 */ 239 */
250 void _decrementPauseCount() { 240 void _decrementPauseCount() {
251 assert(_isPaused); 241 assert(_isPaused);
252 _state -= _STATE_PAUSE_COUNT; 242 _state -= _STATE_PAUSE_COUNT;
253 } 243 }
(...skipping 452 matching lines...) Expand 10 before | Expand all | Expand 10 after
706 } 696 }
707 event.perform(dispatch); 697 event.perform(dispatch);
708 } 698 }
709 699
710 void clear() { 700 void clear() {
711 if (isScheduled) cancelSchedule(); 701 if (isScheduled) cancelSchedule();
712 firstPendingEvent = lastPendingEvent = null; 702 firstPendingEvent = lastPendingEvent = null;
713 } 703 }
714 } 704 }
715 705
716 class _BroadcastLinkedList {
717 _BroadcastLinkedList _next;
718 _BroadcastLinkedList _previous;
719
720 void _unlink() {
721 _previous._next = _next;
722 _next._previous = _previous;
723 _next = _previous = this;
724 }
725
726 void _insertBefore(_BroadcastLinkedList newNext) {
727 _BroadcastLinkedList newPrevious = newNext._previous;
728 newPrevious._next = this;
729 newNext._previous = _previous;
730 _previous._next = newNext;
731 _previous = newPrevious;
732 }
733 }
734
735 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); 706 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);
736 707
737 /** 708 /**
738 * Done subscription that will send one done event as soon as possible. 709 * Done subscription that will send one done event as soon as possible.
739 */ 710 */
740 class _DoneStreamSubscription<T> implements StreamSubscription<T> { 711 class _DoneStreamSubscription<T> implements StreamSubscription<T> {
741 static const int _DONE_SENT = 1; 712 static const int _DONE_SENT = 1;
742 static const int _SCHEDULED = 2; 713 static const int _SCHEDULED = 2;
743 static const int _PAUSED = 4; 714 static const int _PAUSED = 4;
744 715
(...skipping 180 matching lines...) Expand 10 before | Expand all | Expand 10 after
925 896
926 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 897 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
927 throw new UnsupportedError( 898 throw new UnsupportedError(
928 "Cannot change handlers of asBroadcastStream source subscription."); 899 "Cannot change handlers of asBroadcastStream source subscription.");
929 } 900 }
930 } 901 }
931 902
932 903
933 /** 904 /**
934 * Simple implementation of [StreamIterator]. 905 * Simple implementation of [StreamIterator].
906 *
907 * Pauses the stream between calles to [moveNext].
935 */ 908 */
936 class _StreamIteratorImpl<T> implements StreamIterator<T> { 909 class _StreamIterator<T> implements StreamIterator<T> {
937 // Internal state of the stream iterator. 910 // The stream iterator is always in one of four states.
938 // At any time, it is in one of these states. 911 // The value of the [_stateData] field depends on the state.
939 // The interpretation of the [_futureOrPrefecth] field depends on the state. 912 //
940 // In _STATE_MOVING, the _data field holds the most recently returned 913 // When `_subscription == null` and `_stateData != null`:
941 // future. 914 // The stream iterator has been created, but [moveNext] has not been called
942 // When in one of the _STATE_EXTRA_* states, the it may hold the 915 // yet. The [_stateData] field contains the stream to listen to on the first
943 // next data/error object, and the subscription is paused. 916 // call to [moveNext] and [current] returns `null`.
944 917 //
945 /// The simple state where [_data] holds the data to return, and [moveNext] 918 // When `_subscription != null` and `!_subscription.isPaused`:
946 /// is allowed. The subscription is actively listening. 919 // The user has called [moveNext] and the iterator is waiting for the next
947 static const int _STATE_FOUND = 0; 920 // event. The [_stateData] field contains the [_Future] returned by the
948 /// State set after [moveNext] has returned false or an error, 921 // [_moveNext] call and [current] returns `null.`
949 /// or after calling [cancel]. The subscription is always canceled. 922 //
950 static const int _STATE_DONE = 1; 923 // When `_subscription != null` and `_subscription.isPaused`:
951 /// State set after calling [moveNext], but before its returned future has 924 // The most recent call to [moveNext] has completed with a `true` value
952 /// completed. Calling [moveNext] again is not allowed in this state. 925 // and [current] provides the value of the data event.
953 /// The subscription is actively listening. 926 // The [_stateData] field contains the [current] value.
954 static const int _STATE_MOVING = 2; 927 //
955 /// States set when another event occurs while in _STATE_FOUND. 928 // When `_subscription == null` and `_stateData == null`:
956 /// This extra overflow event is cached until the next call to [moveNext], 929 // The stream has completed or been canceled using [cancel].
957 /// which will complete as if it received the event normally. 930 // The stream completes on either a done event or an error event.
958 /// The subscription is paused in these states, so we only ever get one 931 // The last call to [moveNext] has completed with `false` and [current]
959 /// event too many. 932 // returns `null`.
960 static const int _STATE_EXTRA_DATA = 3;
961 static const int _STATE_EXTRA_ERROR = 4;
962 static const int _STATE_EXTRA_DONE = 5;
963 933
964 /// Subscription being listened to. 934 /// Subscription being listened to.
935 ///
936 /// Set to `null` when the stream subscription is done or canceled.
965 StreamSubscription _subscription; 937 StreamSubscription _subscription;
966 938
967 /// The current element represented by the most recent call to moveNext. 939 /// Data value depending on the current state.
968 /// 940 ///
969 /// Is null between the time moveNext is called and its future completes. 941 /// Before first call to [moveNext]: The stream to listen to.
970 T _current = null; 942 ///
943 /// After calling [moveNext] but before the returned future completes:
944 /// The returned future.
945 ///
946 /// After calling [moveNext] and the returned future has completed
947 /// with `true`: The value of [current].
948 ///
949 /// After calling [moveNext] and the returned future has completed
950 /// with `false`, or after calling [cancel]: `null`.
951 Object _stateData;
971 952
972 /// The future returned by the most recent call to [moveNext]. 953 _StreamIterator(final Stream<T> stream) : _stateData = stream;
973 ///
974 /// Also used to store the next value/error in case the stream provides an
975 /// event before [moveNext] is called again. In that case, the stream will
976 /// be paused to prevent further events.
977 var/*Future<bool> or T*/ _futureOrPrefetch = null;
978 954
979 /// The current state. 955 T get current {
980 int _state = _STATE_FOUND; 956 if (_subscription != null && _subscription.isPaused) {
981 957 return _stateData;
floitsch 2016/07/14 13:29:25 Please make sure that this is still strong-mode cl
982 _StreamIteratorImpl(final Stream<T> stream) { 958 }
983 _subscription = stream.listen(_onData, 959 return null;
984 onError: _onError,
985 onDone: _onDone,
986 cancelOnError: true);
987 } 960 }
988 961
989 T get current => _current;
990
991 Future<bool> moveNext() { 962 Future<bool> moveNext() {
992 if (_state == _STATE_DONE) { 963 if (_subscription != null) {
993 return new _Future<bool>.immediate(false); 964 if (_subscription.isPaused) {
994 } 965 var future = new _Future<bool>();
995 if (_state == _STATE_MOVING) { 966 _stateData = future;
967 _subscription.resume();
968 return future;
969 }
996 throw new StateError("Already waiting for next."); 970 throw new StateError("Already waiting for next.");
997 } 971 }
998 if (_state == _STATE_FOUND) { 972 return _initializeOrDone();
999 _state = _STATE_MOVING;
1000 _current = null;
1001 var result = new _Future<bool>();
1002 _futureOrPrefetch = result;
1003 return result;
1004 } else {
1005 assert(_state >= _STATE_EXTRA_DATA);
1006 switch (_state) {
1007 case _STATE_EXTRA_DATA:
1008 _state = _STATE_FOUND;
1009 _current = _futureOrPrefetch as Object /*=T*/;
1010 _futureOrPrefetch = null;
1011 _subscription.resume();
1012 return new _Future<bool>.immediate(true);
1013 case _STATE_EXTRA_ERROR:
1014 AsyncError prefetch = _futureOrPrefetch;
1015 _clear();
1016 return new _Future<bool>.immediateError(prefetch.error,
1017 prefetch.stackTrace);
1018 case _STATE_EXTRA_DONE:
1019 _clear();
1020 return new _Future<bool>.immediate(false);
1021 }
1022 }
1023 } 973 }
1024 974
1025 /** Clears up the internal state when the iterator ends. */ 975 /// Called if there is no active subscription when [moveNext] is called.
1026 void _clear() { 976 ///
1027 _subscription = null; 977 /// Either starts listening on the stream if this is the first call to
1028 _futureOrPrefetch = null; 978 /// [moveNext], or returns a `false` future because the stream has already
1029 _current = null; 979 /// ended.
1030 _state = _STATE_DONE; 980 Future<bool> _initializeOrDone() {
981 assert(_subscription == null);
982 var stateData = _stateData;
983 if (stateData != null) {
984 Stream<T> stream = stateData;
985 _subscription = stream.listen(
986 _onData, onError: _onError, onDone: _onDone, cancelOnError: true);
987 var future = new _Future<bool>();
988 _stateData = future;
989 return future;
990 }
991 return new _Future.immediate(false);
1031 } 992 }
1032 993
1033 Future cancel() { 994 Future cancel() {
1034 StreamSubscription subscription = _subscription; 995 StreamSubscription<T> subscription = _subscription;
1035 if (subscription == null) return null; 996 Object stateData = _stateData;
1036 if (_state == _STATE_MOVING) { 997 _stateData = null;
1037 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; 998 if (subscription != null) {
1038 _clear(); 999 _subscription = null;
1039 hasNext._complete(false); 1000 if (!subscription.isPaused) {
1040 } else { 1001 _Future<bool> future = stateData;
1041 _clear(); 1002 future._asyncComplete(false);
1003 }
1004 return subscription.cancel();
1042 } 1005 }
1043 return subscription.cancel(); 1006 return null;
1044 } 1007 }
1045 1008
1046 void _onData(T data) { 1009 void _onData(T data) {
1047 if (_state == _STATE_MOVING) { 1010 assert(_subscription != null && !_subscription.isPaused);
1048 _current = data; 1011 _Future<bool> moveNextFuture = _stateData;
1049 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; 1012 _stateData = data;
1050 _futureOrPrefetch = null;
1051 _state = _STATE_FOUND;
1052 hasNext._complete(true);
1053 return;
1054 }
1055 _subscription.pause(); 1013 _subscription.pause();
1056 assert(_futureOrPrefetch == null); 1014 moveNextFuture._complete(true);
1057 _futureOrPrefetch = data;
1058 _state = _STATE_EXTRA_DATA;
1059 } 1015 }
1060 1016
1061 void _onError(Object error, [StackTrace stackTrace]) { 1017 void _onError(Object error, [StackTrace stackTrace]) {
1062 if (_state == _STATE_MOVING) { 1018 assert(_subscription != null && !_subscription.isPaused);
1063 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; 1019 _Future<bool> moveNextFuture = _stateData;
1064 // We have cancelOnError: true, so the subscription is canceled. 1020 _subscription = null;
1065 _clear(); 1021 _stateData = null;
1066 hasNext._completeError(error, stackTrace); 1022 moveNextFuture._completeError(error, stackTrace);
1067 return;
1068 }
1069 _subscription.pause();
1070 assert(_futureOrPrefetch == null);
1071 _futureOrPrefetch = new AsyncError(error, stackTrace);
1072 _state = _STATE_EXTRA_ERROR;
1073 } 1023 }
1074 1024
1075 void _onDone() { 1025 void _onDone() {
1076 if (_state == _STATE_MOVING) { 1026 assert(_subscription != null && !_subscription.isPaused);
1077 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; 1027 _Future<bool> moveNextFuture = _stateData;
1078 _clear(); 1028 _subscription = null;
1079 hasNext._complete(false); 1029 _stateData = null;
1080 return; 1030 moveNextFuture._complete(false);
1081 }
1082 _subscription.pause();
1083 _futureOrPrefetch = null;
1084 _state = _STATE_EXTRA_DONE;
1085 } 1031 }
1086 } 1032 }
1087 1033
1088 /** An empty broadcast stream, sending a done event as soon as possible. */ 1034 /** An empty broadcast stream, sending a done event as soon as possible. */
1089 class _EmptyStream<T> extends Stream<T> { 1035 class _EmptyStream<T> extends Stream<T> {
1090 const _EmptyStream() : super._internal(); 1036 const _EmptyStream() : super._internal();
1091 bool get isBroadcast => true; 1037 bool get isBroadcast => true;
1092 StreamSubscription<T> listen(void onData(T data), 1038 StreamSubscription<T> listen(void onData(T data),
1093 {Function onError, 1039 {Function onError,
1094 void onDone(), 1040 void onDone(),
1095 bool cancelOnError}) { 1041 bool cancelOnError}) {
1096 return new _DoneStreamSubscription<T>(onDone); 1042 return new _DoneStreamSubscription<T>(onDone);
1097 } 1043 }
1098 } 1044 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698