| Index: sdk/lib/async/stream_controller.dart
|
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
|
| index f856b61526d1addf93a50ba25bfa134929433fa6..c8a24654f97a6f237953e853f7f17aae8efe857c 100644
|
| --- a/sdk/lib/async/stream_controller.dart
|
| +++ b/sdk/lib/async/stream_controller.dart
|
| @@ -9,6 +9,18 @@ part of dart.async;
|
| // -------------------------------------------------------------------
|
|
|
| /**
|
| + * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks.
|
| + */
|
| +typedef void ControllerCallback();
|
| +
|
| +/**
|
| + * Type of stream controller `onCancel` callbacks.
|
| + *
|
| + * The callback may return either `void` or a future.
|
| + */
|
| +typedef ControllerCancelCallback();
|
| +
|
| +/**
|
| * A controller with the stream it controls.
|
| *
|
| * This controller allows sending data, error and done events on
|
| @@ -133,39 +145,43 @@ abstract class StreamController<T> implements StreamSink<T> {
|
| }
|
|
|
| /**
|
| - * Sets the callback which is called when the stream is listened to.
|
| + * The callback which is called when the stream is listened to.
|
| *
|
| - * This overrides the previous callback, or clears it if the [onListenHandler]
|
| - * is `null`.
|
| + * May be set to `null`, in which case no callback will happen.
|
| */
|
| + ControllerCallback get onListen;
|
| +
|
| void set onListen(void onListenHandler());
|
|
|
| /**
|
| - * Sets the callback which is called when the stream is paused.
|
| + * The callback which is called when the stream is paused.
|
| *
|
| - * This overrides the previous callback, or clears it if the [onPauseHandler]
|
| - * is `null`.
|
| + * May be set to `null`, in which case no callback will happen.
|
| *
|
| * Pause related callbacks are not supported on broadcast stream controllers.
|
| */
|
| + ControllerCallback get onPause;
|
| +
|
| void set onPause(void onPauseHandler());
|
|
|
| /**
|
| - * Sets the callback which is called when the stream is resumed.
|
| + * The callback which is called when the stream is resumed.
|
| *
|
| - * This overrides the previous callback, or clears it if the [onResumeHandler]
|
| - * is `null`.
|
| + * May be set to `null`, in which case no callback will happen.
|
| *
|
| * Pause related callbacks are not supported on broadcast stream controllers.
|
| */
|
| + ControllerCallback get onResume;
|
| +
|
| void set onResume(void onResumeHandler());
|
|
|
| /**
|
| - * Sets the callback which is called when the stream is canceled.
|
| + * The callback which is called when the stream is canceled.
|
| *
|
| - * This overrides the previous callback, or clears it if the [onCancelHandler]
|
| - * is `null`.
|
| + * May be set to `null`, in which case no callback will happen.
|
| */
|
| + ControllerCancelCallback get onCancel;
|
| +
|
| void set onCancel(onCancelHandler());
|
|
|
| /**
|
| @@ -413,23 +429,15 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| // accessed earlier, or if close is called before subscribing.
|
| _Future _doneFuture;
|
|
|
| - _NotificationHandler _onListen;
|
| - _NotificationHandler _onPause;
|
| - _NotificationHandler _onResume;
|
| - _NotificationHandler _onCancel;
|
| -
|
| - _StreamController(void this._onListen(),
|
| - void this._onPause(),
|
| - void this._onResume(),
|
| - this._onCancel());
|
| -
|
| - void set onListen(void onListenHandler()) { _onListen = onListenHandler; }
|
| -
|
| - void set onPause(void onPauseHandler()) { _onPause = onPauseHandler; }
|
| -
|
| - void set onResume(void onResumeHandler()) { _onResume = onResumeHandler; }
|
| + ControllerCallback onListen;
|
| + ControllerCallback onPause;
|
| + ControllerCallback onResume;
|
| + ControllerCancelCallback onCancel;
|
|
|
| - void set onCancel(onCancelHandler()) { _onCancel = onCancelHandler; }
|
| + _StreamController(this.onListen,
|
| + this.onPause,
|
| + this.onResume,
|
| + this.onCancel);
|
|
|
| // Return a new stream every time. The streams are equal, but not identical.
|
| Stream<T> get stream => new _ControllerStream<T>(this);
|
| @@ -653,7 +661,7 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| }
|
| subscription._setPendingEvents(pendingEvents);
|
| subscription._guardCallback(() {
|
| - _runGuarded(_onListen);
|
| + _runGuarded(onListen);
|
| });
|
|
|
| return subscription;
|
| @@ -661,8 +669,8 @@ abstract class _StreamController<T> implements StreamController<T>,
|
|
|
| Future _recordCancel(StreamSubscription<T> subscription) {
|
| // When we cancel, we first cancel any stream being added,
|
| - // Then we call _onCancel, and finally the _doneFuture is completed.
|
| - // If either of addStream's cancel or _onCancel returns a future,
|
| + // Then we call `onCancel`, and finally the _doneFuture is completed.
|
| + // If either of addStream's cancel or `onCancel` returns a future,
|
| // we wait for it before continuing.
|
| // Any error during this process ends up in the returned future.
|
| // If more errors happen, we act as if it happens inside nested try/finallys
|
| @@ -677,12 +685,12 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| _state =
|
| (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
|
|
|
| - if (_onCancel != null) {
|
| + if (onCancel != null) {
|
| if (result == null) {
|
| // Only introduce a future if one is needed.
|
| // If _onCancel returns null, no future is needed.
|
| try {
|
| - result = _onCancel();
|
| + result = onCancel();
|
| } catch (e, s) {
|
| // Return the error in the returned future.
|
| // Complete it asynchronously, so there is time for a listener
|
| @@ -691,7 +699,7 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| }
|
| } else {
|
| // Simpler case when we already know that we will return a future.
|
| - result = result.whenComplete(_onCancel);
|
| + result = result.whenComplete(onCancel);
|
| }
|
| }
|
|
|
| @@ -715,7 +723,7 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| _StreamControllerAddStreamState addState = _varData;
|
| addState.pause();
|
| }
|
| - _runGuarded(_onPause);
|
| + _runGuarded(onPause);
|
| }
|
|
|
| void _recordResume(StreamSubscription<T> subscription) {
|
| @@ -723,7 +731,7 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| _StreamControllerAddStreamState addState = _varData;
|
| addState.resume();
|
| }
|
| - _runGuarded(_onResume);
|
| + _runGuarded(onResume);
|
| }
|
| }
|
|
|
|
|