Chromium Code Reviews| Index: sdk/lib/async/stream_controller.dart |
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
| index bd70b1074aeb97898623d8d1ddf76c98a4120a81..6a1bbc3bde720d81c93155f8e5814f01aaf4c9c2 100644 |
| --- a/sdk/lib/async/stream_controller.dart |
| +++ b/sdk/lib/async/stream_controller.dart |
| @@ -50,20 +50,24 @@ class StreamController<T> extends EventSink<T> { |
| final _StreamImpl<T> stream; |
| /** |
| - * A controller with a broadcast [stream].. |
| + * A controller with a broadcast [stream]. |
| * |
| - * The [onPauseStateChange] function is called when the stream becomes |
| - * paused or resumes after being paused. The current pause state can |
| - * be read from [isPaused]. Ignored if [:null:]. |
| + * The [onPause] function is called when the stream becomes |
| + * paused. [onResume] is called when the stream resumed. |
| * |
| - * The [onSubscriptionStateChange] function is called when the stream |
| - * receives its first listener or loses its last. The current subscription |
| - * state can be read from [hasListener]. Ignored if [:null:]. |
| + * The [onListen] callback is called when the stream |
| + * receives its first listener. [onCancel] when the last listener cancels |
| + * its subscription. |
| + * |
| + * If the stream is canceled before the controller needs new data the |
| + * [onResume] call might not be executed. |
| */ |
| - StreamController.broadcast({void onPauseStateChange(), |
| - void onSubscriptionStateChange()}) |
| - : stream = new _MultiControllerStream<T>(onSubscriptionStateChange, |
| - onPauseStateChange); |
| + StreamController.broadcast({void onListen(), |
|
Lasse Reichstein Nielsen
2013/04/15 10:07:43
Wasn't this constructor deprecated?
And should the
floitsch
2013/04/15 16:33:43
Yes. will happen.
|
| + void onPause(), |
| + void onResume(), |
| + void onCancel()}) |
| + : stream = new _MultiControllerStream<T>( |
| + onListen, onPause, onResume, onCancel); |
| /** |
| * A controller with a [stream] that supports only one single subscriber. |
| @@ -71,18 +75,22 @@ class StreamController<T> extends EventSink<T> { |
| * The controller will buffer all incoming events until the subscriber is |
| * registered. |
| * |
| - * The [onPauseStateChange] function is called when the stream becomes |
| - * paused or resumes after being paused. The current pause state can |
| - * be read from [isPaused]. Ignored if [:null:]. |
| + * The [onPause] function is called when the stream becomes |
| + * paused. [onResume] is called when the stream resumed. |
| + * |
| + * The [onListen] callback is called when the stream |
| + * receives its listener. [onCancel] when the listener cancels |
| + * its subscription. |
| * |
| - * The [onSubscriptionStateChange] function is called when the stream |
| - * receives its first listener or loses its last. The current subscription |
| - * state can be read from [hasListener]. Ignored if [:null:]. |
| + * If the stream is canceled before the controller needs new data the |
| + * [onResume] call might not be executed. |
| */ |
| - StreamController({void onPauseStateChange(), |
| - void onSubscriptionStateChange()}) |
| - : stream = new _SingleControllerStream<T>(onSubscriptionStateChange, |
| - onPauseStateChange); |
| + StreamController({void onListen(), |
| + void onPause(), |
| + void onResume(), |
| + void onCancel()}) |
| + : stream = new _SingleControllerStream<T>( |
| + onListen, onPause, onResume, onCancel); |
| /** |
| * Returns a view of this object that only exposes the [EventSink] interface. |
| @@ -142,55 +150,59 @@ class StreamController<T> extends EventSink<T> { |
| typedef void _NotificationHandler(); |
| class _MultiControllerStream<T> extends _MultiStreamImpl<T> { |
| - _NotificationHandler _subscriptionHandler; |
| - _NotificationHandler _pauseHandler; |
| + _NotificationHandler _onListen; |
| + _NotificationHandler _onPause; |
| + _NotificationHandler _onResume; |
| + _NotificationHandler _onCancel; |
| + |
| + _runGuarded(_NotificationHandler notificationHandler) { |
|
Lasse Reichstein Nielsen
2013/04/15 10:07:43
void return type.
floitsch
2013/04/15 16:33:43
Done.
|
| + if (notificationHandler == null) return; |
| + try { |
| + notificationHandler(); |
| + } catch (e, s) { |
| + new AsyncError(e, s).throwDelayed(); |
| + } |
| + } |
| - _MultiControllerStream(this._subscriptionHandler, this._pauseHandler); |
| + _MultiControllerStream(this._onListen, |
| + this._onPause, |
| + this._onResume, |
| + this._onCancel); |
| void _onSubscriptionStateChange() { |
| - if (_subscriptionHandler != null) { |
| - try { |
| - _subscriptionHandler(); |
| - } catch (e, s) { |
| - new AsyncError(e, s).throwDelayed(); |
| - } |
| - } |
| + _runGuarded(_hasListener ? _onListen : _onCancel); |
| } |
| void _onPauseStateChange() { |
| - if (_pauseHandler != null) { |
| - try { |
| - _pauseHandler(); |
| - } catch (e, s) { |
| - new AsyncError(e, s).throwDelayed(); |
| - } |
| - } |
| + _runGuarded(_isPaused ? _onPause : _onResume); |
| } |
| } |
| class _SingleControllerStream<T> extends _SingleStreamImpl<T> { |
| - _NotificationHandler _subscriptionHandler; |
| - _NotificationHandler _pauseHandler; |
| + _NotificationHandler _onListen; |
| + _NotificationHandler _onPause; |
| + _NotificationHandler _onResume; |
| + _NotificationHandler _onCancel; |
| + |
| + _runGuarded(_NotificationHandler notificationHandler) { |
|
Lasse Reichstein Nielsen
2013/04/15 10:07:43
void return type.
Duplicate code. Could it be a s
floitsch
2013/04/15 16:33:43
Added TODO. It could be a common super class, but
|
| + if (notificationHandler == null) return; |
| + try { |
| + notificationHandler(); |
| + } catch (e, s) { |
| + new AsyncError(e, s).throwDelayed(); |
| + } |
| + } |
| - _SingleControllerStream(this._subscriptionHandler, this._pauseHandler); |
| + _SingleControllerStream(this._onListen, |
| + this._onPause, |
| + this._onResume, |
| + this._onCancel); |
| void _onSubscriptionStateChange() { |
| - if (_subscriptionHandler != null) { |
| - try { |
| - _subscriptionHandler(); |
| - } catch (e, s) { |
| - new AsyncError(e, s).throwDelayed(); |
| - } |
| - } |
| + _runGuarded(_hasListener ? _onListen : _onCancel); |
| } |
| void _onPauseStateChange() { |
| - if (_pauseHandler != null) { |
| - try { |
| - _pauseHandler(); |
| - } catch (e, s) { |
| - new AsyncError(e, s).throwDelayed(); |
| - } |
| - } |
| + _runGuarded(_isPaused ? _onPause : _onResume); |
| } |
| } |