OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 29 matching lines...) Expand all Loading... | |
40 * a stream action, for example firing an event. If the state changes multiple | 40 * a stream action, for example firing an event. If the state changes multiple |
41 * times during the action, and then ends up in the same state as before, no | 41 * times during the action, and then ends up in the same state as before, no |
42 * callback is performed. | 42 * callback is performed. |
43 * | 43 * |
44 * If listeners are added after the stream has completed (sent a "done" event), | 44 * If listeners are added after the stream has completed (sent a "done" event), |
45 * the listeners will be sent a "done" event eventually, but they won't affect | 45 * the listeners will be sent a "done" event eventually, but they won't affect |
46 * the stream at all, and won't trigger callbacks. From the controller's point | 46 * the stream at all, and won't trigger callbacks. From the controller's point |
47 * of view, the stream is completely inert when has completed. | 47 * of view, the stream is completely inert when has completed. |
48 */ | 48 */ |
49 class StreamController<T> extends EventSink<T> { | 49 class StreamController<T> extends EventSink<T> { |
50 final _StreamImpl<T> stream; | 50 static const int _STATE_CANCELLED = 1; |
51 static const int _STATE_CLOSED = 2; | |
52 | |
53 final _NotificationHandler _onListen; | |
54 final _NotificationHandler _onPause; | |
55 final _NotificationHandler _onResume; | |
56 final _NotificationHandler _onCancel; | |
57 _StreamImpl<T> _stream; | |
58 | |
59 // An active subscription on the stream, or null if no subscripton is active. | |
60 _ControllerSubscription<T> _subscription; | |
61 | |
62 // Whether we have sent a "done" event. | |
floitsch
2013/05/22 16:26:29
"Wether" is not right.
Also, what does "0" mean?
Lasse Reichstein Nielsen
2013/05/24 06:02:49
I see "Whether"?
floitsch
2013/05/24 13:53:41
I meant that this doesn't look a boolean. "Whether
| |
63 int _state = 0; | |
64 | |
65 // Events added to the stream before it has an active subscription. | |
66 _PendingEvents _pendingEvents = null; | |
51 | 67 |
52 /** | 68 /** |
53 * | |
54 * If the stream is canceled before the controller needs new data the | 69 * If the stream is canceled before the controller needs new data the |
55 * [onResume] call might not be executed. | 70 * [onResume] call might not be executed. |
56 : stream = new _MultiControllerStream<T>( | |
57 onListen, onPause, onResume, onCancel); | |
floitsch
2013/05/22 16:26:29
You will have to merge this. I removed lines 53 to
Lasse Reichstein Nielsen
2013/05/24 06:02:49
ACK.
| |
58 * A controller with a [stream] that supports only one single subscriber. | 71 * A controller with a [stream] that supports only one single subscriber. |
59 * | 72 * |
60 * The controller will buffer all incoming events until the subscriber is | 73 * The controller will buffer all incoming events until the subscriber is |
61 * registered. | 74 * registered. |
62 * | 75 * |
63 * The [onPause] function is called when the stream becomes | 76 * The [onPause] function is called when the stream becomes |
64 * paused. [onResume] is called when the stream resumed. | 77 * paused. [onResume] is called when the stream resumed. |
65 * | 78 * |
66 * The [onListen] callback is called when the stream | 79 * The [onListen] callback is called when the stream |
67 * receives its listener. [onCancel] when the listener cancels | 80 * receives its listener. [onCancel] when the listener cancels |
68 * its subscription. | 81 * its subscription. |
69 * | 82 * |
70 * If the stream is canceled before the controller needs new data the | 83 * If the stream is canceled before the controller needs new data the |
71 * [onResume] call might not be executed. | 84 * [onResume] call might not be executed. |
72 */ | 85 */ |
73 StreamController({void onListen(), | 86 StreamController({void onListen(), |
74 void onPause(), | 87 void onPause(), |
75 void onResume(), | 88 void onResume(), |
76 void onCancel()}) | 89 void onCancel()}) |
77 : stream = new _SingleControllerStream<T>( | 90 : _onListen = onListen, |
78 onListen, onPause, onResume, onCancel); | 91 _onPause = onPause, |
92 _onResume = onResume, | |
93 _onCancel = onCancel { | |
94 _stream = new _ControllerStream<T>(this); | |
95 } | |
96 | |
97 Stream<T> get stream => _stream; | |
79 | 98 |
80 /** | 99 /** |
81 * Returns a view of this object that only exposes the [EventSink] interface. | 100 * Returns a view of this object that only exposes the [EventSink] interface. |
82 */ | 101 */ |
83 EventSink<T> get sink => new _EventSinkView<T>(this); | 102 EventSink<T> get sink => new _EventSinkView<T>(this); |
84 | 103 |
85 /** | 104 /** |
105 * Whether a listener has existed and been cancelled. | |
106 * | |
107 * After this, adding more events will be ignored. | |
108 */ | |
109 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; | |
110 | |
111 /** | |
86 * Whether the stream is closed for adding more events. | 112 * Whether the stream is closed for adding more events. |
87 * | 113 * |
88 * If true, the "done" event might not have fired yet, but it has been | 114 * If true, the "done" event might not have fired yet, but it has been |
89 * scheduled, and it is too late to add more events. | 115 * scheduled, and it is too late to add more events. |
90 */ | 116 */ |
91 bool get isClosed => stream._isClosed; | 117 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
92 | 118 |
93 /** Whether one or more active subscribers have requested a pause. */ | 119 /** Whether the subscription is active and paused. */ |
94 bool get isPaused => stream._isInputPaused; | 120 bool get isPaused => _subscription != null && _subscription._isInputPaused; |
95 | 121 |
96 /** Whether there are currently any subscribers on this [Stream]. */ | 122 /** Whether there are currently any subscribers on the [Stream]. */ |
floitsch
2013/05/22 16:26:29
there is currently a subscriber on the [Stream].
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
| |
97 bool get hasListener => stream._hasListener; | 123 bool get hasListener => _subscription != null; |
98 | 124 |
99 /** | 125 /** |
100 * Send or queue a data event. | 126 * Send or queue a data event. |
101 */ | 127 */ |
102 void add(T value) => stream._add(value); | 128 void add(T value) { |
129 if (isClosed) throw new StateError("Adding event after close"); | |
130 if (_subscription != null) { | |
131 _subscription._add(value); | |
132 } else if (!_isCancelled) { | |
133 _addPendingEvent(new _DelayedData<T>(value)); | |
134 } | |
135 } | |
103 | 136 |
104 /** | 137 /** |
105 * Send or enqueue an error event. | 138 * Send or enqueue an error event. |
106 * | 139 * |
107 * If a subscription has requested to be unsubscribed on errors, | 140 * If the subscription has requested to be unsubscribed on errors, |
floitsch
2013/05/22 16:26:29
Should we keep this here? It's not really a Contro
Lasse Reichstein Nielsen
2013/05/24 06:02:49
I'm fine with removing it.
| |
108 * it will be unsubscribed after receiving this event. | 141 * it will be unsubscribed after receiving this event. |
109 */ | 142 */ |
110 void addError(Object error, [Object stackTrace]) { | 143 void addError(Object error, [Object stackTrace]) { |
144 if (isClosed) throw new StateError("Adding event after close"); | |
111 if (stackTrace != null) { | 145 if (stackTrace != null) { |
112 // Force stack trace overwrite. Even if the error already contained | 146 // Force stack trace overwrite. Even if the error already contained |
113 // a stack trace. | 147 // a stack trace. |
114 _attachStackTrace(error, stackTrace); | 148 _attachStackTrace(error, stackTrace); |
115 } | 149 } |
116 stream._addError(error); | 150 if (_subscription != null) { |
151 _subscription._addError(error); | |
152 } else if (!_isCancelled) { | |
153 _addPendingEvent(new _DelayedError(error)); | |
154 } | |
117 } | 155 } |
118 | 156 |
119 /** | 157 /** |
120 * Send or enqueue a "done" message. | 158 * Send or enqueue a "done" message. |
121 * | 159 * |
122 * The "done" message should be sent at most once by a stream, and it | 160 * The "done" message should be sent at most once by a stream, and it |
floitsch
2013/05/22 16:26:29
Too many "should"s.
Maybe:
Closes this controller.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
sounds fine.
| |
123 * should be the last message sent. | 161 * should be the last message sent. |
124 */ | 162 */ |
125 void close() { stream._close(); } | 163 void close() { |
164 if (isClosed) return; | |
165 _state |= _STATE_CLOSED; | |
166 if (_subscription != null) { | |
167 _subscription._close(); | |
168 } else if (!_isCancelled) { | |
169 _addPendingEvent(const _DelayedDone()); | |
170 } | |
171 } | |
172 | |
173 void _addPendingEvent(_DelayedEvent event) { | |
174 if (_isCancelled) return; | |
175 _StreamImplEvents events = _pendingEvents; | |
176 if (events == null) { | |
177 events = _pendingEvents = new _StreamImplEvents(); | |
178 } | |
179 events.add(event); | |
180 } | |
181 | |
182 void _recordListen(_BufferingStreamSubscription subscription) { | |
183 assert(_subscription == null); | |
184 _subscription = subscription; | |
185 _pendingEvents = null; // These have been taken over by the stream. | |
floitsch
2013/05/22 16:26:29
by the subscription.
I would prefer if we transfe
Lasse Reichstein Nielsen
2013/05/24 06:02:49
ok, will change to not pass them in the constructo
| |
186 _subscription._guardCallback(() { | |
187 _runGuarded(_onListen); | |
188 }); | |
189 } | |
190 | |
191 void _recordCancel() { | |
192 _subscription = null; | |
193 _state |= _STATE_CANCELLED; | |
194 _runGuarded(_onCancel); | |
195 } | |
196 | |
197 void _recordPause() { | |
198 _runGuarded(_onPause); | |
199 } | |
200 | |
201 void _recordResume() { | |
202 _runGuarded(_onResume); | |
203 } | |
126 } | 204 } |
127 | 205 |
128 typedef void _NotificationHandler(); | 206 typedef void _NotificationHandler(); |
129 | 207 |
130 class _SingleControllerStream<T> extends _SingleStreamImpl<T> { | 208 void _runGuarded(_NotificationHandler notificationHandler) { |
131 _NotificationHandler _onListen; | 209 if (notificationHandler == null) return; |
132 _NotificationHandler _onPause; | 210 try { |
133 _NotificationHandler _onResume; | 211 notificationHandler(); |
134 _NotificationHandler _onCancel; | 212 } catch (e, s) { |
213 _throwDelayed(e, s); | |
214 } | |
215 } | |
135 | 216 |
136 // TODO(floitsch): share this code with _MultiControllerStream. | 217 class _ControllerStream<T> extends _StreamImpl<T> { |
137 _runGuarded(_NotificationHandler notificationHandler) { | 218 StreamController _controller; |
138 if (notificationHandler == null) return; | 219 bool _hasListener = false; |
139 try { | 220 |
140 notificationHandler(); | 221 _ControllerStream(this._controller); |
141 } catch (e, s) { | 222 |
142 _throwDelayed(e, s); | 223 StreamSubscription<T> _createSubscription( |
224 void onData(T data), | |
225 void onError(Object error), | |
226 void onDone(), | |
227 bool cancelOnError) { | |
228 if (_hasListener) { | |
229 try { | |
230 throw 0; | |
231 } catch (e, s) { | |
232 print("LISTEN TWICE(#$hashCode)\n$s"); | |
233 } | |
234 throw new StateError("The stream has already been listened to."); | |
143 } | 235 } |
236 //try { throw 0; } catch (e, s) { print("LISTEN ONCE(#$hashCode):\n$s"); } | |
237 _hasListener = true; | |
238 return new _ControllerSubscription<T>( | |
239 _controller, onData, onError, onDone, cancelOnError); | |
144 } | 240 } |
145 | 241 |
146 _SingleControllerStream(this._onListen, | 242 void _onListen(_BufferingStreamSubscription subscription) { |
147 this._onPause, | 243 _controller._recordListen(subscription); |
148 this._onResume, | 244 } |
149 this._onCancel); | 245 } |
150 | 246 |
151 void _onSubscriptionStateChange() { | 247 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
152 _runGuarded(_hasListener ? _onListen : _onCancel); | 248 final StreamController _controller; |
249 | |
250 _ControllerSubscription(StreamController controller, | |
251 void onData(T data), | |
252 void onError(Object error), | |
253 void onDone(), | |
254 bool cancelOnError) | |
255 : _controller = controller, | |
256 super(onData, onError, onDone, cancelOnError, | |
257 controller._pendingEvents); | |
258 | |
259 void _onCancel() { | |
260 _controller._recordCancel(); | |
153 } | 261 } |
154 | 262 |
155 void _onPauseStateChange() { | 263 void _onPause() { |
156 _runGuarded(_isPaused ? _onPause : _onResume); | 264 _controller._recordPause(); |
265 } | |
266 | |
267 void _onResume() { | |
268 _controller._recordResume(); | |
157 } | 269 } |
158 } | 270 } |
OLD | NEW |