Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(66)

Unified Diff: sdk/lib/async/stream_controller.dart

Issue 15673006: Implement asBroadcast using a _MultiplexStreamController. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Don't keep pending events when calling onCancel. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_controller.dart
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
index a313f2a992df557ac3339f10ecae3c04bcef4b48..b057565063d99e65b75af216e42711cc74670f6f 100644
--- a/sdk/lib/async/stream_controller.dart
+++ b/sdk/lib/async/stream_controller.dart
@@ -448,6 +448,9 @@ class _MultiplexStreamController<T> implements StreamController<T>,
/** Whether there are currently a subscriber on the [Stream]. */
bool get hasListener => !_isEmpty;
+ /** Whether an event is being fired (sent to some, but not all, listeners). */
+ bool get _isFiring => (_state & _STATE_FIRING) != 0;
+
// Linked list helpers
bool get _isEmpty => identical(_next, this);
@@ -488,7 +491,7 @@ class _MultiplexStreamController<T> implements StreamController<T>,
// If we are currently firing an event, the empty-check is performed at
// the end of the listener loop instead of here.
if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
- _runGuarded(_onCancel);
+ _callOnCancel();
}
}
}
@@ -499,24 +502,45 @@ class _MultiplexStreamController<T> implements StreamController<T>,
// EventSink interface.
void add(T data) {
- assert(!isClosed);
+ if (isClosed) {
+ throw new StateError("Cannot add new events after calling close()");
+ }
+ _sendData(data);
+ }
+
+ void addError(Object error, [Object stackTrace]) {
+ if (isClosed) {
+ throw new StateError("Cannot add new events after calling close()");
+ }
+ if (stackTrace != null) _attachStackTrace(error, stackTrace);
+ _sendError(error);
+ }
+
+ void close() {
+ if (isClosed) {
+ throw new StateError("Cannot add new events after calling close()");
+ }
+ _state |= _STATE_CLOSED;
+ _sendDone();
+ }
+
+ // EventDispatch interface.
+
+ void _sendData(T data) {
if (_isEmpty) return;
_forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._add(data);
});
}
- void addError(Object error, [Object stackTrace]) {
- assert(!isClosed);
+ void _sendError(Object error) {
if (_isEmpty) return;
_forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._addError(error);
});
}
- void close() {
- assert(!isClosed);
- _state |= _STATE_CLOSED;
+ void _sendDone() {
if (_isEmpty) return;
_forEachListener((_MultiplexSubscription<T> subscription) {
subscription._close();
@@ -527,7 +551,7 @@ class _MultiplexStreamController<T> implements StreamController<T>,
void _forEachListener(
void action(_BufferingStreamSubscription<T> subscription)) {
- if ((_state & _STATE_FIRING) != 0) {
+ if (_isFiring) {
throw new StateError(
"Cannot fire new event. Controller is already firing an event");
}
@@ -561,7 +585,70 @@ class _MultiplexStreamController<T> implements StreamController<T>,
_state &= ~_STATE_FIRING;
if (_isEmpty) {
- _runGuarded(_onCancel);
+ _callOnCancel();
}
}
+
+ void _callOnCancel() {
+ _runGuarded(_onCancel);
+ }
+}
+
+class _BufferingMultiplexStreamController<T>
+ extends _MultiplexStreamController<T>
+ implements _EventDispatch<T> {
+ _StreamImplEvents _pending;
+
+ _BufferingMultiplexStreamController(void onListen(), void onCancel())
+ : super(onListen, onCancel);
+
+ bool get _hasPending => _pending != null && ! _pending.isEmpty;
+
+ void _addPendingEvent(_DelayedEvent event) {
+ if (_pending == null) {
+ _pending = new _StreamImplEvents();
+ }
+ _pending.add(event);
+ }
+
+ void add(T data) {
+ if (_isFiring) {
+ _addPendingEvent(new _DelayedData<T>(data));
+ return;
+ }
+ super.add(data);
+ while (_hasPending) {
+ _pending.handleNext(this);
+ }
+ }
+
+ void addError(Object error, [StackTrace stackTrace]) {
+ if (_isFiring) {
+ _addPendingEvent(new _DelayedError(error));
+ return;
+ }
+ super.addError(error, stackTrace);
+ while (_hasPending) {
+ _pending.handleNext(this);
+ }
+ }
+
+ void close() {
+ if (_isFiring) {
+ _addPendingEvent(const _DelayedDone());
+ _state |= _STATE_CLOSED;
+ return;
+ }
+ super.close();
+ assert(!_hasPending);
+ }
+
+ void _callOnCancel() {
+ if (_hasPending) {
+ _pending.clear();
+ _pending = null;
+ }
+ super._callOnCancel();
+
+ }
}
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698