Chromium Code Reviews| Index: sdk/lib/async/stream_controller.dart |
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
| index abfd6485859eb1872c5a52f4413f3df712d2d73e..67e4971f359b0d95eb37e04d595728d28d148574 100644 |
| --- a/sdk/lib/async/stream_controller.dart |
| +++ b/sdk/lib/async/stream_controller.dart |
| @@ -125,7 +125,7 @@ abstract class StreamController<T> implements EventSink<T> { |
| /** |
| * Returns a view of this object that only exposes the [EventSink] interface. |
| */ |
| - EventSink<T> get sink; |
| + StreamSink<T> get sink; |
| /** |
| * Whether the stream is closed for adding more events. |
| @@ -185,6 +185,12 @@ abstract class _StreamController<T> implements StreamController<T>, |
| final _NotificationHandler _onResume; |
| final _NotificationHandler _onCancel; |
| _StreamImpl<T> _stream; |
| + /** |
| + * Cached value returned by [sink]. |
| + * |
| + * Used to pause the stream if necessary. |
| + */ |
| + _ControllerStreamSink _sink; |
| // An active subscription on the stream, or null if no subscripton is active. |
| _ControllerSubscription<T> _subscription; |
| @@ -207,7 +213,9 @@ abstract class _StreamController<T> implements StreamController<T>, |
| /** |
| * Returns a view of this object that only exposes the [EventSink] interface. |
| */ |
| - EventSink<T> get sink => new _EventSinkView<T>(this); |
| + StreamSink<T> get sink => |
| + (_sink != null) ? _sink |
| + : _sink = new _ControllerStreamSink<T>(this); |
| /** |
| * Whether a listener has existed and been cancelled. |
| @@ -298,14 +306,17 @@ abstract class _StreamController<T> implements StreamController<T>, |
| assert(identical(_subscription, subscription)); |
| _subscription = null; |
| _state |= _STATE_CANCELLED; |
| + if (_sink != null) _sink._cancel(); |
| _runGuarded(_onCancel); |
| } |
| void _recordPause(StreamSubscription<T> subscription) { |
| + if (_sink != null) _sink._pause(); |
| _runGuarded(_onPause); |
|
floitsch
2013/06/06 15:08:29
Do we want to call onPause when there is an addStr
|
| } |
| void _recordResume(StreamSubscription<T> subscription) { |
| + if (_sink != null) _sink._resume(); |
| _runGuarded(_onResume); |
|
floitsch
2013/06/06 15:08:29
ditto.
|
| } |
| } |
| @@ -495,6 +506,10 @@ abstract class _BroadcastStreamController<T> |
| _BroadcastSubscriptionLink _next; |
| _BroadcastSubscriptionLink _previous; |
| + // Cached return value of [sink]. Used to cancel a `sink.addStream` |
|
floitsch
2013/06/06 15:08:29
cancel, pause and resume a `sink.addStream`.
|
| + // when the stream ends. |
| + _ControllerStreamSink _sink; |
| + |
| _BroadcastStreamController(this._onListen, this._onCancel) |
| : _state = _STATE_INITIAL { |
| _next = _previous = this; |
| @@ -504,7 +519,8 @@ abstract class _BroadcastStreamController<T> |
| Stream<T> get stream => new _BroadcastStream<T>(this); |
| - EventSink<T> get sink => new _EventSinkView<T>(this); |
| + StreamSink<T> get sink => |
| + (_sink != null) ? _sink : _sink = new _ControllerStreamSink<T>(this); |
| bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| @@ -536,12 +552,13 @@ abstract class _BroadcastStreamController<T> |
| subscription._eventState = (_state & _STATE_EVENT_ID); |
| } |
| - void _removeListener(_BroadcastSubscription<T> subscription) { |
| + bool _removeListener(_BroadcastSubscription<T> subscription) { |
| assert(identical(subscription._controller, this)); |
| assert(!identical(subscription._next, subscription)); |
| subscription._previous._next = subscription._next; |
| subscription._next._previous = subscription._previous; |
| subscription._next = subscription._previous = subscription; |
| + return true; |
| } |
| // _StreamControllerLifecycle interface. |
| @@ -555,6 +572,8 @@ abstract class _BroadcastStreamController<T> |
| } |
| void _recordCancel(_BroadcastSubscription<T> subscription) { |
| + // If already removed by the stream, don't remove it again. |
| + if (identical(subscription._next, subscription)) return; |
| if (subscription._isFiring) { |
| subscription._setRemoveAfterFiring(); |
| } else { |
| @@ -636,6 +655,9 @@ abstract class _BroadcastStreamController<T> |
| } |
| void _callOnCancel() { |
| + if (_sink != null && isClosed) { |
|
floitsch
2013/06/06 15:08:29
add comment explaining why you look at `isClosed`.
|
| + _sink._cancel(); |
| + } |
| _runGuarded(_onCancel); |
| } |
| } |
| @@ -772,3 +794,97 @@ class _AsBroadcastStreamController<T> |
| super._callOnCancel(); |
| } |
| } |
| + |
| + |
| +/** |
| + * [EventSink] wrapper that only exposes a [StreamSink] interface. |
| + */ |
| +class _ControllerStreamSink<T> implements StreamSink<T> { |
| + final EventSink<T> _sink; |
| + // Future completed when then controller stream is closed. |
| + _FutureImpl _doneFuture; |
| + // [_FutureImpl] returned by latest call to addStream. |
| + // Set to null while not processing an [addStream] stream. |
| + _FutureImpl _addStreamFuture; |
| + // Subscription of latest call to addStream. |
| + // Set to null while not processing an [addStream] stream. |
| + StreamSubscription _subscription; |
| + |
| + _ControllerStreamSink(this._sink); |
| + |
| + bool get _isAddStreamActive => _addStreamFuture != null; |
| + |
| + void _pause() { |
| + if (_subscription != null) _subscription.pause(); |
| + } |
| + |
| + void _resume() { |
| + if (_subscription != null) _subscription.resume(); |
| + } |
| + |
| + void _cancel() { |
| + if (_isAddStreamActive) { |
| + StreamSubscription subscription = _subscription; |
| + _FutureImpl future = _addStreamFuture; |
| + _subscription = null; |
| + _addStreamFuture = null; |
| + subscription.cancel(); |
| + future._setValue(null); |
| + } |
| + if (_doneFuture != null) { |
| + _doneFuture._setValue(null); |
| + } |
| + } |
| + |
| + void add(T value) { |
| + if (_isAddStreamActive) { |
| + throw new StateError("Cannot add events while addStream is running."); |
| + } |
| + _sink.add(value); |
| + } |
| + |
| + void addError(error) { |
| + if (_isAddStreamActive) { |
| + throw new StateError("Cannot add events while addStream is running."); |
| + } |
| + _sink.addError(error); |
| + } |
| + |
| + Future close() { |
| + if (_isAddStreamActive) { |
| + throw new StateError("Cannot add events while addStream is running."); |
| + } |
| + if (_doneFuture == null) _doneFuture = new _FutureImpl(); |
| + _sink.close(); |
| + return _doneFuture; |
| + } |
| + |
| + Future addStream(Stream<T> stream) { |
| + if (_isAddStreamActive) { |
| + throw new StateError("Cannot add a new stream while " |
| + "addStream is running."); |
| + } |
| + _addStreamFuture = new _FutureImpl(); |
| + _subscription = stream.listen( |
| + _sink.add, |
| + onError: (error) { |
| + _FutureImpl future = _addStreamFuture; |
| + _addStreamFuture = null; |
| + _subscription = null; |
| + future._setError(error); |
| + }, |
| + onDone: () { |
| + _FutureImpl future = _addStreamFuture; |
| + _addStreamFuture = null; |
| + _subscription = null; |
| + future._setValue(null); |
| + }, |
| + cancelOnError: true |
| + ); |
| + return _addStreamFuture; |
| + } |
| + |
| + Future get done => |
| + (_addStreamFuture != null) ? _addStreamFuture |
| + : new _FutureImpl.immediate(null); |
| +} |