Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index 7f128ed658fc756e7fe6631e76cedd44f5ea9ba4..bf7c8f015c9a51ddf3ba7a3f3a4775fb85237adc 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -73,10 +73,20 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
* when `cancelOnError` is true. |
*/ |
static const int _STATE_CANCELED = 8; |
- static const int _STATE_IN_CALLBACK = 16; |
- static const int _STATE_HAS_PENDING = 32; |
- static const int _STATE_PAUSE_COUNT = 64; |
- static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
+ /** |
+ * Set when either: |
+ * |
+ * * an error is sent, and [cancelOnError] is true, or |
+ * * a done event is sent. |
+ * |
+ * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the |
+ * state is unset, and no furher events must be delivered. |
+ */ |
+ static const int _STATE_WAIT_FOR_CANCEL = 16; |
+ static const int _STATE_IN_CALLBACK = 32; |
+ static const int _STATE_HAS_PENDING = 64; |
+ static const int _STATE_PAUSE_COUNT = 128; |
+ static const int _STATE_PAUSE_COUNT_SHIFT = 7; |
/* Event handlers provided in constructor. */ |
_DataHandler<T> _onData; |
@@ -87,6 +97,10 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
/** Bit vector based on state-constants above. */ |
int _state; |
+ // TODO(floitsch): reuse another field |
+ /** The future [_onCancel] may return. */ |
+ Future _cancelFuture; |
+ |
/** |
* Queue of pending events. |
* |
@@ -171,16 +185,14 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
} |
} |
- void cancel() { |
- if (_isCanceled) return; |
+ Future cancel() { |
+ // The user doesn't want to receive any further events. If there is an |
+ // error or done event pending (waiting for the cancel to be done) discard |
+ // that event. |
+ _state &= ~_STATE_WAIT_FOR_CANCEL; |
+ if (_isCanceled) return _cancelFuture; |
_cancel(); |
- if (!_inCallback) { |
- // otherwise checkState will be called after firing or callback completes. |
- _state |= _STATE_IN_CALLBACK; |
- _onCancel(); |
- _pending = null; |
- _state &= ~_STATE_IN_CALLBACK; |
- } |
+ return _cancelFuture; |
} |
Future asFuture([var futureValue]) { |
@@ -201,6 +213,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
+ bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0; |
bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
@@ -216,6 +229,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
if (_hasPending) { |
_pending.cancelSchedule(); |
} |
+ if (!_inCallback) _pending = null; |
+ _cancelFuture = _onCancel(); |
} |
/** |
@@ -283,7 +298,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
assert(!_isInputPaused); |
} |
- void _onCancel() { |
+ Future _onCancel() { |
assert(_isCanceled); |
} |
@@ -325,30 +340,59 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
assert(!_isPaused); |
assert(!_inCallback); |
bool wasInputPaused = _isInputPaused; |
- _state |= _STATE_IN_CALLBACK; |
- if (!_zone.inSameErrorZone(Zone.current)) { |
- // Errors are not allowed to traverse zone boundaries. |
- Zone.current.handleUncaughtError(error, stackTrace); |
- } else if (_onError is ZoneBinaryCallback) { |
- _zone.runBinaryGuarded(_onError, error, stackTrace); |
- } else { |
- _zone.runUnaryGuarded(_onError, error); |
+ |
+ void sendError() { |
+ // If the subscription has been canceled while waiting for the cancel |
+ // future to finish we must not report the error. |
+ if (_isCanceled && !_waitsForCancel) return; |
+ _state |= _STATE_IN_CALLBACK; |
+ if (!_zone.inSameErrorZone(Zone.current)) { |
+ // Errors are not allowed to traverse zone boundaries. |
+ Zone.current.handleUncaughtError(error, stackTrace); |
+ } else if (_onError is ZoneBinaryCallback) { |
+ _zone.runBinaryGuarded(_onError, error, stackTrace); |
+ } else { |
+ _zone.runUnaryGuarded(_onError, error); |
+ } |
+ _state &= ~_STATE_IN_CALLBACK; |
} |
- _state &= ~_STATE_IN_CALLBACK; |
+ |
if (_cancelOnError) { |
+ _state |= _STATE_WAIT_FOR_CANCEL; |
_cancel(); |
+ if (_cancelFuture is Future) { |
+ _cancelFuture.whenComplete(sendError); |
+ } else { |
+ sendError(); |
+ } |
+ } else { |
+ sendError(); |
+ // Only check state if not cancelOnError. |
+ _checkState(wasInputPaused); |
} |
- _checkState(wasInputPaused); |
} |
void _sendDone() { |
assert(!_isCanceled); |
assert(!_isPaused); |
assert(!_inCallback); |
- _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
- _zone.runGuarded(_onDone); |
- _onCancel(); // No checkState after cancel, it is always the last event. |
- _state &= ~_STATE_IN_CALLBACK; |
+ |
+ void sendDone() { |
+ // If the subscription has been canceled while waiting for the cancel |
+ // future to finish we must not report the done event. |
+ if (!_waitsForCancel) return; |
+ _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
+ _zone.runGuarded(_onDone); |
+ _state &= ~_STATE_IN_CALLBACK; |
+ } |
+ |
+ _cancel(); |
+ _state |= _STATE_WAIT_FOR_CANCEL; |
+ if (_cancelFuture is Future) { |
+ _cancelFuture.whenComplete(sendDone); |
+ } else { |
+ sendDone(); |
+ } |
} |
/** |
@@ -389,7 +433,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
// make a new state-change callback. Loop until the state didn't change. |
while (true) { |
if (_isCanceled) { |
- _onCancel(); |
_pending = null; |
return; |
} |
@@ -699,7 +742,7 @@ class _DummyStreamSubscription<T> implements StreamSubscription<T> { |
void resume() { |
if (_pauseCounter > 0) _pauseCounter--; |
} |
- void cancel() {} |
+ Future cancel() => null; |
bool get isPaused => _pauseCounter > 0; |
Future asFuture([futureValue]) => new _Future(); |
@@ -823,8 +866,9 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
_stream._resumeSubscription(); |
} |
- void cancel() { |
+ Future cancel() { |
_stream._cancelSubscription(); |
+ return null; |
} |
bool get isPaused { |
@@ -935,7 +979,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
_state = _STATE_DONE; |
} |
- void cancel() { |
+ Future cancel() { |
StreamSubscription subscription = _subscription; |
if (_state == _STATE_MOVING) { |
_Future<bool> hasNext = _futureOrPrefetch; |
@@ -944,7 +988,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
} else { |
_clear(); |
} |
- subscription.cancel(); |
+ return subscription.cancel(); |
} |
void _onData(T data) { |