Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(43)

Unified Diff: sdk/lib/async/stream_controller.dart

Issue 16240008: Make StreamController be a StreamSink, not just an EventSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream_controller.dart
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
index abfd6485859eb1872c5a52f4413f3df712d2d73e..67e4971f359b0d95eb37e04d595728d28d148574 100644
--- a/sdk/lib/async/stream_controller.dart
+++ b/sdk/lib/async/stream_controller.dart
@@ -125,7 +125,7 @@ abstract class StreamController<T> implements EventSink<T> {
/**
* Returns a view of this object that only exposes the [EventSink] interface.
*/
- EventSink<T> get sink;
+ StreamSink<T> get sink;
/**
* Whether the stream is closed for adding more events.
@@ -185,6 +185,12 @@ abstract class _StreamController<T> implements StreamController<T>,
final _NotificationHandler _onResume;
final _NotificationHandler _onCancel;
_StreamImpl<T> _stream;
+ /**
+ * Cached value returned by [sink].
+ *
+ * Used to pause the stream if necessary.
+ */
+ _ControllerStreamSink _sink;
// An active subscription on the stream, or null if no subscripton is active.
_ControllerSubscription<T> _subscription;
@@ -207,7 +213,9 @@ abstract class _StreamController<T> implements StreamController<T>,
/**
* Returns a view of this object that only exposes the [EventSink] interface.
*/
- EventSink<T> get sink => new _EventSinkView<T>(this);
+ StreamSink<T> get sink =>
+ (_sink != null) ? _sink
+ : _sink = new _ControllerStreamSink<T>(this);
/**
* Whether a listener has existed and been cancelled.
@@ -298,14 +306,17 @@ abstract class _StreamController<T> implements StreamController<T>,
assert(identical(_subscription, subscription));
_subscription = null;
_state |= _STATE_CANCELLED;
+ if (_sink != null) _sink._cancel();
_runGuarded(_onCancel);
}
void _recordPause(StreamSubscription<T> subscription) {
+ if (_sink != null) _sink._pause();
_runGuarded(_onPause);
floitsch 2013/06/06 15:08:29 Do we want to call onPause when there is an addStr
}
void _recordResume(StreamSubscription<T> subscription) {
+ if (_sink != null) _sink._resume();
_runGuarded(_onResume);
floitsch 2013/06/06 15:08:29 ditto.
}
}
@@ -495,6 +506,10 @@ abstract class _BroadcastStreamController<T>
_BroadcastSubscriptionLink _next;
_BroadcastSubscriptionLink _previous;
+ // Cached return value of [sink]. Used to cancel a `sink.addStream`
floitsch 2013/06/06 15:08:29 cancel, pause and resume a `sink.addStream`.
+ // when the stream ends.
+ _ControllerStreamSink _sink;
+
_BroadcastStreamController(this._onListen, this._onCancel)
: _state = _STATE_INITIAL {
_next = _previous = this;
@@ -504,7 +519,8 @@ abstract class _BroadcastStreamController<T>
Stream<T> get stream => new _BroadcastStream<T>(this);
- EventSink<T> get sink => new _EventSinkView<T>(this);
+ StreamSink<T> get sink =>
+ (_sink != null) ? _sink : _sink = new _ControllerStreamSink<T>(this);
bool get isClosed => (_state & _STATE_CLOSED) != 0;
@@ -536,12 +552,13 @@ abstract class _BroadcastStreamController<T>
subscription._eventState = (_state & _STATE_EVENT_ID);
}
- void _removeListener(_BroadcastSubscription<T> subscription) {
+ bool _removeListener(_BroadcastSubscription<T> subscription) {
assert(identical(subscription._controller, this));
assert(!identical(subscription._next, subscription));
subscription._previous._next = subscription._next;
subscription._next._previous = subscription._previous;
subscription._next = subscription._previous = subscription;
+ return true;
}
// _StreamControllerLifecycle interface.
@@ -555,6 +572,8 @@ abstract class _BroadcastStreamController<T>
}
void _recordCancel(_BroadcastSubscription<T> subscription) {
+ // If already removed by the stream, don't remove it again.
+ if (identical(subscription._next, subscription)) return;
if (subscription._isFiring) {
subscription._setRemoveAfterFiring();
} else {
@@ -636,6 +655,9 @@ abstract class _BroadcastStreamController<T>
}
void _callOnCancel() {
+ if (_sink != null && isClosed) {
floitsch 2013/06/06 15:08:29 add comment explaining why you look at `isClosed`.
+ _sink._cancel();
+ }
_runGuarded(_onCancel);
}
}
@@ -772,3 +794,97 @@ class _AsBroadcastStreamController<T>
super._callOnCancel();
}
}
+
+
+/**
+ * [EventSink] wrapper that only exposes a [StreamSink] interface.
+ */
+class _ControllerStreamSink<T> implements StreamSink<T> {
+ final EventSink<T> _sink;
+ // Future completed when then controller stream is closed.
+ _FutureImpl _doneFuture;
+ // [_FutureImpl] returned by latest call to addStream.
+ // Set to null while not processing an [addStream] stream.
+ _FutureImpl _addStreamFuture;
+ // Subscription of latest call to addStream.
+ // Set to null while not processing an [addStream] stream.
+ StreamSubscription _subscription;
+
+ _ControllerStreamSink(this._sink);
+
+ bool get _isAddStreamActive => _addStreamFuture != null;
+
+ void _pause() {
+ if (_subscription != null) _subscription.pause();
+ }
+
+ void _resume() {
+ if (_subscription != null) _subscription.resume();
+ }
+
+ void _cancel() {
+ if (_isAddStreamActive) {
+ StreamSubscription subscription = _subscription;
+ _FutureImpl future = _addStreamFuture;
+ _subscription = null;
+ _addStreamFuture = null;
+ subscription.cancel();
+ future._setValue(null);
+ }
+ if (_doneFuture != null) {
+ _doneFuture._setValue(null);
+ }
+ }
+
+ void add(T value) {
+ if (_isAddStreamActive) {
+ throw new StateError("Cannot add events while addStream is running.");
+ }
+ _sink.add(value);
+ }
+
+ void addError(error) {
+ if (_isAddStreamActive) {
+ throw new StateError("Cannot add events while addStream is running.");
+ }
+ _sink.addError(error);
+ }
+
+ Future close() {
+ if (_isAddStreamActive) {
+ throw new StateError("Cannot add events while addStream is running.");
+ }
+ if (_doneFuture == null) _doneFuture = new _FutureImpl();
+ _sink.close();
+ return _doneFuture;
+ }
+
+ Future addStream(Stream<T> stream) {
+ if (_isAddStreamActive) {
+ throw new StateError("Cannot add a new stream while "
+ "addStream is running.");
+ }
+ _addStreamFuture = new _FutureImpl();
+ _subscription = stream.listen(
+ _sink.add,
+ onError: (error) {
+ _FutureImpl future = _addStreamFuture;
+ _addStreamFuture = null;
+ _subscription = null;
+ future._setError(error);
+ },
+ onDone: () {
+ _FutureImpl future = _addStreamFuture;
+ _addStreamFuture = null;
+ _subscription = null;
+ future._setValue(null);
+ },
+ cancelOnError: true
+ );
+ return _addStreamFuture;
+ }
+
+ Future get done =>
+ (_addStreamFuture != null) ? _addStreamFuture
+ : new _FutureImpl.immediate(null);
+}

Powered by Google App Engine
This is Rietveld 408576698