Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(565)

Unified Diff: sdk/lib/async/stream_controller.dart

Issue 15673006: Implement asBroadcast using a _MultiplexStreamController. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream_controller.dart
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
index a313f2a992df557ac3339f10ecae3c04bcef4b48..3078a30364092ed1ce04e043dfa5d60dccce6d05 100644
--- a/sdk/lib/async/stream_controller.dart
+++ b/sdk/lib/async/stream_controller.dart
@@ -448,6 +448,9 @@ class _MultiplexStreamController<T> implements StreamController<T>,
/** Whether there are currently a subscriber on the [Stream]. */
bool get hasListener => !_isEmpty;
+ /** Whether an event is being fired (sent to some, but not all, listeners). */
+ bool get _isFiring => (_state & _STATE_FIRING) != 0;
+
// Linked list helpers
bool get _isEmpty => identical(_next, this);
@@ -500,23 +503,38 @@ class _MultiplexStreamController<T> implements StreamController<T>,
void add(T data) {
assert(!isClosed);
+ _sendData(data);
+ }
+
+ void addError(Object error, [Object stackTrace]) {
+ assert(!isClosed);
+ // TODO(lrn): Handle stacktrace.
floitsch 2013/05/29 09:39:58 That should be less than 5 lines of code.
Lasse Reichstein Nielsen 2013/05/29 10:39:12 Done.
+ _sendError(error);
+ }
+
+ void close() {
+ assert(!isClosed);
+ _state |= _STATE_CLOSED;
+ _sendDone();
+ }
+
+ // EventDispatch interface.
+
+ void _sendData(T data) {
if (_isEmpty) return;
_forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._add(data);
});
}
- void addError(Object error, [Object stackTrace]) {
- assert(!isClosed);
+ void _sendError(Object error) {
if (_isEmpty) return;
_forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._addError(error);
});
}
- void close() {
- assert(!isClosed);
- _state |= _STATE_CLOSED;
+ void _sendDone() {
if (_isEmpty) return;
_forEachListener((_MultiplexSubscription<T> subscription) {
subscription._close();
@@ -527,7 +545,7 @@ class _MultiplexStreamController<T> implements StreamController<T>,
void _forEachListener(
void action(_BufferingStreamSubscription<T> subscription)) {
- if ((_state & _STATE_FIRING) != 0) {
+ if (_isFiring) {
throw new StateError(
"Cannot fire new event. Controller is already firing an event");
}
@@ -565,3 +583,53 @@ class _MultiplexStreamController<T> implements StreamController<T>,
}
}
}
+
+class _BufferingMultiplexStreamController<T>
+ extends _MultiplexStreamController<T>
+ implements _EventDispatch<T> {
+ _StreamImplEvents _pending;
+
+ _BufferingMultiplexStreamController(void onListen(), void onCancel())
+ : super(onListen, onCancel);
+
+ bool get _hasPending => _pending != null && ! _pending.isEmpty;
+
+ void _addPendingEvent(_DelayedEvent event) {
+ if (_pending == null) {
+ _pending = new _StreamImplEvents();
+ }
+ _pending.add(event);
+ }
+
+ void add(T data) {
+ if (_isFiring || _hasPending) {
floitsch 2013/05/29 09:39:58 add comment when _hasPending but not _isFiring.
Lasse Reichstein Nielsen 2013/05/29 10:39:12 It's only during the onCancel callback after remov
+ _addPendingEvent(new _DelayedData<T>(data));
+ return;
+ }
+ super.add(data);
+ while (_hasPending) {
+ _pending.handleNext(this);
+ }
+ }
+
+ void addError(Object error, [StackTrace stackTrace]) {
+ if (_isFiring || _hasPending) {
+ _addPendingEvent(new _DelayedError(error));
+ return;
+ }
+ super.addError(error, stackTrace);
+ while (_hasPending) {
+ _pending.handleNext(this);
+ }
+ }
+
+ void close() {
+ if (_isFiring || _hasPending) {
+ _addPendingEvent(const _DelayedDone());
+ _state |= _STATE_CLOSED;
+ return;
+ }
+ super.close();
+ assert(!_hasPending);
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698