Chromium Code Reviews| Index: sdk/lib/async/stream_controller.dart |
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
| index e0ba58ab2ff33b7fcbba56491d5f19bbb9dbd5aa..cf30908d4437640294424adcc8ba6b2af5f02040 100644 |
| --- a/sdk/lib/async/stream_controller.dart |
| +++ b/sdk/lib/async/stream_controller.dart |
| @@ -74,7 +74,7 @@ abstract class StreamController<T> implements StreamSink<T> { |
| factory StreamController({void onListen(), |
| void onPause(), |
| void onResume(), |
| - void onCancel(), |
| + onCancel(), |
| bool sync: false}) { |
| if (onListen == null && onPause == null && |
| onResume == null && onCancel == null) { |
| @@ -175,7 +175,7 @@ abstract class _StreamControllerLifecycle<T> { |
| bool cancelOnError); |
| void _recordPause(StreamSubscription<T> subscription) {} |
| void _recordResume(StreamSubscription<T> subscription) {} |
| - void _recordCancel(StreamSubscription<T> subscription) {} |
| + _FutureImpl _recordCancel(StreamSubscription<T> subscription) => null; |
| } |
| /** |
| @@ -469,7 +469,7 @@ abstract class _StreamController<T> implements StreamController<T>, |
| return subscription; |
| } |
| - void _recordCancel(StreamSubscription<T> subscription) { |
| + _FutureImpl _recordCancel(StreamSubscription<T> subscription) { |
|
Lasse Reichstein Nielsen
2013/07/17 07:28:46
Why _FutureImpl if the user can provide a future t
|
| if (_isAddingStream) { |
| _StreamControllerAddStreamState addState = _varData; |
| addState.cancel(); |
| @@ -477,10 +477,11 @@ abstract class _StreamController<T> implements StreamController<T>, |
| _varData = null; |
| _state = |
| (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
| - _runGuarded(_onCancel); |
| + var future = _runGuardedFutureResult(_onCancel); |
|
floitsch
2013/07/12 16:42:34
_runGuardedFutureResult may return `null`. Documen
Lasse Reichstein Nielsen
2013/07/17 07:28:46
I think we should not guarantee that there is no e
|
| if (_doneFuture != null && _doneFuture._mayComplete) { |
| _doneFuture._asyncSetValue(null); |
| } |
| + return future; |
| } |
| void _recordPause(StreamSubscription<T> subscription) { |
| @@ -543,7 +544,7 @@ class _AsyncStreamController<T> extends _StreamController<T> |
| _AsyncStreamController(void this._onListen(), |
| void this._onPause(), |
| void this._onResume(), |
| - void this._onCancel()); |
| + this._onCancel()); |
| } |
| class _SyncStreamController<T> extends _StreamController<T> |
| @@ -556,7 +557,7 @@ class _SyncStreamController<T> extends _StreamController<T> |
| _SyncStreamController(void this._onListen(), |
| void this._onPause(), |
| void this._onResume(), |
| - void this._onCancel()); |
| + this._onCancel()); |
| } |
| abstract class _NoCallbacks { |
| @@ -583,6 +584,20 @@ void _runGuarded(_NotificationHandler notificationHandler) { |
| } |
| } |
| +_FutureImpl _runGuardedFutureResult(_NotificationHandler notificationHandler) { |
| + if (notificationHandler == null) return null; |
| + try { |
| + var value = notificationHandler(); |
| + if (value is _FutureImpl) { |
| + return value; |
| + } else { |
|
Lasse Reichstein Nielsen
2013/07/17 07:28:46
else if (value is Future) { // You can't assume t
|
| + return new Future(() => value); |
|
Lasse Reichstein Nielsen
2013/07/17 07:28:46
return new _FutureImpl.immediate(value);
|
| + } |
| + } catch (e, s) { |
| + _Zone.current.handleUncaughtError(_asyncError(e, s)); |
| + } |
| +} |
| + |
| class _ControllerStream<T> extends _StreamImpl<T> { |
| _StreamControllerLifecycle<T> _controller; |
| @@ -618,8 +633,8 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| bool cancelOnError) |
| : super(onData, onError, onDone, cancelOnError); |
| - void _onCancel() { |
| - _controller._recordCancel(this); |
| + _FutureImpl _onCancel() { |
| + return _controller._recordCancel(this); |
| } |
| void _onPause() { |