Index: sdk/lib/async/stream_controller.dart |
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
index f5b487ebfa5b2f4906ae6ec6729dac7eb4bb017a..0d82c720a20ee997ce433a17b140ac1fb1535356 100644 |
--- a/sdk/lib/async/stream_controller.dart |
+++ b/sdk/lib/async/stream_controller.dart |
@@ -66,7 +66,9 @@ abstract class StreamController<T> implements StreamSink<T> { |
* |
* The [onListen] callback is called when the stream |
* receives its listener and [onCancel] when the listener ends |
- * its subscription. |
+ * its subscription. If [onCancel] needs to perform an asynchronous operation, |
+ * [onCancel] should return a future that completes when the cancel operation |
+ * is done. |
* |
* If the stream is canceled before the controller needs new data the |
* [onResume] call might not be executed. |
@@ -74,7 +76,7 @@ abstract class StreamController<T> implements StreamSink<T> { |
factory StreamController({void onListen(), |
void onPause(), |
void onResume(), |
- void onCancel(), |
+ onCancel(), |
bool sync: false}) { |
if (onListen == null && onPause == null && |
onResume == null && onCancel == null) { |
@@ -172,7 +174,7 @@ abstract class _StreamControllerLifecycle<T> { |
StreamSubscription<T> _subscribe(bool cancelOnError); |
void _recordPause(StreamSubscription<T> subscription) {} |
void _recordResume(StreamSubscription<T> subscription) {} |
- void _recordCancel(StreamSubscription<T> subscription) {} |
+ Future _recordCancel(StreamSubscription<T> subscription) => null; |
} |
/** |
@@ -463,7 +465,7 @@ abstract class _StreamController<T> implements StreamController<T>, |
return subscription; |
} |
- void _recordCancel(StreamSubscription<T> subscription) { |
+ Future _recordCancel(StreamSubscription<T> subscription) { |
if (_isAddingStream) { |
_StreamControllerAddStreamState addState = _varData; |
addState.cancel(); |
@@ -471,10 +473,18 @@ abstract class _StreamController<T> implements StreamController<T>, |
_varData = null; |
_state = |
(_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
- _runGuarded(_onCancel); |
- if (_doneFuture != null && _doneFuture._mayComplete) { |
- _doneFuture._asyncComplete(null); |
+ void complete() { |
+ if (_doneFuture != null && _doneFuture._mayComplete) { |
+ _doneFuture._asyncComplete(null); |
+ } |
} |
+ Future future = _runGuarded(_onCancel); |
+ if (future != null) { |
+ future = future.whenComplete(complete); |
+ } else { |
+ complete(); |
+ } |
+ return future; |
} |
void _recordPause(StreamSubscription<T> subscription) { |
@@ -537,7 +547,7 @@ class _AsyncStreamController<T> extends _StreamController<T> |
_AsyncStreamController(void this._onListen(), |
void this._onPause(), |
void this._onResume(), |
- void this._onCancel()); |
+ this._onCancel()); |
} |
class _SyncStreamController<T> extends _StreamController<T> |
@@ -550,7 +560,7 @@ class _SyncStreamController<T> extends _StreamController<T> |
_SyncStreamController(void this._onListen(), |
void this._onPause(), |
void this._onResume(), |
- void this._onCancel()); |
+ this._onCancel()); |
} |
abstract class _NoCallbacks { |
@@ -566,12 +576,14 @@ class _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
class _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
-typedef void _NotificationHandler(); |
+typedef _NotificationHandler(); |
-void _runGuarded(_NotificationHandler notificationHandler) { |
- if (notificationHandler == null) return; |
+Future _runGuarded(_NotificationHandler notificationHandler) { |
+ if (notificationHandler == null) return null; |
try { |
- notificationHandler(); |
+ var result = notificationHandler(); |
+ if (result is Future) return result; |
+ return null; |
} catch (e, s) { |
Zone.current.handleUncaughtError(_asyncError(e, s), s); |
} |
@@ -605,8 +617,8 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
_ControllerSubscription(this._controller, bool cancelOnError) |
: super(cancelOnError); |
- void _onCancel() { |
- _controller._recordCancel(this); |
+ Future _onCancel() { |
+ return _controller._recordCancel(this); |
} |
void _onPause() { |
@@ -636,7 +648,7 @@ class _StreamSinkWrapper<T> implements StreamSink<T> { |
* Object containing the state used to handle [StreamController.addStream]. |
*/ |
class _AddStreamState<T> { |
- // [_FutureImpl] returned by call to addStream. |
+ // [_Future] returned by call to addStream. |
_Future addStreamFuture; |
// Subscription on stream argument to addStream. |