Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(132)

Unified Diff: sdk/lib/async/stream_controller.dart

Issue 14196003: Change StreamController constructor. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments and rebase. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/io/file_impl.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_controller.dart
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
index bd70b1074aeb97898623d8d1ddf76c98a4120a81..7378bef79897de76f5d0c9dc1f3004dd15f3c91e 100644
--- a/sdk/lib/async/stream_controller.dart
+++ b/sdk/lib/async/stream_controller.dart
@@ -50,20 +50,24 @@ class StreamController<T> extends EventSink<T> {
final _StreamImpl<T> stream;
/**
- * A controller with a broadcast [stream]..
+ * A controller with a broadcast [stream].
*
- * The [onPauseStateChange] function is called when the stream becomes
- * paused or resumes after being paused. The current pause state can
- * be read from [isPaused]. Ignored if [:null:].
+ * The [onPause] function is called when the stream becomes
+ * paused. [onResume] is called when the stream resumed.
*
- * The [onSubscriptionStateChange] function is called when the stream
- * receives its first listener or loses its last. The current subscription
- * state can be read from [hasListener]. Ignored if [:null:].
+ * The [onListen] callback is called when the stream
+ * receives its first listener. [onCancel] when the last listener cancels
+ * its subscription.
+ *
+ * If the stream is canceled before the controller needs new data the
+ * [onResume] call might not be executed.
*/
- StreamController.broadcast({void onPauseStateChange(),
- void onSubscriptionStateChange()})
- : stream = new _MultiControllerStream<T>(onSubscriptionStateChange,
- onPauseStateChange);
+ StreamController.broadcast({void onListen(),
+ void onPause(),
+ void onResume(),
+ void onCancel()})
+ : stream = new _MultiControllerStream<T>(
+ onListen, onPause, onResume, onCancel);
/**
* A controller with a [stream] that supports only one single subscriber.
@@ -71,18 +75,22 @@ class StreamController<T> extends EventSink<T> {
* The controller will buffer all incoming events until the subscriber is
* registered.
*
- * The [onPauseStateChange] function is called when the stream becomes
- * paused or resumes after being paused. The current pause state can
- * be read from [isPaused]. Ignored if [:null:].
+ * The [onPause] function is called when the stream becomes
+ * paused. [onResume] is called when the stream resumed.
+ *
+ * The [onListen] callback is called when the stream
+ * receives its listener. [onCancel] when the listener cancels
+ * its subscription.
*
- * The [onSubscriptionStateChange] function is called when the stream
- * receives its first listener or loses its last. The current subscription
- * state can be read from [hasListener]. Ignored if [:null:].
+ * If the stream is canceled before the controller needs new data the
+ * [onResume] call might not be executed.
*/
- StreamController({void onPauseStateChange(),
- void onSubscriptionStateChange()})
- : stream = new _SingleControllerStream<T>(onSubscriptionStateChange,
- onPauseStateChange);
+ StreamController({void onListen(),
+ void onPause(),
+ void onResume(),
+ void onCancel()})
+ : stream = new _SingleControllerStream<T>(
+ onListen, onPause, onResume, onCancel);
/**
* Returns a view of this object that only exposes the [EventSink] interface.
@@ -142,55 +150,61 @@ class StreamController<T> extends EventSink<T> {
typedef void _NotificationHandler();
class _MultiControllerStream<T> extends _MultiStreamImpl<T> {
- _NotificationHandler _subscriptionHandler;
- _NotificationHandler _pauseHandler;
+ _NotificationHandler _onListen;
+ _NotificationHandler _onPause;
+ _NotificationHandler _onResume;
+ _NotificationHandler _onCancel;
+
+ // TODO(floitsch): share this code with _SingleControllerStream.
+ void _runGuarded(_NotificationHandler notificationHandler) {
+ if (notificationHandler == null) return;
+ try {
+ notificationHandler();
+ } catch (e, s) {
+ new AsyncError(e, s).throwDelayed();
+ }
+ }
- _MultiControllerStream(this._subscriptionHandler, this._pauseHandler);
+ _MultiControllerStream(this._onListen,
+ this._onPause,
+ this._onResume,
+ this._onCancel);
void _onSubscriptionStateChange() {
- if (_subscriptionHandler != null) {
- try {
- _subscriptionHandler();
- } catch (e, s) {
- new AsyncError(e, s).throwDelayed();
- }
- }
+ _runGuarded(_hasListener ? _onListen : _onCancel);
}
void _onPauseStateChange() {
- if (_pauseHandler != null) {
- try {
- _pauseHandler();
- } catch (e, s) {
- new AsyncError(e, s).throwDelayed();
- }
- }
+ _runGuarded(_isPaused ? _onPause : _onResume);
}
}
class _SingleControllerStream<T> extends _SingleStreamImpl<T> {
- _NotificationHandler _subscriptionHandler;
- _NotificationHandler _pauseHandler;
+ _NotificationHandler _onListen;
+ _NotificationHandler _onPause;
+ _NotificationHandler _onResume;
+ _NotificationHandler _onCancel;
+
+ // TODO(floitsch): share this code with _MultiControllerStream.
+ _runGuarded(_NotificationHandler notificationHandler) {
+ if (notificationHandler == null) return;
+ try {
+ notificationHandler();
+ } catch (e, s) {
+ new AsyncError(e, s).throwDelayed();
+ }
+ }
- _SingleControllerStream(this._subscriptionHandler, this._pauseHandler);
+ _SingleControllerStream(this._onListen,
+ this._onPause,
+ this._onResume,
+ this._onCancel);
void _onSubscriptionStateChange() {
- if (_subscriptionHandler != null) {
- try {
- _subscriptionHandler();
- } catch (e, s) {
- new AsyncError(e, s).throwDelayed();
- }
- }
+ _runGuarded(_hasListener ? _onListen : _onCancel);
}
void _onPauseStateChange() {
- if (_pauseHandler != null) {
- try {
- _pauseHandler();
- } catch (e, s) {
- new AsyncError(e, s).throwDelayed();
- }
- }
+ _runGuarded(_isPaused ? _onPause : _onResume);
}
}
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/io/file_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698