Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(298)

Unified Diff: sdk/lib/async/stream_controller.dart

Issue 12049013: Change singleSubscription/multiSubscription to normal/broadcast. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Addressed comments, renamed .multiSubscription to .broadcast. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream_controller.dart
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
index ba8ac9e1863ab503848e440efecbe82de799aed9..e5b8b457c13bc4416b5b8001c34420939c70b20d 100644
--- a/sdk/lib/async/stream_controller.dart
+++ b/sdk/lib/async/stream_controller.dart
@@ -5,12 +5,11 @@
part of dart.async;
// -------------------------------------------------------------------
-// Default implementation of a stream with a controller for adding
-// events to the stream.
+// Controller for creating and adding events to a stream.
// -------------------------------------------------------------------
/**
- * A controller and the stream it controls.
+ * A controller with the stream it controls.
floitsch 2013/01/24 13:03:42 containing?
*
* This controller allows sending data, error and done events on
* its [stream].
@@ -21,12 +20,11 @@ part of dart.async;
* it has subscribers or not, as well as getting a callback when either of
* these change.
*/
-class StreamController<T> extends Stream<T> implements StreamSink<T> {
- _StreamImpl<T> _stream;
- Stream<T> get stream => _stream;
+class StreamController<T> implements StreamSink<T> {
+ final _StreamImpl<T> stream;
/**
- * A controller with a [stream] that supports multiple subscribers.
+ * A controller with a broadcast [stream]..
floitsch 2013/01/24 13:03:42 remove second ".".
*
* The [onPauseStateChange] function is called when the stream becomes
* paused or resumes after being paused. The current pause state can
@@ -36,13 +34,14 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> {
* receives its first listener or loses its last. The current subscription
* state can be read from [hasSubscribers]. Ignored if [:null:].
*/
- StreamController.multiSubscription({void onPauseStateChange(),
- void onSubscriptionStateChange()}) {
- _stream = new _MultiControllerStream<T>(onSubscriptionStateChange,
- onPauseStateChange);
- }
+ StreamController.broadcast({void onPauseStateChange(),
+ void onSubscriptionStateChange()})
+ : stream = new _MultiControllerStream<T>(onSubscriptionStateChange,
+ onPauseStateChange);
+
/**
* A controller with a [stream] that supports only one single subscriber.
+ *
* The controller will buffer all incoming events until the subscriber is
* registered.
*
@@ -55,24 +54,9 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> {
* state can be read from [hasSubscribers]. Ignored if [:null:].
*/
StreamController({void onPauseStateChange(),
- void onSubscriptionStateChange()}) {
- _stream = new _SingleControllerStream<T>(onSubscriptionStateChange,
- onPauseStateChange);
- }
-
- bool get isSingleSubscription => _stream.isSingleSubscription;
-
- Stream<T> asMultiSubscriptionStream() => _stream.asMultiSubscriptionStream();
-
- StreamSubscription listen(void onData(T data),
- { void onError(AsyncError error),
- void onDone(),
- bool unsubscribeOnError}) {
- return _stream.listen(onData,
- onError: onError,
- onDone: onDone,
- unsubscribeOnError: unsubscribeOnError);
- }
+ void onSubscriptionStateChange()})
+ : stream = new _SingleControllerStream<T>(onSubscriptionStateChange,
+ onPauseStateChange);
/**
* Returns a view of this object that only exposes the [StreamSink] interface.
@@ -80,15 +64,15 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> {
StreamSink<T> get sink => new StreamSinkView<T>(this);
/** Whether one or more active subscribers have requested a pause. */
- bool get isPaused => _stream._isPaused;
+ bool get isPaused => stream._isPaused;
/** Whether there are currently any subscribers on this [Stream]. */
- bool get hasSubscribers => _stream._hasSubscribers;
+ bool get hasSubscribers => stream._hasSubscribers;
/**
* Send or queue a data event.
*/
- void add(T value) => _stream._add(value);
+ void add(T value) => stream._add(value);
/**
* Send or enqueue an error event.
@@ -109,7 +93,7 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> {
} else {
asyncError = new AsyncError(error, stackTrace);
}
- _stream._signalError(asyncError);
+ stream._signalError(asyncError);
}
/**
@@ -118,19 +102,7 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> {
* The "done" message should be sent at most once by a stream, and it
* should be the last message sent.
*/
- void close() { _stream._close(); }
-
- void forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)) {
- _stream._forEachSubscriber(() {
- try {
- action();
- } on AsyncError catch (e) {
- e.throwDelayed();
- } catch (e, s) {
- new AsyncError(e, s).throwDelayed();
- }
- });
- }
+ void close() { stream._close(); }
}
typedef void _NotificationHandler();

Powered by Google App Engine
This is Rietveld 408576698