| Index: sdk/lib/async/stream_controller.dart
|
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
|
| index a313f2a992df557ac3339f10ecae3c04bcef4b48..b057565063d99e65b75af216e42711cc74670f6f 100644
|
| --- a/sdk/lib/async/stream_controller.dart
|
| +++ b/sdk/lib/async/stream_controller.dart
|
| @@ -448,6 +448,9 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
| /** Whether there are currently a subscriber on the [Stream]. */
|
| bool get hasListener => !_isEmpty;
|
|
|
| + /** Whether an event is being fired (sent to some, but not all, listeners). */
|
| + bool get _isFiring => (_state & _STATE_FIRING) != 0;
|
| +
|
| // Linked list helpers
|
|
|
| bool get _isEmpty => identical(_next, this);
|
| @@ -488,7 +491,7 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
| // If we are currently firing an event, the empty-check is performed at
|
| // the end of the listener loop instead of here.
|
| if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
|
| - _runGuarded(_onCancel);
|
| + _callOnCancel();
|
| }
|
| }
|
| }
|
| @@ -499,24 +502,45 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
| // EventSink interface.
|
|
|
| void add(T data) {
|
| - assert(!isClosed);
|
| + if (isClosed) {
|
| + throw new StateError("Cannot add new events after calling close()");
|
| + }
|
| + _sendData(data);
|
| + }
|
| +
|
| + void addError(Object error, [Object stackTrace]) {
|
| + if (isClosed) {
|
| + throw new StateError("Cannot add new events after calling close()");
|
| + }
|
| + if (stackTrace != null) _attachStackTrace(error, stackTrace);
|
| + _sendError(error);
|
| + }
|
| +
|
| + void close() {
|
| + if (isClosed) {
|
| + throw new StateError("Cannot add new events after calling close()");
|
| + }
|
| + _state |= _STATE_CLOSED;
|
| + _sendDone();
|
| + }
|
| +
|
| + // EventDispatch interface.
|
| +
|
| + void _sendData(T data) {
|
| if (_isEmpty) return;
|
| _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| subscription._add(data);
|
| });
|
| }
|
|
|
| - void addError(Object error, [Object stackTrace]) {
|
| - assert(!isClosed);
|
| + void _sendError(Object error) {
|
| if (_isEmpty) return;
|
| _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| subscription._addError(error);
|
| });
|
| }
|
|
|
| - void close() {
|
| - assert(!isClosed);
|
| - _state |= _STATE_CLOSED;
|
| + void _sendDone() {
|
| if (_isEmpty) return;
|
| _forEachListener((_MultiplexSubscription<T> subscription) {
|
| subscription._close();
|
| @@ -527,7 +551,7 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
|
|
| void _forEachListener(
|
| void action(_BufferingStreamSubscription<T> subscription)) {
|
| - if ((_state & _STATE_FIRING) != 0) {
|
| + if (_isFiring) {
|
| throw new StateError(
|
| "Cannot fire new event. Controller is already firing an event");
|
| }
|
| @@ -561,7 +585,70 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
| _state &= ~_STATE_FIRING;
|
|
|
| if (_isEmpty) {
|
| - _runGuarded(_onCancel);
|
| + _callOnCancel();
|
| }
|
| }
|
| +
|
| + void _callOnCancel() {
|
| + _runGuarded(_onCancel);
|
| + }
|
| +}
|
| +
|
| +class _BufferingMultiplexStreamController<T>
|
| + extends _MultiplexStreamController<T>
|
| + implements _EventDispatch<T> {
|
| + _StreamImplEvents _pending;
|
| +
|
| + _BufferingMultiplexStreamController(void onListen(), void onCancel())
|
| + : super(onListen, onCancel);
|
| +
|
| + bool get _hasPending => _pending != null && ! _pending.isEmpty;
|
| +
|
| + void _addPendingEvent(_DelayedEvent event) {
|
| + if (_pending == null) {
|
| + _pending = new _StreamImplEvents();
|
| + }
|
| + _pending.add(event);
|
| + }
|
| +
|
| + void add(T data) {
|
| + if (_isFiring) {
|
| + _addPendingEvent(new _DelayedData<T>(data));
|
| + return;
|
| + }
|
| + super.add(data);
|
| + while (_hasPending) {
|
| + _pending.handleNext(this);
|
| + }
|
| + }
|
| +
|
| + void addError(Object error, [StackTrace stackTrace]) {
|
| + if (_isFiring) {
|
| + _addPendingEvent(new _DelayedError(error));
|
| + return;
|
| + }
|
| + super.addError(error, stackTrace);
|
| + while (_hasPending) {
|
| + _pending.handleNext(this);
|
| + }
|
| + }
|
| +
|
| + void close() {
|
| + if (_isFiring) {
|
| + _addPendingEvent(const _DelayedDone());
|
| + _state |= _STATE_CLOSED;
|
| + return;
|
| + }
|
| + super.close();
|
| + assert(!_hasPending);
|
| + }
|
| +
|
| + void _callOnCancel() {
|
| + if (_hasPending) {
|
| + _pending.clear();
|
| + _pending = null;
|
| + }
|
| + super._callOnCancel();
|
| +
|
| + }
|
| }
|
|
|