| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
| 8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
| 9 void _add(T data); | 9 void _add(T data); |
| 10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 79 * * an error is sent, and [cancelOnError] is true, or | 79 * * an error is sent, and [cancelOnError] is true, or |
| 80 * * a done event is sent. | 80 * * a done event is sent. |
| 81 * | 81 * |
| 82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the | 82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the |
| 83 * state is unset, and no furher events must be delivered. | 83 * state is unset, and no furher events must be delivered. |
| 84 */ | 84 */ |
| 85 static const int _STATE_WAIT_FOR_CANCEL = 16; | 85 static const int _STATE_WAIT_FOR_CANCEL = 16; |
| 86 static const int _STATE_IN_CALLBACK = 32; | 86 static const int _STATE_IN_CALLBACK = 32; |
| 87 static const int _STATE_HAS_PENDING = 64; | 87 static const int _STATE_HAS_PENDING = 64; |
| 88 static const int _STATE_PAUSE_COUNT = 128; | 88 static const int _STATE_PAUSE_COUNT = 128; |
| 89 static const int _STATE_PAUSE_COUNT_SHIFT = 7; | |
| 90 | 89 |
| 91 /* Event handlers provided in constructor. */ | 90 /* Event handlers provided in constructor. */ |
| 92 _DataHandler<T> _onData; | 91 _DataHandler<T> _onData; |
| 93 Function _onError; | 92 Function _onError; |
| 94 _DoneHandler _onDone; | 93 _DoneHandler _onDone; |
| 95 final Zone _zone = Zone.current; | 94 final Zone _zone = Zone.current; |
| 96 | 95 |
| 97 /** Bit vector based on state-constants above. */ | 96 /** Bit vector based on state-constants above. */ |
| 98 int _state; | 97 int _state; |
| 99 | 98 |
| (...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 232 void _cancel() { | 231 void _cancel() { |
| 233 _state |= _STATE_CANCELED; | 232 _state |= _STATE_CANCELED; |
| 234 if (_hasPending) { | 233 if (_hasPending) { |
| 235 _pending.cancelSchedule(); | 234 _pending.cancelSchedule(); |
| 236 } | 235 } |
| 237 if (!_inCallback) _pending = null; | 236 if (!_inCallback) _pending = null; |
| 238 _cancelFuture = _onCancel(); | 237 _cancelFuture = _onCancel(); |
| 239 } | 238 } |
| 240 | 239 |
| 241 /** | 240 /** |
| 242 * Increment the pause count. | |
| 243 * | |
| 244 * Also marks input as paused. | |
| 245 */ | |
| 246 void _incrementPauseCount() { | |
| 247 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
| 248 } | |
| 249 | |
| 250 /** | |
| 251 * Decrements the pause count. | 241 * Decrements the pause count. |
| 252 * | 242 * |
| 253 * Does not automatically unpause the input (call [_onResume]) when | 243 * Does not automatically unpause the input (call [_onResume]) when |
| 254 * the pause count reaches zero. This is handled elsewhere, and only | 244 * the pause count reaches zero. This is handled elsewhere, and only |
| 255 * if there are no pending events buffered. | 245 * if there are no pending events buffered. |
| 256 */ | 246 */ |
| 257 void _decrementPauseCount() { | 247 void _decrementPauseCount() { |
| 258 assert(_isPaused); | 248 assert(_isPaused); |
| 259 _state -= _STATE_PAUSE_COUNT; | 249 _state -= _STATE_PAUSE_COUNT; |
| 260 } | 250 } |
| (...skipping 454 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 715 } | 705 } |
| 716 event.perform(dispatch); | 706 event.perform(dispatch); |
| 717 } | 707 } |
| 718 | 708 |
| 719 void clear() { | 709 void clear() { |
| 720 if (isScheduled) cancelSchedule(); | 710 if (isScheduled) cancelSchedule(); |
| 721 firstPendingEvent = lastPendingEvent = null; | 711 firstPendingEvent = lastPendingEvent = null; |
| 722 } | 712 } |
| 723 } | 713 } |
| 724 | 714 |
| 725 class _BroadcastLinkedList { | |
| 726 _BroadcastLinkedList _next; | |
| 727 _BroadcastLinkedList _previous; | |
| 728 | |
| 729 void _unlink() { | |
| 730 _previous._next = _next; | |
| 731 _next._previous = _previous; | |
| 732 _next = _previous = this; | |
| 733 } | |
| 734 | |
| 735 void _insertBefore(_BroadcastLinkedList newNext) { | |
| 736 _BroadcastLinkedList newPrevious = newNext._previous; | |
| 737 newPrevious._next = this; | |
| 738 newNext._previous = _previous; | |
| 739 _previous._next = newNext; | |
| 740 _previous = newPrevious; | |
| 741 } | |
| 742 } | |
| 743 | |
| 744 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); | 715 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); |
| 745 | 716 |
| 746 /** | 717 /** |
| 747 * Done subscription that will send one done event as soon as possible. | 718 * Done subscription that will send one done event as soon as possible. |
| 748 */ | 719 */ |
| 749 class _DoneStreamSubscription<T> implements StreamSubscription<T> { | 720 class _DoneStreamSubscription<T> implements StreamSubscription<T> { |
| 750 static const int _DONE_SENT = 1; | 721 static const int _DONE_SENT = 1; |
| 751 static const int _SCHEDULED = 2; | 722 static const int _SCHEDULED = 2; |
| 752 static const int _PAUSED = 4; | 723 static const int _PAUSED = 4; |
| 753 | 724 |
| (...skipping 180 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 934 | 905 |
| 935 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { | 906 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
| 936 throw new UnsupportedError( | 907 throw new UnsupportedError( |
| 937 "Cannot change handlers of asBroadcastStream source subscription."); | 908 "Cannot change handlers of asBroadcastStream source subscription."); |
| 938 } | 909 } |
| 939 } | 910 } |
| 940 | 911 |
| 941 | 912 |
| 942 /** | 913 /** |
| 943 * Simple implementation of [StreamIterator]. | 914 * Simple implementation of [StreamIterator]. |
| 915 * |
| 916 * Pauses the stream between calls to [moveNext]. |
| 944 */ | 917 */ |
| 945 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 918 class _StreamIterator<T> implements StreamIterator<T> { |
| 946 // Internal state of the stream iterator. | 919 // The stream iterator is always in one of four states. |
| 947 // At any time, it is in one of these states. | 920 // The value of the [_stateData] field depends on the state. |
| 948 // The interpretation of the [_futureOrPrefecth] field depends on the state. | 921 // |
| 949 // In _STATE_MOVING, the _data field holds the most recently returned | 922 // When `_subscription == null` and `_stateData != null`: |
| 950 // future. | 923 // The stream iterator has been created, but [moveNext] has not been called |
| 951 // When in one of the _STATE_EXTRA_* states, the it may hold the | 924 // yet. The [_stateData] field contains the stream to listen to on the first |
| 952 // next data/error object, and the subscription is paused. | 925 // call to [moveNext] and [current] returns `null`. |
| 953 | 926 // |
| 954 /// The simple state where [_data] holds the data to return, and [moveNext] | 927 // When `_subscription != null` and `!_isPaused`: |
| 955 /// is allowed. The subscription is actively listening. | 928 // The user has called [moveNext] and the iterator is waiting for the next |
| 956 static const int _STATE_FOUND = 0; | 929 // event. The [_stateData] field contains the [_Future] returned by the |
| 957 /// State set after [moveNext] has returned false or an error, | 930 // [_moveNext] call and [current] returns `null.` |
| 958 /// or after calling [cancel]. The subscription is always canceled. | 931 // |
| 959 static const int _STATE_DONE = 1; | 932 // When `_subscription != null` and `_isPaused`: |
| 960 /// State set after calling [moveNext], but before its returned future has | 933 // The most recent call to [moveNext] has completed with a `true` value |
| 961 /// completed. Calling [moveNext] again is not allowed in this state. | 934 // and [current] provides the value of the data event. |
| 962 /// The subscription is actively listening. | 935 // The [_stateData] field contains the [current] value. |
| 963 static const int _STATE_MOVING = 2; | 936 // |
| 964 /// States set when another event occurs while in _STATE_FOUND. | 937 // When `_subscription == null` and `_stateData == null`: |
| 965 /// This extra overflow event is cached until the next call to [moveNext], | 938 // The stream has completed or been canceled using [cancel]. |
| 966 /// which will complete as if it received the event normally. | 939 // The stream completes on either a done event or an error event. |
| 967 /// The subscription is paused in these states, so we only ever get one | 940 // The last call to [moveNext] has completed with `false` and [current] |
| 968 /// event too many. | 941 // returns `null`. |
| 969 static const int _STATE_EXTRA_DATA = 3; | |
| 970 static const int _STATE_EXTRA_ERROR = 4; | |
| 971 static const int _STATE_EXTRA_DONE = 5; | |
| 972 | 942 |
| 973 /// Subscription being listened to. | 943 /// Subscription being listened to. |
| 944 /// |
| 945 /// Set to `null` when the stream subscription is done or canceled. |
| 974 StreamSubscription _subscription; | 946 StreamSubscription _subscription; |
| 975 | 947 |
| 976 /// The current element represented by the most recent call to moveNext. | 948 /// Data value depending on the current state. |
| 977 /// | 949 /// |
| 978 /// Is null between the time moveNext is called and its future completes. | 950 /// Before first call to [moveNext]: The stream to listen to. |
| 979 T _current = null; | 951 /// |
| 952 /// After calling [moveNext] but before the returned future completes: |
| 953 /// The returned future. |
| 954 /// |
| 955 /// After calling [moveNext] and the returned future has completed |
| 956 /// with `true`: The value of [current]. |
| 957 /// |
| 958 /// After calling [moveNext] and the returned future has completed |
| 959 /// with `false`, or after calling [cancel]: `null`. |
| 960 Object _stateData; |
| 980 | 961 |
| 981 /// The future returned by the most recent call to [moveNext]. | 962 /// Whether the iterator is between calls to `moveNext`. |
| 982 /// | 963 /// This will usually cause the [_subscription] to be paused, but as an |
| 983 /// Also used to store the next value/error in case the stream provides an | 964 /// optimization, we only pause after the [moveNext] future has been |
| 984 /// event before [moveNext] is called again. In that case, the stream will | 965 /// completed. |
| 985 /// be paused to prevent further events. | 966 bool _isPaused = false; |
| 986 var/*Future<bool> or T*/ _futureOrPrefetch = null; | |
| 987 | 967 |
| 988 /// The current state. | 968 _StreamIterator(final Stream<T> stream) : _stateData = stream; |
| 989 int _state = _STATE_FOUND; | |
| 990 | 969 |
| 991 _StreamIteratorImpl(final Stream<T> stream) { | 970 T get current { |
| 992 _subscription = stream.listen(_onData, | 971 if (_subscription != null && _isPaused) { |
| 993 onError: _onError, | 972 return _stateData as Object /*=T*/; |
| 994 onDone: _onDone, | 973 } |
| 995 cancelOnError: true); | 974 return null; |
| 996 } | 975 } |
| 997 | 976 |
| 998 T get current => _current; | |
| 999 | |
| 1000 Future<bool> moveNext() { | 977 Future<bool> moveNext() { |
| 1001 if (_state == _STATE_DONE) { | 978 if (_subscription != null) { |
| 1002 return new _Future<bool>.immediate(false); | 979 if (_isPaused) { |
| 1003 } | 980 var future = new _Future<bool>(); |
| 1004 if (_state == _STATE_MOVING) { | 981 _stateData = future; |
| 982 _isPaused = false; |
| 983 _subscription.resume(); |
| 984 return future; |
| 985 } |
| 1005 throw new StateError("Already waiting for next."); | 986 throw new StateError("Already waiting for next."); |
| 1006 } | 987 } |
| 1007 if (_state == _STATE_FOUND) { | 988 return _initializeOrDone(); |
| 1008 _state = _STATE_MOVING; | |
| 1009 _current = null; | |
| 1010 var result = new _Future<bool>(); | |
| 1011 _futureOrPrefetch = result; | |
| 1012 return result; | |
| 1013 } else { | |
| 1014 assert(_state >= _STATE_EXTRA_DATA); | |
| 1015 switch (_state) { | |
| 1016 case _STATE_EXTRA_DATA: | |
| 1017 _state = _STATE_FOUND; | |
| 1018 _current = _futureOrPrefetch as Object /*=T*/; | |
| 1019 _futureOrPrefetch = null; | |
| 1020 _subscription.resume(); | |
| 1021 return new _Future<bool>.immediate(true); | |
| 1022 case _STATE_EXTRA_ERROR: | |
| 1023 AsyncError prefetch = _futureOrPrefetch; | |
| 1024 _clear(); | |
| 1025 return new _Future<bool>.immediateError(prefetch.error, | |
| 1026 prefetch.stackTrace); | |
| 1027 case _STATE_EXTRA_DONE: | |
| 1028 _clear(); | |
| 1029 return new _Future<bool>.immediate(false); | |
| 1030 } | |
| 1031 } | |
| 1032 } | 989 } |
| 1033 | 990 |
| 1034 /** Clears up the internal state when the iterator ends. */ | 991 /// Called if there is no active subscription when [moveNext] is called. |
| 1035 void _clear() { | 992 /// |
| 1036 _subscription = null; | 993 /// Either starts listening on the stream if this is the first call to |
| 1037 _futureOrPrefetch = null; | 994 /// [moveNext], or returns a `false` future because the stream has already |
| 1038 _current = null; | 995 /// ended. |
| 1039 _state = _STATE_DONE; | 996 Future<bool> _initializeOrDone() { |
| 997 assert(_subscription == null); |
| 998 var stateData = _stateData; |
| 999 if (stateData != null) { |
| 1000 Stream<T> stream = stateData as Object /*=Stream<T>*/; |
| 1001 _subscription = stream.listen( |
| 1002 _onData, onError: _onError, onDone: _onDone, cancelOnError: true); |
| 1003 var future = new _Future<bool>(); |
| 1004 _stateData = future; |
| 1005 return future; |
| 1006 } |
| 1007 return new _Future<bool>.immediate(false); |
| 1040 } | 1008 } |
| 1041 | 1009 |
| 1042 Future cancel() { | 1010 Future cancel() { |
| 1043 StreamSubscription subscription = _subscription; | 1011 StreamSubscription<T> subscription = _subscription; |
| 1044 if (subscription == null) return Future._nullFuture; | 1012 Object stateData = _stateData; |
| 1045 if (_state == _STATE_MOVING) { | 1013 _stateData = null; |
| 1046 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1014 if (subscription != null) { |
| 1047 _clear(); | 1015 _subscription = null; |
| 1048 hasNext._complete(false); | 1016 if (!_isPaused) { |
| 1049 } else { | 1017 _Future<bool> future = stateData as Object /*=_Future<bool>*/; |
| 1050 _clear(); | 1018 future._asyncComplete(false); |
| 1019 } |
| 1020 return subscription.cancel(); |
| 1051 } | 1021 } |
| 1052 return subscription.cancel(); | 1022 return Future._nullFuture; |
| 1053 } | 1023 } |
| 1054 | 1024 |
| 1055 void _onData(T data) { | 1025 void _onData(T data) { |
| 1056 if (_state == _STATE_MOVING) { | 1026 assert(_subscription != null && !_isPaused); |
| 1057 _current = data; | 1027 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
| 1058 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1028 _stateData = data; |
| 1059 _futureOrPrefetch = null; | 1029 _isPaused = true; |
| 1060 _state = _STATE_FOUND; | 1030 moveNextFuture._complete(true); |
| 1061 hasNext._complete(true); | 1031 if (_subscription != null && _isPaused) _subscription.pause(); |
| 1062 return; | |
| 1063 } | |
| 1064 _subscription.pause(); | |
| 1065 assert(_futureOrPrefetch == null); | |
| 1066 _futureOrPrefetch = data; | |
| 1067 _state = _STATE_EXTRA_DATA; | |
| 1068 } | 1032 } |
| 1069 | 1033 |
| 1070 void _onError(Object error, [StackTrace stackTrace]) { | 1034 void _onError(Object error, [StackTrace stackTrace]) { |
| 1071 if (_state == _STATE_MOVING) { | 1035 assert(_subscription != null && !_isPaused); |
| 1072 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1036 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
| 1073 // We have cancelOnError: true, so the subscription is canceled. | 1037 _subscription = null; |
| 1074 _clear(); | 1038 _stateData = null; |
| 1075 hasNext._completeError(error, stackTrace); | 1039 moveNextFuture._completeError(error, stackTrace); |
| 1076 return; | |
| 1077 } | |
| 1078 _subscription.pause(); | |
| 1079 assert(_futureOrPrefetch == null); | |
| 1080 _futureOrPrefetch = new AsyncError(error, stackTrace); | |
| 1081 _state = _STATE_EXTRA_ERROR; | |
| 1082 } | 1040 } |
| 1083 | 1041 |
| 1084 void _onDone() { | 1042 void _onDone() { |
| 1085 if (_state == _STATE_MOVING) { | 1043 assert(_subscription != null && !_isPaused); |
| 1086 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1044 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
| 1087 _clear(); | 1045 _subscription = null; |
| 1088 hasNext._complete(false); | 1046 _stateData = null; |
| 1089 return; | 1047 moveNextFuture._complete(false); |
| 1090 } | |
| 1091 _subscription.pause(); | |
| 1092 _futureOrPrefetch = null; | |
| 1093 _state = _STATE_EXTRA_DONE; | |
| 1094 } | 1048 } |
| 1095 } | 1049 } |
| 1096 | 1050 |
| 1097 /** An empty broadcast stream, sending a done event as soon as possible. */ | 1051 /** An empty broadcast stream, sending a done event as soon as possible. */ |
| 1098 class _EmptyStream<T> extends Stream<T> { | 1052 class _EmptyStream<T> extends Stream<T> { |
| 1099 const _EmptyStream() : super._internal(); | 1053 const _EmptyStream() : super._internal(); |
| 1100 bool get isBroadcast => true; | 1054 bool get isBroadcast => true; |
| 1101 StreamSubscription<T> listen(void onData(T data), | 1055 StreamSubscription<T> listen(void onData(T data), |
| 1102 {Function onError, | 1056 {Function onError, |
| 1103 void onDone(), | 1057 void onDone(), |
| 1104 bool cancelOnError}) { | 1058 bool cancelOnError}) { |
| 1105 return new _DoneStreamSubscription<T>(onDone); | 1059 return new _DoneStreamSubscription<T>(onDone); |
| 1106 } | 1060 } |
| 1107 } | 1061 } |
| OLD | NEW |