| Index: sdk/lib/async/stream_controller.dart
|
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
|
| index 501b588e19ed63777cddc004e528d35290e44c52..04112c82b37f7b96031268f840a495305a45ec0d 100644
|
| --- a/sdk/lib/async/stream_controller.dart
|
| +++ b/sdk/lib/async/stream_controller.dart
|
| @@ -46,7 +46,81 @@ part of dart.async;
|
| * the stream at all, and won't trigger callbacks. From the controller's point
|
| * of view, the stream is completely inert when has completed.
|
| */
|
| -class StreamController<T> extends EventSink<T> {
|
| +abstract class StreamController<T> implements EventSink<T> {
|
| + /** The stream that this controller is controlling. */
|
| + Stream<T> get stream;
|
| +
|
| + /**
|
| + * A controller with a [stream] that supports only one single subscriber.
|
| + *
|
| + * The controller will buffer all incoming events until the subscriber is
|
| + * registered.
|
| + *
|
| + * The [onPause] function is called when the stream becomes
|
| + * paused. [onResume] is called when the stream resumed.
|
| + *
|
| + * The [onListen] callback is called when the stream
|
| + * receives its listener and [onCancel] when the listener ends
|
| + * its subscription.
|
| + *
|
| + * If the stream is canceled before the controller needs new data the
|
| + * [onResume] call might not be executed.
|
| + */
|
| + factory StreamController({void onListen(),
|
| + void onPause(),
|
| + void onResume(),
|
| + void onCancel()})
|
| + => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel);
|
| +
|
| + /**
|
| + * A controller where [stream] creates new stream each time it is read.
|
| + *
|
| + * The controller distributes any events to all currently subscribed streams.
|
| + *
|
| + * The [onListen] callback is called when the first listener is subscribed,
|
| + * and the [onCancel] is called when there is no longer any active listeners.
|
| + * If a listener is added again later, after the [onCancel] was called,
|
| + * the [onListen] will be called again.
|
| + */
|
| + factory StreamController.multiplex({void onListen(), void onCancel()}) {
|
| + return new _MultiplexStreamController<T>(onListen, onCancel);
|
| + }
|
| +
|
| + /**
|
| + * Returns a view of this object that only exposes the [EventSink] interface.
|
| + */
|
| + EventSink<T> get sink;
|
| +
|
| + /**
|
| + * Whether the stream is closed for adding more events.
|
| + *
|
| + * If true, the "done" event might not have fired yet, but it has been
|
| + * scheduled, and it is too late to add more events.
|
| + */
|
| + bool get isClosed;
|
| +
|
| + /** Whether the subscription is active and paused. */
|
| + bool get isPaused;
|
| +
|
| + /** Whether there is a subscriber on the [Stream]. */
|
| + bool get hasListener;
|
| +}
|
| +
|
| +
|
| +abstract class _StreamControllerLifecycle<T> {
|
| + void _recordListen(StreamSubscription<T> subscription) {}
|
| + void _recordPause(StreamSubscription<T> subscription) {}
|
| + void _recordResume(StreamSubscription<T> subscription) {}
|
| + void _recordCancel(StreamSubscription<T> subscription) {}
|
| +}
|
| +
|
| +/**
|
| + * Default implementation of [StreamController].
|
| + *
|
| + * Controls a stream that only supports a single controller.
|
| + */
|
| +class _StreamControllerImpl<T> implements StreamController<T>,
|
| + _StreamControllerLifecycle<T> {
|
| static const int _STATE_OPEN = 0;
|
| static const int _STATE_CANCELLED = 1;
|
| static const int _STATE_CLOSED = 2;
|
| @@ -66,30 +140,10 @@ class StreamController<T> extends EventSink<T> {
|
| // Events added to the stream before it has an active subscription.
|
| _PendingEvents _pendingEvents = null;
|
|
|
| - /**
|
| - * A controller with a [stream] that supports only one single subscriber.
|
| - *
|
| - * The controller will buffer all incoming events until the subscriber is
|
| - * registered.
|
| - *
|
| - * The [onPause] function is called when the stream becomes
|
| - * paused. [onResume] is called when the stream resumed.
|
| - *
|
| - * The [onListen] callback is called when the stream
|
| - * receives its listener and [onCancel] when the listener ends
|
| - * its subscription.
|
| - *
|
| - * If the stream is canceled before the controller needs new data the
|
| - * [onResume] call might not be executed.
|
| - */
|
| - StreamController({void onListen(),
|
| - void onPause(),
|
| - void onResume(),
|
| - void onCancel()})
|
| - : _onListen = onListen,
|
| - _onPause = onPause,
|
| - _onResume = onResume,
|
| - _onCancel = onCancel {
|
| + _StreamControllerImpl(this._onListen,
|
| + this._onPause,
|
| + this._onResume,
|
| + this._onCancel) {
|
| _stream = new _ControllerStream<T>(this);
|
| }
|
|
|
| @@ -107,18 +161,10 @@ class StreamController<T> extends EventSink<T> {
|
| */
|
| bool get _isCancelled => (_state & _STATE_CANCELLED) != 0;
|
|
|
| - /**
|
| - * Whether the stream is closed for adding more events.
|
| - *
|
| - * If true, the "done" event might not have fired yet, but it has been
|
| - * scheduled, and it is too late to add more events.
|
| - */
|
| bool get isClosed => (_state & _STATE_CLOSED) != 0;
|
|
|
| - /** Whether the subscription is active and paused. */
|
| bool get isPaused => _subscription != null && _subscription._isInputPaused;
|
|
|
| - /** Whether there are currently a subscriber on the [Stream]. */
|
| bool get hasListener => _subscription != null;
|
|
|
| /**
|
| @@ -180,7 +226,7 @@ class StreamController<T> extends EventSink<T> {
|
| events.add(event);
|
| }
|
|
|
| - void _recordListen(_BufferingStreamSubscription subscription) {
|
| + void _recordListen(_BufferingStreamSubscription<T> subscription) {
|
| assert(_subscription == null);
|
| _subscription = subscription;
|
| subscription._setPendingEvents(_pendingEvents);
|
| @@ -190,17 +236,18 @@ class StreamController<T> extends EventSink<T> {
|
| });
|
| }
|
|
|
| - void _recordCancel() {
|
| + void _recordCancel(StreamSubscription<T> subscription) {
|
| + assert(identical(_subscription, subscription));
|
| _subscription = null;
|
| _state |= _STATE_CANCELLED;
|
| _runGuarded(_onCancel);
|
| }
|
|
|
| - void _recordPause() {
|
| + void _recordPause(StreamSubscription<T> subscription) {
|
| _runGuarded(_onPause);
|
| }
|
|
|
| - void _recordResume() {
|
| + void _recordResume(StreamSubscription<T> subscription) {
|
| _runGuarded(_onResume);
|
| }
|
| }
|
| @@ -217,7 +264,7 @@ void _runGuarded(_NotificationHandler notificationHandler) {
|
| }
|
|
|
| class _ControllerStream<T> extends _StreamImpl<T> {
|
| - StreamController _controller;
|
| + _StreamControllerLifecycle<T> _controller;
|
| bool _hasListener = false;
|
|
|
| _ControllerStream(this._controller);
|
| @@ -241,25 +288,115 @@ class _ControllerStream<T> extends _StreamImpl<T> {
|
| }
|
|
|
| class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
|
| - final StreamController _controller;
|
| + final _StreamControllerLifecycle<T> _controller;
|
|
|
| - _ControllerSubscription(StreamController controller,
|
| + _ControllerSubscription(this._controller,
|
| void onData(T data),
|
| void onError(Object error),
|
| void onDone(),
|
| bool cancelOnError)
|
| - : _controller = controller,
|
| - super(onData, onError, onDone, cancelOnError);
|
| + : super(onData, onError, onDone, cancelOnError);
|
|
|
| void _onCancel() {
|
| - _controller._recordCancel();
|
| + _controller._recordCancel(this);
|
| }
|
|
|
| void _onPause() {
|
| - _controller._recordPause();
|
| + _controller._recordPause(this);
|
| }
|
|
|
| void _onResume() {
|
| - _controller._recordResume();
|
| + _controller._recordResume(this);
|
| }
|
| }
|
| +
|
| +class _MultiplexStreamController<T> implements StreamController<T>,
|
| + _StreamControllerLifecycle<T> {
|
| + final _NotificationHandler _onListen;
|
| + final _NotificationHandler _onCancel;
|
| + /** Set when the [close] method is called. */
|
| + bool _isClosed = false;
|
| +
|
| + // TODO(lrn): Make a more efficient implementation of these subscriptions,
|
| + // e.g., the traditional double-linked list with concurrent add and remove
|
| + // while firing.
|
| + Set<_BufferingStreamSubscription<T>> _streams;
|
| +
|
| + _MultiplexStreamController(this._onListen, this._onCancel)
|
| + : _streams = new Set<_BufferingStreamSubscription<T>>();
|
| +
|
| + // StreamController interface.
|
| +
|
| + Stream<T> get stream => new _ControllerStream<T>(this);
|
| +
|
| + EventSink<T> get sink => new _EventSinkView<T>(this);
|
| +
|
| + bool get isClosed => _isClosed;
|
| +
|
| + /**
|
| + * A multiplex controller is never paused.
|
| + *
|
| + * Each receiving stream may be paused individually, and they handle their
|
| + * own buffering.
|
| + */
|
| + bool get isPaused => false;
|
| +
|
| + /** Whether there are currently a subscriber on the [Stream]. */
|
| + bool get hasListener => !_streams.isEmpty;
|
| +
|
| + // _StreamControllerLifecycle interface.
|
| +
|
| + void _recordListen(_BufferingStreamSubscription<T> subscription) {
|
| + bool isFirst = _streams.isEmpty;
|
| + _streams.add(subscription);
|
| + if (isFirst) {
|
| + _runGuarded(_onListen);
|
| + }
|
| + }
|
| +
|
| + void _recordCancel(_BufferingStreamSubscription<T> subscription) {
|
| + _streams.remove(subscription);
|
| + if (_streams.isEmpty) {
|
| + _runGuarded(_onCancel);
|
| + }
|
| + }
|
| +
|
| + void _recordPause(StreamSubscription<T> subscription) {}
|
| + void _recordResume(StreamSubscription<T> subscription) {}
|
| +
|
| + // EventSink interface.
|
| +
|
| + void add(T data) {
|
| + if (_streams.isEmpty) return;
|
| + _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| + subscription._add(data);
|
| + });
|
| + }
|
| +
|
| + void addError(Object error) {
|
| + if (_streams.isEmpty) return;
|
| + _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| + subscription._addError(error);
|
| + });
|
| + }
|
| +
|
| + void close() {
|
| + _isClosed = true;
|
| + if (_streams.isEmpty) return;
|
| + _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| + _streams.remove(subscription);
|
| + subscription._close();
|
| + });
|
| + }
|
| +
|
| + void _forEachListener(
|
| + void action(_BufferingStreamSubscription<T> subscription)) {
|
| + List<_BufferingStreamSubscription<T>> subscriptions = _streams.toList();
|
| + for (_BufferingStreamSubscription<T> subscription in subscriptions) {
|
| + if (_streams.contains(subscription)) {
|
| + action(subscription);
|
| + }
|
| + }
|
| + }
|
| +}
|
| +
|
|
|