Chromium Code Reviews| Index: sdk/lib/async/stream_controller.dart |
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
| index a313f2a992df557ac3339f10ecae3c04bcef4b48..3078a30364092ed1ce04e043dfa5d60dccce6d05 100644 |
| --- a/sdk/lib/async/stream_controller.dart |
| +++ b/sdk/lib/async/stream_controller.dart |
| @@ -448,6 +448,9 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| /** Whether there are currently a subscriber on the [Stream]. */ |
| bool get hasListener => !_isEmpty; |
| + /** Whether an event is being fired (sent to some, but not all, listeners). */ |
| + bool get _isFiring => (_state & _STATE_FIRING) != 0; |
| + |
| // Linked list helpers |
| bool get _isEmpty => identical(_next, this); |
| @@ -500,23 +503,38 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| void add(T data) { |
| assert(!isClosed); |
| + _sendData(data); |
| + } |
| + |
| + void addError(Object error, [Object stackTrace]) { |
| + assert(!isClosed); |
| + // TODO(lrn): Handle stacktrace. |
|
floitsch
2013/05/29 09:39:58
That should be less than 5 lines of code.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
Done.
|
| + _sendError(error); |
| + } |
| + |
| + void close() { |
| + assert(!isClosed); |
| + _state |= _STATE_CLOSED; |
| + _sendDone(); |
| + } |
| + |
| + // EventDispatch interface. |
| + |
| + void _sendData(T data) { |
| if (_isEmpty) return; |
| _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| subscription._add(data); |
| }); |
| } |
| - void addError(Object error, [Object stackTrace]) { |
| - assert(!isClosed); |
| + void _sendError(Object error) { |
| if (_isEmpty) return; |
| _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| subscription._addError(error); |
| }); |
| } |
| - void close() { |
| - assert(!isClosed); |
| - _state |= _STATE_CLOSED; |
| + void _sendDone() { |
| if (_isEmpty) return; |
| _forEachListener((_MultiplexSubscription<T> subscription) { |
| subscription._close(); |
| @@ -527,7 +545,7 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| void _forEachListener( |
| void action(_BufferingStreamSubscription<T> subscription)) { |
| - if ((_state & _STATE_FIRING) != 0) { |
| + if (_isFiring) { |
| throw new StateError( |
| "Cannot fire new event. Controller is already firing an event"); |
| } |
| @@ -565,3 +583,53 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| } |
| } |
| } |
| + |
| +class _BufferingMultiplexStreamController<T> |
| + extends _MultiplexStreamController<T> |
| + implements _EventDispatch<T> { |
| + _StreamImplEvents _pending; |
| + |
| + _BufferingMultiplexStreamController(void onListen(), void onCancel()) |
| + : super(onListen, onCancel); |
| + |
| + bool get _hasPending => _pending != null && ! _pending.isEmpty; |
| + |
| + void _addPendingEvent(_DelayedEvent event) { |
| + if (_pending == null) { |
| + _pending = new _StreamImplEvents(); |
| + } |
| + _pending.add(event); |
| + } |
| + |
| + void add(T data) { |
| + if (_isFiring || _hasPending) { |
|
floitsch
2013/05/29 09:39:58
add comment when _hasPending but not _isFiring.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
It's only during the onCancel callback after remov
|
| + _addPendingEvent(new _DelayedData<T>(data)); |
| + return; |
| + } |
| + super.add(data); |
| + while (_hasPending) { |
| + _pending.handleNext(this); |
| + } |
| + } |
| + |
| + void addError(Object error, [StackTrace stackTrace]) { |
| + if (_isFiring || _hasPending) { |
| + _addPendingEvent(new _DelayedError(error)); |
| + return; |
| + } |
| + super.addError(error, stackTrace); |
| + while (_hasPending) { |
| + _pending.handleNext(this); |
| + } |
| + } |
| + |
| + void close() { |
| + if (_isFiring || _hasPending) { |
| + _addPendingEvent(const _DelayedDone()); |
| + _state |= _STATE_CLOSED; |
| + return; |
| + } |
| + super.close(); |
| + assert(!_hasPending); |
| + } |
| +} |