Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(388)

Unified Diff: sdk/lib/async/stream_controller.dart

Issue 16125005: Make new StreamController be async by default. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream_controller.dart
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
index b057565063d99e65b75af216e42711cc74670f6f..030c5906f8f0f43599f019348738fa5f1bc3d830 100644
--- a/sdk/lib/async/stream_controller.dart
+++ b/sdk/lib/async/stream_controller.dart
@@ -69,8 +69,11 @@ abstract class StreamController<T> implements EventSink<T> {
factory StreamController({void onListen(),
void onPause(),
void onResume(),
- void onCancel()})
- => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel);
+ void onCancel(),
+ bool sync: false})
+ => sync
+ ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
+ : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
/**
* A controller where [stream] can be listened to more than once.
@@ -92,8 +95,12 @@ abstract class StreamController<T> implements EventSink<T> {
* If a listener is added again later, after the [onCancel] was called,
* the [onListen] will be called again.
*/
- factory StreamController.broadcast({void onListen(), void onCancel()}) {
- return new _MultiplexStreamController<T>(onListen, onCancel);
+ factory StreamController.broadcast({void onListen(),
+ void onCancel(),
+ bool sync: false}) {
+ return sync
+ ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
+ : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
}
/**
@@ -147,8 +154,9 @@ abstract class _StreamControllerLifecycle<T> {
*
* Controls a stream that only supports a single controller.
*/
-class _StreamControllerImpl<T> implements StreamController<T>,
- _StreamControllerLifecycle<T> {
+abstract class _StreamController<T> implements StreamController<T>,
+ _StreamControllerLifecycle<T>,
+ _EventDispatch<T> {
static const int _STATE_OPEN = 0;
static const int _STATE_CANCELLED = 1;
static const int _STATE_CLOSED = 2;
@@ -168,10 +176,10 @@ class _StreamControllerImpl<T> implements StreamController<T>,
// Events added to the stream before it has an active subscription.
_PendingEvents _pendingEvents = null;
- _StreamControllerImpl(this._onListen,
- this._onPause,
- this._onResume,
- this._onCancel) {
+ _StreamController(this._onListen,
+ this._onPause,
+ this._onResume,
+ this._onCancel) {
_stream = new _ControllerStream<T>(this);
}
@@ -202,7 +210,7 @@ class _StreamControllerImpl<T> implements StreamController<T>,
void add(T value) {
if (isClosed) throw new StateError("Adding event after close");
if (_subscription != null) {
- _subscription._add(value);
+ _sendData(value);
} else if (!_isCancelled) {
_addPendingEvent(new _DelayedData<T>(value));
}
@@ -219,7 +227,7 @@ class _StreamControllerImpl<T> implements StreamController<T>,
_attachStackTrace(error, stackTrace);
}
if (_subscription != null) {
- _subscription._addError(error);
+ _sendError(error);
} else if (!_isCancelled) {
_addPendingEvent(new _DelayedError(error));
}
@@ -240,12 +248,14 @@ class _StreamControllerImpl<T> implements StreamController<T>,
if (isClosed) return;
_state |= _STATE_CLOSED;
if (_subscription != null) {
- _subscription._close();
+ _sendDone();
} else if (!_isCancelled) {
_addPendingEvent(const _DelayedDone());
}
}
+ // EventDispatch interface
+
void _addPendingEvent(_DelayedEvent event) {
if (_isCancelled) return;
_StreamImplEvents events = _pendingEvents;
@@ -281,6 +291,52 @@ class _StreamControllerImpl<T> implements StreamController<T>,
}
}
+class _SyncStreamController<T> extends _StreamController<T> {
+ _SyncStreamController(void onListen(),
+ void onPause(),
+ void onResume(),
+ void onCancel())
+ : super(onListen, onPause, onResume, onCancel);
+
+ void _sendData(T data) {
+ _subscription._add(data);
+ }
+
+ void _sendError(Object error) {
+ _subscription._addError(error);
+ }
+
+ void _sendDone() {
+ _subscription._close();
+ }
+}
+
+class _AsyncStreamController<T> extends _StreamController<T> {
+ _AsyncStreamController(void onListen(),
+ void onPause(),
+ void onResume(),
+ void onCancel())
+ : super(onListen, onPause, onResume, onCancel);
+
+ void _sendData(T data) {
+ runAsync(() {
floitsch 2013/05/30 12:13:48 Try to use addPending instead.
Lasse Reichstein Nielsen 2013/05/31 05:51:59 Done.
+ if (_subscription == null) return;
+ _subscription._add(data);
+ });
+ }
+
+ void _sendError(Object error) {
+ runAsync(() {
+ if (_subscription == null) return;
+ _subscription._addError(error);
+ });
+ }
+
+ void _sendDone() {
+ runAsync(_subscription._close);
+ }
+}
+
typedef void _NotificationHandler();
void _runGuarded(_NotificationHandler notificationHandler) {
@@ -339,10 +395,10 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
}
}
-class _MultiplexStream<T> extends _StreamImpl<T> {
- _MultiplexStreamController _controller;
+class _BroadcastStream<T> extends _StreamImpl<T> {
+ _BroadcastStreamController _controller;
- _MultiplexStream(this._controller);
+ _BroadcastStream(this._controller);
bool get isBroadcast => true;
@@ -351,7 +407,7 @@ class _MultiplexStream<T> extends _StreamImpl<T> {
void onError(Object error),
void onDone(),
bool cancelOnError) {
- return new _MultiplexSubscription<T>(
+ return new _BroadcastSubscription<T>(
_controller, onData, onError, onDone, cancelOnError);
}
@@ -360,22 +416,22 @@ class _MultiplexStream<T> extends _StreamImpl<T> {
}
}
-abstract class _MultiplexSubscriptionLink {
- _MultiplexSubscriptionLink _next;
- _MultiplexSubscriptionLink _previous;
+abstract class _BroadcastSubscriptionLink {
+ _BroadcastSubscriptionLink _next;
+ _BroadcastSubscriptionLink _previous;
}
-class _MultiplexSubscription<T> extends _ControllerSubscription<T>
- implements _MultiplexSubscriptionLink {
+class _BroadcastSubscription<T> extends _ControllerSubscription<T>
+ implements _BroadcastSubscriptionLink {
static const int _STATE_EVENT_ID = 1;
static const int _STATE_FIRING = 2;
static const int _STATE_REMOVE_AFTER_FIRING = 4;
int _eventState;
- _MultiplexSubscriptionLink _next;
- _MultiplexSubscriptionLink _previous;
+ _BroadcastSubscriptionLink _next;
+ _BroadcastSubscriptionLink _previous;
- _MultiplexSubscription(_StreamControllerLifecycle controller,
+ _BroadcastSubscription(_StreamControllerLifecycle controller,
void onData(T data),
void onError(Object error),
void onDone(),
@@ -384,7 +440,7 @@ class _MultiplexSubscription<T> extends _ControllerSubscription<T>
_next = _previous = this;
}
- _MultiplexStreamController get _controller => super._controller;
+ _BroadcastStreamController get _controller => super._controller;
bool _expectsEvent(int eventId) {
return (_eventState & _STATE_EVENT_ID) == eventId;
@@ -406,9 +462,11 @@ class _MultiplexSubscription<T> extends _ControllerSubscription<T>
}
-class _MultiplexStreamController<T> implements StreamController<T>,
- _StreamControllerLifecycle<T>,
- _MultiplexSubscriptionLink {
+abstract class _BroadcastStreamController<T>
+ implements StreamController<T>,
+ _StreamControllerLifecycle<T>,
+ _BroadcastSubscriptionLink,
+ _EventDispatch<T> {
static const int _STATE_INITIAL = 0;
static const int _STATE_EVENT_ID = 1;
static const int _STATE_FIRING = 2;
@@ -421,24 +479,24 @@ class _MultiplexStreamController<T> implements StreamController<T>,
int _state;
// Double-linked list of active listeners.
- _MultiplexSubscriptionLink _next;
- _MultiplexSubscriptionLink _previous;
+ _BroadcastSubscriptionLink _next;
+ _BroadcastSubscriptionLink _previous;
- _MultiplexStreamController(this._onListen, this._onCancel)
+ _BroadcastStreamController(this._onListen, this._onCancel)
: _state = _STATE_INITIAL {
_next = _previous = this;
}
// StreamController interface.
- Stream<T> get stream => new _MultiplexStream<T>(this);
+ Stream<T> get stream => new _BroadcastStream<T>(this);
EventSink<T> get sink => new _EventSinkView<T>(this);
bool get isClosed => (_state & _STATE_CLOSED) != 0;
/**
- * A multiplex controller is never paused.
+ * A broadcast controller is never paused.
*
* Each receiving stream may be paused individually, and they handle their
* own buffering.
@@ -456,8 +514,8 @@ class _MultiplexStreamController<T> implements StreamController<T>,
bool get _isEmpty => identical(_next, this);
/** Adds subscription to linked list of active listeners. */
- void _addListener(_MultiplexSubscription<T> subscription) {
- _MultiplexSubscriptionLink previous = _previous;
+ void _addListener(_BroadcastSubscription<T> subscription) {
+ _BroadcastSubscriptionLink previous = _previous;
previous._next = subscription;
_previous = subscription._previous;
subscription._previous._next = this;
@@ -465,7 +523,7 @@ class _MultiplexStreamController<T> implements StreamController<T>,
subscription._eventState = (_state & _STATE_EVENT_ID);
}
- void _removeListener(_MultiplexSubscription<T> subscription) {
+ void _removeListener(_BroadcastSubscription<T> subscription) {
assert(identical(subscription._controller, this));
assert(!identical(subscription._next, subscription));
subscription._previous._next = subscription._next;
@@ -475,7 +533,7 @@ class _MultiplexStreamController<T> implements StreamController<T>,
// _StreamControllerLifecycle interface.
- void _recordListen(_MultiplexSubscription<T> subscription) {
+ void _recordListen(_BroadcastSubscription<T> subscription) {
_addListener(subscription);
if (identical(_next, _previous)) {
// Only one listener, so it must be the first listener.
@@ -483,7 +541,7 @@ class _MultiplexStreamController<T> implements StreamController<T>,
}
}
- void _recordCancel(_MultiplexSubscription<T> subscription) {
+ void _recordCancel(_BroadcastSubscription<T> subscription) {
if (subscription._isFiring) {
subscription._setRemoveAfterFiring();
} else {
@@ -524,31 +582,6 @@ class _MultiplexStreamController<T> implements StreamController<T>,
_sendDone();
}
- // EventDispatch interface.
-
- void _sendData(T data) {
- if (_isEmpty) return;
- _forEachListener((_BufferingStreamSubscription<T> subscription) {
- subscription._add(data);
- });
- }
-
- void _sendError(Object error) {
- if (_isEmpty) return;
- _forEachListener((_BufferingStreamSubscription<T> subscription) {
- subscription._addError(error);
- });
- }
-
- void _sendDone() {
- if (_isEmpty) return;
- _forEachListener((_MultiplexSubscription<T> subscription) {
- subscription._close();
- subscription._eventState |=
- _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING;
- });
- }
-
void _forEachListener(
void action(_BufferingStreamSubscription<T> subscription)) {
if (_isFiring) {
@@ -566,18 +599,18 @@ class _MultiplexStreamController<T> implements StreamController<T>,
// Any listeners added while firing this event will expect the next event,
// not this one, and won't get notified.
_state ^= _STATE_EVENT_ID | _STATE_FIRING;
- _MultiplexSubscriptionLink link = _next;
+ _BroadcastSubscriptionLink link = _next;
while (!identical(link, this)) {
- _MultiplexSubscription<T> subscription = link;
+ _BroadcastSubscription<T> subscription = link;
if (subscription._expectsEvent(id)) {
- subscription._eventState |= _MultiplexSubscription._STATE_FIRING;
+ subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
action(subscription);
subscription._toggleEventId();
link = subscription._next;
if (subscription._removeAfterFiring) {
_removeListener(subscription);
}
- subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING;
+ subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
} else {
link = subscription._next;
}
@@ -594,12 +627,89 @@ class _MultiplexStreamController<T> implements StreamController<T>,
}
}
-class _BufferingMultiplexStreamController<T>
- extends _MultiplexStreamController<T>
+class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
+ _SyncBroadcastStreamController(void onListen(), void onCancel())
+ : super(onListen, onCancel);
+
+ // EventDispatch interface.
+
+ void _sendData(T data) {
+ if (_isEmpty) return;
+ _forEachListener((_BufferingStreamSubscription<T> subscription) {
+ subscription._add(data);
+ });
+ }
+
+ void _sendError(Object error) {
+ if (_isEmpty) return;
+ _forEachListener((_BufferingStreamSubscription<T> subscription) {
+ subscription._addError(error);
+ });
+ }
+
+ void _sendDone() {
+ if (_isEmpty) return;
+ _forEachListener((_BroadcastSubscription<T> subscription) {
+ subscription._close();
+ subscription._eventState |=
+ _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
+ });
+ }
+}
+
+class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
+ _AsyncBroadcastStreamController(void onListen(), void onCancel())
+ : super(onListen, onCancel);
+
+ // EventDispatch interface.
+
+ void _sendData(T data) {
+ runAsync(() {
+ if (_isEmpty) return;
+ _forEachListener((_BufferingStreamSubscription<T> subscription) {
+ subscription._add(data);
+ });
+ });
+ }
+
+ void _sendError(Object error) {
+ runAsync(() {
+ if (_isEmpty) return;
+ _forEachListener((_BufferingStreamSubscription<T> subscription) {
+ subscription._addError(error);
+ });
+ });
+ }
+
+ void _sendDone() {
+ runAsync(() {
+ if (_isEmpty) return;
+ _forEachListener((_BroadcastSubscription<T> subscription) {
+ subscription._close();
+ subscription._eventState |=
+ _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
+ });
+ });
+ }
+}
+
+/**
+ * Stream controller that is used by [Stream.asBroadcastStream].
+ *
+ * This stream controller allows incoming events while it is firing
+ * other events. This is handled by delaying the events until the
+ * current event is done firing, and then fire the pending events.
+ *
+ * This class extends [_SyncBroadcastStreamController]. Events of
+ * an "asBroadcastStream" stream are always initiated by events
+ * on another stream, and it is fine to forward them synchronously.
+ */
+class _AsBroadcastStreamController<T>
+ extends _SyncBroadcastStreamController<T>
implements _EventDispatch<T> {
_StreamImplEvents _pending;
- _BufferingMultiplexStreamController(void onListen(), void onCancel())
+ _AsBroadcastStreamController(void onListen(), void onCancel())
: super(onListen, onCancel);
bool get _hasPending => _pending != null && ! _pending.isEmpty;
@@ -649,6 +759,5 @@ class _BufferingMultiplexStreamController<T>
_pending = null;
}
super._callOnCancel();
-
}
}

Powered by Google App Engine
This is Rietveld 408576698