OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
79 * * an error is sent, and [cancelOnError] is true, or | 79 * * an error is sent, and [cancelOnError] is true, or |
80 * * a done event is sent. | 80 * * a done event is sent. |
81 * | 81 * |
82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the | 82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the |
83 * state is unset, and no furher events must be delivered. | 83 * state is unset, and no furher events must be delivered. |
84 */ | 84 */ |
85 static const int _STATE_WAIT_FOR_CANCEL = 16; | 85 static const int _STATE_WAIT_FOR_CANCEL = 16; |
86 static const int _STATE_IN_CALLBACK = 32; | 86 static const int _STATE_IN_CALLBACK = 32; |
87 static const int _STATE_HAS_PENDING = 64; | 87 static const int _STATE_HAS_PENDING = 64; |
88 static const int _STATE_PAUSE_COUNT = 128; | 88 static const int _STATE_PAUSE_COUNT = 128; |
89 static const int _STATE_PAUSE_COUNT_SHIFT = 7; | |
90 | 89 |
91 /* Event handlers provided in constructor. */ | 90 /* Event handlers provided in constructor. */ |
92 _DataHandler<T> _onData; | 91 _DataHandler<T> _onData; |
93 Function _onError; | 92 Function _onError; |
94 _DoneHandler _onDone; | 93 _DoneHandler _onDone; |
95 final Zone _zone = Zone.current; | 94 final Zone _zone = Zone.current; |
96 | 95 |
97 /** Bit vector based on state-constants above. */ | 96 /** Bit vector based on state-constants above. */ |
98 int _state; | 97 int _state; |
99 | 98 |
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
225 void _cancel() { | 224 void _cancel() { |
226 _state |= _STATE_CANCELED; | 225 _state |= _STATE_CANCELED; |
227 if (_hasPending) { | 226 if (_hasPending) { |
228 _pending.cancelSchedule(); | 227 _pending.cancelSchedule(); |
229 } | 228 } |
230 if (!_inCallback) _pending = null; | 229 if (!_inCallback) _pending = null; |
231 _cancelFuture = _onCancel(); | 230 _cancelFuture = _onCancel(); |
232 } | 231 } |
233 | 232 |
234 /** | 233 /** |
235 * Increment the pause count. | |
236 * | |
237 * Also marks input as paused. | |
238 */ | |
239 void _incrementPauseCount() { | |
240 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
241 } | |
242 | |
243 /** | |
244 * Decrements the pause count. | 234 * Decrements the pause count. |
245 * | 235 * |
246 * Does not automatically unpause the input (call [_onResume]) when | 236 * Does not automatically unpause the input (call [_onResume]) when |
247 * the pause count reaches zero. This is handled elsewhere, and only | 237 * the pause count reaches zero. This is handled elsewhere, and only |
248 * if there are no pending events buffered. | 238 * if there are no pending events buffered. |
249 */ | 239 */ |
250 void _decrementPauseCount() { | 240 void _decrementPauseCount() { |
251 assert(_isPaused); | 241 assert(_isPaused); |
252 _state -= _STATE_PAUSE_COUNT; | 242 _state -= _STATE_PAUSE_COUNT; |
253 } | 243 } |
(...skipping 452 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
706 } | 696 } |
707 event.perform(dispatch); | 697 event.perform(dispatch); |
708 } | 698 } |
709 | 699 |
710 void clear() { | 700 void clear() { |
711 if (isScheduled) cancelSchedule(); | 701 if (isScheduled) cancelSchedule(); |
712 firstPendingEvent = lastPendingEvent = null; | 702 firstPendingEvent = lastPendingEvent = null; |
713 } | 703 } |
714 } | 704 } |
715 | 705 |
716 class _BroadcastLinkedList { | |
717 _BroadcastLinkedList _next; | |
718 _BroadcastLinkedList _previous; | |
719 | |
720 void _unlink() { | |
721 _previous._next = _next; | |
722 _next._previous = _previous; | |
723 _next = _previous = this; | |
724 } | |
725 | |
726 void _insertBefore(_BroadcastLinkedList newNext) { | |
727 _BroadcastLinkedList newPrevious = newNext._previous; | |
728 newPrevious._next = this; | |
729 newNext._previous = _previous; | |
730 _previous._next = newNext; | |
731 _previous = newPrevious; | |
732 } | |
733 } | |
734 | |
735 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); | 706 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); |
736 | 707 |
737 /** | 708 /** |
738 * Done subscription that will send one done event as soon as possible. | 709 * Done subscription that will send one done event as soon as possible. |
739 */ | 710 */ |
740 class _DoneStreamSubscription<T> implements StreamSubscription<T> { | 711 class _DoneStreamSubscription<T> implements StreamSubscription<T> { |
741 static const int _DONE_SENT = 1; | 712 static const int _DONE_SENT = 1; |
742 static const int _SCHEDULED = 2; | 713 static const int _SCHEDULED = 2; |
743 static const int _PAUSED = 4; | 714 static const int _PAUSED = 4; |
744 | 715 |
(...skipping 180 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
925 | 896 |
926 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { | 897 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
927 throw new UnsupportedError( | 898 throw new UnsupportedError( |
928 "Cannot change handlers of asBroadcastStream source subscription."); | 899 "Cannot change handlers of asBroadcastStream source subscription."); |
929 } | 900 } |
930 } | 901 } |
931 | 902 |
932 | 903 |
933 /** | 904 /** |
934 * Simple implementation of [StreamIterator]. | 905 * Simple implementation of [StreamIterator]. |
906 * | |
907 * Pauses the stream between calles to [moveNext]. | |
935 */ | 908 */ |
936 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 909 class _StreamIterator<T> implements StreamIterator<T> { |
937 // Internal state of the stream iterator. | 910 // The stream iterator is always in one of four states. |
938 // At any time, it is in one of these states. | 911 // The value of the [_stateData] field depends on the state. |
939 // The interpretation of the [_futureOrPrefecth] field depends on the state. | 912 // |
940 // In _STATE_MOVING, the _data field holds the most recently returned | 913 // When `_subscription == null` and `_stateData != null`: |
941 // future. | 914 // The stream iterator has been created, but [moveNext] has not been called |
942 // When in one of the _STATE_EXTRA_* states, the it may hold the | 915 // yet. The [_stateData] field contains the stream to listen to on the first |
943 // next data/error object, and the subscription is paused. | 916 // call to [moveNext] and [current] returns `null`. |
944 | 917 // |
945 /// The simple state where [_data] holds the data to return, and [moveNext] | 918 // When `_subscription != null` and `!_subscription.isPaused`: |
946 /// is allowed. The subscription is actively listening. | 919 // The user has called [moveNext] and the iterator is waiting for the next |
947 static const int _STATE_FOUND = 0; | 920 // event. The [_stateData] field contains the [_Future] returned by the |
948 /// State set after [moveNext] has returned false or an error, | 921 // [_moveNext] call and [current] returns `null.` |
949 /// or after calling [cancel]. The subscription is always canceled. | 922 // |
950 static const int _STATE_DONE = 1; | 923 // When `_subscription != null` and `_subscription.isPaused`: |
951 /// State set after calling [moveNext], but before its returned future has | 924 // The most recent call to [moveNext] has completed with a `true` value |
952 /// completed. Calling [moveNext] again is not allowed in this state. | 925 // and [current] provides the value of the data event. |
953 /// The subscription is actively listening. | 926 // The [_stateData] field contains the [current] value. |
954 static const int _STATE_MOVING = 2; | 927 // |
955 /// States set when another event occurs while in _STATE_FOUND. | 928 // When `_subscription == null` and `_stateData == null`: |
956 /// This extra overflow event is cached until the next call to [moveNext], | 929 // The stream has completed or been canceled using [cancel]. |
957 /// which will complete as if it received the event normally. | 930 // The stream completes on either a done event or an error event. |
958 /// The subscription is paused in these states, so we only ever get one | 931 // The last call to [moveNext] has completed with `false` and [current] |
959 /// event too many. | 932 // returns `null`. |
960 static const int _STATE_EXTRA_DATA = 3; | |
961 static const int _STATE_EXTRA_ERROR = 4; | |
962 static const int _STATE_EXTRA_DONE = 5; | |
963 | 933 |
964 /// Subscription being listened to. | 934 /// Subscription being listened to. |
935 /// | |
936 /// Set to `null` when the stream subscription is done or canceled. | |
965 StreamSubscription _subscription; | 937 StreamSubscription _subscription; |
966 | 938 |
967 /// The current element represented by the most recent call to moveNext. | 939 /// Data value depending on the current state. |
968 /// | 940 /// |
969 /// Is null between the time moveNext is called and its future completes. | 941 /// Before first call to [moveNext]: The stream to listen to. |
970 T _current = null; | 942 /// |
943 /// After calling [moveNext] but before the returned future completes: | |
944 /// The returned future. | |
945 /// | |
946 /// After calling [moveNext] and the returned future has completed | |
947 /// with `true`: The value of [current]. | |
948 /// | |
949 /// After calling [moveNext] and the returned future has completed | |
950 /// with `false`, or after calling [cancel]: `null`. | |
951 Object _stateData; | |
971 | 952 |
972 /// The future returned by the most recent call to [moveNext]. | 953 _StreamIterator(final Stream<T> stream) : _stateData = stream; |
973 /// | |
974 /// Also used to store the next value/error in case the stream provides an | |
975 /// event before [moveNext] is called again. In that case, the stream will | |
976 /// be paused to prevent further events. | |
977 var/*Future<bool> or T*/ _futureOrPrefetch = null; | |
978 | 954 |
979 /// The current state. | 955 T get current { |
980 int _state = _STATE_FOUND; | 956 if (_subscription != null && _subscription.isPaused) { |
981 | 957 return _stateData; |
floitsch
2016/07/14 13:29:25
Please make sure that this is still strong-mode cl
| |
982 _StreamIteratorImpl(final Stream<T> stream) { | 958 } |
983 _subscription = stream.listen(_onData, | 959 return null; |
984 onError: _onError, | |
985 onDone: _onDone, | |
986 cancelOnError: true); | |
987 } | 960 } |
988 | 961 |
989 T get current => _current; | |
990 | |
991 Future<bool> moveNext() { | 962 Future<bool> moveNext() { |
992 if (_state == _STATE_DONE) { | 963 if (_subscription != null) { |
993 return new _Future<bool>.immediate(false); | 964 if (_subscription.isPaused) { |
994 } | 965 var future = new _Future<bool>(); |
995 if (_state == _STATE_MOVING) { | 966 _stateData = future; |
967 _subscription.resume(); | |
968 return future; | |
969 } | |
996 throw new StateError("Already waiting for next."); | 970 throw new StateError("Already waiting for next."); |
997 } | 971 } |
998 if (_state == _STATE_FOUND) { | 972 return _initializeOrDone(); |
999 _state = _STATE_MOVING; | |
1000 _current = null; | |
1001 var result = new _Future<bool>(); | |
1002 _futureOrPrefetch = result; | |
1003 return result; | |
1004 } else { | |
1005 assert(_state >= _STATE_EXTRA_DATA); | |
1006 switch (_state) { | |
1007 case _STATE_EXTRA_DATA: | |
1008 _state = _STATE_FOUND; | |
1009 _current = _futureOrPrefetch as Object /*=T*/; | |
1010 _futureOrPrefetch = null; | |
1011 _subscription.resume(); | |
1012 return new _Future<bool>.immediate(true); | |
1013 case _STATE_EXTRA_ERROR: | |
1014 AsyncError prefetch = _futureOrPrefetch; | |
1015 _clear(); | |
1016 return new _Future<bool>.immediateError(prefetch.error, | |
1017 prefetch.stackTrace); | |
1018 case _STATE_EXTRA_DONE: | |
1019 _clear(); | |
1020 return new _Future<bool>.immediate(false); | |
1021 } | |
1022 } | |
1023 } | 973 } |
1024 | 974 |
1025 /** Clears up the internal state when the iterator ends. */ | 975 /// Called if there is no active subscription when [moveNext] is called. |
1026 void _clear() { | 976 /// |
1027 _subscription = null; | 977 /// Either starts listening on the stream if this is the first call to |
1028 _futureOrPrefetch = null; | 978 /// [moveNext], or returns a `false` future because the stream has already |
1029 _current = null; | 979 /// ended. |
1030 _state = _STATE_DONE; | 980 Future<bool> _initializeOrDone() { |
981 assert(_subscription == null); | |
982 var stateData = _stateData; | |
983 if (stateData != null) { | |
984 Stream<T> stream = stateData; | |
985 _subscription = stream.listen( | |
986 _onData, onError: _onError, onDone: _onDone, cancelOnError: true); | |
987 var future = new _Future<bool>(); | |
988 _stateData = future; | |
989 return future; | |
990 } | |
991 return new _Future.immediate(false); | |
1031 } | 992 } |
1032 | 993 |
1033 Future cancel() { | 994 Future cancel() { |
1034 StreamSubscription subscription = _subscription; | 995 StreamSubscription<T> subscription = _subscription; |
1035 if (subscription == null) return null; | 996 Object stateData = _stateData; |
1036 if (_state == _STATE_MOVING) { | 997 _stateData = null; |
1037 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 998 if (subscription != null) { |
1038 _clear(); | 999 _subscription = null; |
1039 hasNext._complete(false); | 1000 if (!subscription.isPaused) { |
1040 } else { | 1001 _Future<bool> future = stateData; |
1041 _clear(); | 1002 future._asyncComplete(false); |
1003 } | |
1004 return subscription.cancel(); | |
1042 } | 1005 } |
1043 return subscription.cancel(); | 1006 return null; |
1044 } | 1007 } |
1045 | 1008 |
1046 void _onData(T data) { | 1009 void _onData(T data) { |
1047 if (_state == _STATE_MOVING) { | 1010 assert(_subscription != null && !_subscription.isPaused); |
1048 _current = data; | 1011 _Future<bool> moveNextFuture = _stateData; |
1049 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1012 _stateData = data; |
1050 _futureOrPrefetch = null; | |
1051 _state = _STATE_FOUND; | |
1052 hasNext._complete(true); | |
1053 return; | |
1054 } | |
1055 _subscription.pause(); | 1013 _subscription.pause(); |
1056 assert(_futureOrPrefetch == null); | 1014 moveNextFuture._complete(true); |
1057 _futureOrPrefetch = data; | |
1058 _state = _STATE_EXTRA_DATA; | |
1059 } | 1015 } |
1060 | 1016 |
1061 void _onError(Object error, [StackTrace stackTrace]) { | 1017 void _onError(Object error, [StackTrace stackTrace]) { |
1062 if (_state == _STATE_MOVING) { | 1018 assert(_subscription != null && !_subscription.isPaused); |
1063 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1019 _Future<bool> moveNextFuture = _stateData; |
1064 // We have cancelOnError: true, so the subscription is canceled. | 1020 _subscription = null; |
1065 _clear(); | 1021 _stateData = null; |
1066 hasNext._completeError(error, stackTrace); | 1022 moveNextFuture._completeError(error, stackTrace); |
1067 return; | |
1068 } | |
1069 _subscription.pause(); | |
1070 assert(_futureOrPrefetch == null); | |
1071 _futureOrPrefetch = new AsyncError(error, stackTrace); | |
1072 _state = _STATE_EXTRA_ERROR; | |
1073 } | 1023 } |
1074 | 1024 |
1075 void _onDone() { | 1025 void _onDone() { |
1076 if (_state == _STATE_MOVING) { | 1026 assert(_subscription != null && !_subscription.isPaused); |
1077 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1027 _Future<bool> moveNextFuture = _stateData; |
1078 _clear(); | 1028 _subscription = null; |
1079 hasNext._complete(false); | 1029 _stateData = null; |
1080 return; | 1030 moveNextFuture._complete(false); |
1081 } | |
1082 _subscription.pause(); | |
1083 _futureOrPrefetch = null; | |
1084 _state = _STATE_EXTRA_DONE; | |
1085 } | 1031 } |
1086 } | 1032 } |
1087 | 1033 |
1088 /** An empty broadcast stream, sending a done event as soon as possible. */ | 1034 /** An empty broadcast stream, sending a done event as soon as possible. */ |
1089 class _EmptyStream<T> extends Stream<T> { | 1035 class _EmptyStream<T> extends Stream<T> { |
1090 const _EmptyStream() : super._internal(); | 1036 const _EmptyStream() : super._internal(); |
1091 bool get isBroadcast => true; | 1037 bool get isBroadcast => true; |
1092 StreamSubscription<T> listen(void onData(T data), | 1038 StreamSubscription<T> listen(void onData(T data), |
1093 {Function onError, | 1039 {Function onError, |
1094 void onDone(), | 1040 void onDone(), |
1095 bool cancelOnError}) { | 1041 bool cancelOnError}) { |
1096 return new _DoneStreamSubscription<T>(onDone); | 1042 return new _DoneStreamSubscription<T>(onDone); |
1097 } | 1043 } |
1098 } | 1044 } |
OLD | NEW |