Chromium Code Reviews| Index: sdk/lib/async/stream_controller.dart |
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
| index ba8ac9e1863ab503848e440efecbe82de799aed9..e5b8b457c13bc4416b5b8001c34420939c70b20d 100644 |
| --- a/sdk/lib/async/stream_controller.dart |
| +++ b/sdk/lib/async/stream_controller.dart |
| @@ -5,12 +5,11 @@ |
| part of dart.async; |
| // ------------------------------------------------------------------- |
| -// Default implementation of a stream with a controller for adding |
| -// events to the stream. |
| +// Controller for creating and adding events to a stream. |
| // ------------------------------------------------------------------- |
| /** |
| - * A controller and the stream it controls. |
| + * A controller with the stream it controls. |
|
floitsch
2013/01/24 13:03:42
containing?
|
| * |
| * This controller allows sending data, error and done events on |
| * its [stream]. |
| @@ -21,12 +20,11 @@ part of dart.async; |
| * it has subscribers or not, as well as getting a callback when either of |
| * these change. |
| */ |
| -class StreamController<T> extends Stream<T> implements StreamSink<T> { |
| - _StreamImpl<T> _stream; |
| - Stream<T> get stream => _stream; |
| +class StreamController<T> implements StreamSink<T> { |
| + final _StreamImpl<T> stream; |
| /** |
| - * A controller with a [stream] that supports multiple subscribers. |
| + * A controller with a broadcast [stream].. |
|
floitsch
2013/01/24 13:03:42
remove second ".".
|
| * |
| * The [onPauseStateChange] function is called when the stream becomes |
| * paused or resumes after being paused. The current pause state can |
| @@ -36,13 +34,14 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> { |
| * receives its first listener or loses its last. The current subscription |
| * state can be read from [hasSubscribers]. Ignored if [:null:]. |
| */ |
| - StreamController.multiSubscription({void onPauseStateChange(), |
| - void onSubscriptionStateChange()}) { |
| - _stream = new _MultiControllerStream<T>(onSubscriptionStateChange, |
| - onPauseStateChange); |
| - } |
| + StreamController.broadcast({void onPauseStateChange(), |
| + void onSubscriptionStateChange()}) |
| + : stream = new _MultiControllerStream<T>(onSubscriptionStateChange, |
| + onPauseStateChange); |
| + |
| /** |
| * A controller with a [stream] that supports only one single subscriber. |
| + * |
| * The controller will buffer all incoming events until the subscriber is |
| * registered. |
| * |
| @@ -55,24 +54,9 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> { |
| * state can be read from [hasSubscribers]. Ignored if [:null:]. |
| */ |
| StreamController({void onPauseStateChange(), |
| - void onSubscriptionStateChange()}) { |
| - _stream = new _SingleControllerStream<T>(onSubscriptionStateChange, |
| - onPauseStateChange); |
| - } |
| - |
| - bool get isSingleSubscription => _stream.isSingleSubscription; |
| - |
| - Stream<T> asMultiSubscriptionStream() => _stream.asMultiSubscriptionStream(); |
| - |
| - StreamSubscription listen(void onData(T data), |
| - { void onError(AsyncError error), |
| - void onDone(), |
| - bool unsubscribeOnError}) { |
| - return _stream.listen(onData, |
| - onError: onError, |
| - onDone: onDone, |
| - unsubscribeOnError: unsubscribeOnError); |
| - } |
| + void onSubscriptionStateChange()}) |
| + : stream = new _SingleControllerStream<T>(onSubscriptionStateChange, |
| + onPauseStateChange); |
| /** |
| * Returns a view of this object that only exposes the [StreamSink] interface. |
| @@ -80,15 +64,15 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> { |
| StreamSink<T> get sink => new StreamSinkView<T>(this); |
| /** Whether one or more active subscribers have requested a pause. */ |
| - bool get isPaused => _stream._isPaused; |
| + bool get isPaused => stream._isPaused; |
| /** Whether there are currently any subscribers on this [Stream]. */ |
| - bool get hasSubscribers => _stream._hasSubscribers; |
| + bool get hasSubscribers => stream._hasSubscribers; |
| /** |
| * Send or queue a data event. |
| */ |
| - void add(T value) => _stream._add(value); |
| + void add(T value) => stream._add(value); |
| /** |
| * Send or enqueue an error event. |
| @@ -109,7 +93,7 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> { |
| } else { |
| asyncError = new AsyncError(error, stackTrace); |
| } |
| - _stream._signalError(asyncError); |
| + stream._signalError(asyncError); |
| } |
| /** |
| @@ -118,19 +102,7 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> { |
| * The "done" message should be sent at most once by a stream, and it |
| * should be the last message sent. |
| */ |
| - void close() { _stream._close(); } |
| - |
| - void forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)) { |
| - _stream._forEachSubscriber(() { |
| - try { |
| - action(); |
| - } on AsyncError catch (e) { |
| - e.throwDelayed(); |
| - } catch (e, s) { |
| - new AsyncError(e, s).throwDelayed(); |
| - } |
| - }); |
| - } |
| + void close() { stream._close(); } |
| } |
| typedef void _NotificationHandler(); |