Chromium Code Reviews| Index: sdk/lib/async/stream_controller.dart |
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
| index 501b588e19ed63777cddc004e528d35290e44c52..51c3e30e08d24d476a7d2c53f7119c1c68bbc4e2 100644 |
| --- a/sdk/lib/async/stream_controller.dart |
| +++ b/sdk/lib/async/stream_controller.dart |
| @@ -46,7 +46,67 @@ part of dart.async; |
| * the stream at all, and won't trigger callbacks. From the controller's point |
| * of view, the stream is completely inert when has completed. |
| */ |
| -class StreamController<T> extends EventSink<T> { |
| +abstract class StreamController<T> implements EventSink<T> { |
| + /** The stream that this controller is controlling. */ |
| + Stream<T> get stream; |
| + |
| + /** |
| + * A controller with a [stream] that supports only one single subscriber. |
| + * |
| + * The controller will buffer all incoming events until the subscriber is |
| + * registered. |
| + * |
| + * The [onPause] function is called when the stream becomes |
| + * paused. [onResume] is called when the stream resumed. |
| + * |
| + * The [onListen] callback is called when the stream |
| + * receives its listener and [onCancel] when the listener ends |
| + * its subscription. |
| + * |
| + * If the stream is canceled before the controller needs new data the |
| + * [onResume] call might not be executed. |
| + */ |
| + factory StreamController({void onListen(), |
| + void onPause(), |
| + void onResume(), |
| + void onCancel()}) |
| + => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); |
| + |
| + /** |
| + * A controller where [stream] creates new stream each time it is read. |
| + * |
| + * The controller distributes any events to all currently subscribed streams. |
| + * |
| + * The [onListen] callback is called when the first listener is subscribed, |
| + * and the [onCancel] is called when there is no longer any active listeners. |
| + * If a listener is added again later, after the [onCancel] was called, |
| + * the [onListen] will be called again. |
| + */ |
| + factory StreamController.multiplex({void onListen(), void onCancel()}) { |
| + return new _MultiplexStreamController<T>(onListen, onCancel); |
| + } |
| + |
| + /** |
| + * Returns a view of this object that only exposes the [EventSink] interface. |
| + */ |
| + EventSink<T> get sink; |
| +} |
| + |
| + |
| +abstract class _StreamControllerLifecycle<T> { |
| + void _recordListen(StreamSubscription<T> subscription) {} |
| + void _recordPause(StreamSubscription<T> subscription) {} |
| + void _recordResume(StreamSubscription<T> subscription) {} |
| + void _recordCancel(StreamSubscription<T> subscription) {} |
| +} |
| + |
| +/** |
| + * Default implementation of [StreamController]. |
| + * |
| + * Controls a stream that only supports a single controller. |
| + */ |
| +class _StreamControllerImpl<T> implements StreamController<T>, |
| + _StreamControllerLifecycle<T> { |
| static const int _STATE_OPEN = 0; |
| static const int _STATE_CANCELLED = 1; |
| static const int _STATE_CLOSED = 2; |
| @@ -66,30 +126,10 @@ class StreamController<T> extends EventSink<T> { |
| // Events added to the stream before it has an active subscription. |
| _PendingEvents _pendingEvents = null; |
| - /** |
| - * A controller with a [stream] that supports only one single subscriber. |
| - * |
| - * The controller will buffer all incoming events until the subscriber is |
| - * registered. |
| - * |
| - * The [onPause] function is called when the stream becomes |
| - * paused. [onResume] is called when the stream resumed. |
| - * |
| - * The [onListen] callback is called when the stream |
| - * receives its listener and [onCancel] when the listener ends |
| - * its subscription. |
| - * |
| - * If the stream is canceled before the controller needs new data the |
| - * [onResume] call might not be executed. |
| - */ |
| - StreamController({void onListen(), |
| - void onPause(), |
| - void onResume(), |
| - void onCancel()}) |
| - : _onListen = onListen, |
| - _onPause = onPause, |
| - _onResume = onResume, |
| - _onCancel = onCancel { |
| + _StreamControllerImpl(this._onListen, |
| + this._onPause, |
| + this._onResume, |
| + this._onCancel) { |
| _stream = new _ControllerStream<T>(this); |
| } |
| @@ -180,27 +220,28 @@ class StreamController<T> extends EventSink<T> { |
| events.add(event); |
| } |
| - void _recordListen(_BufferingStreamSubscription subscription) { |
| + void _recordListen(StreamSubscription<T> subscription) { |
|
floitsch
2013/05/24 20:51:42
Why not keep the type?
Lasse Reichstein Nielsen
2013/05/27 08:04:12
True, it is true for this subclass, and I avoid a
|
| + _BufferingStreamSubscription bufferingSubscription = subscription; |
| assert(_subscription == null); |
| - _subscription = subscription; |
| - subscription._setPendingEvents(_pendingEvents); |
| + _subscription = bufferingSubscription; |
| + bufferingSubscription._setPendingEvents(_pendingEvents); |
| _pendingEvents = null; |
| - subscription._guardCallback(() { |
| + bufferingSubscription._guardCallback(() { |
| _runGuarded(_onListen); |
| }); |
| } |
| - void _recordCancel() { |
| + void _recordCancel(StreamSubscription<T> subscription) { |
| _subscription = null; |
|
floitsch
2013/05/24 20:51:42
assert that the subscription is the same?
Lasse Reichstein Nielsen
2013/05/27 08:04:12
Done.
|
| _state |= _STATE_CANCELLED; |
| _runGuarded(_onCancel); |
| } |
| - void _recordPause() { |
| + void _recordPause(StreamSubscription<T> subscription) { |
| _runGuarded(_onPause); |
| } |
| - void _recordResume() { |
| + void _recordResume(StreamSubscription<T> subscription) { |
| _runGuarded(_onResume); |
| } |
| } |
| @@ -217,7 +258,7 @@ void _runGuarded(_NotificationHandler notificationHandler) { |
| } |
| class _ControllerStream<T> extends _StreamImpl<T> { |
| - StreamController _controller; |
| + _StreamControllerLifecycle<T> _controller; |
| bool _hasListener = false; |
| _ControllerStream(this._controller); |
| @@ -241,25 +282,98 @@ class _ControllerStream<T> extends _StreamImpl<T> { |
| } |
| class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| - final StreamController _controller; |
| + final _StreamControllerLifecycle<T> _controller; |
| - _ControllerSubscription(StreamController controller, |
| + _ControllerSubscription(this._controller, |
| void onData(T data), |
| void onError(Object error), |
| void onDone(), |
| bool cancelOnError) |
| - : _controller = controller, |
| - super(onData, onError, onDone, cancelOnError); |
| + : super(onData, onError, onDone, cancelOnError); |
| void _onCancel() { |
| - _controller._recordCancel(); |
| + _controller._recordCancel(this); |
| } |
| void _onPause() { |
| - _controller._recordPause(); |
| + _controller._recordPause(this); |
| } |
| void _onResume() { |
| - _controller._recordResume(); |
| + _controller._recordResume(this); |
| + } |
| +} |
| + |
| +class _MultiplexStreamController<T> implements StreamController<T>, |
| + _StreamControllerLifecycle<T> { |
| + final _NotificationHandler _onListen; |
| + final _NotificationHandler _onCancel; |
| + // TODO(lrn): Make a more efficient implementation of these subscriptions, |
| + // e.g., the traditional double-linked list with concurrent add and remove |
| + // while firing. |
| + Set<_BufferingStreamSubscription<T>> _streams; |
| + |
| + _MultiplexStreamController(this._onListen, this._onCancel) |
| + : _streams = new Set<_BufferingStreamSubscription<T>>(); |
| + |
| + // StreamController interface. |
| + |
| + Stream<T> get stream => new _ControllerStream<T>(this); |
| + |
| + EventSink<T> get sink => new _EventSinkView<T>(this); |
| + |
| + // _StreamControllerLifecycle interface. |
| + |
| + void _recordListen(_BufferingStreamSubscription<T> subscription) { |
| + bool isFirst = _streams.isEmpty; |
| + _streams.add(subscription); |
| + if (isFirst) { |
| + _runGuarded(_onListen); |
| + } |
| + } |
| + |
| + void _recordCancel(_BufferingStreamSubscription<T> subscription) { |
| + _streams.remove(subscription); |
| + if (_streams.isEmpty) { |
| + _runGuarded(_onCancel); |
| + } |
| + } |
| + |
| + void _recordPause(StreamSubscription<T> subscription) {} |
| + void _recordResume(StreamSubscription<T> subscription) {} |
| + |
| + // EventSink interface. |
| + |
| + void add(T data) { |
| + if (_streams.isEmpty) return; |
| + _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| + subscription._add(data); |
| + }); |
| + } |
| + |
| + void addError(Object error) { |
| + if (_streams.isEmpty) return; |
| + _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| + subscription._addError(error); |
| + }); |
| + } |
| + |
| + void close() { |
| + if (_streams.isEmpty) return; |
| + _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| + _streams.remove(subscription); |
| + subscription._close(); |
| + }); |
| + } |
| + |
| + void _forEachListener( |
| + void action(_BufferingStreamSubscription<T> subscription)) { |
| + List<_BufferingStreamSubscription<T>> subscriptions = _streams.toList(); |
| + for (_BufferingStreamSubscription<T> subscription in subscriptions) { |
| + if (_streams.contains(subscription)) { |
| + action(subscription); |
| + } |
| + } |
| } |
| } |
| + |