Index: sdk/lib/async/stream_controller.dart |
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
index b057565063d99e65b75af216e42711cc74670f6f..030c5906f8f0f43599f019348738fa5f1bc3d830 100644 |
--- a/sdk/lib/async/stream_controller.dart |
+++ b/sdk/lib/async/stream_controller.dart |
@@ -69,8 +69,11 @@ abstract class StreamController<T> implements EventSink<T> { |
factory StreamController({void onListen(), |
void onPause(), |
void onResume(), |
- void onCancel()}) |
- => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); |
+ void onCancel(), |
+ bool sync: false}) |
+ => sync |
+ ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
+ : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
/** |
* A controller where [stream] can be listened to more than once. |
@@ -92,8 +95,12 @@ abstract class StreamController<T> implements EventSink<T> { |
* If a listener is added again later, after the [onCancel] was called, |
* the [onListen] will be called again. |
*/ |
- factory StreamController.broadcast({void onListen(), void onCancel()}) { |
- return new _MultiplexStreamController<T>(onListen, onCancel); |
+ factory StreamController.broadcast({void onListen(), |
+ void onCancel(), |
+ bool sync: false}) { |
+ return sync |
+ ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
+ : new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
} |
/** |
@@ -147,8 +154,9 @@ abstract class _StreamControllerLifecycle<T> { |
* |
* Controls a stream that only supports a single controller. |
*/ |
-class _StreamControllerImpl<T> implements StreamController<T>, |
- _StreamControllerLifecycle<T> { |
+abstract class _StreamController<T> implements StreamController<T>, |
+ _StreamControllerLifecycle<T>, |
+ _EventDispatch<T> { |
static const int _STATE_OPEN = 0; |
static const int _STATE_CANCELLED = 1; |
static const int _STATE_CLOSED = 2; |
@@ -168,10 +176,10 @@ class _StreamControllerImpl<T> implements StreamController<T>, |
// Events added to the stream before it has an active subscription. |
_PendingEvents _pendingEvents = null; |
- _StreamControllerImpl(this._onListen, |
- this._onPause, |
- this._onResume, |
- this._onCancel) { |
+ _StreamController(this._onListen, |
+ this._onPause, |
+ this._onResume, |
+ this._onCancel) { |
_stream = new _ControllerStream<T>(this); |
} |
@@ -202,7 +210,7 @@ class _StreamControllerImpl<T> implements StreamController<T>, |
void add(T value) { |
if (isClosed) throw new StateError("Adding event after close"); |
if (_subscription != null) { |
- _subscription._add(value); |
+ _sendData(value); |
} else if (!_isCancelled) { |
_addPendingEvent(new _DelayedData<T>(value)); |
} |
@@ -219,7 +227,7 @@ class _StreamControllerImpl<T> implements StreamController<T>, |
_attachStackTrace(error, stackTrace); |
} |
if (_subscription != null) { |
- _subscription._addError(error); |
+ _sendError(error); |
} else if (!_isCancelled) { |
_addPendingEvent(new _DelayedError(error)); |
} |
@@ -240,12 +248,14 @@ class _StreamControllerImpl<T> implements StreamController<T>, |
if (isClosed) return; |
_state |= _STATE_CLOSED; |
if (_subscription != null) { |
- _subscription._close(); |
+ _sendDone(); |
} else if (!_isCancelled) { |
_addPendingEvent(const _DelayedDone()); |
} |
} |
+ // EventDispatch interface |
+ |
void _addPendingEvent(_DelayedEvent event) { |
if (_isCancelled) return; |
_StreamImplEvents events = _pendingEvents; |
@@ -281,6 +291,52 @@ class _StreamControllerImpl<T> implements StreamController<T>, |
} |
} |
+class _SyncStreamController<T> extends _StreamController<T> { |
+ _SyncStreamController(void onListen(), |
+ void onPause(), |
+ void onResume(), |
+ void onCancel()) |
+ : super(onListen, onPause, onResume, onCancel); |
+ |
+ void _sendData(T data) { |
+ _subscription._add(data); |
+ } |
+ |
+ void _sendError(Object error) { |
+ _subscription._addError(error); |
+ } |
+ |
+ void _sendDone() { |
+ _subscription._close(); |
+ } |
+} |
+ |
+class _AsyncStreamController<T> extends _StreamController<T> { |
+ _AsyncStreamController(void onListen(), |
+ void onPause(), |
+ void onResume(), |
+ void onCancel()) |
+ : super(onListen, onPause, onResume, onCancel); |
+ |
+ void _sendData(T data) { |
+ runAsync(() { |
floitsch
2013/05/30 12:13:48
Try to use addPending instead.
Lasse Reichstein Nielsen
2013/05/31 05:51:59
Done.
|
+ if (_subscription == null) return; |
+ _subscription._add(data); |
+ }); |
+ } |
+ |
+ void _sendError(Object error) { |
+ runAsync(() { |
+ if (_subscription == null) return; |
+ _subscription._addError(error); |
+ }); |
+ } |
+ |
+ void _sendDone() { |
+ runAsync(_subscription._close); |
+ } |
+} |
+ |
typedef void _NotificationHandler(); |
void _runGuarded(_NotificationHandler notificationHandler) { |
@@ -339,10 +395,10 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
} |
} |
-class _MultiplexStream<T> extends _StreamImpl<T> { |
- _MultiplexStreamController _controller; |
+class _BroadcastStream<T> extends _StreamImpl<T> { |
+ _BroadcastStreamController _controller; |
- _MultiplexStream(this._controller); |
+ _BroadcastStream(this._controller); |
bool get isBroadcast => true; |
@@ -351,7 +407,7 @@ class _MultiplexStream<T> extends _StreamImpl<T> { |
void onError(Object error), |
void onDone(), |
bool cancelOnError) { |
- return new _MultiplexSubscription<T>( |
+ return new _BroadcastSubscription<T>( |
_controller, onData, onError, onDone, cancelOnError); |
} |
@@ -360,22 +416,22 @@ class _MultiplexStream<T> extends _StreamImpl<T> { |
} |
} |
-abstract class _MultiplexSubscriptionLink { |
- _MultiplexSubscriptionLink _next; |
- _MultiplexSubscriptionLink _previous; |
+abstract class _BroadcastSubscriptionLink { |
+ _BroadcastSubscriptionLink _next; |
+ _BroadcastSubscriptionLink _previous; |
} |
-class _MultiplexSubscription<T> extends _ControllerSubscription<T> |
- implements _MultiplexSubscriptionLink { |
+class _BroadcastSubscription<T> extends _ControllerSubscription<T> |
+ implements _BroadcastSubscriptionLink { |
static const int _STATE_EVENT_ID = 1; |
static const int _STATE_FIRING = 2; |
static const int _STATE_REMOVE_AFTER_FIRING = 4; |
int _eventState; |
- _MultiplexSubscriptionLink _next; |
- _MultiplexSubscriptionLink _previous; |
+ _BroadcastSubscriptionLink _next; |
+ _BroadcastSubscriptionLink _previous; |
- _MultiplexSubscription(_StreamControllerLifecycle controller, |
+ _BroadcastSubscription(_StreamControllerLifecycle controller, |
void onData(T data), |
void onError(Object error), |
void onDone(), |
@@ -384,7 +440,7 @@ class _MultiplexSubscription<T> extends _ControllerSubscription<T> |
_next = _previous = this; |
} |
- _MultiplexStreamController get _controller => super._controller; |
+ _BroadcastStreamController get _controller => super._controller; |
bool _expectsEvent(int eventId) { |
return (_eventState & _STATE_EVENT_ID) == eventId; |
@@ -406,9 +462,11 @@ class _MultiplexSubscription<T> extends _ControllerSubscription<T> |
} |
-class _MultiplexStreamController<T> implements StreamController<T>, |
- _StreamControllerLifecycle<T>, |
- _MultiplexSubscriptionLink { |
+abstract class _BroadcastStreamController<T> |
+ implements StreamController<T>, |
+ _StreamControllerLifecycle<T>, |
+ _BroadcastSubscriptionLink, |
+ _EventDispatch<T> { |
static const int _STATE_INITIAL = 0; |
static const int _STATE_EVENT_ID = 1; |
static const int _STATE_FIRING = 2; |
@@ -421,24 +479,24 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
int _state; |
// Double-linked list of active listeners. |
- _MultiplexSubscriptionLink _next; |
- _MultiplexSubscriptionLink _previous; |
+ _BroadcastSubscriptionLink _next; |
+ _BroadcastSubscriptionLink _previous; |
- _MultiplexStreamController(this._onListen, this._onCancel) |
+ _BroadcastStreamController(this._onListen, this._onCancel) |
: _state = _STATE_INITIAL { |
_next = _previous = this; |
} |
// StreamController interface. |
- Stream<T> get stream => new _MultiplexStream<T>(this); |
+ Stream<T> get stream => new _BroadcastStream<T>(this); |
EventSink<T> get sink => new _EventSinkView<T>(this); |
bool get isClosed => (_state & _STATE_CLOSED) != 0; |
/** |
- * A multiplex controller is never paused. |
+ * A broadcast controller is never paused. |
* |
* Each receiving stream may be paused individually, and they handle their |
* own buffering. |
@@ -456,8 +514,8 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
bool get _isEmpty => identical(_next, this); |
/** Adds subscription to linked list of active listeners. */ |
- void _addListener(_MultiplexSubscription<T> subscription) { |
- _MultiplexSubscriptionLink previous = _previous; |
+ void _addListener(_BroadcastSubscription<T> subscription) { |
+ _BroadcastSubscriptionLink previous = _previous; |
previous._next = subscription; |
_previous = subscription._previous; |
subscription._previous._next = this; |
@@ -465,7 +523,7 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
subscription._eventState = (_state & _STATE_EVENT_ID); |
} |
- void _removeListener(_MultiplexSubscription<T> subscription) { |
+ void _removeListener(_BroadcastSubscription<T> subscription) { |
assert(identical(subscription._controller, this)); |
assert(!identical(subscription._next, subscription)); |
subscription._previous._next = subscription._next; |
@@ -475,7 +533,7 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
// _StreamControllerLifecycle interface. |
- void _recordListen(_MultiplexSubscription<T> subscription) { |
+ void _recordListen(_BroadcastSubscription<T> subscription) { |
_addListener(subscription); |
if (identical(_next, _previous)) { |
// Only one listener, so it must be the first listener. |
@@ -483,7 +541,7 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
} |
} |
- void _recordCancel(_MultiplexSubscription<T> subscription) { |
+ void _recordCancel(_BroadcastSubscription<T> subscription) { |
if (subscription._isFiring) { |
subscription._setRemoveAfterFiring(); |
} else { |
@@ -524,31 +582,6 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
_sendDone(); |
} |
- // EventDispatch interface. |
- |
- void _sendData(T data) { |
- if (_isEmpty) return; |
- _forEachListener((_BufferingStreamSubscription<T> subscription) { |
- subscription._add(data); |
- }); |
- } |
- |
- void _sendError(Object error) { |
- if (_isEmpty) return; |
- _forEachListener((_BufferingStreamSubscription<T> subscription) { |
- subscription._addError(error); |
- }); |
- } |
- |
- void _sendDone() { |
- if (_isEmpty) return; |
- _forEachListener((_MultiplexSubscription<T> subscription) { |
- subscription._close(); |
- subscription._eventState |= |
- _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; |
- }); |
- } |
- |
void _forEachListener( |
void action(_BufferingStreamSubscription<T> subscription)) { |
if (_isFiring) { |
@@ -566,18 +599,18 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
// Any listeners added while firing this event will expect the next event, |
// not this one, and won't get notified. |
_state ^= _STATE_EVENT_ID | _STATE_FIRING; |
- _MultiplexSubscriptionLink link = _next; |
+ _BroadcastSubscriptionLink link = _next; |
while (!identical(link, this)) { |
- _MultiplexSubscription<T> subscription = link; |
+ _BroadcastSubscription<T> subscription = link; |
if (subscription._expectsEvent(id)) { |
- subscription._eventState |= _MultiplexSubscription._STATE_FIRING; |
+ subscription._eventState |= _BroadcastSubscription._STATE_FIRING; |
action(subscription); |
subscription._toggleEventId(); |
link = subscription._next; |
if (subscription._removeAfterFiring) { |
_removeListener(subscription); |
} |
- subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING; |
+ subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; |
} else { |
link = subscription._next; |
} |
@@ -594,12 +627,89 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
} |
} |
-class _BufferingMultiplexStreamController<T> |
- extends _MultiplexStreamController<T> |
+class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
+ _SyncBroadcastStreamController(void onListen(), void onCancel()) |
+ : super(onListen, onCancel); |
+ |
+ // EventDispatch interface. |
+ |
+ void _sendData(T data) { |
+ if (_isEmpty) return; |
+ _forEachListener((_BufferingStreamSubscription<T> subscription) { |
+ subscription._add(data); |
+ }); |
+ } |
+ |
+ void _sendError(Object error) { |
+ if (_isEmpty) return; |
+ _forEachListener((_BufferingStreamSubscription<T> subscription) { |
+ subscription._addError(error); |
+ }); |
+ } |
+ |
+ void _sendDone() { |
+ if (_isEmpty) return; |
+ _forEachListener((_BroadcastSubscription<T> subscription) { |
+ subscription._close(); |
+ subscription._eventState |= |
+ _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; |
+ }); |
+ } |
+} |
+ |
+class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
+ _AsyncBroadcastStreamController(void onListen(), void onCancel()) |
+ : super(onListen, onCancel); |
+ |
+ // EventDispatch interface. |
+ |
+ void _sendData(T data) { |
+ runAsync(() { |
+ if (_isEmpty) return; |
+ _forEachListener((_BufferingStreamSubscription<T> subscription) { |
+ subscription._add(data); |
+ }); |
+ }); |
+ } |
+ |
+ void _sendError(Object error) { |
+ runAsync(() { |
+ if (_isEmpty) return; |
+ _forEachListener((_BufferingStreamSubscription<T> subscription) { |
+ subscription._addError(error); |
+ }); |
+ }); |
+ } |
+ |
+ void _sendDone() { |
+ runAsync(() { |
+ if (_isEmpty) return; |
+ _forEachListener((_BroadcastSubscription<T> subscription) { |
+ subscription._close(); |
+ subscription._eventState |= |
+ _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; |
+ }); |
+ }); |
+ } |
+} |
+ |
+/** |
+ * Stream controller that is used by [Stream.asBroadcastStream]. |
+ * |
+ * This stream controller allows incoming events while it is firing |
+ * other events. This is handled by delaying the events until the |
+ * current event is done firing, and then fire the pending events. |
+ * |
+ * This class extends [_SyncBroadcastStreamController]. Events of |
+ * an "asBroadcastStream" stream are always initiated by events |
+ * on another stream, and it is fine to forward them synchronously. |
+ */ |
+class _AsBroadcastStreamController<T> |
+ extends _SyncBroadcastStreamController<T> |
implements _EventDispatch<T> { |
_StreamImplEvents _pending; |
- _BufferingMultiplexStreamController(void onListen(), void onCancel()) |
+ _AsBroadcastStreamController(void onListen(), void onCancel()) |
: super(onListen, onCancel); |
bool get _hasPending => _pending != null && ! _pending.isEmpty; |
@@ -649,6 +759,5 @@ class _BufferingMultiplexStreamController<T> |
_pending = null; |
} |
super._callOnCancel(); |
- |
} |
} |