Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart |
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
| index c8fbe70f29ef6e07131376f3f14b6e55e244c147..dfbfa1ac2949ba41d692f711043c9d3c4ab9a642 100644 |
| --- a/sdk/lib/async/stream_impl.dart |
| +++ b/sdk/lib/async/stream_impl.dart |
| @@ -85,6 +85,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| /** Bit vector based on state-constants above. */ |
| int _state; |
| + _FutureImpl _cancelFuture; |
|
floitsch
2013/07/12 16:42:34
Add TODO that this should be in some field that is
Lasse Reichstein Nielsen
2013/07/17 07:28:46
And add documentation saying what it is, and where
|
| + |
| /** |
| * Queue of pending events. |
| * |
| @@ -177,16 +179,25 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| } |
| } |
| - void cancel() { |
| - if (_isCanceled) return; |
| + void _chainCancelFuture(_FutureImpl future) { |
|
Lasse Reichstein Nielsen
2013/07/17 07:28:46
_cancelFuture._setOrChainValue(future);
It handle
|
| + if (future == null) { |
| + _cancelFuture._setValue(null); |
| + } else { |
| + future._chain(_cancelFuture); |
| + } |
| + } |
| + |
| + Future cancel() { |
| + if (_isCanceled) return _cancelFuture; |
| _cancel(); |
| if (!_inCallback) { |
| // otherwise checkState will be called after firing or callback completes. |
| _state |= _STATE_IN_CALLBACK; |
| - _onCancel(); |
| + _chainCancelFuture(_onCancel()); |
|
Lasse Reichstein Nielsen
2013/07/17 07:28:46
So this line could just be:
_cancelFuture.setOrCh
|
| _pending = null; |
| _state &= ~_STATE_IN_CALLBACK; |
| } |
| + return _cancelFuture; |
| } |
| Future asFuture([var futureValue]) { |
| @@ -218,6 +229,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| bool get isPaused => _isPaused; |
| void _cancel() { |
| + _cancelFuture = new _FutureImpl(); |
| _state |= _STATE_CANCELED; |
| _zone.cancelCallbackExpectation(); |
| if (_hasPending) { |
| @@ -290,7 +302,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| assert(!_isInputPaused); |
| } |
| - void _onCancel() { |
| + _FutureImpl _onCancel() { |
| assert(_isCanceled); |
| } |
| @@ -394,7 +406,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| // make a new state-change callback. Loop until the state didn't change. |
| while (true) { |
| if (_isCanceled) { |
| - _onCancel(); |
| + _chainCancelFuture(_onCancel()); |
|
Lasse Reichstein Nielsen
2013/07/17 07:28:46
_cancelFuture._setOrChainValue(_onCancel());
|
| _pending = null; |
| return; |
| } |