Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
| 8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
| 9 void _add(T data); | 9 void _add(T data); |
| 10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 79 * * an error is sent, and [cancelOnError] is true, or | 79 * * an error is sent, and [cancelOnError] is true, or |
| 80 * * a done event is sent. | 80 * * a done event is sent. |
| 81 * | 81 * |
| 82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the | 82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the |
| 83 * state is unset, and no furher events must be delivered. | 83 * state is unset, and no furher events must be delivered. |
| 84 */ | 84 */ |
| 85 static const int _STATE_WAIT_FOR_CANCEL = 16; | 85 static const int _STATE_WAIT_FOR_CANCEL = 16; |
| 86 static const int _STATE_IN_CALLBACK = 32; | 86 static const int _STATE_IN_CALLBACK = 32; |
| 87 static const int _STATE_HAS_PENDING = 64; | 87 static const int _STATE_HAS_PENDING = 64; |
| 88 static const int _STATE_PAUSE_COUNT = 128; | 88 static const int _STATE_PAUSE_COUNT = 128; |
| 89 static const int _STATE_PAUSE_COUNT_SHIFT = 7; | |
| 90 | 89 |
| 91 /* Event handlers provided in constructor. */ | 90 /* Event handlers provided in constructor. */ |
| 92 _DataHandler<T> _onData; | 91 _DataHandler<T> _onData; |
| 93 Function _onError; | 92 Function _onError; |
| 94 _DoneHandler _onDone; | 93 _DoneHandler _onDone; |
| 95 final Zone _zone = Zone.current; | 94 final Zone _zone = Zone.current; |
| 96 | 95 |
| 97 /** Bit vector based on state-constants above. */ | 96 /** Bit vector based on state-constants above. */ |
| 98 int _state; | 97 int _state; |
| 99 | 98 |
| (...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 225 void _cancel() { | 224 void _cancel() { |
| 226 _state |= _STATE_CANCELED; | 225 _state |= _STATE_CANCELED; |
| 227 if (_hasPending) { | 226 if (_hasPending) { |
| 228 _pending.cancelSchedule(); | 227 _pending.cancelSchedule(); |
| 229 } | 228 } |
| 230 if (!_inCallback) _pending = null; | 229 if (!_inCallback) _pending = null; |
| 231 _cancelFuture = _onCancel(); | 230 _cancelFuture = _onCancel(); |
| 232 } | 231 } |
| 233 | 232 |
| 234 /** | 233 /** |
| 235 * Increment the pause count. | |
| 236 * | |
| 237 * Also marks input as paused. | |
| 238 */ | |
| 239 void _incrementPauseCount() { | |
| 240 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
| 241 } | |
| 242 | |
| 243 /** | |
| 244 * Decrements the pause count. | 234 * Decrements the pause count. |
| 245 * | 235 * |
| 246 * Does not automatically unpause the input (call [_onResume]) when | 236 * Does not automatically unpause the input (call [_onResume]) when |
| 247 * the pause count reaches zero. This is handled elsewhere, and only | 237 * the pause count reaches zero. This is handled elsewhere, and only |
| 248 * if there are no pending events buffered. | 238 * if there are no pending events buffered. |
| 249 */ | 239 */ |
| 250 void _decrementPauseCount() { | 240 void _decrementPauseCount() { |
| 251 assert(_isPaused); | 241 assert(_isPaused); |
| 252 _state -= _STATE_PAUSE_COUNT; | 242 _state -= _STATE_PAUSE_COUNT; |
| 253 } | 243 } |
| (...skipping 452 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 706 } | 696 } |
| 707 event.perform(dispatch); | 697 event.perform(dispatch); |
| 708 } | 698 } |
| 709 | 699 |
| 710 void clear() { | 700 void clear() { |
| 711 if (isScheduled) cancelSchedule(); | 701 if (isScheduled) cancelSchedule(); |
| 712 firstPendingEvent = lastPendingEvent = null; | 702 firstPendingEvent = lastPendingEvent = null; |
| 713 } | 703 } |
| 714 } | 704 } |
| 715 | 705 |
| 716 class _BroadcastLinkedList { | |
| 717 _BroadcastLinkedList _next; | |
| 718 _BroadcastLinkedList _previous; | |
| 719 | |
| 720 void _unlink() { | |
| 721 _previous._next = _next; | |
| 722 _next._previous = _previous; | |
| 723 _next = _previous = this; | |
| 724 } | |
| 725 | |
| 726 void _insertBefore(_BroadcastLinkedList newNext) { | |
| 727 _BroadcastLinkedList newPrevious = newNext._previous; | |
| 728 newPrevious._next = this; | |
| 729 newNext._previous = _previous; | |
| 730 _previous._next = newNext; | |
| 731 _previous = newPrevious; | |
| 732 } | |
| 733 } | |
| 734 | |
| 735 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); | 706 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); |
| 736 | 707 |
| 737 /** | 708 /** |
| 738 * Done subscription that will send one done event as soon as possible. | 709 * Done subscription that will send one done event as soon as possible. |
| 739 */ | 710 */ |
| 740 class _DoneStreamSubscription<T> implements StreamSubscription<T> { | 711 class _DoneStreamSubscription<T> implements StreamSubscription<T> { |
| 741 static const int _DONE_SENT = 1; | 712 static const int _DONE_SENT = 1; |
| 742 static const int _SCHEDULED = 2; | 713 static const int _SCHEDULED = 2; |
| 743 static const int _PAUSED = 4; | 714 static const int _PAUSED = 4; |
| 744 | 715 |
| (...skipping 180 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 925 | 896 |
| 926 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { | 897 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
| 927 throw new UnsupportedError( | 898 throw new UnsupportedError( |
| 928 "Cannot change handlers of asBroadcastStream source subscription."); | 899 "Cannot change handlers of asBroadcastStream source subscription."); |
| 929 } | 900 } |
| 930 } | 901 } |
| 931 | 902 |
| 932 | 903 |
| 933 /** | 904 /** |
| 934 * Simple implementation of [StreamIterator]. | 905 * Simple implementation of [StreamIterator]. |
| 906 * | |
| 907 * Pauses the stream between calles to [moveNext]. | |
| 935 */ | 908 */ |
| 936 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 909 class _StreamIterator<T> implements StreamIterator<T> { |
| 937 // Internal state of the stream iterator. | 910 // The stream iterator is always in one of four states. |
| 938 // At any time, it is in one of these states. | 911 // The value of the [_stateData] field depends on the state. |
| 939 // The interpretation of the [_futureOrPrefecth] field depends on the state. | 912 // |
| 940 // In _STATE_MOVING, the _data field holds the most recently returned | 913 // When `_subscription == null` and `_stateData != null`: |
| 941 // future. | 914 // The stream iterator has been created, but [moveNext] has not been called |
| 942 // When in one of the _STATE_EXTRA_* states, the it may hold the | 915 // yet. The [_stateData] field contains the stream to listen to on the first |
| 943 // next data/error object, and the subscription is paused. | 916 // call to [moveNext] and [current] returns `null`. |
| 944 | 917 // |
| 945 /// The simple state where [_data] holds the data to return, and [moveNext] | 918 // When `_subscription != null` and `!_subscription.isPaused`: |
| 946 /// is allowed. The subscription is actively listening. | 919 // The user has called [moveNext] and the iterator is waiting for the next |
| 947 static const int _STATE_FOUND = 0; | 920 // event. The [_stateData] field contains the [_Future] returned by the |
| 948 /// State set after [moveNext] has returned false or an error, | 921 // [_moveNext] call and [current] returns `null.` |
| 949 /// or after calling [cancel]. The subscription is always canceled. | 922 // |
| 950 static const int _STATE_DONE = 1; | 923 // When `_subscription != null` and `_subscription.isPaused`: |
| 951 /// State set after calling [moveNext], but before its returned future has | 924 // The most recent call to [moveNext] has completed with a `true` value |
| 952 /// completed. Calling [moveNext] again is not allowed in this state. | 925 // and [current] provides the value of the data event. |
| 953 /// The subscription is actively listening. | 926 // The [_stateData] field contains the [current] value. |
| 954 static const int _STATE_MOVING = 2; | 927 // |
| 955 /// States set when another event occurs while in _STATE_FOUND. | 928 // When `_subscription == null` and `_stateData == null`: |
| 956 /// This extra overflow event is cached until the next call to [moveNext], | 929 // The stream has completed or been canceled using [cancel]. |
| 957 /// which will complete as if it received the event normally. | 930 // The stream completes on either a done event or an error event. |
| 958 /// The subscription is paused in these states, so we only ever get one | 931 // The last call to [moveNext] has completed with `false` and [current] |
| 959 /// event too many. | 932 // returns `null`. |
| 960 static const int _STATE_EXTRA_DATA = 3; | |
| 961 static const int _STATE_EXTRA_ERROR = 4; | |
| 962 static const int _STATE_EXTRA_DONE = 5; | |
| 963 | 933 |
| 964 /// Subscription being listened to. | 934 /// Subscription being listened to. |
| 935 /// | |
| 936 /// Set to `null` when the stream subscription is done or canceled. | |
| 965 StreamSubscription _subscription; | 937 StreamSubscription _subscription; |
| 966 | 938 |
| 967 /// The current element represented by the most recent call to moveNext. | 939 /// Data value depending on the current state. |
| 968 /// | 940 /// |
| 969 /// Is null between the time moveNext is called and its future completes. | 941 /// Before first call to [moveNext]: The stream to listen to. |
| 970 T _current = null; | 942 /// |
| 943 /// After calling [moveNext] but before the returned future completes: | |
| 944 /// The returned future. | |
| 945 /// | |
| 946 /// After calling [moveNext] and the returned future has completed | |
| 947 /// with `true`: The value of [current]. | |
| 948 /// | |
| 949 /// After calling [moveNext] and the returned future has completed | |
| 950 /// with `false`, or after calling [cancel]: `null`. | |
| 951 Object _stateData; | |
| 971 | 952 |
| 972 /// The future returned by the most recent call to [moveNext]. | 953 _StreamIterator(final Stream<T> stream) : _stateData = stream; |
| 973 /// | |
| 974 /// Also used to store the next value/error in case the stream provides an | |
| 975 /// event before [moveNext] is called again. In that case, the stream will | |
| 976 /// be paused to prevent further events. | |
| 977 var/*Future<bool> or T*/ _futureOrPrefetch = null; | |
| 978 | 954 |
| 979 /// The current state. | 955 T get current { |
| 980 int _state = _STATE_FOUND; | 956 if (_subscription != null && _subscription.isPaused) { |
| 981 | 957 return _stateData; |
|
floitsch
2016/07/14 13:29:25
Please make sure that this is still strong-mode cl
| |
| 982 _StreamIteratorImpl(final Stream<T> stream) { | 958 } |
| 983 _subscription = stream.listen(_onData, | 959 return null; |
| 984 onError: _onError, | |
| 985 onDone: _onDone, | |
| 986 cancelOnError: true); | |
| 987 } | 960 } |
| 988 | 961 |
| 989 T get current => _current; | |
| 990 | |
| 991 Future<bool> moveNext() { | 962 Future<bool> moveNext() { |
| 992 if (_state == _STATE_DONE) { | 963 if (_subscription != null) { |
| 993 return new _Future<bool>.immediate(false); | 964 if (_subscription.isPaused) { |
| 994 } | 965 var future = new _Future<bool>(); |
| 995 if (_state == _STATE_MOVING) { | 966 _stateData = future; |
| 967 _subscription.resume(); | |
| 968 return future; | |
| 969 } | |
| 996 throw new StateError("Already waiting for next."); | 970 throw new StateError("Already waiting for next."); |
| 997 } | 971 } |
| 998 if (_state == _STATE_FOUND) { | 972 return _initializeOrDone(); |
| 999 _state = _STATE_MOVING; | |
| 1000 _current = null; | |
| 1001 var result = new _Future<bool>(); | |
| 1002 _futureOrPrefetch = result; | |
| 1003 return result; | |
| 1004 } else { | |
| 1005 assert(_state >= _STATE_EXTRA_DATA); | |
| 1006 switch (_state) { | |
| 1007 case _STATE_EXTRA_DATA: | |
| 1008 _state = _STATE_FOUND; | |
| 1009 _current = _futureOrPrefetch as Object /*=T*/; | |
| 1010 _futureOrPrefetch = null; | |
| 1011 _subscription.resume(); | |
| 1012 return new _Future<bool>.immediate(true); | |
| 1013 case _STATE_EXTRA_ERROR: | |
| 1014 AsyncError prefetch = _futureOrPrefetch; | |
| 1015 _clear(); | |
| 1016 return new _Future<bool>.immediateError(prefetch.error, | |
| 1017 prefetch.stackTrace); | |
| 1018 case _STATE_EXTRA_DONE: | |
| 1019 _clear(); | |
| 1020 return new _Future<bool>.immediate(false); | |
| 1021 } | |
| 1022 } | |
| 1023 } | 973 } |
| 1024 | 974 |
| 1025 /** Clears up the internal state when the iterator ends. */ | 975 /// Called if there is no active subscription when [moveNext] is called. |
| 1026 void _clear() { | 976 /// |
| 1027 _subscription = null; | 977 /// Either starts listening on the stream if this is the first call to |
| 1028 _futureOrPrefetch = null; | 978 /// [moveNext], or returns a `false` future because the stream has already |
| 1029 _current = null; | 979 /// ended. |
| 1030 _state = _STATE_DONE; | 980 Future<bool> _initializeOrDone() { |
| 981 assert(_subscription == null); | |
| 982 var stateData = _stateData; | |
| 983 if (stateData != null) { | |
| 984 Stream<T> stream = stateData; | |
| 985 _subscription = stream.listen( | |
| 986 _onData, onError: _onError, onDone: _onDone, cancelOnError: true); | |
| 987 var future = new _Future<bool>(); | |
| 988 _stateData = future; | |
| 989 return future; | |
| 990 } | |
| 991 return new _Future.immediate(false); | |
| 1031 } | 992 } |
| 1032 | 993 |
| 1033 Future cancel() { | 994 Future cancel() { |
| 1034 StreamSubscription subscription = _subscription; | 995 StreamSubscription<T> subscription = _subscription; |
| 1035 if (subscription == null) return null; | 996 Object stateData = _stateData; |
| 1036 if (_state == _STATE_MOVING) { | 997 _stateData = null; |
| 1037 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 998 if (subscription != null) { |
| 1038 _clear(); | 999 _subscription = null; |
| 1039 hasNext._complete(false); | 1000 if (!subscription.isPaused) { |
| 1040 } else { | 1001 _Future<bool> future = stateData; |
| 1041 _clear(); | 1002 future._asyncComplete(false); |
| 1003 } | |
| 1004 return subscription.cancel(); | |
| 1042 } | 1005 } |
| 1043 return subscription.cancel(); | 1006 return null; |
| 1044 } | 1007 } |
| 1045 | 1008 |
| 1046 void _onData(T data) { | 1009 void _onData(T data) { |
| 1047 if (_state == _STATE_MOVING) { | 1010 assert(_subscription != null && !_subscription.isPaused); |
| 1048 _current = data; | 1011 _Future<bool> moveNextFuture = _stateData; |
| 1049 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1012 _stateData = data; |
| 1050 _futureOrPrefetch = null; | |
| 1051 _state = _STATE_FOUND; | |
| 1052 hasNext._complete(true); | |
| 1053 return; | |
| 1054 } | |
| 1055 _subscription.pause(); | 1013 _subscription.pause(); |
| 1056 assert(_futureOrPrefetch == null); | 1014 moveNextFuture._complete(true); |
| 1057 _futureOrPrefetch = data; | |
| 1058 _state = _STATE_EXTRA_DATA; | |
| 1059 } | 1015 } |
| 1060 | 1016 |
| 1061 void _onError(Object error, [StackTrace stackTrace]) { | 1017 void _onError(Object error, [StackTrace stackTrace]) { |
| 1062 if (_state == _STATE_MOVING) { | 1018 assert(_subscription != null && !_subscription.isPaused); |
| 1063 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1019 _Future<bool> moveNextFuture = _stateData; |
| 1064 // We have cancelOnError: true, so the subscription is canceled. | 1020 _subscription = null; |
| 1065 _clear(); | 1021 _stateData = null; |
| 1066 hasNext._completeError(error, stackTrace); | 1022 moveNextFuture._completeError(error, stackTrace); |
| 1067 return; | |
| 1068 } | |
| 1069 _subscription.pause(); | |
| 1070 assert(_futureOrPrefetch == null); | |
| 1071 _futureOrPrefetch = new AsyncError(error, stackTrace); | |
| 1072 _state = _STATE_EXTRA_ERROR; | |
| 1073 } | 1023 } |
| 1074 | 1024 |
| 1075 void _onDone() { | 1025 void _onDone() { |
| 1076 if (_state == _STATE_MOVING) { | 1026 assert(_subscription != null && !_subscription.isPaused); |
| 1077 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | 1027 _Future<bool> moveNextFuture = _stateData; |
| 1078 _clear(); | 1028 _subscription = null; |
| 1079 hasNext._complete(false); | 1029 _stateData = null; |
| 1080 return; | 1030 moveNextFuture._complete(false); |
| 1081 } | |
| 1082 _subscription.pause(); | |
| 1083 _futureOrPrefetch = null; | |
| 1084 _state = _STATE_EXTRA_DONE; | |
| 1085 } | 1031 } |
| 1086 } | 1032 } |
| 1087 | 1033 |
| 1088 /** An empty broadcast stream, sending a done event as soon as possible. */ | 1034 /** An empty broadcast stream, sending a done event as soon as possible. */ |
| 1089 class _EmptyStream<T> extends Stream<T> { | 1035 class _EmptyStream<T> extends Stream<T> { |
| 1090 const _EmptyStream() : super._internal(); | 1036 const _EmptyStream() : super._internal(); |
| 1091 bool get isBroadcast => true; | 1037 bool get isBroadcast => true; |
| 1092 StreamSubscription<T> listen(void onData(T data), | 1038 StreamSubscription<T> listen(void onData(T data), |
| 1093 {Function onError, | 1039 {Function onError, |
| 1094 void onDone(), | 1040 void onDone(), |
| 1095 bool cancelOnError}) { | 1041 bool cancelOnError}) { |
| 1096 return new _DoneStreamSubscription<T>(onDone); | 1042 return new _DoneStreamSubscription<T>(onDone); |
| 1097 } | 1043 } |
| 1098 } | 1044 } |
| OLD | NEW |