OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
57 * registered. | 57 * registered. |
58 * | 58 * |
59 * The [onPause] function is called when the stream becomes | 59 * The [onPause] function is called when the stream becomes |
60 * paused. [onResume] is called when the stream resumed. | 60 * paused. [onResume] is called when the stream resumed. |
61 * | 61 * |
62 * The [onListen] callback is called when the stream | 62 * The [onListen] callback is called when the stream |
63 * receives its listener and [onCancel] when the listener ends | 63 * receives its listener and [onCancel] when the listener ends |
64 * its subscription. | 64 * its subscription. |
65 * | 65 * |
66 * If the stream is canceled before the controller needs new data the | 66 * If the stream is canceled before the controller needs new data the |
67 * [onResume] call might not be executed. | 67 * [onResume] call might not be executed. |
floitsch
2013/05/30 12:13:48
Comment `sync`.
Lasse Reichstein Nielsen
2013/05/31 05:51:59
Done.
| |
68 */ | 68 */ |
69 factory StreamController({void onListen(), | 69 factory StreamController({void onListen(), |
70 void onPause(), | 70 void onPause(), |
71 void onResume(), | 71 void onResume(), |
72 void onCancel()}) | 72 void onCancel(), |
73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); | 73 bool sync: false}) |
74 => sync | |
75 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | |
76 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | |
74 | 77 |
75 /** | 78 /** |
76 * A controller where [stream] can be listened to more than once. | 79 * A controller where [stream] can be listened to more than once. |
77 * | 80 * |
78 * The [Stream] returned by [stream] is a broadcast stream. It can be listened | 81 * The [Stream] returned by [stream] is a broadcast stream. It can be listened |
79 * to more than once. | 82 * to more than once. |
80 * | 83 * |
81 * The controller distributes any events to all currently subscribed | 84 * The controller distributes any events to all currently subscribed |
82 * listeners. | 85 * listeners. |
83 * It is not allowed to call [add], [addError], or [close] before a previous | 86 * It is not allowed to call [add], [addError], or [close] before a previous |
84 * call has returned. | 87 * call has returned. |
85 * | 88 * |
86 * Each listener is handled independently, and if they pause, only the pausing | 89 * Each listener is handled independently, and if they pause, only the pausing |
87 * listener is affected. A paused listener will buffer events internally until | 90 * listener is affected. A paused listener will buffer events internally until |
88 * unpaused or canceled. | 91 * unpaused or canceled. |
89 * | 92 * |
90 * The [onListen] callback is called when the first listener is subscribed, | 93 * The [onListen] callback is called when the first listener is subscribed, |
91 * and the [onCancel] is called when there are no longer any active listeners. | 94 * and the [onCancel] is called when there are no longer any active listeners. |
92 * If a listener is added again later, after the [onCancel] was called, | 95 * If a listener is added again later, after the [onCancel] was called, |
93 * the [onListen] will be called again. | 96 * the [onListen] will be called again. |
94 */ | 97 */ |
95 factory StreamController.broadcast({void onListen(), void onCancel()}) { | 98 factory StreamController.broadcast({void onListen(), |
96 return new _MultiplexStreamController<T>(onListen, onCancel); | 99 void onCancel(), |
100 bool sync: false}) { | |
101 return sync | |
102 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | |
103 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); | |
97 } | 104 } |
98 | 105 |
99 /** | 106 /** |
100 * Returns a view of this object that only exposes the [EventSink] interface. | 107 * Returns a view of this object that only exposes the [EventSink] interface. |
101 */ | 108 */ |
102 EventSink<T> get sink; | 109 EventSink<T> get sink; |
103 | 110 |
104 /** | 111 /** |
105 * Whether the stream is closed for adding more events. | 112 * Whether the stream is closed for adding more events. |
106 * | 113 * |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
140 void _recordPause(StreamSubscription<T> subscription) {} | 147 void _recordPause(StreamSubscription<T> subscription) {} |
141 void _recordResume(StreamSubscription<T> subscription) {} | 148 void _recordResume(StreamSubscription<T> subscription) {} |
142 void _recordCancel(StreamSubscription<T> subscription) {} | 149 void _recordCancel(StreamSubscription<T> subscription) {} |
143 } | 150 } |
144 | 151 |
145 /** | 152 /** |
146 * Default implementation of [StreamController]. | 153 * Default implementation of [StreamController]. |
147 * | 154 * |
148 * Controls a stream that only supports a single controller. | 155 * Controls a stream that only supports a single controller. |
149 */ | 156 */ |
150 class _StreamControllerImpl<T> implements StreamController<T>, | 157 abstract class _StreamController<T> implements StreamController<T>, |
151 _StreamControllerLifecycle<T> { | 158 _StreamControllerLifecycle<T>, |
159 _EventDispatch<T> { | |
152 static const int _STATE_OPEN = 0; | 160 static const int _STATE_OPEN = 0; |
153 static const int _STATE_CANCELLED = 1; | 161 static const int _STATE_CANCELLED = 1; |
154 static const int _STATE_CLOSED = 2; | 162 static const int _STATE_CLOSED = 2; |
155 | 163 |
156 final _NotificationHandler _onListen; | 164 final _NotificationHandler _onListen; |
157 final _NotificationHandler _onPause; | 165 final _NotificationHandler _onPause; |
158 final _NotificationHandler _onResume; | 166 final _NotificationHandler _onResume; |
159 final _NotificationHandler _onCancel; | 167 final _NotificationHandler _onCancel; |
160 _StreamImpl<T> _stream; | 168 _StreamImpl<T> _stream; |
161 | 169 |
162 // An active subscription on the stream, or null if no subscripton is active. | 170 // An active subscription on the stream, or null if no subscripton is active. |
163 _ControllerSubscription<T> _subscription; | 171 _ControllerSubscription<T> _subscription; |
164 | 172 |
165 // Whether we have sent a "done" event. | 173 // Whether we have sent a "done" event. |
166 int _state = _STATE_OPEN; | 174 int _state = _STATE_OPEN; |
167 | 175 |
168 // Events added to the stream before it has an active subscription. | 176 // Events added to the stream before it has an active subscription. |
169 _PendingEvents _pendingEvents = null; | 177 _PendingEvents _pendingEvents = null; |
170 | 178 |
171 _StreamControllerImpl(this._onListen, | 179 _StreamController(this._onListen, |
172 this._onPause, | 180 this._onPause, |
173 this._onResume, | 181 this._onResume, |
174 this._onCancel) { | 182 this._onCancel) { |
175 _stream = new _ControllerStream<T>(this); | 183 _stream = new _ControllerStream<T>(this); |
176 } | 184 } |
177 | 185 |
178 Stream<T> get stream => _stream; | 186 Stream<T> get stream => _stream; |
179 | 187 |
180 /** | 188 /** |
181 * Returns a view of this object that only exposes the [EventSink] interface. | 189 * Returns a view of this object that only exposes the [EventSink] interface. |
182 */ | 190 */ |
183 EventSink<T> get sink => new _EventSinkView<T>(this); | 191 EventSink<T> get sink => new _EventSinkView<T>(this); |
184 | 192 |
(...skipping 10 matching lines...) Expand all Loading... | |
195 : !_isCancelled; | 203 : !_isCancelled; |
196 | 204 |
197 bool get hasListener => _subscription != null; | 205 bool get hasListener => _subscription != null; |
198 | 206 |
199 /** | 207 /** |
200 * Send or queue a data event. | 208 * Send or queue a data event. |
201 */ | 209 */ |
202 void add(T value) { | 210 void add(T value) { |
203 if (isClosed) throw new StateError("Adding event after close"); | 211 if (isClosed) throw new StateError("Adding event after close"); |
204 if (_subscription != null) { | 212 if (_subscription != null) { |
205 _subscription._add(value); | 213 _sendData(value); |
206 } else if (!_isCancelled) { | 214 } else if (!_isCancelled) { |
207 _addPendingEvent(new _DelayedData<T>(value)); | 215 _addPendingEvent(new _DelayedData<T>(value)); |
208 } | 216 } |
209 } | 217 } |
210 | 218 |
211 /** | 219 /** |
212 * Send or enqueue an error event. | 220 * Send or enqueue an error event. |
213 */ | 221 */ |
214 void addError(Object error, [Object stackTrace]) { | 222 void addError(Object error, [Object stackTrace]) { |
215 if (isClosed) throw new StateError("Adding event after close"); | 223 if (isClosed) throw new StateError("Adding event after close"); |
216 if (stackTrace != null) { | 224 if (stackTrace != null) { |
217 // Force stack trace overwrite. Even if the error already contained | 225 // Force stack trace overwrite. Even if the error already contained |
218 // a stack trace. | 226 // a stack trace. |
219 _attachStackTrace(error, stackTrace); | 227 _attachStackTrace(error, stackTrace); |
220 } | 228 } |
221 if (_subscription != null) { | 229 if (_subscription != null) { |
222 _subscription._addError(error); | 230 _sendError(error); |
223 } else if (!_isCancelled) { | 231 } else if (!_isCancelled) { |
224 _addPendingEvent(new _DelayedError(error)); | 232 _addPendingEvent(new _DelayedError(error)); |
225 } | 233 } |
226 } | 234 } |
227 | 235 |
228 /** | 236 /** |
229 * Closes this controller. | 237 * Closes this controller. |
230 * | 238 * |
231 * After closing, no further events may be added using [add] or [addError]. | 239 * After closing, no further events may be added using [add] or [addError]. |
232 * | 240 * |
233 * You are allowed to close the controller more than once, but only the first | 241 * You are allowed to close the controller more than once, but only the first |
234 * call has any effect. | 242 * call has any effect. |
235 * | 243 * |
236 * The first time a controller is closed, a "done" event is sent to its | 244 * The first time a controller is closed, a "done" event is sent to its |
237 * stream. | 245 * stream. |
238 */ | 246 */ |
239 void close() { | 247 void close() { |
240 if (isClosed) return; | 248 if (isClosed) return; |
241 _state |= _STATE_CLOSED; | 249 _state |= _STATE_CLOSED; |
242 if (_subscription != null) { | 250 if (_subscription != null) { |
243 _subscription._close(); | 251 _sendDone(); |
244 } else if (!_isCancelled) { | 252 } else if (!_isCancelled) { |
245 _addPendingEvent(const _DelayedDone()); | 253 _addPendingEvent(const _DelayedDone()); |
246 } | 254 } |
247 } | 255 } |
248 | 256 |
257 // EventDispatch interface | |
258 | |
249 void _addPendingEvent(_DelayedEvent event) { | 259 void _addPendingEvent(_DelayedEvent event) { |
250 if (_isCancelled) return; | 260 if (_isCancelled) return; |
251 _StreamImplEvents events = _pendingEvents; | 261 _StreamImplEvents events = _pendingEvents; |
252 if (events == null) { | 262 if (events == null) { |
253 events = _pendingEvents = new _StreamImplEvents(); | 263 events = _pendingEvents = new _StreamImplEvents(); |
254 } | 264 } |
255 events.add(event); | 265 events.add(event); |
256 } | 266 } |
257 | 267 |
258 void _recordListen(_BufferingStreamSubscription<T> subscription) { | 268 void _recordListen(_BufferingStreamSubscription<T> subscription) { |
(...skipping 15 matching lines...) Expand all Loading... | |
274 | 284 |
275 void _recordPause(StreamSubscription<T> subscription) { | 285 void _recordPause(StreamSubscription<T> subscription) { |
276 _runGuarded(_onPause); | 286 _runGuarded(_onPause); |
277 } | 287 } |
278 | 288 |
279 void _recordResume(StreamSubscription<T> subscription) { | 289 void _recordResume(StreamSubscription<T> subscription) { |
280 _runGuarded(_onResume); | 290 _runGuarded(_onResume); |
281 } | 291 } |
282 } | 292 } |
283 | 293 |
294 class _SyncStreamController<T> extends _StreamController<T> { | |
295 _SyncStreamController(void onListen(), | |
296 void onPause(), | |
297 void onResume(), | |
298 void onCancel()) | |
299 : super(onListen, onPause, onResume, onCancel); | |
300 | |
301 void _sendData(T data) { | |
302 _subscription._add(data); | |
303 } | |
304 | |
305 void _sendError(Object error) { | |
306 _subscription._addError(error); | |
307 } | |
308 | |
309 void _sendDone() { | |
310 _subscription._close(); | |
311 } | |
312 } | |
313 | |
314 class _AsyncStreamController<T> extends _StreamController<T> { | |
315 _AsyncStreamController(void onListen(), | |
316 void onPause(), | |
317 void onResume(), | |
318 void onCancel()) | |
319 : super(onListen, onPause, onResume, onCancel); | |
320 | |
321 void _sendData(T data) { | |
322 runAsync(() { | |
floitsch
2013/05/30 12:13:48
Try to use addPending instead.
Lasse Reichstein Nielsen
2013/05/31 05:51:59
Done.
| |
323 if (_subscription == null) return; | |
324 _subscription._add(data); | |
325 }); | |
326 } | |
327 | |
328 void _sendError(Object error) { | |
329 runAsync(() { | |
330 if (_subscription == null) return; | |
331 _subscription._addError(error); | |
332 }); | |
333 } | |
334 | |
335 void _sendDone() { | |
336 runAsync(_subscription._close); | |
337 } | |
338 } | |
339 | |
284 typedef void _NotificationHandler(); | 340 typedef void _NotificationHandler(); |
285 | 341 |
286 void _runGuarded(_NotificationHandler notificationHandler) { | 342 void _runGuarded(_NotificationHandler notificationHandler) { |
287 if (notificationHandler == null) return; | 343 if (notificationHandler == null) return; |
288 try { | 344 try { |
289 notificationHandler(); | 345 notificationHandler(); |
290 } catch (e, s) { | 346 } catch (e, s) { |
291 _throwDelayed(e, s); | 347 _throwDelayed(e, s); |
292 } | 348 } |
293 } | 349 } |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
332 | 388 |
333 void _onPause() { | 389 void _onPause() { |
334 _controller._recordPause(this); | 390 _controller._recordPause(this); |
335 } | 391 } |
336 | 392 |
337 void _onResume() { | 393 void _onResume() { |
338 _controller._recordResume(this); | 394 _controller._recordResume(this); |
339 } | 395 } |
340 } | 396 } |
341 | 397 |
342 class _MultiplexStream<T> extends _StreamImpl<T> { | 398 class _BroadcastStream<T> extends _StreamImpl<T> { |
343 _MultiplexStreamController _controller; | 399 _BroadcastStreamController _controller; |
344 | 400 |
345 _MultiplexStream(this._controller); | 401 _BroadcastStream(this._controller); |
346 | 402 |
347 bool get isBroadcast => true; | 403 bool get isBroadcast => true; |
348 | 404 |
349 StreamSubscription<T> _createSubscription( | 405 StreamSubscription<T> _createSubscription( |
350 void onData(T data), | 406 void onData(T data), |
351 void onError(Object error), | 407 void onError(Object error), |
352 void onDone(), | 408 void onDone(), |
353 bool cancelOnError) { | 409 bool cancelOnError) { |
354 return new _MultiplexSubscription<T>( | 410 return new _BroadcastSubscription<T>( |
355 _controller, onData, onError, onDone, cancelOnError); | 411 _controller, onData, onError, onDone, cancelOnError); |
356 } | 412 } |
357 | 413 |
358 void _onListen(_BufferingStreamSubscription subscription) { | 414 void _onListen(_BufferingStreamSubscription subscription) { |
359 _controller._recordListen(subscription); | 415 _controller._recordListen(subscription); |
360 } | 416 } |
361 } | 417 } |
362 | 418 |
363 abstract class _MultiplexSubscriptionLink { | 419 abstract class _BroadcastSubscriptionLink { |
364 _MultiplexSubscriptionLink _next; | 420 _BroadcastSubscriptionLink _next; |
365 _MultiplexSubscriptionLink _previous; | 421 _BroadcastSubscriptionLink _previous; |
366 } | 422 } |
367 | 423 |
368 class _MultiplexSubscription<T> extends _ControllerSubscription<T> | 424 class _BroadcastSubscription<T> extends _ControllerSubscription<T> |
369 implements _MultiplexSubscriptionLink { | 425 implements _BroadcastSubscriptionLink { |
370 static const int _STATE_EVENT_ID = 1; | 426 static const int _STATE_EVENT_ID = 1; |
371 static const int _STATE_FIRING = 2; | 427 static const int _STATE_FIRING = 2; |
372 static const int _STATE_REMOVE_AFTER_FIRING = 4; | 428 static const int _STATE_REMOVE_AFTER_FIRING = 4; |
373 int _eventState; | 429 int _eventState; |
374 | 430 |
375 _MultiplexSubscriptionLink _next; | 431 _BroadcastSubscriptionLink _next; |
376 _MultiplexSubscriptionLink _previous; | 432 _BroadcastSubscriptionLink _previous; |
377 | 433 |
378 _MultiplexSubscription(_StreamControllerLifecycle controller, | 434 _BroadcastSubscription(_StreamControllerLifecycle controller, |
379 void onData(T data), | 435 void onData(T data), |
380 void onError(Object error), | 436 void onError(Object error), |
381 void onDone(), | 437 void onDone(), |
382 bool cancelOnError) | 438 bool cancelOnError) |
383 : super(controller, onData, onError, onDone, cancelOnError) { | 439 : super(controller, onData, onError, onDone, cancelOnError) { |
384 _next = _previous = this; | 440 _next = _previous = this; |
385 } | 441 } |
386 | 442 |
387 _MultiplexStreamController get _controller => super._controller; | 443 _BroadcastStreamController get _controller => super._controller; |
388 | 444 |
389 bool _expectsEvent(int eventId) { | 445 bool _expectsEvent(int eventId) { |
390 return (_eventState & _STATE_EVENT_ID) == eventId; | 446 return (_eventState & _STATE_EVENT_ID) == eventId; |
391 } | 447 } |
392 | 448 |
393 void _toggleEventId() { | 449 void _toggleEventId() { |
394 _eventState ^= _STATE_EVENT_ID; | 450 _eventState ^= _STATE_EVENT_ID; |
395 } | 451 } |
396 | 452 |
397 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; | 453 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; |
398 | 454 |
399 bool _setRemoveAfterFiring() { | 455 bool _setRemoveAfterFiring() { |
400 assert(_isFiring); | 456 assert(_isFiring); |
401 _eventState |= _STATE_REMOVE_AFTER_FIRING; | 457 _eventState |= _STATE_REMOVE_AFTER_FIRING; |
402 } | 458 } |
403 | 459 |
404 bool get _removeAfterFiring => | 460 bool get _removeAfterFiring => |
405 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; | 461 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; |
406 } | 462 } |
407 | 463 |
408 | 464 |
409 class _MultiplexStreamController<T> implements StreamController<T>, | 465 abstract class _BroadcastStreamController<T> |
410 _StreamControllerLifecycle<T>, | 466 implements StreamController<T>, |
411 _MultiplexSubscriptionLink { | 467 _StreamControllerLifecycle<T>, |
468 _BroadcastSubscriptionLink, | |
469 _EventDispatch<T> { | |
412 static const int _STATE_INITIAL = 0; | 470 static const int _STATE_INITIAL = 0; |
413 static const int _STATE_EVENT_ID = 1; | 471 static const int _STATE_EVENT_ID = 1; |
414 static const int _STATE_FIRING = 2; | 472 static const int _STATE_FIRING = 2; |
415 static const int _STATE_CLOSED = 4; | 473 static const int _STATE_CLOSED = 4; |
416 | 474 |
417 final _NotificationHandler _onListen; | 475 final _NotificationHandler _onListen; |
418 final _NotificationHandler _onCancel; | 476 final _NotificationHandler _onCancel; |
419 | 477 |
420 // State of the controller. | 478 // State of the controller. |
421 int _state; | 479 int _state; |
422 | 480 |
423 // Double-linked list of active listeners. | 481 // Double-linked list of active listeners. |
424 _MultiplexSubscriptionLink _next; | 482 _BroadcastSubscriptionLink _next; |
425 _MultiplexSubscriptionLink _previous; | 483 _BroadcastSubscriptionLink _previous; |
426 | 484 |
427 _MultiplexStreamController(this._onListen, this._onCancel) | 485 _BroadcastStreamController(this._onListen, this._onCancel) |
428 : _state = _STATE_INITIAL { | 486 : _state = _STATE_INITIAL { |
429 _next = _previous = this; | 487 _next = _previous = this; |
430 } | 488 } |
431 | 489 |
432 // StreamController interface. | 490 // StreamController interface. |
433 | 491 |
434 Stream<T> get stream => new _MultiplexStream<T>(this); | 492 Stream<T> get stream => new _BroadcastStream<T>(this); |
435 | 493 |
436 EventSink<T> get sink => new _EventSinkView<T>(this); | 494 EventSink<T> get sink => new _EventSinkView<T>(this); |
437 | 495 |
438 bool get isClosed => (_state & _STATE_CLOSED) != 0; | 496 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
439 | 497 |
440 /** | 498 /** |
441 * A multiplex controller is never paused. | 499 * A broadcast controller is never paused. |
442 * | 500 * |
443 * Each receiving stream may be paused individually, and they handle their | 501 * Each receiving stream may be paused individually, and they handle their |
444 * own buffering. | 502 * own buffering. |
445 */ | 503 */ |
446 bool get isPaused => false; | 504 bool get isPaused => false; |
447 | 505 |
448 /** Whether there are currently a subscriber on the [Stream]. */ | 506 /** Whether there are currently a subscriber on the [Stream]. */ |
449 bool get hasListener => !_isEmpty; | 507 bool get hasListener => !_isEmpty; |
450 | 508 |
451 /** Whether an event is being fired (sent to some, but not all, listeners). */ | 509 /** Whether an event is being fired (sent to some, but not all, listeners). */ |
452 bool get _isFiring => (_state & _STATE_FIRING) != 0; | 510 bool get _isFiring => (_state & _STATE_FIRING) != 0; |
453 | 511 |
454 // Linked list helpers | 512 // Linked list helpers |
455 | 513 |
456 bool get _isEmpty => identical(_next, this); | 514 bool get _isEmpty => identical(_next, this); |
457 | 515 |
458 /** Adds subscription to linked list of active listeners. */ | 516 /** Adds subscription to linked list of active listeners. */ |
459 void _addListener(_MultiplexSubscription<T> subscription) { | 517 void _addListener(_BroadcastSubscription<T> subscription) { |
460 _MultiplexSubscriptionLink previous = _previous; | 518 _BroadcastSubscriptionLink previous = _previous; |
461 previous._next = subscription; | 519 previous._next = subscription; |
462 _previous = subscription._previous; | 520 _previous = subscription._previous; |
463 subscription._previous._next = this; | 521 subscription._previous._next = this; |
464 subscription._previous = previous; | 522 subscription._previous = previous; |
465 subscription._eventState = (_state & _STATE_EVENT_ID); | 523 subscription._eventState = (_state & _STATE_EVENT_ID); |
466 } | 524 } |
467 | 525 |
468 void _removeListener(_MultiplexSubscription<T> subscription) { | 526 void _removeListener(_BroadcastSubscription<T> subscription) { |
469 assert(identical(subscription._controller, this)); | 527 assert(identical(subscription._controller, this)); |
470 assert(!identical(subscription._next, subscription)); | 528 assert(!identical(subscription._next, subscription)); |
471 subscription._previous._next = subscription._next; | 529 subscription._previous._next = subscription._next; |
472 subscription._next._previous = subscription._previous; | 530 subscription._next._previous = subscription._previous; |
473 subscription._next = subscription._previous = subscription; | 531 subscription._next = subscription._previous = subscription; |
474 } | 532 } |
475 | 533 |
476 // _StreamControllerLifecycle interface. | 534 // _StreamControllerLifecycle interface. |
477 | 535 |
478 void _recordListen(_MultiplexSubscription<T> subscription) { | 536 void _recordListen(_BroadcastSubscription<T> subscription) { |
479 _addListener(subscription); | 537 _addListener(subscription); |
480 if (identical(_next, _previous)) { | 538 if (identical(_next, _previous)) { |
481 // Only one listener, so it must be the first listener. | 539 // Only one listener, so it must be the first listener. |
482 _runGuarded(_onListen); | 540 _runGuarded(_onListen); |
483 } | 541 } |
484 } | 542 } |
485 | 543 |
486 void _recordCancel(_MultiplexSubscription<T> subscription) { | 544 void _recordCancel(_BroadcastSubscription<T> subscription) { |
487 if (subscription._isFiring) { | 545 if (subscription._isFiring) { |
488 subscription._setRemoveAfterFiring(); | 546 subscription._setRemoveAfterFiring(); |
489 } else { | 547 } else { |
490 _removeListener(subscription); | 548 _removeListener(subscription); |
491 // If we are currently firing an event, the empty-check is performed at | 549 // If we are currently firing an event, the empty-check is performed at |
492 // the end of the listener loop instead of here. | 550 // the end of the listener loop instead of here. |
493 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { | 551 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { |
494 _callOnCancel(); | 552 _callOnCancel(); |
495 } | 553 } |
496 } | 554 } |
(...skipping 20 matching lines...) Expand all Loading... | |
517 } | 575 } |
518 | 576 |
519 void close() { | 577 void close() { |
520 if (isClosed) { | 578 if (isClosed) { |
521 throw new StateError("Cannot add new events after calling close()"); | 579 throw new StateError("Cannot add new events after calling close()"); |
522 } | 580 } |
523 _state |= _STATE_CLOSED; | 581 _state |= _STATE_CLOSED; |
524 _sendDone(); | 582 _sendDone(); |
525 } | 583 } |
526 | 584 |
527 // EventDispatch interface. | |
528 | |
529 void _sendData(T data) { | |
530 if (_isEmpty) return; | |
531 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
532 subscription._add(data); | |
533 }); | |
534 } | |
535 | |
536 void _sendError(Object error) { | |
537 if (_isEmpty) return; | |
538 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
539 subscription._addError(error); | |
540 }); | |
541 } | |
542 | |
543 void _sendDone() { | |
544 if (_isEmpty) return; | |
545 _forEachListener((_MultiplexSubscription<T> subscription) { | |
546 subscription._close(); | |
547 subscription._eventState |= | |
548 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; | |
549 }); | |
550 } | |
551 | |
552 void _forEachListener( | 585 void _forEachListener( |
553 void action(_BufferingStreamSubscription<T> subscription)) { | 586 void action(_BufferingStreamSubscription<T> subscription)) { |
554 if (_isFiring) { | 587 if (_isFiring) { |
555 throw new StateError( | 588 throw new StateError( |
556 "Cannot fire new event. Controller is already firing an event"); | 589 "Cannot fire new event. Controller is already firing an event"); |
557 } | 590 } |
558 if (_isEmpty) return; | 591 if (_isEmpty) return; |
559 | 592 |
560 // Get event id of this event. | 593 // Get event id of this event. |
561 int id = (_state & _STATE_EVENT_ID); | 594 int id = (_state & _STATE_EVENT_ID); |
562 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] | 595 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] |
563 // callbacks while firing, and we prevent reentrancy of this function. | 596 // callbacks while firing, and we prevent reentrancy of this function. |
564 // | 597 // |
565 // Set [_state]'s event id to the next event's id. | 598 // Set [_state]'s event id to the next event's id. |
566 // Any listeners added while firing this event will expect the next event, | 599 // Any listeners added while firing this event will expect the next event, |
567 // not this one, and won't get notified. | 600 // not this one, and won't get notified. |
568 _state ^= _STATE_EVENT_ID | _STATE_FIRING; | 601 _state ^= _STATE_EVENT_ID | _STATE_FIRING; |
569 _MultiplexSubscriptionLink link = _next; | 602 _BroadcastSubscriptionLink link = _next; |
570 while (!identical(link, this)) { | 603 while (!identical(link, this)) { |
571 _MultiplexSubscription<T> subscription = link; | 604 _BroadcastSubscription<T> subscription = link; |
572 if (subscription._expectsEvent(id)) { | 605 if (subscription._expectsEvent(id)) { |
573 subscription._eventState |= _MultiplexSubscription._STATE_FIRING; | 606 subscription._eventState |= _BroadcastSubscription._STATE_FIRING; |
574 action(subscription); | 607 action(subscription); |
575 subscription._toggleEventId(); | 608 subscription._toggleEventId(); |
576 link = subscription._next; | 609 link = subscription._next; |
577 if (subscription._removeAfterFiring) { | 610 if (subscription._removeAfterFiring) { |
578 _removeListener(subscription); | 611 _removeListener(subscription); |
579 } | 612 } |
580 subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING; | 613 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; |
581 } else { | 614 } else { |
582 link = subscription._next; | 615 link = subscription._next; |
583 } | 616 } |
584 } | 617 } |
585 _state &= ~_STATE_FIRING; | 618 _state &= ~_STATE_FIRING; |
586 | 619 |
587 if (_isEmpty) { | 620 if (_isEmpty) { |
588 _callOnCancel(); | 621 _callOnCancel(); |
589 } | 622 } |
590 } | 623 } |
591 | 624 |
592 void _callOnCancel() { | 625 void _callOnCancel() { |
593 _runGuarded(_onCancel); | 626 _runGuarded(_onCancel); |
594 } | 627 } |
595 } | 628 } |
596 | 629 |
597 class _BufferingMultiplexStreamController<T> | 630 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
598 extends _MultiplexStreamController<T> | 631 _SyncBroadcastStreamController(void onListen(), void onCancel()) |
632 : super(onListen, onCancel); | |
633 | |
634 // EventDispatch interface. | |
635 | |
636 void _sendData(T data) { | |
637 if (_isEmpty) return; | |
638 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
639 subscription._add(data); | |
640 }); | |
641 } | |
642 | |
643 void _sendError(Object error) { | |
644 if (_isEmpty) return; | |
645 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
646 subscription._addError(error); | |
647 }); | |
648 } | |
649 | |
650 void _sendDone() { | |
651 if (_isEmpty) return; | |
652 _forEachListener((_BroadcastSubscription<T> subscription) { | |
653 subscription._close(); | |
654 subscription._eventState |= | |
655 _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; | |
656 }); | |
657 } | |
658 } | |
659 | |
660 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { | |
661 _AsyncBroadcastStreamController(void onListen(), void onCancel()) | |
662 : super(onListen, onCancel); | |
663 | |
664 // EventDispatch interface. | |
665 | |
666 void _sendData(T data) { | |
667 runAsync(() { | |
668 if (_isEmpty) return; | |
669 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
670 subscription._add(data); | |
671 }); | |
672 }); | |
673 } | |
674 | |
675 void _sendError(Object error) { | |
676 runAsync(() { | |
677 if (_isEmpty) return; | |
678 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
679 subscription._addError(error); | |
680 }); | |
681 }); | |
682 } | |
683 | |
684 void _sendDone() { | |
685 runAsync(() { | |
686 if (_isEmpty) return; | |
687 _forEachListener((_BroadcastSubscription<T> subscription) { | |
688 subscription._close(); | |
689 subscription._eventState |= | |
690 _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; | |
691 }); | |
692 }); | |
693 } | |
694 } | |
695 | |
696 /** | |
697 * Stream controller that is used by [Stream.asBroadcastStream]. | |
698 * | |
699 * This stream controller allows incoming events while it is firing | |
700 * other events. This is handled by delaying the events until the | |
701 * current event is done firing, and then fire the pending events. | |
702 * | |
703 * This class extends [_SyncBroadcastStreamController]. Events of | |
704 * an "asBroadcastStream" stream are always initiated by events | |
705 * on another stream, and it is fine to forward them synchronously. | |
706 */ | |
707 class _AsBroadcastStreamController<T> | |
708 extends _SyncBroadcastStreamController<T> | |
599 implements _EventDispatch<T> { | 709 implements _EventDispatch<T> { |
600 _StreamImplEvents _pending; | 710 _StreamImplEvents _pending; |
601 | 711 |
602 _BufferingMultiplexStreamController(void onListen(), void onCancel()) | 712 _AsBroadcastStreamController(void onListen(), void onCancel()) |
603 : super(onListen, onCancel); | 713 : super(onListen, onCancel); |
604 | 714 |
605 bool get _hasPending => _pending != null && ! _pending.isEmpty; | 715 bool get _hasPending => _pending != null && ! _pending.isEmpty; |
606 | 716 |
607 void _addPendingEvent(_DelayedEvent event) { | 717 void _addPendingEvent(_DelayedEvent event) { |
608 if (_pending == null) { | 718 if (_pending == null) { |
609 _pending = new _StreamImplEvents(); | 719 _pending = new _StreamImplEvents(); |
610 } | 720 } |
611 _pending.add(event); | 721 _pending.add(event); |
612 } | 722 } |
(...skipping 29 matching lines...) Expand all Loading... | |
642 super.close(); | 752 super.close(); |
643 assert(!_hasPending); | 753 assert(!_hasPending); |
644 } | 754 } |
645 | 755 |
646 void _callOnCancel() { | 756 void _callOnCancel() { |
647 if (_hasPending) { | 757 if (_hasPending) { |
648 _pending.clear(); | 758 _pending.clear(); |
649 _pending = null; | 759 _pending = null; |
650 } | 760 } |
651 super._callOnCancel(); | 761 super._callOnCancel(); |
652 | |
653 } | 762 } |
654 } | 763 } |
OLD | NEW |