| Index: sdk/lib/async/stream_controller.dart
|
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
|
| index f5b487ebfa5b2f4906ae6ec6729dac7eb4bb017a..0d82c720a20ee997ce433a17b140ac1fb1535356 100644
|
| --- a/sdk/lib/async/stream_controller.dart
|
| +++ b/sdk/lib/async/stream_controller.dart
|
| @@ -66,7 +66,9 @@ abstract class StreamController<T> implements StreamSink<T> {
|
| *
|
| * The [onListen] callback is called when the stream
|
| * receives its listener and [onCancel] when the listener ends
|
| - * its subscription.
|
| + * its subscription. If [onCancel] needs to perform an asynchronous operation,
|
| + * [onCancel] should return a future that completes when the cancel operation
|
| + * is done.
|
| *
|
| * If the stream is canceled before the controller needs new data the
|
| * [onResume] call might not be executed.
|
| @@ -74,7 +76,7 @@ abstract class StreamController<T> implements StreamSink<T> {
|
| factory StreamController({void onListen(),
|
| void onPause(),
|
| void onResume(),
|
| - void onCancel(),
|
| + onCancel(),
|
| bool sync: false}) {
|
| if (onListen == null && onPause == null &&
|
| onResume == null && onCancel == null) {
|
| @@ -172,7 +174,7 @@ abstract class _StreamControllerLifecycle<T> {
|
| StreamSubscription<T> _subscribe(bool cancelOnError);
|
| void _recordPause(StreamSubscription<T> subscription) {}
|
| void _recordResume(StreamSubscription<T> subscription) {}
|
| - void _recordCancel(StreamSubscription<T> subscription) {}
|
| + Future _recordCancel(StreamSubscription<T> subscription) => null;
|
| }
|
|
|
| /**
|
| @@ -463,7 +465,7 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| return subscription;
|
| }
|
|
|
| - void _recordCancel(StreamSubscription<T> subscription) {
|
| + Future _recordCancel(StreamSubscription<T> subscription) {
|
| if (_isAddingStream) {
|
| _StreamControllerAddStreamState addState = _varData;
|
| addState.cancel();
|
| @@ -471,10 +473,18 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| _varData = null;
|
| _state =
|
| (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
|
| - _runGuarded(_onCancel);
|
| - if (_doneFuture != null && _doneFuture._mayComplete) {
|
| - _doneFuture._asyncComplete(null);
|
| + void complete() {
|
| + if (_doneFuture != null && _doneFuture._mayComplete) {
|
| + _doneFuture._asyncComplete(null);
|
| + }
|
| }
|
| + Future future = _runGuarded(_onCancel);
|
| + if (future != null) {
|
| + future = future.whenComplete(complete);
|
| + } else {
|
| + complete();
|
| + }
|
| + return future;
|
| }
|
|
|
| void _recordPause(StreamSubscription<T> subscription) {
|
| @@ -537,7 +547,7 @@ class _AsyncStreamController<T> extends _StreamController<T>
|
| _AsyncStreamController(void this._onListen(),
|
| void this._onPause(),
|
| void this._onResume(),
|
| - void this._onCancel());
|
| + this._onCancel());
|
| }
|
|
|
| class _SyncStreamController<T> extends _StreamController<T>
|
| @@ -550,7 +560,7 @@ class _SyncStreamController<T> extends _StreamController<T>
|
| _SyncStreamController(void this._onListen(),
|
| void this._onPause(),
|
| void this._onResume(),
|
| - void this._onCancel());
|
| + this._onCancel());
|
| }
|
|
|
| abstract class _NoCallbacks {
|
| @@ -566,12 +576,14 @@ class _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/
|
| class _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/
|
| with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
|
|
|
| -typedef void _NotificationHandler();
|
| +typedef _NotificationHandler();
|
|
|
| -void _runGuarded(_NotificationHandler notificationHandler) {
|
| - if (notificationHandler == null) return;
|
| +Future _runGuarded(_NotificationHandler notificationHandler) {
|
| + if (notificationHandler == null) return null;
|
| try {
|
| - notificationHandler();
|
| + var result = notificationHandler();
|
| + if (result is Future) return result;
|
| + return null;
|
| } catch (e, s) {
|
| Zone.current.handleUncaughtError(_asyncError(e, s), s);
|
| }
|
| @@ -605,8 +617,8 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
|
| _ControllerSubscription(this._controller, bool cancelOnError)
|
| : super(cancelOnError);
|
|
|
| - void _onCancel() {
|
| - _controller._recordCancel(this);
|
| + Future _onCancel() {
|
| + return _controller._recordCancel(this);
|
| }
|
|
|
| void _onPause() {
|
| @@ -636,7 +648,7 @@ class _StreamSinkWrapper<T> implements StreamSink<T> {
|
| * Object containing the state used to handle [StreamController.addStream].
|
| */
|
| class _AddStreamState<T> {
|
| - // [_FutureImpl] returned by call to addStream.
|
| + // [_Future] returned by call to addStream.
|
| _Future addStreamFuture;
|
|
|
| // Subscription on stream argument to addStream.
|
|
|