OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
8 | 8 |
9 // Completion state of the stream. | 9 // Completion state of the stream. |
10 /// Initial and default state where the stream can receive and send events. | 10 /// Initial and default state where the stream can receive and send events. |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
59 // ------------------------------------------------------------------- | 59 // ------------------------------------------------------------------- |
60 // Common base class for single and multi-subscription streams. | 60 // Common base class for single and multi-subscription streams. |
61 // ------------------------------------------------------------------- | 61 // ------------------------------------------------------------------- |
62 abstract class _StreamImpl<T> extends Stream<T> { | 62 abstract class _StreamImpl<T> extends Stream<T> { |
63 /** Current state of the stream. */ | 63 /** Current state of the stream. */ |
64 int _state = _STREAM_OPEN; | 64 int _state = _STREAM_OPEN; |
65 | 65 |
66 /** | 66 /** |
67 * List of pending events. | 67 * List of pending events. |
68 * | 68 * |
69 * If events are added to the stream (using [_add], [_signalError] or [_done]) | 69 * If events are added to the stream (using [_add], [_addError] or [_done]) |
70 * while the stream is paused, or while another event is firing, events will | 70 * while the stream is paused, or while another event is firing, events will |
71 * stored here. | 71 * stored here. |
72 * Also supports scheduling the events for later execution. | 72 * Also supports scheduling the events for later execution. |
73 */ | 73 */ |
74 _PendingEvents _pendingEvents; | 74 _PendingEvents _pendingEvents; |
75 | 75 |
76 // ------------------------------------------------------------------ | 76 // ------------------------------------------------------------------ |
77 // Stream interface. | 77 // Stream interface. |
78 | 78 |
79 StreamSubscription<T> listen(void onData(T data), | 79 StreamSubscription<T> listen(void onData(T data), |
80 { void onError(AsyncError error), | 80 { void onError(AsyncError error), |
81 void onDone(), | 81 void onDone(), |
82 bool unsubscribeOnError }) { | 82 bool unsubscribeOnError }) { |
83 if (_isComplete) { | 83 if (_isComplete) { |
84 return new _DoneSubscription(onDone); | 84 return new _DoneSubscription(onDone); |
85 } | 85 } |
86 if (onData == null) onData = _nullDataHandler; | 86 if (onData == null) onData = _nullDataHandler; |
87 if (onError == null) onError = _nullErrorHandler; | 87 if (onError == null) onError = _nullErrorHandler; |
88 if (onDone == null) onDone = _nullDoneHandler; | 88 if (onDone == null) onDone = _nullDoneHandler; |
89 unsubscribeOnError = identical(true, unsubscribeOnError); | 89 unsubscribeOnError = identical(true, unsubscribeOnError); |
90 _StreamSubscriptionImpl subscription = | 90 _StreamSubscriptionImpl subscription = |
91 _createSubscription(onData, onError, onDone, unsubscribeOnError); | 91 _createSubscription(onData, onError, onDone, unsubscribeOnError); |
92 _addListener(subscription); | 92 _addListener(subscription); |
93 return subscription; | 93 return subscription; |
94 } | 94 } |
95 | 95 |
96 // ------------------------------------------------------------------ | 96 // ------------------------------------------------------------------ |
97 // StreamSink interface-like methods for sending events into the stream. | 97 // EventSink interface-like methods for sending events into the stream. |
98 // It's the responsibility of the caller to ensure that the stream is not | 98 // It's the responsibility of the caller to ensure that the stream is not |
99 // paused when adding events. If the stream is paused, the events will be | 99 // paused when adding events. If the stream is paused, the events will be |
100 // queued, but it's better to not send events at all. | 100 // queued, but it's better to not send events at all. |
101 | 101 |
102 /** | 102 /** |
103 * Send or queue a data event. | 103 * Send or queue a data event. |
104 */ | 104 */ |
105 void _add(T value) { | 105 void _add(T value) { |
106 if (_isClosed) throw new StateError("Sending on closed stream"); | 106 if (_isClosed) throw new StateError("Sending on closed stream"); |
107 if (!_mayFireState) { | 107 if (!_mayFireState) { |
108 // Not the time to send events. | 108 // Not the time to send events. |
109 _addPendingEvent(new _DelayedData<T>(value)); | 109 _addPendingEvent(new _DelayedData<T>(value)); |
110 return; | 110 return; |
111 } | 111 } |
112 if (_hasPendingEvent) { | 112 if (_hasPendingEvent) { |
113 _addPendingEvent(new _DelayedData<T>(value)); | 113 _addPendingEvent(new _DelayedData<T>(value)); |
114 } else { | 114 } else { |
115 _sendData(value); | 115 _sendData(value); |
116 } | 116 } |
117 _handlePendingEvents(); | 117 _handlePendingEvents(); |
118 } | 118 } |
119 | 119 |
120 /** | 120 /** |
121 * Send or enqueue an error event. | 121 * Send or enqueue an error event. |
122 * | 122 * |
123 * If a subscription has requested to be unsubscribed on errors, | 123 * If a subscription has requested to be unsubscribed on errors, |
124 * it will be unsubscribed after receiving this event. | 124 * it will be unsubscribed after receiving this event. |
125 */ | 125 */ |
126 void _signalError(AsyncError error) { | 126 void _addError(AsyncError error) { |
127 if (_isClosed) throw new StateError("Sending on closed stream"); | 127 if (_isClosed) throw new StateError("Sending on closed stream"); |
128 if (!_mayFireState) { | 128 if (!_mayFireState) { |
129 // Not the time to send events. | 129 // Not the time to send events. |
130 _addPendingEvent(new _DelayedError(error)); | 130 _addPendingEvent(new _DelayedError(error)); |
131 return; | 131 return; |
132 } | 132 } |
133 if (_hasPendingEvent) { | 133 if (_hasPendingEvent) { |
134 _addPendingEvent(new _DelayedError(error)); | 134 _addPendingEvent(new _DelayedError(error)); |
135 } else { | 135 } else { |
136 _sendError(error); | 136 _sendError(error); |
(...skipping 418 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
555 * | 555 * |
556 * The only public methods are those of [Stream], so instances of | 556 * The only public methods are those of [Stream], so instances of |
557 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing | 557 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing |
558 * internal functionality. | 558 * internal functionality. |
559 * | 559 * |
560 * The [StreamController] is a public facing version of this class, with | 560 * The [StreamController] is a public facing version of this class, with |
561 * some methods made public. | 561 * some methods made public. |
562 * | 562 * |
563 * The user interface of [_SingleStreamImpl] are the following methods: | 563 * The user interface of [_SingleStreamImpl] are the following methods: |
564 * * [_add]: Add a data event to the stream. | 564 * * [_add]: Add a data event to the stream. |
565 * * [_signalError]: Add an error event to the stream. | 565 * * [_addError]: Add an error event to the stream. |
566 * * [_close]: Request to close the stream. | 566 * * [_close]: Request to close the stream. |
567 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or | 567 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or |
568 * when losing the last subscriber. | 568 * when losing the last subscriber. |
569 * * [_onPauseStateChange]: Called when entering or leaving paused mode. | 569 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
570 * * [_hasSubscribers]: Test whether there are currently any subscribers. | 570 * * [_hasSubscribers]: Test whether there are currently any subscribers. |
571 * * [_isInputPaused]: Test whether the stream is currently paused. | 571 * * [_isInputPaused]: Test whether the stream is currently paused. |
572 * The user should not add new events while the stream is paused, but if it | 572 * The user should not add new events while the stream is paused, but if it |
573 * happens anyway, the stream will enqueue the events just as when new events | 573 * happens anyway, the stream will enqueue the events just as when new events |
574 * arrive while still firing an old event. | 574 * arrive while still firing an old event. |
575 */ | 575 */ |
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
669 * | 669 * |
670 * The only public methods are those of [Stream], so instances of | 670 * The only public methods are those of [Stream], so instances of |
671 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing | 671 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing |
672 * internal functionality. | 672 * internal functionality. |
673 * | 673 * |
674 * The [StreamController] is a public facing version of this class, with | 674 * The [StreamController] is a public facing version of this class, with |
675 * some methods made public. | 675 * some methods made public. |
676 * | 676 * |
677 * The user interface of [_MultiStreamImpl] are the following methods: | 677 * The user interface of [_MultiStreamImpl] are the following methods: |
678 * * [_add]: Add a data event to the stream. | 678 * * [_add]: Add a data event to the stream. |
679 * * [_signalError]: Add an error event to the stream. | 679 * * [_addError]: Add an error event to the stream. |
680 * * [_close]: Request to close the stream. | 680 * * [_close]: Request to close the stream. |
681 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or | 681 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or |
682 * when losing the last subscriber. | 682 * when losing the last subscriber. |
683 * * [_onPauseStateChange]: Called when entering or leaving paused mode. | 683 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
684 * * [_hasSubscribers]: Test whether there are currently any subscribers. | 684 * * [_hasSubscribers]: Test whether there are currently any subscribers. |
685 * * [_isPaused]: Test whether the stream is currently paused. | 685 * * [_isPaused]: Test whether the stream is currently paused. |
686 * The user should not add new events while the stream is paused, but if it | 686 * The user should not add new events while the stream is paused, but if it |
687 * happens anyway, the stream will enqueue the events just as when new events | 687 * happens anyway, the stream will enqueue the events just as when new events |
688 * arrive while still firing an old event. | 688 * arrive while still firing an old event. |
689 */ | 689 */ |
(...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
834 */ | 834 */ |
835 _GeneratedSingleStreamImpl(_PendingEvents events) { | 835 _GeneratedSingleStreamImpl(_PendingEvents events) { |
836 _pendingEvents = events; | 836 _pendingEvents = events; |
837 _setClosed(); // Closed for input since all events are already pending. | 837 _setClosed(); // Closed for input since all events are already pending. |
838 } | 838 } |
839 | 839 |
840 void _add(T value) { | 840 void _add(T value) { |
841 throw new UnsupportedError("Cannot inject events into generated stream"); | 841 throw new UnsupportedError("Cannot inject events into generated stream"); |
842 } | 842 } |
843 | 843 |
844 void _signalError(AsyncError value) { | 844 void _addError(AsyncError value) { |
845 throw new UnsupportedError("Cannot inject events into generated stream"); | 845 throw new UnsupportedError("Cannot inject events into generated stream"); |
846 } | 846 } |
847 | 847 |
848 void _close() { | 848 void _close() { |
849 throw new UnsupportedError("Cannot inject events into generated stream"); | 849 throw new UnsupportedError("Cannot inject events into generated stream"); |
850 } | 850 } |
851 } | 851 } |
852 | 852 |
853 | 853 |
854 /** Pending events object that gets its events from an [Iterable]. */ | 854 /** Pending events object that gets its events from an [Iterable]. */ |
(...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1091 otherNext._previousLink = listLast; | 1091 otherNext._previousLink = listLast; |
1092 _InternalLink otherLast = other._previousLink; | 1092 _InternalLink otherLast = other._previousLink; |
1093 list._previousLink = otherLast; | 1093 list._previousLink = otherLast; |
1094 otherLast._nextLink = list; | 1094 otherLast._nextLink = list; |
1095 // Clean up [other]. | 1095 // Clean up [other]. |
1096 other._nextLink = other._previousLink = other; | 1096 other._nextLink = other._previousLink = other; |
1097 } | 1097 } |
1098 } | 1098 } |
1099 | 1099 |
1100 /** Abstract type for an internal interface for sending events. */ | 1100 /** Abstract type for an internal interface for sending events. */ |
1101 abstract class _StreamOutputSink<T> { | 1101 abstract class _EventOutputSink<T> { |
1102 _sendData(T data); | 1102 _sendData(T data); |
1103 _sendError(AsyncError error); | 1103 _sendError(AsyncError error); |
1104 _sendDone(); | 1104 _sendDone(); |
1105 } | 1105 } |
1106 | 1106 |
1107 abstract class _StreamListener<T> extends _InternalLink | 1107 abstract class _StreamListener<T> extends _InternalLink |
1108 implements _StreamOutputSink<T> { | 1108 implements _EventOutputSink<T> { |
1109 final _StreamImpl _source; | 1109 final _StreamImpl _source; |
1110 int _state = _LISTENER_UNSUBSCRIBED; | 1110 int _state = _LISTENER_UNSUBSCRIBED; |
1111 | 1111 |
1112 _StreamListener(this._source); | 1112 _StreamListener(this._source); |
1113 | 1113 |
1114 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); | 1114 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); |
1115 | 1115 |
1116 bool get _isPendingUnsubscribe => | 1116 bool get _isPendingUnsubscribe => |
1117 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; | 1117 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; |
1118 | 1118 |
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1325 } | 1325 } |
1326 | 1326 |
1327 /** | 1327 /** |
1328 * Subscribe or unsubscribe on [_source] depending on whether | 1328 * Subscribe or unsubscribe on [_source] depending on whether |
1329 * [_stream] has subscribers. | 1329 * [_stream] has subscribers. |
1330 */ | 1330 */ |
1331 void _onSubscriptionStateChange() { | 1331 void _onSubscriptionStateChange() { |
1332 if (_hasSubscribers) { | 1332 if (_hasSubscribers) { |
1333 assert(_subscription == null); | 1333 assert(_subscription == null); |
1334 _subscription = _source.listen(this._add, | 1334 _subscription = _source.listen(this._add, |
1335 onError: this._signalError, | 1335 onError: this._addError, |
1336 onDone: this._close); | 1336 onDone: this._close); |
1337 } else { | 1337 } else { |
1338 // TODO(lrn): Check why this can happen. | 1338 // TODO(lrn): Check why this can happen. |
1339 if (_subscription == null) return; | 1339 if (_subscription == null) return; |
1340 _subscription.cancel(); | 1340 _subscription.cancel(); |
1341 _subscription = null; | 1341 _subscription = null; |
1342 } | 1342 } |
1343 } | 1343 } |
1344 } | 1344 } |
OLD | NEW |