OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
79 * * an error is sent, and [cancelOnError] is true, or | 79 * * an error is sent, and [cancelOnError] is true, or |
80 * * a done event is sent. | 80 * * a done event is sent. |
81 * | 81 * |
82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the | 82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the |
83 * state is unset, and no furher events must be delivered. | 83 * state is unset, and no furher events must be delivered. |
84 */ | 84 */ |
85 static const int _STATE_WAIT_FOR_CANCEL = 16; | 85 static const int _STATE_WAIT_FOR_CANCEL = 16; |
86 static const int _STATE_IN_CALLBACK = 32; | 86 static const int _STATE_IN_CALLBACK = 32; |
87 static const int _STATE_HAS_PENDING = 64; | 87 static const int _STATE_HAS_PENDING = 64; |
88 static const int _STATE_PAUSE_COUNT = 128; | 88 static const int _STATE_PAUSE_COUNT = 128; |
89 static const int _STATE_PAUSE_COUNT_SHIFT = 7; | |
90 | 89 |
91 /* Event handlers provided in constructor. */ | 90 /* Event handlers provided in constructor. */ |
92 _DataHandler<T> _onData; | 91 _DataHandler<T> _onData; |
93 Function _onError; | 92 Function _onError; |
94 _DoneHandler _onDone; | 93 _DoneHandler _onDone; |
95 final Zone _zone = Zone.current; | 94 final Zone _zone = Zone.current; |
96 | 95 |
97 /** Bit vector based on state-constants above. */ | 96 /** Bit vector based on state-constants above. */ |
98 int _state; | 97 int _state; |
99 | 98 |
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
232 void _cancel() { | 231 void _cancel() { |
233 _state |= _STATE_CANCELED; | 232 _state |= _STATE_CANCELED; |
234 if (_hasPending) { | 233 if (_hasPending) { |
235 _pending.cancelSchedule(); | 234 _pending.cancelSchedule(); |
236 } | 235 } |
237 if (!_inCallback) _pending = null; | 236 if (!_inCallback) _pending = null; |
238 _cancelFuture = _onCancel(); | 237 _cancelFuture = _onCancel(); |
239 } | 238 } |
240 | 239 |
241 /** | 240 /** |
242 * Increment the pause count. | |
243 * | |
244 * Also marks input as paused. | |
245 */ | |
246 void _incrementPauseCount() { | |
247 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
248 } | |
249 | |
250 /** | |
251 * Decrements the pause count. | 241 * Decrements the pause count. |
252 * | 242 * |
253 * Does not automatically unpause the input (call [_onResume]) when | 243 * Does not automatically unpause the input (call [_onResume]) when |
254 * the pause count reaches zero. This is handled elsewhere, and only | 244 * the pause count reaches zero. This is handled elsewhere, and only |
255 * if there are no pending events buffered. | 245 * if there are no pending events buffered. |
256 */ | 246 */ |
257 void _decrementPauseCount() { | 247 void _decrementPauseCount() { |
258 assert(_isPaused); | 248 assert(_isPaused); |
259 _state -= _STATE_PAUSE_COUNT; | 249 _state -= _STATE_PAUSE_COUNT; |
260 } | 250 } |
(...skipping 454 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
715 } | 705 } |
716 event.perform(dispatch); | 706 event.perform(dispatch); |
717 } | 707 } |
718 | 708 |
719 void clear() { | 709 void clear() { |
720 if (isScheduled) cancelSchedule(); | 710 if (isScheduled) cancelSchedule(); |
721 firstPendingEvent = lastPendingEvent = null; | 711 firstPendingEvent = lastPendingEvent = null; |
722 } | 712 } |
723 } | 713 } |
724 | 714 |
725 class _BroadcastLinkedList { | |
726 _BroadcastLinkedList _next; | |
727 _BroadcastLinkedList _previous; | |
728 | |
729 void _unlink() { | |
730 _previous._next = _next; | |
731 _next._previous = _previous; | |
732 _next = _previous = this; | |
733 } | |
734 | |
735 void _insertBefore(_BroadcastLinkedList newNext) { | |
736 _BroadcastLinkedList newPrevious = newNext._previous; | |
737 newPrevious._next = this; | |
738 newNext._previous = _previous; | |
739 _previous._next = newNext; | |
740 _previous = newPrevious; | |
741 } | |
742 } | |
743 | |
744 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); | 715 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); |
745 | 716 |
746 /** | 717 /** |
747 * Done subscription that will send one done event as soon as possible. | 718 * Done subscription that will send one done event as soon as possible. |
748 */ | 719 */ |
749 class _DoneStreamSubscription<T> implements StreamSubscription<T> { | 720 class _DoneStreamSubscription<T> implements StreamSubscription<T> { |
750 static const int _DONE_SENT = 1; | 721 static const int _DONE_SENT = 1; |
751 static const int _SCHEDULED = 2; | 722 static const int _SCHEDULED = 2; |
752 static const int _PAUSED = 4; | 723 static const int _PAUSED = 4; |
753 | 724 |
(...skipping 180 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
934 | 905 |
935 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { | 906 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
936 throw new UnsupportedError( | 907 throw new UnsupportedError( |
937 "Cannot change handlers of asBroadcastStream source subscription."); | 908 "Cannot change handlers of asBroadcastStream source subscription."); |
938 } | 909 } |
939 } | 910 } |
940 | 911 |
941 | 912 |
942 /** | 913 /** |
943 * Simple implementation of [StreamIterator]. | 914 * Simple implementation of [StreamIterator]. |
| 915 * |
| 916 * Pauses the stream between calls to [moveNext]. |
944 */ | 917 */ |
945 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 918 class _StreamIterator<T> implements StreamIterator<T> { |
946 // Internal state of the stream iterator. | 919 // The stream iterator is always in one of four states. |
947 // At any time, it is in one of these states. | 920 // The value of the [_stateData] field depends on the state. |
948 // The interpretation of the [_futureOrPrefecth] field depends on the state. | 921 // |
949 // In _STATE_MOVING, the _data field holds the most recently returned | 922 // When `_subscription == null` and `_stateData != null`: |
950 // future. | 923 // The stream iterator has been created, but [moveNext] has not been called |
951 // When in one of the _STATE_EXTRA_* states, the it may hold the | 924 // yet. The [_stateData] field contains the stream to listen to on the first |
952 // next data/error object, and the subscription is paused. | 925 // call to [moveNext] and [current] returns `null`. |
953 | 926 // |
954 /// The simple state where [_data] holds the data to return, and [moveNext] | 927 // When `_subscription != null` and `!_isPaused`: |
955 /// is allowed. The subscription is actively listening. | 928 // The user has called [moveNext] and the iterator is waiting for the next |
956 static const int _STATE_FOUND = 0; | 929 // event. The [_stateData] field contains the [_Future] returned by the |
957 /// State set after [moveNext] has returned false or an error, | 930 // [_moveNext] call and [current] returns `null.` |
958 /// or after calling [cancel]. The subscription is always canceled. | 931 // |
959 static const int _STATE_DONE = 1; | 932 // When `_subscription != null` and `_isPaused`: |
960 /// State set after calling [moveNext], but before its returned future has | 933 // The most recent call to [moveNext] has completed with a `true` value |
961 /// completed. Calling [moveNext] again is not allowed in this state. | 934 // and [current] provides the value of the data event. |
962 /// The subscription is actively listening. | 935 // The [_stateData] field contains the [current] value. |
963 static const int _STATE_MOVING = 2; | 936 // |
964 /// States set when another event occurs while in _STATE_FOUND. | 937 // When `_subscription == null` and `_stateData == null`: |
965 /// This extra overflow event is cached until the next call to [moveNext], | 938 // The stream has completed or been canceled using [cancel]. |
966 /// which will complete as if it received the event normally. | 939 // The stream completes on either a done event or an error event. |
967 /// The subscription is paused in these states, so we only ever get one | 940 // The last call to [moveNext] has completed with `false` and [current] |
968 /// event too many. | 941 // returns `null`. |
969 static const int _STATE_EXTRA_DATA = 3; | |
970 static const int _STATE_EXTRA_ERROR = 4; | |
971 static const int _STATE_EXTRA_DONE = 5; | |
972 | 942 |
973 /// Subscription being listened to. | 943 /// Subscription being listened to. |
| 944 /// |
| 945 /// Set to `null` when the stream subscription is done or canceled. |
974 StreamSubscription _subscription; | 946 StreamSubscription _subscription; |
975 | 947 |
976 /// The current element represented by the most recent call to moveNext. | 948 /// Data value depending on the current state. |
977 /// | 949 /// |
978 /// Is null between the time moveNext is called and its future completes. | 950 /// Before first call to [moveNext]: The stream to listen to. |
979 T _current = null; | 951 /// |
| 952 /// After calling [moveNext] but before the returned future completes: |
| 953 /// The returned future. |
| 954 /// |
| 955 /// After calling [moveNext] and the returned future has completed |
| 956 /// with `true`: The value of [current]. |
| 957 /// |
| 958 /// After calling [moveNext] and the returned future has completed |
| 959 /// with `false`, or after calling [cancel]: `null`. |
| 960 Object _stateData; |
980 | 961 |
981 /// The future returned by the most recent call to [moveNext]. | 962 /// Whether the iterator is between calls to `moveNext`. |
982 /// | 963 /// This will usually cause the [_subscription] to be paused, but as an |
983 /// Also used to store the next value/error in case the stream provides an | 964 /// optimization, we only pause after the [moveNext] future has been |
984 /// event before [moveNext] is called again. In that case, the stream will | 965 /// completed. |
985 /// be paused to prevent further events. | 966 bool _isPaused = false; |
986 var/*Future<bool> or T*/ _futureOrPrefetch = null; | |
987 | 967 |
988 /// The current state. | 968 _StreamIterator(final Stream<T> stream) : _stateData = stream; |
989 int _state = _STATE_FOUND; | |
990 | 969 |
991 _StreamIteratorImpl(final Stream<T> stream) { | 970 T get current { |
992 _subscription = stream.listen(_onData, | 971 if (_subscription != null && _isPaused) { |
993 onError: _onError, | 972 return _stateData as Object /*=T*/; |
994 onDone: _onDone, | 973 } |
995 cancelOnError: true); | 974 return null; |
996 } | 975 } |
997 | 976 |
998 T get current => _current; | |
999 | |
1000 Future<bool> moveNext() { | 977 Future<bool> moveNext() { |
1001 if (_state == _STATE_DONE) { | 978 if (_subscription != null) { |
1002 return new _Future<bool>.immediate(false); | 979 if (_isPaused) { |
1003 } | 980 var future = new _Future<bool>(); |
1004 if (_state == _STATE_MOVING) { | 981 _stateData = future; |
| 982 _isPaused = false; |
| 983 _subscription.resume(); |
| 984 return future; |
| 985 } |
1005 throw new StateError("Already waiting for next."); | 986 throw new StateError("Already waiting for next."); |
1006 } | 987 } |
1007 if (_state == _STATE_FOUND) { | 988 return _initializeOrDone(); |
1008 _state = _STATE_MOVING; | |
1009 _current = null; | |
1010 var result = new _Future<bool>(); | |
1011 _futureOrPrefetch = result; | |
1012 return result; | |
1013 } else { | |
1014 assert(_state >= _STATE_EXTRA_DATA); | |
1015 switch (_state) { | |
1016 case _STATE_EXTRA_DATA: | |
1017 _state = _STATE_FOUND; | |
1018 _current = _futureOrPrefetch as Object /*=T*/; | |
1019 _futureOrPrefetch = null; | |
1020 _subscription.resume(); | |
1021 return new _Future<bool>.immediate(true); | |
1022 case _STATE_EXTRA_ERROR: | |
1023 AsyncError prefetch = _futureOrPrefetch; | |
1024 _clear(); | |
1025 return new _Future<bool>.immediateError(prefetch.error, | |
1026 prefetch.stackTrace); | |
1027 case _STATE_EXTRA_DONE: | |
1028 _clear(); | |
1029 return new _Future<bool>.immediate(false); | |
1030 } | |
1031 } | |
1032 } | 989 } |
1033 | 990 |
1034 /** Clears up the internal state when the iterator ends. */ | 991 /// Called if there is no active subscription when [moveNext] is called. |
1035 void _clear() { | 992 /// |
1036 _subscription = null; | 993 /// Either starts listening on the stream if this is the first call to |
1037 _futureOrPrefetch = null; | 994 /// [moveNext], or returns a `false` future because the stream has already |
1038 _current = null; | 995 /// ended. |
1039 _state = _STATE_DONE; | 996 Future<bool> _initializeOrDone() { |
| 997 assert(_subscription == null); |
| 998 var stateData = _stateData; |
| 999 if (stateData != null) { |
| 1000 Stream<T> stream = stateData as Object /*=Stream<T>*/; |
| 1001 _subscription = stream.listen( |
| 1002 _onData, onError: _onError, onDone: _onDone, cancelOnError: true); |
| 1003 var future = new _Future<bool>(); |
| 1004 _stateData = future; |
| 1005 return future; |
| 1006 } |
| 1007 return new _Future<bool>.immediate(false); |
1040 } | 1008 } |
1041 | 1009 |
1042 Future cancel() { | 1010 Future cancel() { |
1043 StreamSubscription subscription = _subscription; | 1011 StreamSubscription<T> subscription = _subscription; |
1044 if (subscription == null) return Future._nullFuture; | 1012 Object stateData = _stateData; |
1045 if (_state == _STATE_MOVING) { | 1013 _stateData = null; |
1046 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1014 if (subscription != null) { |
1047 _clear(); | 1015 _subscription = null; |
1048 hasNext._complete(false); | 1016 if (!_isPaused) { |
1049 } else { | 1017 _Future<bool> future = stateData as Object /*=_Future<bool>*/; |
1050 _clear(); | 1018 future._asyncComplete(false); |
| 1019 } |
| 1020 return subscription.cancel(); |
1051 } | 1021 } |
1052 return subscription.cancel(); | 1022 return Future._nullFuture; |
1053 } | 1023 } |
1054 | 1024 |
1055 void _onData(T data) { | 1025 void _onData(T data) { |
1056 if (_state == _STATE_MOVING) { | 1026 assert(_subscription != null && !_isPaused); |
1057 _current = data; | 1027 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
1058 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1028 _stateData = data; |
1059 _futureOrPrefetch = null; | 1029 _isPaused = true; |
1060 _state = _STATE_FOUND; | 1030 moveNextFuture._complete(true); |
1061 hasNext._complete(true); | 1031 if (_subscription != null && _isPaused) _subscription.pause(); |
1062 return; | |
1063 } | |
1064 _subscription.pause(); | |
1065 assert(_futureOrPrefetch == null); | |
1066 _futureOrPrefetch = data; | |
1067 _state = _STATE_EXTRA_DATA; | |
1068 } | 1032 } |
1069 | 1033 |
1070 void _onError(Object error, [StackTrace stackTrace]) { | 1034 void _onError(Object error, [StackTrace stackTrace]) { |
1071 if (_state == _STATE_MOVING) { | 1035 assert(_subscription != null && !_isPaused); |
1072 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1036 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
1073 // We have cancelOnError: true, so the subscription is canceled. | 1037 _subscription = null; |
1074 _clear(); | 1038 _stateData = null; |
1075 hasNext._completeError(error, stackTrace); | 1039 moveNextFuture._completeError(error, stackTrace); |
1076 return; | |
1077 } | |
1078 _subscription.pause(); | |
1079 assert(_futureOrPrefetch == null); | |
1080 _futureOrPrefetch = new AsyncError(error, stackTrace); | |
1081 _state = _STATE_EXTRA_ERROR; | |
1082 } | 1040 } |
1083 | 1041 |
1084 void _onDone() { | 1042 void _onDone() { |
1085 if (_state == _STATE_MOVING) { | 1043 assert(_subscription != null && !_isPaused); |
1086 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1044 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
1087 _clear(); | 1045 _subscription = null; |
1088 hasNext._complete(false); | 1046 _stateData = null; |
1089 return; | 1047 moveNextFuture._complete(false); |
1090 } | |
1091 _subscription.pause(); | |
1092 _futureOrPrefetch = null; | |
1093 _state = _STATE_EXTRA_DONE; | |
1094 } | 1048 } |
1095 } | 1049 } |
1096 | 1050 |
1097 /** An empty broadcast stream, sending a done event as soon as possible. */ | 1051 /** An empty broadcast stream, sending a done event as soon as possible. */ |
1098 class _EmptyStream<T> extends Stream<T> { | 1052 class _EmptyStream<T> extends Stream<T> { |
1099 const _EmptyStream() : super._internal(); | 1053 const _EmptyStream() : super._internal(); |
1100 bool get isBroadcast => true; | 1054 bool get isBroadcast => true; |
1101 StreamSubscription<T> listen(void onData(T data), | 1055 StreamSubscription<T> listen(void onData(T data), |
1102 {Function onError, | 1056 {Function onError, |
1103 void onDone(), | 1057 void onDone(), |
1104 bool cancelOnError}) { | 1058 bool cancelOnError}) { |
1105 return new _DoneStreamSubscription<T>(onDone); | 1059 return new _DoneStreamSubscription<T>(onDone); |
1106 } | 1060 } |
1107 } | 1061 } |
OLD | NEW |