Index: sdk/lib/async/stream_controller.dart |
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
index bd70b1074aeb97898623d8d1ddf76c98a4120a81..7378bef79897de76f5d0c9dc1f3004dd15f3c91e 100644 |
--- a/sdk/lib/async/stream_controller.dart |
+++ b/sdk/lib/async/stream_controller.dart |
@@ -50,20 +50,24 @@ class StreamController<T> extends EventSink<T> { |
final _StreamImpl<T> stream; |
/** |
- * A controller with a broadcast [stream].. |
+ * A controller with a broadcast [stream]. |
* |
- * The [onPauseStateChange] function is called when the stream becomes |
- * paused or resumes after being paused. The current pause state can |
- * be read from [isPaused]. Ignored if [:null:]. |
+ * The [onPause] function is called when the stream becomes |
+ * paused. [onResume] is called when the stream resumed. |
* |
- * The [onSubscriptionStateChange] function is called when the stream |
- * receives its first listener or loses its last. The current subscription |
- * state can be read from [hasListener]. Ignored if [:null:]. |
+ * The [onListen] callback is called when the stream |
+ * receives its first listener. [onCancel] when the last listener cancels |
+ * its subscription. |
+ * |
+ * If the stream is canceled before the controller needs new data the |
+ * [onResume] call might not be executed. |
*/ |
- StreamController.broadcast({void onPauseStateChange(), |
- void onSubscriptionStateChange()}) |
- : stream = new _MultiControllerStream<T>(onSubscriptionStateChange, |
- onPauseStateChange); |
+ StreamController.broadcast({void onListen(), |
+ void onPause(), |
+ void onResume(), |
+ void onCancel()}) |
+ : stream = new _MultiControllerStream<T>( |
+ onListen, onPause, onResume, onCancel); |
/** |
* A controller with a [stream] that supports only one single subscriber. |
@@ -71,18 +75,22 @@ class StreamController<T> extends EventSink<T> { |
* The controller will buffer all incoming events until the subscriber is |
* registered. |
* |
- * The [onPauseStateChange] function is called when the stream becomes |
- * paused or resumes after being paused. The current pause state can |
- * be read from [isPaused]. Ignored if [:null:]. |
+ * The [onPause] function is called when the stream becomes |
+ * paused. [onResume] is called when the stream resumed. |
+ * |
+ * The [onListen] callback is called when the stream |
+ * receives its listener. [onCancel] when the listener cancels |
+ * its subscription. |
* |
- * The [onSubscriptionStateChange] function is called when the stream |
- * receives its first listener or loses its last. The current subscription |
- * state can be read from [hasListener]. Ignored if [:null:]. |
+ * If the stream is canceled before the controller needs new data the |
+ * [onResume] call might not be executed. |
*/ |
- StreamController({void onPauseStateChange(), |
- void onSubscriptionStateChange()}) |
- : stream = new _SingleControllerStream<T>(onSubscriptionStateChange, |
- onPauseStateChange); |
+ StreamController({void onListen(), |
+ void onPause(), |
+ void onResume(), |
+ void onCancel()}) |
+ : stream = new _SingleControllerStream<T>( |
+ onListen, onPause, onResume, onCancel); |
/** |
* Returns a view of this object that only exposes the [EventSink] interface. |
@@ -142,55 +150,61 @@ class StreamController<T> extends EventSink<T> { |
typedef void _NotificationHandler(); |
class _MultiControllerStream<T> extends _MultiStreamImpl<T> { |
- _NotificationHandler _subscriptionHandler; |
- _NotificationHandler _pauseHandler; |
+ _NotificationHandler _onListen; |
+ _NotificationHandler _onPause; |
+ _NotificationHandler _onResume; |
+ _NotificationHandler _onCancel; |
+ |
+ // TODO(floitsch): share this code with _SingleControllerStream. |
+ void _runGuarded(_NotificationHandler notificationHandler) { |
+ if (notificationHandler == null) return; |
+ try { |
+ notificationHandler(); |
+ } catch (e, s) { |
+ new AsyncError(e, s).throwDelayed(); |
+ } |
+ } |
- _MultiControllerStream(this._subscriptionHandler, this._pauseHandler); |
+ _MultiControllerStream(this._onListen, |
+ this._onPause, |
+ this._onResume, |
+ this._onCancel); |
void _onSubscriptionStateChange() { |
- if (_subscriptionHandler != null) { |
- try { |
- _subscriptionHandler(); |
- } catch (e, s) { |
- new AsyncError(e, s).throwDelayed(); |
- } |
- } |
+ _runGuarded(_hasListener ? _onListen : _onCancel); |
} |
void _onPauseStateChange() { |
- if (_pauseHandler != null) { |
- try { |
- _pauseHandler(); |
- } catch (e, s) { |
- new AsyncError(e, s).throwDelayed(); |
- } |
- } |
+ _runGuarded(_isPaused ? _onPause : _onResume); |
} |
} |
class _SingleControllerStream<T> extends _SingleStreamImpl<T> { |
- _NotificationHandler _subscriptionHandler; |
- _NotificationHandler _pauseHandler; |
+ _NotificationHandler _onListen; |
+ _NotificationHandler _onPause; |
+ _NotificationHandler _onResume; |
+ _NotificationHandler _onCancel; |
+ |
+ // TODO(floitsch): share this code with _MultiControllerStream. |
+ _runGuarded(_NotificationHandler notificationHandler) { |
+ if (notificationHandler == null) return; |
+ try { |
+ notificationHandler(); |
+ } catch (e, s) { |
+ new AsyncError(e, s).throwDelayed(); |
+ } |
+ } |
- _SingleControllerStream(this._subscriptionHandler, this._pauseHandler); |
+ _SingleControllerStream(this._onListen, |
+ this._onPause, |
+ this._onResume, |
+ this._onCancel); |
void _onSubscriptionStateChange() { |
- if (_subscriptionHandler != null) { |
- try { |
- _subscriptionHandler(); |
- } catch (e, s) { |
- new AsyncError(e, s).throwDelayed(); |
- } |
- } |
+ _runGuarded(_hasListener ? _onListen : _onCancel); |
} |
void _onPauseStateChange() { |
- if (_pauseHandler != null) { |
- try { |
- _pauseHandler(); |
- } catch (e, s) { |
- new AsyncError(e, s).throwDelayed(); |
- } |
- } |
+ _runGuarded(_isPaused ? _onPause : _onResume); |
} |
} |