Index: sdk/lib/async/stream_controller.dart |
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
index a313f2a992df557ac3339f10ecae3c04bcef4b48..b057565063d99e65b75af216e42711cc74670f6f 100644 |
--- a/sdk/lib/async/stream_controller.dart |
+++ b/sdk/lib/async/stream_controller.dart |
@@ -448,6 +448,9 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
/** Whether there are currently a subscriber on the [Stream]. */ |
bool get hasListener => !_isEmpty; |
+ /** Whether an event is being fired (sent to some, but not all, listeners). */ |
+ bool get _isFiring => (_state & _STATE_FIRING) != 0; |
+ |
// Linked list helpers |
bool get _isEmpty => identical(_next, this); |
@@ -488,7 +491,7 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
// If we are currently firing an event, the empty-check is performed at |
// the end of the listener loop instead of here. |
if ((_state & _STATE_FIRING) == 0 && _isEmpty) { |
- _runGuarded(_onCancel); |
+ _callOnCancel(); |
} |
} |
} |
@@ -499,24 +502,45 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
// EventSink interface. |
void add(T data) { |
- assert(!isClosed); |
+ if (isClosed) { |
+ throw new StateError("Cannot add new events after calling close()"); |
+ } |
+ _sendData(data); |
+ } |
+ |
+ void addError(Object error, [Object stackTrace]) { |
+ if (isClosed) { |
+ throw new StateError("Cannot add new events after calling close()"); |
+ } |
+ if (stackTrace != null) _attachStackTrace(error, stackTrace); |
+ _sendError(error); |
+ } |
+ |
+ void close() { |
+ if (isClosed) { |
+ throw new StateError("Cannot add new events after calling close()"); |
+ } |
+ _state |= _STATE_CLOSED; |
+ _sendDone(); |
+ } |
+ |
+ // EventDispatch interface. |
+ |
+ void _sendData(T data) { |
if (_isEmpty) return; |
_forEachListener((_BufferingStreamSubscription<T> subscription) { |
subscription._add(data); |
}); |
} |
- void addError(Object error, [Object stackTrace]) { |
- assert(!isClosed); |
+ void _sendError(Object error) { |
if (_isEmpty) return; |
_forEachListener((_BufferingStreamSubscription<T> subscription) { |
subscription._addError(error); |
}); |
} |
- void close() { |
- assert(!isClosed); |
- _state |= _STATE_CLOSED; |
+ void _sendDone() { |
if (_isEmpty) return; |
_forEachListener((_MultiplexSubscription<T> subscription) { |
subscription._close(); |
@@ -527,7 +551,7 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
void _forEachListener( |
void action(_BufferingStreamSubscription<T> subscription)) { |
- if ((_state & _STATE_FIRING) != 0) { |
+ if (_isFiring) { |
throw new StateError( |
"Cannot fire new event. Controller is already firing an event"); |
} |
@@ -561,7 +585,70 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
_state &= ~_STATE_FIRING; |
if (_isEmpty) { |
- _runGuarded(_onCancel); |
+ _callOnCancel(); |
} |
} |
+ |
+ void _callOnCancel() { |
+ _runGuarded(_onCancel); |
+ } |
+} |
+ |
+class _BufferingMultiplexStreamController<T> |
+ extends _MultiplexStreamController<T> |
+ implements _EventDispatch<T> { |
+ _StreamImplEvents _pending; |
+ |
+ _BufferingMultiplexStreamController(void onListen(), void onCancel()) |
+ : super(onListen, onCancel); |
+ |
+ bool get _hasPending => _pending != null && ! _pending.isEmpty; |
+ |
+ void _addPendingEvent(_DelayedEvent event) { |
+ if (_pending == null) { |
+ _pending = new _StreamImplEvents(); |
+ } |
+ _pending.add(event); |
+ } |
+ |
+ void add(T data) { |
+ if (_isFiring) { |
+ _addPendingEvent(new _DelayedData<T>(data)); |
+ return; |
+ } |
+ super.add(data); |
+ while (_hasPending) { |
+ _pending.handleNext(this); |
+ } |
+ } |
+ |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ if (_isFiring) { |
+ _addPendingEvent(new _DelayedError(error)); |
+ return; |
+ } |
+ super.addError(error, stackTrace); |
+ while (_hasPending) { |
+ _pending.handleNext(this); |
+ } |
+ } |
+ |
+ void close() { |
+ if (_isFiring) { |
+ _addPendingEvent(const _DelayedDone()); |
+ _state |= _STATE_CLOSED; |
+ return; |
+ } |
+ super.close(); |
+ assert(!_hasPending); |
+ } |
+ |
+ void _callOnCancel() { |
+ if (_hasPending) { |
+ _pending.clear(); |
+ _pending = null; |
+ } |
+ super._callOnCancel(); |
+ |
+ } |
} |