Index: sdk/lib/async/stream_controller.dart |
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
index a313f2a992df557ac3339f10ecae3c04bcef4b48..3078a30364092ed1ce04e043dfa5d60dccce6d05 100644 |
--- a/sdk/lib/async/stream_controller.dart |
+++ b/sdk/lib/async/stream_controller.dart |
@@ -448,6 +448,9 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
/** Whether there are currently a subscriber on the [Stream]. */ |
bool get hasListener => !_isEmpty; |
+ /** Whether an event is being fired (sent to some, but not all, listeners). */ |
+ bool get _isFiring => (_state & _STATE_FIRING) != 0; |
+ |
// Linked list helpers |
bool get _isEmpty => identical(_next, this); |
@@ -500,23 +503,38 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
void add(T data) { |
assert(!isClosed); |
+ _sendData(data); |
+ } |
+ |
+ void addError(Object error, [Object stackTrace]) { |
+ assert(!isClosed); |
+ // TODO(lrn): Handle stacktrace. |
floitsch
2013/05/29 09:39:58
That should be less than 5 lines of code.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
Done.
|
+ _sendError(error); |
+ } |
+ |
+ void close() { |
+ assert(!isClosed); |
+ _state |= _STATE_CLOSED; |
+ _sendDone(); |
+ } |
+ |
+ // EventDispatch interface. |
+ |
+ void _sendData(T data) { |
if (_isEmpty) return; |
_forEachListener((_BufferingStreamSubscription<T> subscription) { |
subscription._add(data); |
}); |
} |
- void addError(Object error, [Object stackTrace]) { |
- assert(!isClosed); |
+ void _sendError(Object error) { |
if (_isEmpty) return; |
_forEachListener((_BufferingStreamSubscription<T> subscription) { |
subscription._addError(error); |
}); |
} |
- void close() { |
- assert(!isClosed); |
- _state |= _STATE_CLOSED; |
+ void _sendDone() { |
if (_isEmpty) return; |
_forEachListener((_MultiplexSubscription<T> subscription) { |
subscription._close(); |
@@ -527,7 +545,7 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
void _forEachListener( |
void action(_BufferingStreamSubscription<T> subscription)) { |
- if ((_state & _STATE_FIRING) != 0) { |
+ if (_isFiring) { |
throw new StateError( |
"Cannot fire new event. Controller is already firing an event"); |
} |
@@ -565,3 +583,53 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
} |
} |
} |
+ |
+class _BufferingMultiplexStreamController<T> |
+ extends _MultiplexStreamController<T> |
+ implements _EventDispatch<T> { |
+ _StreamImplEvents _pending; |
+ |
+ _BufferingMultiplexStreamController(void onListen(), void onCancel()) |
+ : super(onListen, onCancel); |
+ |
+ bool get _hasPending => _pending != null && ! _pending.isEmpty; |
+ |
+ void _addPendingEvent(_DelayedEvent event) { |
+ if (_pending == null) { |
+ _pending = new _StreamImplEvents(); |
+ } |
+ _pending.add(event); |
+ } |
+ |
+ void add(T data) { |
+ if (_isFiring || _hasPending) { |
floitsch
2013/05/29 09:39:58
add comment when _hasPending but not _isFiring.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
It's only during the onCancel callback after remov
|
+ _addPendingEvent(new _DelayedData<T>(data)); |
+ return; |
+ } |
+ super.add(data); |
+ while (_hasPending) { |
+ _pending.handleNext(this); |
+ } |
+ } |
+ |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ if (_isFiring || _hasPending) { |
+ _addPendingEvent(new _DelayedError(error)); |
+ return; |
+ } |
+ super.addError(error, stackTrace); |
+ while (_hasPending) { |
+ _pending.handleNext(this); |
+ } |
+ } |
+ |
+ void close() { |
+ if (_isFiring || _hasPending) { |
+ _addPendingEvent(const _DelayedDone()); |
+ _state |= _STATE_CLOSED; |
+ return; |
+ } |
+ super.close(); |
+ assert(!_hasPending); |
+ } |
+} |