OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 28 matching lines...) Expand all Loading... |
39 * Whether to invoke a callback depends only on the state before and after | 39 * Whether to invoke a callback depends only on the state before and after |
40 * a stream action, for example firing an event. If the state changes multiple | 40 * a stream action, for example firing an event. If the state changes multiple |
41 * times during the action, and then ends up in the same state as before, no | 41 * times during the action, and then ends up in the same state as before, no |
42 * callback is performed. | 42 * callback is performed. |
43 * | 43 * |
44 * If listeners are added after the stream has completed (sent a "done" event), | 44 * If listeners are added after the stream has completed (sent a "done" event), |
45 * the listeners will be sent a "done" event eventually, but they won't affect | 45 * the listeners will be sent a "done" event eventually, but they won't affect |
46 * the stream at all, and won't trigger callbacks. From the controller's point | 46 * the stream at all, and won't trigger callbacks. From the controller's point |
47 * of view, the stream is completely inert when has completed. | 47 * of view, the stream is completely inert when has completed. |
48 */ | 48 */ |
49 class StreamController<T> extends EventSink<T> { | 49 abstract class StreamController<T> implements EventSink<T> { |
| 50 /** The stream that this controller is controlling. */ |
| 51 Stream<T> get stream; |
| 52 |
| 53 /** |
| 54 * A controller with a [stream] that supports only one single subscriber. |
| 55 * |
| 56 * The controller will buffer all incoming events until the subscriber is |
| 57 * registered. |
| 58 * |
| 59 * The [onPause] function is called when the stream becomes |
| 60 * paused. [onResume] is called when the stream resumed. |
| 61 * |
| 62 * The [onListen] callback is called when the stream |
| 63 * receives its listener and [onCancel] when the listener ends |
| 64 * its subscription. |
| 65 * |
| 66 * If the stream is canceled before the controller needs new data the |
| 67 * [onResume] call might not be executed. |
| 68 */ |
| 69 factory StreamController({void onListen(), |
| 70 void onPause(), |
| 71 void onResume(), |
| 72 void onCancel()}) |
| 73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); |
| 74 |
| 75 /** |
| 76 * A controller where [stream] creates new stream each time it is read. |
| 77 * |
| 78 * The controller distributes any events to all currently subscribed streams. |
| 79 * |
| 80 * The [onListen] callback is called when the first listener is subscribed, |
| 81 * and the [onCancel] is called when there is no longer any active listeners. |
| 82 * If a listener is added again later, after the [onCancel] was called, |
| 83 * the [onListen] will be called again. |
| 84 */ |
| 85 factory StreamController.multiplex({void onListen(), void onCancel()}) { |
| 86 return new _MultiplexStreamController<T>(onListen, onCancel); |
| 87 } |
| 88 |
| 89 /** |
| 90 * Returns a view of this object that only exposes the [EventSink] interface. |
| 91 */ |
| 92 EventSink<T> get sink; |
| 93 |
| 94 /** |
| 95 * Whether the stream is closed for adding more events. |
| 96 * |
| 97 * If true, the "done" event might not have fired yet, but it has been |
| 98 * scheduled, and it is too late to add more events. |
| 99 */ |
| 100 bool get isClosed; |
| 101 |
| 102 /** Whether the subscription is active and paused. */ |
| 103 bool get isPaused; |
| 104 |
| 105 /** Whether there is a subscriber on the [Stream]. */ |
| 106 bool get hasListener; |
| 107 } |
| 108 |
| 109 |
| 110 abstract class _StreamControllerLifecycle<T> { |
| 111 void _recordListen(StreamSubscription<T> subscription) {} |
| 112 void _recordPause(StreamSubscription<T> subscription) {} |
| 113 void _recordResume(StreamSubscription<T> subscription) {} |
| 114 void _recordCancel(StreamSubscription<T> subscription) {} |
| 115 } |
| 116 |
| 117 /** |
| 118 * Default implementation of [StreamController]. |
| 119 * |
| 120 * Controls a stream that only supports a single controller. |
| 121 */ |
| 122 class _StreamControllerImpl<T> implements StreamController<T>, |
| 123 _StreamControllerLifecycle<T> { |
50 static const int _STATE_OPEN = 0; | 124 static const int _STATE_OPEN = 0; |
51 static const int _STATE_CANCELLED = 1; | 125 static const int _STATE_CANCELLED = 1; |
52 static const int _STATE_CLOSED = 2; | 126 static const int _STATE_CLOSED = 2; |
53 | 127 |
54 final _NotificationHandler _onListen; | 128 final _NotificationHandler _onListen; |
55 final _NotificationHandler _onPause; | 129 final _NotificationHandler _onPause; |
56 final _NotificationHandler _onResume; | 130 final _NotificationHandler _onResume; |
57 final _NotificationHandler _onCancel; | 131 final _NotificationHandler _onCancel; |
58 _StreamImpl<T> _stream; | 132 _StreamImpl<T> _stream; |
59 | 133 |
60 // An active subscription on the stream, or null if no subscripton is active. | 134 // An active subscription on the stream, or null if no subscripton is active. |
61 _ControllerSubscription<T> _subscription; | 135 _ControllerSubscription<T> _subscription; |
62 | 136 |
63 // Whether we have sent a "done" event. | 137 // Whether we have sent a "done" event. |
64 int _state = _STATE_OPEN; | 138 int _state = _STATE_OPEN; |
65 | 139 |
66 // Events added to the stream before it has an active subscription. | 140 // Events added to the stream before it has an active subscription. |
67 _PendingEvents _pendingEvents = null; | 141 _PendingEvents _pendingEvents = null; |
68 | 142 |
69 /** | 143 _StreamControllerImpl(this._onListen, |
70 * A controller with a [stream] that supports only one single subscriber. | 144 this._onPause, |
71 * | 145 this._onResume, |
72 * The controller will buffer all incoming events until the subscriber is | 146 this._onCancel) { |
73 * registered. | |
74 * | |
75 * The [onPause] function is called when the stream becomes | |
76 * paused. [onResume] is called when the stream resumed. | |
77 * | |
78 * The [onListen] callback is called when the stream | |
79 * receives its listener and [onCancel] when the listener ends | |
80 * its subscription. | |
81 * | |
82 * If the stream is canceled before the controller needs new data the | |
83 * [onResume] call might not be executed. | |
84 */ | |
85 StreamController({void onListen(), | |
86 void onPause(), | |
87 void onResume(), | |
88 void onCancel()}) | |
89 : _onListen = onListen, | |
90 _onPause = onPause, | |
91 _onResume = onResume, | |
92 _onCancel = onCancel { | |
93 _stream = new _ControllerStream<T>(this); | 147 _stream = new _ControllerStream<T>(this); |
94 } | 148 } |
95 | 149 |
96 Stream<T> get stream => _stream; | 150 Stream<T> get stream => _stream; |
97 | 151 |
98 /** | 152 /** |
99 * Returns a view of this object that only exposes the [EventSink] interface. | 153 * Returns a view of this object that only exposes the [EventSink] interface. |
100 */ | 154 */ |
101 EventSink<T> get sink => new _EventSinkView<T>(this); | 155 EventSink<T> get sink => new _EventSinkView<T>(this); |
102 | 156 |
103 /** | 157 /** |
104 * Whether a listener has existed and been cancelled. | 158 * Whether a listener has existed and been cancelled. |
105 * | 159 * |
106 * After this, adding more events will be ignored. | 160 * After this, adding more events will be ignored. |
107 */ | 161 */ |
108 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; | 162 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; |
109 | 163 |
110 /** | |
111 * Whether the stream is closed for adding more events. | |
112 * | |
113 * If true, the "done" event might not have fired yet, but it has been | |
114 * scheduled, and it is too late to add more events. | |
115 */ | |
116 bool get isClosed => (_state & _STATE_CLOSED) != 0; | 164 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
117 | 165 |
118 /** Whether the subscription is active and paused. */ | |
119 bool get isPaused => _subscription != null && _subscription._isInputPaused; | 166 bool get isPaused => _subscription != null && _subscription._isInputPaused; |
120 | 167 |
121 /** Whether there are currently a subscriber on the [Stream]. */ | |
122 bool get hasListener => _subscription != null; | 168 bool get hasListener => _subscription != null; |
123 | 169 |
124 /** | 170 /** |
125 * Send or queue a data event. | 171 * Send or queue a data event. |
126 */ | 172 */ |
127 void add(T value) { | 173 void add(T value) { |
128 if (isClosed) throw new StateError("Adding event after close"); | 174 if (isClosed) throw new StateError("Adding event after close"); |
129 if (_subscription != null) { | 175 if (_subscription != null) { |
130 _subscription._add(value); | 176 _subscription._add(value); |
131 } else if (!_isCancelled) { | 177 } else if (!_isCancelled) { |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
173 | 219 |
174 void _addPendingEvent(_DelayedEvent event) { | 220 void _addPendingEvent(_DelayedEvent event) { |
175 if (_isCancelled) return; | 221 if (_isCancelled) return; |
176 _StreamImplEvents events = _pendingEvents; | 222 _StreamImplEvents events = _pendingEvents; |
177 if (events == null) { | 223 if (events == null) { |
178 events = _pendingEvents = new _StreamImplEvents(); | 224 events = _pendingEvents = new _StreamImplEvents(); |
179 } | 225 } |
180 events.add(event); | 226 events.add(event); |
181 } | 227 } |
182 | 228 |
183 void _recordListen(_BufferingStreamSubscription subscription) { | 229 void _recordListen(_BufferingStreamSubscription<T> subscription) { |
184 assert(_subscription == null); | 230 assert(_subscription == null); |
185 _subscription = subscription; | 231 _subscription = subscription; |
186 subscription._setPendingEvents(_pendingEvents); | 232 subscription._setPendingEvents(_pendingEvents); |
187 _pendingEvents = null; | 233 _pendingEvents = null; |
188 subscription._guardCallback(() { | 234 subscription._guardCallback(() { |
189 _runGuarded(_onListen); | 235 _runGuarded(_onListen); |
190 }); | 236 }); |
191 } | 237 } |
192 | 238 |
193 void _recordCancel() { | 239 void _recordCancel(StreamSubscription<T> subscription) { |
| 240 assert(identical(_subscription, subscription)); |
194 _subscription = null; | 241 _subscription = null; |
195 _state |= _STATE_CANCELLED; | 242 _state |= _STATE_CANCELLED; |
196 _runGuarded(_onCancel); | 243 _runGuarded(_onCancel); |
197 } | 244 } |
198 | 245 |
199 void _recordPause() { | 246 void _recordPause(StreamSubscription<T> subscription) { |
200 _runGuarded(_onPause); | 247 _runGuarded(_onPause); |
201 } | 248 } |
202 | 249 |
203 void _recordResume() { | 250 void _recordResume(StreamSubscription<T> subscription) { |
204 _runGuarded(_onResume); | 251 _runGuarded(_onResume); |
205 } | 252 } |
206 } | 253 } |
207 | 254 |
208 typedef void _NotificationHandler(); | 255 typedef void _NotificationHandler(); |
209 | 256 |
210 void _runGuarded(_NotificationHandler notificationHandler) { | 257 void _runGuarded(_NotificationHandler notificationHandler) { |
211 if (notificationHandler == null) return; | 258 if (notificationHandler == null) return; |
212 try { | 259 try { |
213 notificationHandler(); | 260 notificationHandler(); |
214 } catch (e, s) { | 261 } catch (e, s) { |
215 _throwDelayed(e, s); | 262 _throwDelayed(e, s); |
216 } | 263 } |
217 } | 264 } |
218 | 265 |
219 class _ControllerStream<T> extends _StreamImpl<T> { | 266 class _ControllerStream<T> extends _StreamImpl<T> { |
220 StreamController _controller; | 267 _StreamControllerLifecycle<T> _controller; |
221 bool _hasListener = false; | 268 bool _hasListener = false; |
222 | 269 |
223 _ControllerStream(this._controller); | 270 _ControllerStream(this._controller); |
224 | 271 |
225 StreamSubscription<T> _createSubscription( | 272 StreamSubscription<T> _createSubscription( |
226 void onData(T data), | 273 void onData(T data), |
227 void onError(Object error), | 274 void onError(Object error), |
228 void onDone(), | 275 void onDone(), |
229 bool cancelOnError) { | 276 bool cancelOnError) { |
230 if (_hasListener) { | 277 if (_hasListener) { |
231 throw new StateError("The stream has already been listened to."); | 278 throw new StateError("The stream has already been listened to."); |
232 } | 279 } |
233 _hasListener = true; | 280 _hasListener = true; |
234 return new _ControllerSubscription<T>( | 281 return new _ControllerSubscription<T>( |
235 _controller, onData, onError, onDone, cancelOnError); | 282 _controller, onData, onError, onDone, cancelOnError); |
236 } | 283 } |
237 | 284 |
238 void _onListen(_BufferingStreamSubscription subscription) { | 285 void _onListen(_BufferingStreamSubscription subscription) { |
239 _controller._recordListen(subscription); | 286 _controller._recordListen(subscription); |
240 } | 287 } |
241 } | 288 } |
242 | 289 |
243 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 290 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
244 final StreamController _controller; | 291 final _StreamControllerLifecycle<T> _controller; |
245 | 292 |
246 _ControllerSubscription(StreamController controller, | 293 _ControllerSubscription(this._controller, |
247 void onData(T data), | 294 void onData(T data), |
248 void onError(Object error), | 295 void onError(Object error), |
249 void onDone(), | 296 void onDone(), |
250 bool cancelOnError) | 297 bool cancelOnError) |
251 : _controller = controller, | 298 : super(onData, onError, onDone, cancelOnError); |
252 super(onData, onError, onDone, cancelOnError); | |
253 | 299 |
254 void _onCancel() { | 300 void _onCancel() { |
255 _controller._recordCancel(); | 301 _controller._recordCancel(this); |
256 } | 302 } |
257 | 303 |
258 void _onPause() { | 304 void _onPause() { |
259 _controller._recordPause(); | 305 _controller._recordPause(this); |
260 } | 306 } |
261 | 307 |
262 void _onResume() { | 308 void _onResume() { |
263 _controller._recordResume(); | 309 _controller._recordResume(this); |
264 } | 310 } |
265 } | 311 } |
| 312 |
| 313 class _MultiplexStreamController<T> implements StreamController<T>, |
| 314 _StreamControllerLifecycle<T> { |
| 315 final _NotificationHandler _onListen; |
| 316 final _NotificationHandler _onCancel; |
| 317 /** Set when the [close] method is called. */ |
| 318 bool _isClosed = false; |
| 319 |
| 320 // TODO(lrn): Make a more efficient implementation of these subscriptions, |
| 321 // e.g., the traditional double-linked list with concurrent add and remove |
| 322 // while firing. |
| 323 Set<_BufferingStreamSubscription<T>> _streams; |
| 324 |
| 325 _MultiplexStreamController(this._onListen, this._onCancel) |
| 326 : _streams = new Set<_BufferingStreamSubscription<T>>(); |
| 327 |
| 328 // StreamController interface. |
| 329 |
| 330 Stream<T> get stream => new _ControllerStream<T>(this); |
| 331 |
| 332 EventSink<T> get sink => new _EventSinkView<T>(this); |
| 333 |
| 334 bool get isClosed => _isClosed; |
| 335 |
| 336 /** |
| 337 * A multiplex controller is never paused. |
| 338 * |
| 339 * Each receiving stream may be paused individually, and they handle their |
| 340 * own buffering. |
| 341 */ |
| 342 bool get isPaused => false; |
| 343 |
| 344 /** Whether there are currently a subscriber on the [Stream]. */ |
| 345 bool get hasListener => !_streams.isEmpty; |
| 346 |
| 347 // _StreamControllerLifecycle interface. |
| 348 |
| 349 void _recordListen(_BufferingStreamSubscription<T> subscription) { |
| 350 bool isFirst = _streams.isEmpty; |
| 351 _streams.add(subscription); |
| 352 if (isFirst) { |
| 353 _runGuarded(_onListen); |
| 354 } |
| 355 } |
| 356 |
| 357 void _recordCancel(_BufferingStreamSubscription<T> subscription) { |
| 358 _streams.remove(subscription); |
| 359 if (_streams.isEmpty) { |
| 360 _runGuarded(_onCancel); |
| 361 } |
| 362 } |
| 363 |
| 364 void _recordPause(StreamSubscription<T> subscription) {} |
| 365 void _recordResume(StreamSubscription<T> subscription) {} |
| 366 |
| 367 // EventSink interface. |
| 368 |
| 369 void add(T data) { |
| 370 if (_streams.isEmpty) return; |
| 371 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 372 subscription._add(data); |
| 373 }); |
| 374 } |
| 375 |
| 376 void addError(Object error) { |
| 377 if (_streams.isEmpty) return; |
| 378 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 379 subscription._addError(error); |
| 380 }); |
| 381 } |
| 382 |
| 383 void close() { |
| 384 _isClosed = true; |
| 385 if (_streams.isEmpty) return; |
| 386 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 387 _streams.remove(subscription); |
| 388 subscription._close(); |
| 389 }); |
| 390 } |
| 391 |
| 392 void _forEachListener( |
| 393 void action(_BufferingStreamSubscription<T> subscription)) { |
| 394 List<_BufferingStreamSubscription<T>> subscriptions = _streams.toList(); |
| 395 for (_BufferingStreamSubscription<T> subscription in subscriptions) { |
| 396 if (_streams.contains(subscription)) { |
| 397 action(subscription); |
| 398 } |
| 399 } |
| 400 } |
| 401 } |
| 402 |
OLD | NEW |