Index: sdk/lib/async/stream_controller.dart |
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
index ba8ac9e1863ab503848e440efecbe82de799aed9..e5b8b457c13bc4416b5b8001c34420939c70b20d 100644 |
--- a/sdk/lib/async/stream_controller.dart |
+++ b/sdk/lib/async/stream_controller.dart |
@@ -5,12 +5,11 @@ |
part of dart.async; |
// ------------------------------------------------------------------- |
-// Default implementation of a stream with a controller for adding |
-// events to the stream. |
+// Controller for creating and adding events to a stream. |
// ------------------------------------------------------------------- |
/** |
- * A controller and the stream it controls. |
+ * A controller with the stream it controls. |
floitsch
2013/01/24 13:03:42
containing?
|
* |
* This controller allows sending data, error and done events on |
* its [stream]. |
@@ -21,12 +20,11 @@ part of dart.async; |
* it has subscribers or not, as well as getting a callback when either of |
* these change. |
*/ |
-class StreamController<T> extends Stream<T> implements StreamSink<T> { |
- _StreamImpl<T> _stream; |
- Stream<T> get stream => _stream; |
+class StreamController<T> implements StreamSink<T> { |
+ final _StreamImpl<T> stream; |
/** |
- * A controller with a [stream] that supports multiple subscribers. |
+ * A controller with a broadcast [stream].. |
floitsch
2013/01/24 13:03:42
remove second ".".
|
* |
* The [onPauseStateChange] function is called when the stream becomes |
* paused or resumes after being paused. The current pause state can |
@@ -36,13 +34,14 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> { |
* receives its first listener or loses its last. The current subscription |
* state can be read from [hasSubscribers]. Ignored if [:null:]. |
*/ |
- StreamController.multiSubscription({void onPauseStateChange(), |
- void onSubscriptionStateChange()}) { |
- _stream = new _MultiControllerStream<T>(onSubscriptionStateChange, |
- onPauseStateChange); |
- } |
+ StreamController.broadcast({void onPauseStateChange(), |
+ void onSubscriptionStateChange()}) |
+ : stream = new _MultiControllerStream<T>(onSubscriptionStateChange, |
+ onPauseStateChange); |
+ |
/** |
* A controller with a [stream] that supports only one single subscriber. |
+ * |
* The controller will buffer all incoming events until the subscriber is |
* registered. |
* |
@@ -55,24 +54,9 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> { |
* state can be read from [hasSubscribers]. Ignored if [:null:]. |
*/ |
StreamController({void onPauseStateChange(), |
- void onSubscriptionStateChange()}) { |
- _stream = new _SingleControllerStream<T>(onSubscriptionStateChange, |
- onPauseStateChange); |
- } |
- |
- bool get isSingleSubscription => _stream.isSingleSubscription; |
- |
- Stream<T> asMultiSubscriptionStream() => _stream.asMultiSubscriptionStream(); |
- |
- StreamSubscription listen(void onData(T data), |
- { void onError(AsyncError error), |
- void onDone(), |
- bool unsubscribeOnError}) { |
- return _stream.listen(onData, |
- onError: onError, |
- onDone: onDone, |
- unsubscribeOnError: unsubscribeOnError); |
- } |
+ void onSubscriptionStateChange()}) |
+ : stream = new _SingleControllerStream<T>(onSubscriptionStateChange, |
+ onPauseStateChange); |
/** |
* Returns a view of this object that only exposes the [StreamSink] interface. |
@@ -80,15 +64,15 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> { |
StreamSink<T> get sink => new StreamSinkView<T>(this); |
/** Whether one or more active subscribers have requested a pause. */ |
- bool get isPaused => _stream._isPaused; |
+ bool get isPaused => stream._isPaused; |
/** Whether there are currently any subscribers on this [Stream]. */ |
- bool get hasSubscribers => _stream._hasSubscribers; |
+ bool get hasSubscribers => stream._hasSubscribers; |
/** |
* Send or queue a data event. |
*/ |
- void add(T value) => _stream._add(value); |
+ void add(T value) => stream._add(value); |
/** |
* Send or enqueue an error event. |
@@ -109,7 +93,7 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> { |
} else { |
asyncError = new AsyncError(error, stackTrace); |
} |
- _stream._signalError(asyncError); |
+ stream._signalError(asyncError); |
} |
/** |
@@ -118,19 +102,7 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> { |
* The "done" message should be sent at most once by a stream, and it |
* should be the last message sent. |
*/ |
- void close() { _stream._close(); } |
- |
- void forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)) { |
- _stream._forEachSubscriber(() { |
- try { |
- action(); |
- } on AsyncError catch (e) { |
- e.throwDelayed(); |
- } catch (e, s) { |
- new AsyncError(e, s).throwDelayed(); |
- } |
- }); |
- } |
+ void close() { stream._close(); } |
} |
typedef void _NotificationHandler(); |