OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
46 * the stream at all, and won't trigger callbacks. From the controller's point | 46 * the stream at all, and won't trigger callbacks. From the controller's point |
47 * of view, the stream is completely inert when has completed. | 47 * of view, the stream is completely inert when has completed. |
48 */ | 48 */ |
49 abstract class StreamController<T> implements EventSink<T> { | 49 abstract class StreamController<T> implements EventSink<T> { |
50 /** The stream that this controller is controlling. */ | 50 /** The stream that this controller is controlling. */ |
51 Stream<T> get stream; | 51 Stream<T> get stream; |
52 | 52 |
53 /** | 53 /** |
54 * A controller with a [stream] that supports only one single subscriber. | 54 * A controller with a [stream] that supports only one single subscriber. |
55 * | 55 * |
| 56 * If [sync] is true, events may be passed directly to the stream's listener |
| 57 * during an [add], [addError] or [close] call. If [sync] is false, the event |
| 58 * will be passed to the listener at a later time, after the code creating |
| 59 * the event has returned. |
| 60 * |
56 * The controller will buffer all incoming events until the subscriber is | 61 * The controller will buffer all incoming events until the subscriber is |
57 * registered. | 62 * registered. |
58 * | 63 * |
59 * The [onPause] function is called when the stream becomes | 64 * The [onPause] function is called when the stream becomes |
60 * paused. [onResume] is called when the stream resumed. | 65 * paused. [onResume] is called when the stream resumed. |
61 * | 66 * |
62 * The [onListen] callback is called when the stream | 67 * The [onListen] callback is called when the stream |
63 * receives its listener and [onCancel] when the listener ends | 68 * receives its listener and [onCancel] when the listener ends |
64 * its subscription. | 69 * its subscription. |
65 * | 70 * |
66 * If the stream is canceled before the controller needs new data the | 71 * If the stream is canceled before the controller needs new data the |
67 * [onResume] call might not be executed. | 72 * [onResume] call might not be executed. |
68 */ | 73 */ |
69 factory StreamController({void onListen(), | 74 factory StreamController({void onListen(), |
70 void onPause(), | 75 void onPause(), |
71 void onResume(), | 76 void onResume(), |
72 void onCancel()}) | 77 void onCancel(), |
73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); | 78 bool sync: false}) |
| 79 => sync |
| 80 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
| 81 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
74 | 82 |
75 /** | 83 /** |
76 * A controller where [stream] can be listened to more than once. | 84 * A controller where [stream] can be listened to more than once. |
77 * | 85 * |
78 * The [Stream] returned by [stream] is a broadcast stream. It can be listened | 86 * The [Stream] returned by [stream] is a broadcast stream. It can be listened |
79 * to more than once. | 87 * to more than once. |
80 * | 88 * |
81 * The controller distributes any events to all currently subscribed | 89 * The controller distributes any events to all currently subscribed |
82 * listeners. | 90 * listeners. |
83 * It is not allowed to call [add], [addError], or [close] before a previous | 91 * It is not allowed to call [add], [addError], or [close] before a previous |
84 * call has returned. | 92 * call has returned. |
85 * | 93 * |
| 94 * If [sync] is true, events may be passed directly to the stream's listener |
| 95 * during an [add], [addError] or [close] call. If [sync] is false, the event |
| 96 * will be passed to the listener at a later time, after the code creating |
| 97 * the event has returned. |
| 98 * |
86 * Each listener is handled independently, and if they pause, only the pausing | 99 * Each listener is handled independently, and if they pause, only the pausing |
87 * listener is affected. A paused listener will buffer events internally until | 100 * listener is affected. A paused listener will buffer events internally until |
88 * unpaused or canceled. | 101 * unpaused or canceled. |
89 * | 102 * |
| 103 * If [sync] is false, no guarantees are given with regard to when |
| 104 * multiple listeners get the events, except that each listener will get |
| 105 * all events in the correct order. If two events are sent on an async |
| 106 * controller with two listeners, one of the listeners may get both events |
| 107 * before the other listener gets any. |
| 108 * A listener must be subscribed both when the event is initiated (that is, |
| 109 * when [add] is called) and when the event is later delivered, in order to |
| 110 * get the event. |
| 111 * |
90 * The [onListen] callback is called when the first listener is subscribed, | 112 * The [onListen] callback is called when the first listener is subscribed, |
91 * and the [onCancel] is called when there are no longer any active listeners. | 113 * and the [onCancel] is called when there are no longer any active listeners. |
92 * If a listener is added again later, after the [onCancel] was called, | 114 * If a listener is added again later, after the [onCancel] was called, |
93 * the [onListen] will be called again. | 115 * the [onListen] will be called again. |
94 */ | 116 */ |
95 factory StreamController.broadcast({void onListen(), void onCancel()}) { | 117 factory StreamController.broadcast({void onListen(), |
96 return new _MultiplexStreamController<T>(onListen, onCancel); | 118 void onCancel(), |
| 119 bool sync: false}) { |
| 120 return sync |
| 121 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
| 122 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
97 } | 123 } |
98 | 124 |
99 /** | 125 /** |
100 * Returns a view of this object that only exposes the [EventSink] interface. | 126 * Returns a view of this object that only exposes the [EventSink] interface. |
101 */ | 127 */ |
102 EventSink<T> get sink; | 128 EventSink<T> get sink; |
103 | 129 |
104 /** | 130 /** |
105 * Whether the stream is closed for adding more events. | 131 * Whether the stream is closed for adding more events. |
106 * | 132 * |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
140 void _recordPause(StreamSubscription<T> subscription) {} | 166 void _recordPause(StreamSubscription<T> subscription) {} |
141 void _recordResume(StreamSubscription<T> subscription) {} | 167 void _recordResume(StreamSubscription<T> subscription) {} |
142 void _recordCancel(StreamSubscription<T> subscription) {} | 168 void _recordCancel(StreamSubscription<T> subscription) {} |
143 } | 169 } |
144 | 170 |
145 /** | 171 /** |
146 * Default implementation of [StreamController]. | 172 * Default implementation of [StreamController]. |
147 * | 173 * |
148 * Controls a stream that only supports a single controller. | 174 * Controls a stream that only supports a single controller. |
149 */ | 175 */ |
150 class _StreamControllerImpl<T> implements StreamController<T>, | 176 abstract class _StreamController<T> implements StreamController<T>, |
151 _StreamControllerLifecycle<T> { | 177 _StreamControllerLifecycle<T>, |
| 178 _EventDispatch<T> { |
152 static const int _STATE_OPEN = 0; | 179 static const int _STATE_OPEN = 0; |
153 static const int _STATE_CANCELLED = 1; | 180 static const int _STATE_CANCELLED = 1; |
154 static const int _STATE_CLOSED = 2; | 181 static const int _STATE_CLOSED = 2; |
155 | 182 |
156 final _NotificationHandler _onListen; | 183 final _NotificationHandler _onListen; |
157 final _NotificationHandler _onPause; | 184 final _NotificationHandler _onPause; |
158 final _NotificationHandler _onResume; | 185 final _NotificationHandler _onResume; |
159 final _NotificationHandler _onCancel; | 186 final _NotificationHandler _onCancel; |
160 _StreamImpl<T> _stream; | 187 _StreamImpl<T> _stream; |
161 | 188 |
162 // An active subscription on the stream, or null if no subscripton is active. | 189 // An active subscription on the stream, or null if no subscripton is active. |
163 _ControllerSubscription<T> _subscription; | 190 _ControllerSubscription<T> _subscription; |
164 | 191 |
165 // Whether we have sent a "done" event. | 192 // Whether we have sent a "done" event. |
166 int _state = _STATE_OPEN; | 193 int _state = _STATE_OPEN; |
167 | 194 |
168 // Events added to the stream before it has an active subscription. | 195 // Events added to the stream before it has an active subscription. |
169 _PendingEvents _pendingEvents = null; | 196 _PendingEvents _pendingEvents = null; |
170 | 197 |
171 _StreamControllerImpl(this._onListen, | 198 _StreamController(this._onListen, |
172 this._onPause, | 199 this._onPause, |
173 this._onResume, | 200 this._onResume, |
174 this._onCancel) { | 201 this._onCancel) { |
175 _stream = new _ControllerStream<T>(this); | 202 _stream = new _ControllerStream<T>(this); |
176 } | 203 } |
177 | 204 |
178 Stream<T> get stream => _stream; | 205 Stream<T> get stream => _stream; |
179 | 206 |
180 /** | 207 /** |
181 * Returns a view of this object that only exposes the [EventSink] interface. | 208 * Returns a view of this object that only exposes the [EventSink] interface. |
182 */ | 209 */ |
183 EventSink<T> get sink => new _EventSinkView<T>(this); | 210 EventSink<T> get sink => new _EventSinkView<T>(this); |
184 | 211 |
(...skipping 10 matching lines...) Expand all Loading... |
195 : !_isCancelled; | 222 : !_isCancelled; |
196 | 223 |
197 bool get hasListener => _subscription != null; | 224 bool get hasListener => _subscription != null; |
198 | 225 |
199 /** | 226 /** |
200 * Send or queue a data event. | 227 * Send or queue a data event. |
201 */ | 228 */ |
202 void add(T value) { | 229 void add(T value) { |
203 if (isClosed) throw new StateError("Adding event after close"); | 230 if (isClosed) throw new StateError("Adding event after close"); |
204 if (_subscription != null) { | 231 if (_subscription != null) { |
205 _subscription._add(value); | 232 _sendData(value); |
206 } else if (!_isCancelled) { | 233 } else if (!_isCancelled) { |
207 _addPendingEvent(new _DelayedData<T>(value)); | 234 _addPendingEvent(new _DelayedData<T>(value)); |
208 } | 235 } |
209 } | 236 } |
210 | 237 |
211 /** | 238 /** |
212 * Send or enqueue an error event. | 239 * Send or enqueue an error event. |
213 */ | 240 */ |
214 void addError(Object error, [Object stackTrace]) { | 241 void addError(Object error, [Object stackTrace]) { |
215 if (isClosed) throw new StateError("Adding event after close"); | 242 if (isClosed) throw new StateError("Adding event after close"); |
216 if (stackTrace != null) { | 243 if (stackTrace != null) { |
217 // Force stack trace overwrite. Even if the error already contained | 244 // Force stack trace overwrite. Even if the error already contained |
218 // a stack trace. | 245 // a stack trace. |
219 _attachStackTrace(error, stackTrace); | 246 _attachStackTrace(error, stackTrace); |
220 } | 247 } |
221 if (_subscription != null) { | 248 if (_subscription != null) { |
222 _subscription._addError(error); | 249 _sendError(error); |
223 } else if (!_isCancelled) { | 250 } else if (!_isCancelled) { |
224 _addPendingEvent(new _DelayedError(error)); | 251 _addPendingEvent(new _DelayedError(error)); |
225 } | 252 } |
226 } | 253 } |
227 | 254 |
228 /** | 255 /** |
229 * Closes this controller. | 256 * Closes this controller. |
230 * | 257 * |
231 * After closing, no further events may be added using [add] or [addError]. | 258 * After closing, no further events may be added using [add] or [addError]. |
232 * | 259 * |
233 * You are allowed to close the controller more than once, but only the first | 260 * You are allowed to close the controller more than once, but only the first |
234 * call has any effect. | 261 * call has any effect. |
235 * | 262 * |
236 * The first time a controller is closed, a "done" event is sent to its | 263 * The first time a controller is closed, a "done" event is sent to its |
237 * stream. | 264 * stream. |
238 */ | 265 */ |
239 void close() { | 266 void close() { |
240 if (isClosed) return; | 267 if (isClosed) return; |
241 _state |= _STATE_CLOSED; | 268 _state |= _STATE_CLOSED; |
242 if (_subscription != null) { | 269 if (_subscription != null) { |
243 _subscription._close(); | 270 _sendDone(); |
244 } else if (!_isCancelled) { | 271 } else if (!_isCancelled) { |
245 _addPendingEvent(const _DelayedDone()); | 272 _addPendingEvent(const _DelayedDone()); |
246 } | 273 } |
247 } | 274 } |
248 | 275 |
| 276 // EventDispatch interface |
| 277 |
249 void _addPendingEvent(_DelayedEvent event) { | 278 void _addPendingEvent(_DelayedEvent event) { |
250 if (_isCancelled) return; | 279 if (_isCancelled) return; |
251 _StreamImplEvents events = _pendingEvents; | 280 _StreamImplEvents events = _pendingEvents; |
252 if (events == null) { | 281 if (events == null) { |
253 events = _pendingEvents = new _StreamImplEvents(); | 282 events = _pendingEvents = new _StreamImplEvents(); |
254 } | 283 } |
255 events.add(event); | 284 events.add(event); |
256 } | 285 } |
257 | 286 |
258 void _recordListen(_BufferingStreamSubscription<T> subscription) { | 287 void _recordListen(_BufferingStreamSubscription<T> subscription) { |
(...skipping 15 matching lines...) Expand all Loading... |
274 | 303 |
275 void _recordPause(StreamSubscription<T> subscription) { | 304 void _recordPause(StreamSubscription<T> subscription) { |
276 _runGuarded(_onPause); | 305 _runGuarded(_onPause); |
277 } | 306 } |
278 | 307 |
279 void _recordResume(StreamSubscription<T> subscription) { | 308 void _recordResume(StreamSubscription<T> subscription) { |
280 _runGuarded(_onResume); | 309 _runGuarded(_onResume); |
281 } | 310 } |
282 } | 311 } |
283 | 312 |
| 313 class _SyncStreamController<T> extends _StreamController<T> { |
| 314 _SyncStreamController(void onListen(), |
| 315 void onPause(), |
| 316 void onResume(), |
| 317 void onCancel()) |
| 318 : super(onListen, onPause, onResume, onCancel); |
| 319 |
| 320 void _sendData(T data) { |
| 321 _subscription._add(data); |
| 322 } |
| 323 |
| 324 void _sendError(Object error) { |
| 325 _subscription._addError(error); |
| 326 } |
| 327 |
| 328 void _sendDone() { |
| 329 _subscription._close(); |
| 330 } |
| 331 } |
| 332 |
| 333 class _AsyncStreamController<T> extends _StreamController<T> { |
| 334 _AsyncStreamController(void onListen(), |
| 335 void onPause(), |
| 336 void onResume(), |
| 337 void onCancel()) |
| 338 : super(onListen, onPause, onResume, onCancel); |
| 339 |
| 340 void _sendData(T data) { |
| 341 _subscription._addPending(new _DelayedData(data)); |
| 342 } |
| 343 |
| 344 void _sendError(Object error) { |
| 345 _subscription._addPending(new _DelayedError(error)); |
| 346 } |
| 347 |
| 348 void _sendDone() { |
| 349 _subscription._addPending(const _DelayedDone()); |
| 350 } |
| 351 } |
| 352 |
284 typedef void _NotificationHandler(); | 353 typedef void _NotificationHandler(); |
285 | 354 |
286 void _runGuarded(_NotificationHandler notificationHandler) { | 355 void _runGuarded(_NotificationHandler notificationHandler) { |
287 if (notificationHandler == null) return; | 356 if (notificationHandler == null) return; |
288 try { | 357 try { |
289 notificationHandler(); | 358 notificationHandler(); |
290 } catch (e, s) { | 359 } catch (e, s) { |
291 _throwDelayed(e, s); | 360 _throwDelayed(e, s); |
292 } | 361 } |
293 } | 362 } |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
332 | 401 |
333 void _onPause() { | 402 void _onPause() { |
334 _controller._recordPause(this); | 403 _controller._recordPause(this); |
335 } | 404 } |
336 | 405 |
337 void _onResume() { | 406 void _onResume() { |
338 _controller._recordResume(this); | 407 _controller._recordResume(this); |
339 } | 408 } |
340 } | 409 } |
341 | 410 |
342 class _MultiplexStream<T> extends _StreamImpl<T> { | 411 class _BroadcastStream<T> extends _StreamImpl<T> { |
343 _MultiplexStreamController _controller; | 412 _BroadcastStreamController _controller; |
344 | 413 |
345 _MultiplexStream(this._controller); | 414 _BroadcastStream(this._controller); |
346 | 415 |
347 bool get isBroadcast => true; | 416 bool get isBroadcast => true; |
348 | 417 |
349 StreamSubscription<T> _createSubscription( | 418 StreamSubscription<T> _createSubscription( |
350 void onData(T data), | 419 void onData(T data), |
351 void onError(Object error), | 420 void onError(Object error), |
352 void onDone(), | 421 void onDone(), |
353 bool cancelOnError) { | 422 bool cancelOnError) { |
354 return new _MultiplexSubscription<T>( | 423 return new _BroadcastSubscription<T>( |
355 _controller, onData, onError, onDone, cancelOnError); | 424 _controller, onData, onError, onDone, cancelOnError); |
356 } | 425 } |
357 | 426 |
358 void _onListen(_BufferingStreamSubscription subscription) { | 427 void _onListen(_BufferingStreamSubscription subscription) { |
359 _controller._recordListen(subscription); | 428 _controller._recordListen(subscription); |
360 } | 429 } |
361 } | 430 } |
362 | 431 |
363 abstract class _MultiplexSubscriptionLink { | 432 abstract class _BroadcastSubscriptionLink { |
364 _MultiplexSubscriptionLink _next; | 433 _BroadcastSubscriptionLink _next; |
365 _MultiplexSubscriptionLink _previous; | 434 _BroadcastSubscriptionLink _previous; |
366 } | 435 } |
367 | 436 |
368 class _MultiplexSubscription<T> extends _ControllerSubscription<T> | 437 class _BroadcastSubscription<T> extends _ControllerSubscription<T> |
369 implements _MultiplexSubscriptionLink { | 438 implements _BroadcastSubscriptionLink { |
370 static const int _STATE_EVENT_ID = 1; | 439 static const int _STATE_EVENT_ID = 1; |
371 static const int _STATE_FIRING = 2; | 440 static const int _STATE_FIRING = 2; |
372 static const int _STATE_REMOVE_AFTER_FIRING = 4; | 441 static const int _STATE_REMOVE_AFTER_FIRING = 4; |
373 int _eventState; | 442 int _eventState; |
374 | 443 |
375 _MultiplexSubscriptionLink _next; | 444 _BroadcastSubscriptionLink _next; |
376 _MultiplexSubscriptionLink _previous; | 445 _BroadcastSubscriptionLink _previous; |
377 | 446 |
378 _MultiplexSubscription(_StreamControllerLifecycle controller, | 447 _BroadcastSubscription(_StreamControllerLifecycle controller, |
379 void onData(T data), | 448 void onData(T data), |
380 void onError(Object error), | 449 void onError(Object error), |
381 void onDone(), | 450 void onDone(), |
382 bool cancelOnError) | 451 bool cancelOnError) |
383 : super(controller, onData, onError, onDone, cancelOnError) { | 452 : super(controller, onData, onError, onDone, cancelOnError) { |
384 _next = _previous = this; | 453 _next = _previous = this; |
385 } | 454 } |
386 | 455 |
387 _MultiplexStreamController get _controller => super._controller; | 456 _BroadcastStreamController get _controller => super._controller; |
388 | 457 |
389 bool _expectsEvent(int eventId) { | 458 bool _expectsEvent(int eventId) { |
390 return (_eventState & _STATE_EVENT_ID) == eventId; | 459 return (_eventState & _STATE_EVENT_ID) == eventId; |
391 } | 460 } |
392 | 461 |
393 void _toggleEventId() { | 462 void _toggleEventId() { |
394 _eventState ^= _STATE_EVENT_ID; | 463 _eventState ^= _STATE_EVENT_ID; |
395 } | 464 } |
396 | 465 |
397 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; | 466 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; |
398 | 467 |
399 bool _setRemoveAfterFiring() { | 468 bool _setRemoveAfterFiring() { |
400 assert(_isFiring); | 469 assert(_isFiring); |
401 _eventState |= _STATE_REMOVE_AFTER_FIRING; | 470 _eventState |= _STATE_REMOVE_AFTER_FIRING; |
402 } | 471 } |
403 | 472 |
404 bool get _removeAfterFiring => | 473 bool get _removeAfterFiring => |
405 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; | 474 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; |
406 } | 475 } |
407 | 476 |
408 | 477 |
409 class _MultiplexStreamController<T> implements StreamController<T>, | 478 abstract class _BroadcastStreamController<T> |
410 _StreamControllerLifecycle<T>, | 479 implements StreamController<T>, |
411 _MultiplexSubscriptionLink { | 480 _StreamControllerLifecycle<T>, |
| 481 _BroadcastSubscriptionLink, |
| 482 _EventDispatch<T> { |
412 static const int _STATE_INITIAL = 0; | 483 static const int _STATE_INITIAL = 0; |
413 static const int _STATE_EVENT_ID = 1; | 484 static const int _STATE_EVENT_ID = 1; |
414 static const int _STATE_FIRING = 2; | 485 static const int _STATE_FIRING = 2; |
415 static const int _STATE_CLOSED = 4; | 486 static const int _STATE_CLOSED = 4; |
416 | 487 |
417 final _NotificationHandler _onListen; | 488 final _NotificationHandler _onListen; |
418 final _NotificationHandler _onCancel; | 489 final _NotificationHandler _onCancel; |
419 | 490 |
420 // State of the controller. | 491 // State of the controller. |
421 int _state; | 492 int _state; |
422 | 493 |
423 // Double-linked list of active listeners. | 494 // Double-linked list of active listeners. |
424 _MultiplexSubscriptionLink _next; | 495 _BroadcastSubscriptionLink _next; |
425 _MultiplexSubscriptionLink _previous; | 496 _BroadcastSubscriptionLink _previous; |
426 | 497 |
427 _MultiplexStreamController(this._onListen, this._onCancel) | 498 _BroadcastStreamController(this._onListen, this._onCancel) |
428 : _state = _STATE_INITIAL { | 499 : _state = _STATE_INITIAL { |
429 _next = _previous = this; | 500 _next = _previous = this; |
430 } | 501 } |
431 | 502 |
432 // StreamController interface. | 503 // StreamController interface. |
433 | 504 |
434 Stream<T> get stream => new _MultiplexStream<T>(this); | 505 Stream<T> get stream => new _BroadcastStream<T>(this); |
435 | 506 |
436 EventSink<T> get sink => new _EventSinkView<T>(this); | 507 EventSink<T> get sink => new _EventSinkView<T>(this); |
437 | 508 |
438 bool get isClosed => (_state & _STATE_CLOSED) != 0; | 509 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
439 | 510 |
440 /** | 511 /** |
441 * A multiplex controller is never paused. | 512 * A broadcast controller is never paused. |
442 * | 513 * |
443 * Each receiving stream may be paused individually, and they handle their | 514 * Each receiving stream may be paused individually, and they handle their |
444 * own buffering. | 515 * own buffering. |
445 */ | 516 */ |
446 bool get isPaused => false; | 517 bool get isPaused => false; |
447 | 518 |
448 /** Whether there are currently a subscriber on the [Stream]. */ | 519 /** Whether there are currently a subscriber on the [Stream]. */ |
449 bool get hasListener => !_isEmpty; | 520 bool get hasListener => !_isEmpty; |
450 | 521 |
451 /** Whether an event is being fired (sent to some, but not all, listeners). */ | 522 /** Whether an event is being fired (sent to some, but not all, listeners). */ |
452 bool get _isFiring => (_state & _STATE_FIRING) != 0; | 523 bool get _isFiring => (_state & _STATE_FIRING) != 0; |
453 | 524 |
454 // Linked list helpers | 525 // Linked list helpers |
455 | 526 |
456 bool get _isEmpty => identical(_next, this); | 527 bool get _isEmpty => identical(_next, this); |
457 | 528 |
458 /** Adds subscription to linked list of active listeners. */ | 529 /** Adds subscription to linked list of active listeners. */ |
459 void _addListener(_MultiplexSubscription<T> subscription) { | 530 void _addListener(_BroadcastSubscription<T> subscription) { |
460 _MultiplexSubscriptionLink previous = _previous; | 531 _BroadcastSubscriptionLink previous = _previous; |
461 previous._next = subscription; | 532 previous._next = subscription; |
462 _previous = subscription._previous; | 533 _previous = subscription._previous; |
463 subscription._previous._next = this; | 534 subscription._previous._next = this; |
464 subscription._previous = previous; | 535 subscription._previous = previous; |
465 subscription._eventState = (_state & _STATE_EVENT_ID); | 536 subscription._eventState = (_state & _STATE_EVENT_ID); |
466 } | 537 } |
467 | 538 |
468 void _removeListener(_MultiplexSubscription<T> subscription) { | 539 void _removeListener(_BroadcastSubscription<T> subscription) { |
469 assert(identical(subscription._controller, this)); | 540 assert(identical(subscription._controller, this)); |
470 assert(!identical(subscription._next, subscription)); | 541 assert(!identical(subscription._next, subscription)); |
471 subscription._previous._next = subscription._next; | 542 subscription._previous._next = subscription._next; |
472 subscription._next._previous = subscription._previous; | 543 subscription._next._previous = subscription._previous; |
473 subscription._next = subscription._previous = subscription; | 544 subscription._next = subscription._previous = subscription; |
474 } | 545 } |
475 | 546 |
476 // _StreamControllerLifecycle interface. | 547 // _StreamControllerLifecycle interface. |
477 | 548 |
478 void _recordListen(_MultiplexSubscription<T> subscription) { | 549 void _recordListen(_BroadcastSubscription<T> subscription) { |
479 _addListener(subscription); | 550 _addListener(subscription); |
480 if (identical(_next, _previous)) { | 551 if (identical(_next, _previous)) { |
481 // Only one listener, so it must be the first listener. | 552 // Only one listener, so it must be the first listener. |
482 _runGuarded(_onListen); | 553 _runGuarded(_onListen); |
483 } | 554 } |
484 } | 555 } |
485 | 556 |
486 void _recordCancel(_MultiplexSubscription<T> subscription) { | 557 void _recordCancel(_BroadcastSubscription<T> subscription) { |
487 if (subscription._isFiring) { | 558 if (subscription._isFiring) { |
488 subscription._setRemoveAfterFiring(); | 559 subscription._setRemoveAfterFiring(); |
489 } else { | 560 } else { |
490 _removeListener(subscription); | 561 _removeListener(subscription); |
491 // If we are currently firing an event, the empty-check is performed at | 562 // If we are currently firing an event, the empty-check is performed at |
492 // the end of the listener loop instead of here. | 563 // the end of the listener loop instead of here. |
493 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { | 564 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { |
494 _callOnCancel(); | 565 _callOnCancel(); |
495 } | 566 } |
496 } | 567 } |
(...skipping 20 matching lines...) Expand all Loading... |
517 } | 588 } |
518 | 589 |
519 void close() { | 590 void close() { |
520 if (isClosed) { | 591 if (isClosed) { |
521 throw new StateError("Cannot add new events after calling close()"); | 592 throw new StateError("Cannot add new events after calling close()"); |
522 } | 593 } |
523 _state |= _STATE_CLOSED; | 594 _state |= _STATE_CLOSED; |
524 _sendDone(); | 595 _sendDone(); |
525 } | 596 } |
526 | 597 |
527 // EventDispatch interface. | |
528 | |
529 void _sendData(T data) { | |
530 if (_isEmpty) return; | |
531 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
532 subscription._add(data); | |
533 }); | |
534 } | |
535 | |
536 void _sendError(Object error) { | |
537 if (_isEmpty) return; | |
538 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
539 subscription._addError(error); | |
540 }); | |
541 } | |
542 | |
543 void _sendDone() { | |
544 if (_isEmpty) return; | |
545 _forEachListener((_MultiplexSubscription<T> subscription) { | |
546 subscription._close(); | |
547 subscription._eventState |= | |
548 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; | |
549 }); | |
550 } | |
551 | |
552 void _forEachListener( | 598 void _forEachListener( |
553 void action(_BufferingStreamSubscription<T> subscription)) { | 599 void action(_BufferingStreamSubscription<T> subscription)) { |
554 if (_isFiring) { | 600 if (_isFiring) { |
555 throw new StateError( | 601 throw new StateError( |
556 "Cannot fire new event. Controller is already firing an event"); | 602 "Cannot fire new event. Controller is already firing an event"); |
557 } | 603 } |
558 if (_isEmpty) return; | 604 if (_isEmpty) return; |
559 | 605 |
560 // Get event id of this event. | 606 // Get event id of this event. |
561 int id = (_state & _STATE_EVENT_ID); | 607 int id = (_state & _STATE_EVENT_ID); |
562 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] | 608 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] |
563 // callbacks while firing, and we prevent reentrancy of this function. | 609 // callbacks while firing, and we prevent reentrancy of this function. |
564 // | 610 // |
565 // Set [_state]'s event id to the next event's id. | 611 // Set [_state]'s event id to the next event's id. |
566 // Any listeners added while firing this event will expect the next event, | 612 // Any listeners added while firing this event will expect the next event, |
567 // not this one, and won't get notified. | 613 // not this one, and won't get notified. |
568 _state ^= _STATE_EVENT_ID | _STATE_FIRING; | 614 _state ^= _STATE_EVENT_ID | _STATE_FIRING; |
569 _MultiplexSubscriptionLink link = _next; | 615 _BroadcastSubscriptionLink link = _next; |
570 while (!identical(link, this)) { | 616 while (!identical(link, this)) { |
571 _MultiplexSubscription<T> subscription = link; | 617 _BroadcastSubscription<T> subscription = link; |
572 if (subscription._expectsEvent(id)) { | 618 if (subscription._expectsEvent(id)) { |
573 subscription._eventState |= _MultiplexSubscription._STATE_FIRING; | 619 subscription._eventState |= _BroadcastSubscription._STATE_FIRING; |
574 action(subscription); | 620 action(subscription); |
575 subscription._toggleEventId(); | 621 subscription._toggleEventId(); |
576 link = subscription._next; | 622 link = subscription._next; |
577 if (subscription._removeAfterFiring) { | 623 if (subscription._removeAfterFiring) { |
578 _removeListener(subscription); | 624 _removeListener(subscription); |
579 } | 625 } |
580 subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING; | 626 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; |
581 } else { | 627 } else { |
582 link = subscription._next; | 628 link = subscription._next; |
583 } | 629 } |
584 } | 630 } |
585 _state &= ~_STATE_FIRING; | 631 _state &= ~_STATE_FIRING; |
586 | 632 |
587 if (_isEmpty) { | 633 if (_isEmpty) { |
588 _callOnCancel(); | 634 _callOnCancel(); |
589 } | 635 } |
590 } | 636 } |
591 | 637 |
592 void _callOnCancel() { | 638 void _callOnCancel() { |
593 _runGuarded(_onCancel); | 639 _runGuarded(_onCancel); |
594 } | 640 } |
595 } | 641 } |
596 | 642 |
597 class _BufferingMultiplexStreamController<T> | 643 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
598 extends _MultiplexStreamController<T> | 644 _SyncBroadcastStreamController(void onListen(), void onCancel()) |
| 645 : super(onListen, onCancel); |
| 646 |
| 647 // EventDispatch interface. |
| 648 |
| 649 void _sendData(T data) { |
| 650 if (_isEmpty) return; |
| 651 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 652 subscription._add(data); |
| 653 }); |
| 654 } |
| 655 |
| 656 void _sendError(Object error) { |
| 657 if (_isEmpty) return; |
| 658 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 659 subscription._addError(error); |
| 660 }); |
| 661 } |
| 662 |
| 663 void _sendDone() { |
| 664 if (_isEmpty) return; |
| 665 _forEachListener((_BroadcastSubscription<T> subscription) { |
| 666 subscription._close(); |
| 667 subscription._eventState |= |
| 668 _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; |
| 669 }); |
| 670 } |
| 671 } |
| 672 |
| 673 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| 674 _AsyncBroadcastStreamController(void onListen(), void onCancel()) |
| 675 : super(onListen, onCancel); |
| 676 |
| 677 // EventDispatch interface. |
| 678 |
| 679 void _sendData(T data) { |
| 680 for (_BroadcastSubscriptionLink link = _next; |
| 681 !identical(link, this); |
| 682 link = link._next) { |
| 683 _BroadcastSubscription<T> subscription = link; |
| 684 subscription._addPending(new _DelayedData(data)); |
| 685 } |
| 686 } |
| 687 |
| 688 void _sendError(Object error) { |
| 689 for (_BroadcastSubscriptionLink link = _next; |
| 690 !identical(link, this); |
| 691 link = link._next) { |
| 692 _BroadcastSubscription<T> subscription = link; |
| 693 subscription._addPending(new _DelayedError(error)); |
| 694 } |
| 695 } |
| 696 |
| 697 void _sendDone() { |
| 698 for (_BroadcastSubscriptionLink link = _next; |
| 699 !identical(link, this); |
| 700 link = link._next) { |
| 701 _BroadcastSubscription<T> subscription = link; |
| 702 subscription._addPending(const _DelayedDone()); |
| 703 } |
| 704 } |
| 705 } |
| 706 |
| 707 /** |
| 708 * Stream controller that is used by [Stream.asBroadcastStream]. |
| 709 * |
| 710 * This stream controller allows incoming events while it is firing |
| 711 * other events. This is handled by delaying the events until the |
| 712 * current event is done firing, and then fire the pending events. |
| 713 * |
| 714 * This class extends [_SyncBroadcastStreamController]. Events of |
| 715 * an "asBroadcastStream" stream are always initiated by events |
| 716 * on another stream, and it is fine to forward them synchronously. |
| 717 */ |
| 718 class _AsBroadcastStreamController<T> |
| 719 extends _SyncBroadcastStreamController<T> |
599 implements _EventDispatch<T> { | 720 implements _EventDispatch<T> { |
600 _StreamImplEvents _pending; | 721 _StreamImplEvents _pending; |
601 | 722 |
602 _BufferingMultiplexStreamController(void onListen(), void onCancel()) | 723 _AsBroadcastStreamController(void onListen(), void onCancel()) |
603 : super(onListen, onCancel); | 724 : super(onListen, onCancel); |
604 | 725 |
605 bool get _hasPending => _pending != null && ! _pending.isEmpty; | 726 bool get _hasPending => _pending != null && ! _pending.isEmpty; |
606 | 727 |
607 void _addPendingEvent(_DelayedEvent event) { | 728 void _addPendingEvent(_DelayedEvent event) { |
608 if (_pending == null) { | 729 if (_pending == null) { |
609 _pending = new _StreamImplEvents(); | 730 _pending = new _StreamImplEvents(); |
610 } | 731 } |
611 _pending.add(event); | 732 _pending.add(event); |
612 } | 733 } |
(...skipping 29 matching lines...) Expand all Loading... |
642 super.close(); | 763 super.close(); |
643 assert(!_hasPending); | 764 assert(!_hasPending); |
644 } | 765 } |
645 | 766 |
646 void _callOnCancel() { | 767 void _callOnCancel() { |
647 if (_hasPending) { | 768 if (_hasPending) { |
648 _pending.clear(); | 769 _pending.clear(); |
649 _pending = null; | 770 _pending = null; |
650 } | 771 } |
651 super._callOnCancel(); | 772 super._callOnCancel(); |
652 | |
653 } | 773 } |
654 } | 774 } |
OLD | NEW |