Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index c8fbe70f29ef6e07131376f3f14b6e55e244c147..dfbfa1ac2949ba41d692f711043c9d3c4ab9a642 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -85,6 +85,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
/** Bit vector based on state-constants above. */ |
int _state; |
+ _FutureImpl _cancelFuture; |
floitsch
2013/07/12 16:42:34
Add TODO that this should be in some field that is
Lasse Reichstein Nielsen
2013/07/17 07:28:46
And add documentation saying what it is, and where
|
+ |
/** |
* Queue of pending events. |
* |
@@ -177,16 +179,25 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
} |
} |
- void cancel() { |
- if (_isCanceled) return; |
+ void _chainCancelFuture(_FutureImpl future) { |
Lasse Reichstein Nielsen
2013/07/17 07:28:46
_cancelFuture._setOrChainValue(future);
It handle
|
+ if (future == null) { |
+ _cancelFuture._setValue(null); |
+ } else { |
+ future._chain(_cancelFuture); |
+ } |
+ } |
+ |
+ Future cancel() { |
+ if (_isCanceled) return _cancelFuture; |
_cancel(); |
if (!_inCallback) { |
// otherwise checkState will be called after firing or callback completes. |
_state |= _STATE_IN_CALLBACK; |
- _onCancel(); |
+ _chainCancelFuture(_onCancel()); |
Lasse Reichstein Nielsen
2013/07/17 07:28:46
So this line could just be:
_cancelFuture.setOrCh
|
_pending = null; |
_state &= ~_STATE_IN_CALLBACK; |
} |
+ return _cancelFuture; |
} |
Future asFuture([var futureValue]) { |
@@ -218,6 +229,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
bool get isPaused => _isPaused; |
void _cancel() { |
+ _cancelFuture = new _FutureImpl(); |
_state |= _STATE_CANCELED; |
_zone.cancelCallbackExpectation(); |
if (_hasPending) { |
@@ -290,7 +302,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
assert(!_isInputPaused); |
} |
- void _onCancel() { |
+ _FutureImpl _onCancel() { |
assert(_isCanceled); |
} |
@@ -394,7 +406,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
// make a new state-change callback. Loop until the state didn't change. |
while (true) { |
if (_isCanceled) { |
- _onCancel(); |
+ _chainCancelFuture(_onCancel()); |
Lasse Reichstein Nielsen
2013/07/17 07:28:46
_cancelFuture._setOrChainValue(_onCancel());
|
_pending = null; |
return; |
} |