Chromium Code Reviews| Index: sdk/lib/async/stream_controller.dart |
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
| index f5b487ebfa5b2f4906ae6ec6729dac7eb4bb017a..e9fe3d33d5f1d683cf28b9966d220343dd3a3270 100644 |
| --- a/sdk/lib/async/stream_controller.dart |
| +++ b/sdk/lib/async/stream_controller.dart |
| @@ -66,7 +66,9 @@ abstract class StreamController<T> implements StreamSink<T> { |
| * |
| * The [onListen] callback is called when the stream |
| * receives its listener and [onCancel] when the listener ends |
| - * its subscription. |
| + * its subscription. If [onCancel] needs to perform an asynchronous operation, |
| + * [onCancel] can return a [Future]. The [StreamSubscription] will then get |
|
floitsch
2013/10/16 14:43:44
should return a future that completes when the can
Anders Johnsen
2013/10/21 08:01:46
Done.
|
| + * that future, and can wait for it before continuing. |
| * |
| * If the stream is canceled before the controller needs new data the |
| * [onResume] call might not be executed. |
| @@ -74,7 +76,7 @@ abstract class StreamController<T> implements StreamSink<T> { |
| factory StreamController({void onListen(), |
| void onPause(), |
| void onResume(), |
| - void onCancel(), |
| + onCancel(), |
| bool sync: false}) { |
| if (onListen == null && onPause == null && |
| onResume == null && onCancel == null) { |
| @@ -172,7 +174,7 @@ abstract class _StreamControllerLifecycle<T> { |
| StreamSubscription<T> _subscribe(bool cancelOnError); |
| void _recordPause(StreamSubscription<T> subscription) {} |
| void _recordResume(StreamSubscription<T> subscription) {} |
| - void _recordCancel(StreamSubscription<T> subscription) {} |
| + Future _recordCancel(StreamSubscription<T> subscription) => null; |
| } |
| /** |
| @@ -463,7 +465,7 @@ abstract class _StreamController<T> implements StreamController<T>, |
| return subscription; |
| } |
| - void _recordCancel(StreamSubscription<T> subscription) { |
| + Future _recordCancel(StreamSubscription<T> subscription) { |
| if (_isAddingStream) { |
| _StreamControllerAddStreamState addState = _varData; |
| addState.cancel(); |
| @@ -471,10 +473,11 @@ abstract class _StreamController<T> implements StreamController<T>, |
| _varData = null; |
| _state = |
| (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
| - _runGuarded(_onCancel); |
| + Future future = _runGuarded(_onCancel); |
|
floitsch
2013/10/16 14:43:44
If the doneFuture and the cancel are not linked (s
Anders Johnsen
2013/10/21 08:01:46
Done.
|
| if (_doneFuture != null && _doneFuture._mayComplete) { |
| _doneFuture._asyncComplete(null); |
| } |
| + return future; |
| } |
| void _recordPause(StreamSubscription<T> subscription) { |
| @@ -537,7 +540,7 @@ class _AsyncStreamController<T> extends _StreamController<T> |
| _AsyncStreamController(void this._onListen(), |
| void this._onPause(), |
| void this._onResume(), |
| - void this._onCancel()); |
| + this._onCancel()); |
| } |
| class _SyncStreamController<T> extends _StreamController<T> |
| @@ -550,7 +553,7 @@ class _SyncStreamController<T> extends _StreamController<T> |
| _SyncStreamController(void this._onListen(), |
| void this._onPause(), |
| void this._onResume(), |
| - void this._onCancel()); |
| + this._onCancel()); |
| } |
| abstract class _NoCallbacks { |
| @@ -566,12 +569,14 @@ class _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
| class _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
| with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
| -typedef void _NotificationHandler(); |
| +typedef _NotificationHandler(); |
| -void _runGuarded(_NotificationHandler notificationHandler) { |
| - if (notificationHandler == null) return; |
| +Future _runGuarded(_NotificationHandler notificationHandler) { |
| + if (notificationHandler == null) return null; |
| try { |
| - notificationHandler(); |
| + var result = notificationHandler(); |
| + if (result is Future) return result; |
| + return null; |
| } catch (e, s) { |
| Zone.current.handleUncaughtError(_asyncError(e, s), s); |
| } |
| @@ -605,8 +610,8 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| _ControllerSubscription(this._controller, bool cancelOnError) |
| : super(cancelOnError); |
| - void _onCancel() { |
| - _controller._recordCancel(this); |
| + Future _onCancel() { |
| + return _controller._recordCancel(this); |
| } |
| void _onPause() { |
| @@ -636,7 +641,7 @@ class _StreamSinkWrapper<T> implements StreamSink<T> { |
| * Object containing the state used to handle [StreamController.addStream]. |
| */ |
| class _AddStreamState<T> { |
| - // [_FutureImpl] returned by call to addStream. |
| + // [_Future] returned by call to addStream. |
| _Future addStreamFuture; |
| // Subscription on stream argument to addStream. |