OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 28 matching lines...) Expand all Loading... |
39 * Whether to invoke a callback depends only on the state before and after | 39 * Whether to invoke a callback depends only on the state before and after |
40 * a stream action, for example firing an event. If the state changes multiple | 40 * a stream action, for example firing an event. If the state changes multiple |
41 * times during the action, and then ends up in the same state as before, no | 41 * times during the action, and then ends up in the same state as before, no |
42 * callback is performed. | 42 * callback is performed. |
43 * | 43 * |
44 * If listeners are added after the stream has completed (sent a "done" event), | 44 * If listeners are added after the stream has completed (sent a "done" event), |
45 * the listeners will be sent a "done" event eventually, but they won't affect | 45 * the listeners will be sent a "done" event eventually, but they won't affect |
46 * the stream at all, and won't trigger callbacks. From the controller's point | 46 * the stream at all, and won't trigger callbacks. From the controller's point |
47 * of view, the stream is completely inert when has completed. | 47 * of view, the stream is completely inert when has completed. |
48 */ | 48 */ |
49 abstract class StreamController<T> implements EventSink<T> { | 49 class StreamController<T> extends EventSink<T> { |
50 /** The stream that this controller is controlling. */ | 50 final _StreamImpl<T> stream; |
51 Stream<T> get stream; | |
52 | 51 |
53 /** | 52 /** |
54 * A controller with a [stream] that supports only one single subscriber. | 53 * A controller with a [stream] that supports only one single subscriber. |
55 * | 54 * |
56 * The controller will buffer all incoming events until the subscriber is | 55 * The controller will buffer all incoming events until the subscriber is |
57 * registered. | 56 * registered. |
58 * | 57 * |
59 * The [onPause] function is called when the stream becomes | 58 * The [onPause] function is called when the stream becomes |
60 * paused. [onResume] is called when the stream resumed. | 59 * paused. [onResume] is called when the stream resumed. |
61 * | 60 * |
62 * The [onListen] callback is called when the stream | 61 * The [onListen] callback is called when the stream |
63 * receives its listener and [onCancel] when the listener ends | 62 * receives its listener. [onCancel] when the listener cancels |
64 * its subscription. | 63 * its subscription. |
65 * | 64 * |
66 * If the stream is canceled before the controller needs new data the | 65 * If the stream is canceled before the controller needs new data the |
67 * [onResume] call might not be executed. | 66 * [onResume] call might not be executed. |
68 */ | 67 */ |
69 factory StreamController({void onListen(), | 68 StreamController({void onListen(), |
70 void onPause(), | 69 void onPause(), |
71 void onResume(), | 70 void onResume(), |
72 void onCancel()}) | 71 void onCancel()}) |
73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); | 72 : stream = new _SingleControllerStream<T>( |
74 | 73 onListen, onPause, onResume, onCancel); |
75 /** | |
76 * A controller where [stream] creates new stream each time it is read. | |
77 * | |
78 * The controller distributes any events to all currently subscribed streams. | |
79 * | |
80 * The [onListen] callback is called when the first listener is subscribed, | |
81 * and the [onCancel] is called when there is no longer any active listeners. | |
82 * If a listener is added again later, after the [onCancel] was called, | |
83 * the [onListen] will be called again. | |
84 */ | |
85 factory StreamController.multiplex({void onListen(), void onCancel()}) { | |
86 return new _MultiplexStreamController<T>(onListen, onCancel); | |
87 } | |
88 | 74 |
89 /** | 75 /** |
90 * Returns a view of this object that only exposes the [EventSink] interface. | 76 * Returns a view of this object that only exposes the [EventSink] interface. |
91 */ | 77 */ |
92 EventSink<T> get sink; | 78 EventSink<T> get sink => new _EventSinkView<T>(this); |
93 | 79 |
94 /** | 80 /** |
95 * Whether the stream is closed for adding more events. | 81 * Whether the stream is closed for adding more events. |
96 * | 82 * |
97 * If true, the "done" event might not have fired yet, but it has been | 83 * If true, the "done" event might not have fired yet, but it has been |
98 * scheduled, and it is too late to add more events. | 84 * scheduled, and it is too late to add more events. |
99 */ | 85 */ |
100 bool get isClosed; | 86 bool get isClosed => stream._isClosed; |
101 | 87 |
102 /** Whether the subscription is active and paused. */ | 88 /** Whether one or more active subscribers have requested a pause. */ |
103 bool get isPaused; | 89 bool get isPaused => stream._isInputPaused; |
104 | 90 |
105 /** Whether there is a subscriber on the [Stream]. */ | 91 /** Whether there are currently any subscribers on this [Stream]. */ |
106 bool get hasListener; | 92 bool get hasListener => stream._hasListener; |
| 93 |
| 94 /** |
| 95 * Send or queue a data event. |
| 96 */ |
| 97 void add(T value) => stream._add(value); |
107 | 98 |
108 /** | 99 /** |
109 * Send or enqueue an error event. | 100 * Send or enqueue an error event. |
110 * | 101 * |
111 * Also allows an objection stack trace object, on top of what [EventSink] | 102 * If a subscription has requested to be unsubscribed on errors, |
112 * allows. | 103 * it will be unsubscribed after receiving this event. |
113 */ | |
114 void addError(Object error, [Object stackTrace]); | |
115 } | |
116 | |
117 | |
118 abstract class _StreamControllerLifecycle<T> { | |
119 void _recordListen(StreamSubscription<T> subscription) {} | |
120 void _recordPause(StreamSubscription<T> subscription) {} | |
121 void _recordResume(StreamSubscription<T> subscription) {} | |
122 void _recordCancel(StreamSubscription<T> subscription) {} | |
123 } | |
124 | |
125 /** | |
126 * Default implementation of [StreamController]. | |
127 * | |
128 * Controls a stream that only supports a single controller. | |
129 */ | |
130 class _StreamControllerImpl<T> implements StreamController<T>, | |
131 _StreamControllerLifecycle<T> { | |
132 static const int _STATE_OPEN = 0; | |
133 static const int _STATE_CANCELLED = 1; | |
134 static const int _STATE_CLOSED = 2; | |
135 | |
136 final _NotificationHandler _onListen; | |
137 final _NotificationHandler _onPause; | |
138 final _NotificationHandler _onResume; | |
139 final _NotificationHandler _onCancel; | |
140 _StreamImpl<T> _stream; | |
141 | |
142 // An active subscription on the stream, or null if no subscripton is active. | |
143 _ControllerSubscription<T> _subscription; | |
144 | |
145 // Whether we have sent a "done" event. | |
146 int _state = _STATE_OPEN; | |
147 | |
148 // Events added to the stream before it has an active subscription. | |
149 _PendingEvents _pendingEvents = null; | |
150 | |
151 _StreamControllerImpl(this._onListen, | |
152 this._onPause, | |
153 this._onResume, | |
154 this._onCancel) { | |
155 _stream = new _ControllerStream<T>(this); | |
156 } | |
157 | |
158 Stream<T> get stream => _stream; | |
159 | |
160 /** | |
161 * Returns a view of this object that only exposes the [EventSink] interface. | |
162 */ | |
163 EventSink<T> get sink => new _EventSinkView<T>(this); | |
164 | |
165 /** | |
166 * Whether a listener has existed and been cancelled. | |
167 * | |
168 * After this, adding more events will be ignored. | |
169 */ | |
170 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; | |
171 | |
172 bool get isClosed => (_state & _STATE_CLOSED) != 0; | |
173 | |
174 bool get isPaused => _subscription != null && _subscription._isInputPaused; | |
175 | |
176 bool get hasListener => _subscription != null; | |
177 | |
178 /** | |
179 * Send or queue a data event. | |
180 */ | |
181 void add(T value) { | |
182 if (isClosed) throw new StateError("Adding event after close"); | |
183 if (_subscription != null) { | |
184 _subscription._add(value); | |
185 } else if (!_isCancelled) { | |
186 _addPendingEvent(new _DelayedData<T>(value)); | |
187 } | |
188 } | |
189 | |
190 /** | |
191 * Send or enqueue an error event. | |
192 */ | 104 */ |
193 void addError(Object error, [Object stackTrace]) { | 105 void addError(Object error, [Object stackTrace]) { |
194 if (isClosed) throw new StateError("Adding event after close"); | |
195 if (stackTrace != null) { | 106 if (stackTrace != null) { |
196 // Force stack trace overwrite. Even if the error already contained | 107 // Force stack trace overwrite. Even if the error already contained |
197 // a stack trace. | 108 // a stack trace. |
198 _attachStackTrace(error, stackTrace); | 109 _attachStackTrace(error, stackTrace); |
199 } | 110 } |
200 if (_subscription != null) { | 111 stream._addError(error); |
201 _subscription._addError(error); | |
202 } else if (!_isCancelled) { | |
203 _addPendingEvent(new _DelayedError(error)); | |
204 } | |
205 } | 112 } |
206 | 113 |
207 /** | 114 /** |
208 * Closes this controller. | 115 * Send or enqueue a "done" message. |
209 * | 116 * |
210 * After closing, no further events may be added using [add] or [addError]. | 117 * The "done" message should be sent at most once by a stream, and it |
211 * | 118 * should be the last message sent. |
212 * You are allowed to close the controller more than once, but only the first | |
213 * call has any effect. | |
214 * | |
215 * The first time a controller is closed, a "done" event is sent to its | |
216 * stream. | |
217 */ | 119 */ |
218 void close() { | 120 void close() { stream._close(); } |
219 if (isClosed) return; | |
220 _state |= _STATE_CLOSED; | |
221 if (_subscription != null) { | |
222 _subscription._close(); | |
223 } else if (!_isCancelled) { | |
224 _addPendingEvent(const _DelayedDone()); | |
225 } | |
226 } | |
227 | |
228 void _addPendingEvent(_DelayedEvent event) { | |
229 if (_isCancelled) return; | |
230 _StreamImplEvents events = _pendingEvents; | |
231 if (events == null) { | |
232 events = _pendingEvents = new _StreamImplEvents(); | |
233 } | |
234 events.add(event); | |
235 } | |
236 | |
237 void _recordListen(_BufferingStreamSubscription<T> subscription) { | |
238 assert(_subscription == null); | |
239 _subscription = subscription; | |
240 subscription._setPendingEvents(_pendingEvents); | |
241 _pendingEvents = null; | |
242 subscription._guardCallback(() { | |
243 _runGuarded(_onListen); | |
244 }); | |
245 } | |
246 | |
247 void _recordCancel(StreamSubscription<T> subscription) { | |
248 assert(identical(_subscription, subscription)); | |
249 _subscription = null; | |
250 _state |= _STATE_CANCELLED; | |
251 _runGuarded(_onCancel); | |
252 } | |
253 | |
254 void _recordPause(StreamSubscription<T> subscription) { | |
255 _runGuarded(_onPause); | |
256 } | |
257 | |
258 void _recordResume(StreamSubscription<T> subscription) { | |
259 _runGuarded(_onResume); | |
260 } | |
261 } | 121 } |
262 | 122 |
263 typedef void _NotificationHandler(); | 123 typedef void _NotificationHandler(); |
264 | 124 |
265 void _runGuarded(_NotificationHandler notificationHandler) { | 125 class _SingleControllerStream<T> extends _SingleStreamImpl<T> { |
266 if (notificationHandler == null) return; | 126 _NotificationHandler _onListen; |
267 try { | 127 _NotificationHandler _onPause; |
268 notificationHandler(); | 128 _NotificationHandler _onResume; |
269 } catch (e, s) { | 129 _NotificationHandler _onCancel; |
270 _throwDelayed(e, s); | |
271 } | |
272 } | |
273 | 130 |
274 class _ControllerStream<T> extends _StreamImpl<T> { | 131 // TODO(floitsch): share this code with _MultiControllerStream. |
275 _StreamControllerLifecycle<T> _controller; | 132 _runGuarded(_NotificationHandler notificationHandler) { |
276 bool _hasListener = false; | 133 if (notificationHandler == null) return; |
277 | 134 try { |
278 _ControllerStream(this._controller); | 135 notificationHandler(); |
279 | 136 } catch (e, s) { |
280 StreamSubscription<T> _createSubscription( | 137 _throwDelayed(e, s); |
281 void onData(T data), | |
282 void onError(Object error), | |
283 void onDone(), | |
284 bool cancelOnError) { | |
285 if (_hasListener) { | |
286 throw new StateError("The stream has already been listened to."); | |
287 } | |
288 _hasListener = true; | |
289 return new _ControllerSubscription<T>( | |
290 _controller, onData, onError, onDone, cancelOnError); | |
291 } | |
292 | |
293 void _onListen(_BufferingStreamSubscription subscription) { | |
294 _controller._recordListen(subscription); | |
295 } | |
296 } | |
297 | |
298 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | |
299 final _StreamControllerLifecycle<T> _controller; | |
300 | |
301 _ControllerSubscription(this._controller, | |
302 void onData(T data), | |
303 void onError(Object error), | |
304 void onDone(), | |
305 bool cancelOnError) | |
306 : super(onData, onError, onDone, cancelOnError); | |
307 | |
308 void _onCancel() { | |
309 _controller._recordCancel(this); | |
310 } | |
311 | |
312 void _onPause() { | |
313 _controller._recordPause(this); | |
314 } | |
315 | |
316 void _onResume() { | |
317 _controller._recordResume(this); | |
318 } | |
319 } | |
320 | |
321 class _MultiplexStreamController<T> implements StreamController<T>, | |
322 _StreamControllerLifecycle<T> { | |
323 final _NotificationHandler _onListen; | |
324 final _NotificationHandler _onCancel; | |
325 /** Set when the [close] method is called. */ | |
326 bool _isClosed = false; | |
327 | |
328 // TODO(lrn): Make a more efficient implementation of these subscriptions, | |
329 // e.g., the traditional double-linked list with concurrent add and remove | |
330 // while firing. | |
331 Set<_BufferingStreamSubscription<T>> _streams; | |
332 | |
333 _MultiplexStreamController(this._onListen, this._onCancel) | |
334 : _streams = new Set<_BufferingStreamSubscription<T>>(); | |
335 | |
336 // StreamController interface. | |
337 | |
338 Stream<T> get stream => new _ControllerStream<T>(this); | |
339 | |
340 EventSink<T> get sink => new _EventSinkView<T>(this); | |
341 | |
342 bool get isClosed => _isClosed; | |
343 | |
344 /** | |
345 * A multiplex controller is never paused. | |
346 * | |
347 * Each receiving stream may be paused individually, and they handle their | |
348 * own buffering. | |
349 */ | |
350 bool get isPaused => false; | |
351 | |
352 /** Whether there are currently a subscriber on the [Stream]. */ | |
353 bool get hasListener => !_streams.isEmpty; | |
354 | |
355 // _StreamControllerLifecycle interface. | |
356 | |
357 void _recordListen(_BufferingStreamSubscription<T> subscription) { | |
358 bool isFirst = _streams.isEmpty; | |
359 _streams.add(subscription); | |
360 if (isFirst) { | |
361 _runGuarded(_onListen); | |
362 } | 138 } |
363 } | 139 } |
364 | 140 |
365 void _recordCancel(_BufferingStreamSubscription<T> subscription) { | 141 _SingleControllerStream(this._onListen, |
366 _streams.remove(subscription); | 142 this._onPause, |
367 if (_streams.isEmpty) { | 143 this._onResume, |
368 _runGuarded(_onCancel); | 144 this._onCancel); |
369 } | 145 |
| 146 void _onSubscriptionStateChange() { |
| 147 _runGuarded(_hasListener ? _onListen : _onCancel); |
370 } | 148 } |
371 | 149 |
372 void _recordPause(StreamSubscription<T> subscription) {} | 150 void _onPauseStateChange() { |
373 void _recordResume(StreamSubscription<T> subscription) {} | 151 _runGuarded(_isPaused ? _onPause : _onResume); |
374 | |
375 // EventSink interface. | |
376 | |
377 void add(T data) { | |
378 if (_streams.isEmpty) return; | |
379 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
380 subscription._add(data); | |
381 }); | |
382 } | |
383 | |
384 void addError(Object error, [Object stackTrace]) { | |
385 if (_streams.isEmpty) return; | |
386 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
387 subscription._addError(error); | |
388 }); | |
389 } | |
390 | |
391 void close() { | |
392 _isClosed = true; | |
393 if (_streams.isEmpty) return; | |
394 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
395 _streams.remove(subscription); | |
396 subscription._close(); | |
397 }); | |
398 } | |
399 | |
400 void _forEachListener( | |
401 void action(_BufferingStreamSubscription<T> subscription)) { | |
402 List<_BufferingStreamSubscription<T>> subscriptions = _streams.toList(); | |
403 for (_BufferingStreamSubscription<T> subscription in subscriptions) { | |
404 if (_streams.contains(subscription)) { | |
405 action(subscription); | |
406 } | |
407 } | |
408 } | 152 } |
409 } | 153 } |
410 | |
OLD | NEW |