Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart |
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
| index 7f128ed658fc756e7fe6631e76cedd44f5ea9ba4..a3856ab16836da9c3e00d3e767be8292e63f59ad 100644 |
| --- a/sdk/lib/async/stream_impl.dart |
| +++ b/sdk/lib/async/stream_impl.dart |
| @@ -73,10 +73,19 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| * when `cancelOnError` is true. |
| */ |
| static const int _STATE_CANCELED = 8; |
| - static const int _STATE_IN_CALLBACK = 16; |
| - static const int _STATE_HAS_PENDING = 32; |
| - static const int _STATE_PAUSE_COUNT = 64; |
| - static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
| + /** |
| + * Set when either: |
| + * * An error is sent, and [cancelOnError] is true. |
|
floitsch
2013/10/16 14:43:44
Nit (you also need the new line as otherwise dartd
Anders Johnsen
2013/10/21 08:01:46
Done.
|
| + * * A done is sent. |
| + * |
| + * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the |
| + * state is unset, and no furher events should be delivered. |
|
floitsch
2013/10/16 14:43:44
"should" is not helpful.
I guess "is" is more accu
Anders Johnsen
2013/10/21 08:01:46
Using 'must', as the comment is dictating how the
|
| + */ |
| + static const int _STATE_WAIT_FOR_CANCEL = 16; |
| + static const int _STATE_IN_CALLBACK = 32; |
| + static const int _STATE_HAS_PENDING = 64; |
| + static const int _STATE_PAUSE_COUNT = 128; |
| + static const int _STATE_PAUSE_COUNT_SHIFT = 7; |
| /* Event handlers provided in constructor. */ |
| _DataHandler<T> _onData; |
| @@ -87,6 +96,10 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| /** Bit vector based on state-constants above. */ |
| int _state; |
| + // TODO(floitsch): reuse another field |
| + /** The future [_onCancel] may return. */ |
| + Future _cancelFuture; |
| + |
| /** |
| * Queue of pending events. |
| * |
| @@ -171,16 +184,14 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| } |
| } |
| - void cancel() { |
| - if (_isCanceled) return; |
| + Future cancel() { |
| + // The user doesn't want to receive any further events. If there is an |
| + // error or done event pending (waiting for the cancel to be done) discard |
| + // that event. |
| + _state &= ~_STATE_WAIT_FOR_CANCEL; |
| + if (_isCanceled) return _cancelFuture; |
| _cancel(); |
| - if (!_inCallback) { |
| - // otherwise checkState will be called after firing or callback completes. |
| - _state |= _STATE_IN_CALLBACK; |
| - _onCancel(); |
| - _pending = null; |
| - _state &= ~_STATE_IN_CALLBACK; |
| - } |
| + return _cancelFuture; |
| } |
| Future asFuture([var futureValue]) { |
| @@ -201,6 +212,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
| bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
| bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| + bool get _waitForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0; |
|
floitsch
2013/10/16 14:43:44
_waitsForCancel
Anders Johnsen
2013/10/21 08:01:46
Done.
|
| bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
| bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
| bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
| @@ -216,6 +228,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| if (_hasPending) { |
| _pending.cancelSchedule(); |
| } |
| + if (!_inCallback) _pending = null; |
| + _cancelFuture = _onCancel(); |
| } |
| /** |
| @@ -283,7 +297,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| assert(!_isInputPaused); |
| } |
| - void _onCancel() { |
| + Future _onCancel() { |
| assert(_isCanceled); |
| } |
| @@ -325,30 +339,59 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| assert(!_isPaused); |
| assert(!_inCallback); |
| bool wasInputPaused = _isInputPaused; |
| - _state |= _STATE_IN_CALLBACK; |
| - if (!_zone.inSameErrorZone(Zone.current)) { |
| - // Errors are not allowed to traverse zone boundaries. |
| - Zone.current.handleUncaughtError(error, stackTrace); |
| - } else if (_onError is ZoneBinaryCallback) { |
| - _zone.runBinaryGuarded(_onError, error, stackTrace); |
| - } else { |
| - _zone.runUnaryGuarded(_onError, error); |
| + |
| + void sendError() { |
| + // If the subscription has been canceled while waiting for the cancel |
| + // future to finish we must not report the error. |
| + if (_isCanceled && !_waitForCancel) return; |
| + _state |= _STATE_IN_CALLBACK; |
| + if (!_zone.inSameErrorZone(Zone.current)) { |
| + // Errors are not allowed to traverse zone boundaries. |
| + Zone.current.handleUncaughtError(error, stackTrace); |
| + } else if (_onError is ZoneBinaryCallback) { |
| + _zone.runBinaryGuarded(_onError, error, stackTrace); |
| + } else { |
| + _zone.runUnaryGuarded(_onError, error); |
| + } |
| + _state &= ~_STATE_IN_CALLBACK; |
| } |
| - _state &= ~_STATE_IN_CALLBACK; |
| + |
| if (_cancelOnError) { |
| + _state |= _STATE_WAIT_FOR_CANCEL; |
| _cancel(); |
| + if (_cancelFuture is Future) { |
| + _cancelFuture.whenComplete(sendError); |
| + } else { |
| + sendError(); |
| + } |
| + } else { |
| + sendError(); |
| + // Only check state if not cancelOnError. |
| + _checkState(wasInputPaused); |
| } |
| - _checkState(wasInputPaused); |
| } |
| void _sendDone() { |
| assert(!_isCanceled); |
| assert(!_isPaused); |
| assert(!_inCallback); |
| - _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
| - _zone.runGuarded(_onDone); |
| - _onCancel(); // No checkState after cancel, it is always the last event. |
| - _state &= ~_STATE_IN_CALLBACK; |
| + |
| + void sendDone() { |
| + // If the subscription has been canceled while waiting for the cancel |
| + // future to finish we must not report the done event. |
| + if (!_waitForCancel) return; |
| + _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
| + _zone.runGuarded(_onDone); |
| + _state &= ~_STATE_IN_CALLBACK; |
| + } |
| + |
| + _cancel(); |
| + _state |= _STATE_WAIT_FOR_CANCEL; |
| + if (_cancelFuture is Future) { |
| + _cancelFuture.whenComplete(sendDone); |
| + } else { |
| + sendDone(); |
| + } |
| } |
| /** |
| @@ -389,7 +432,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| // make a new state-change callback. Loop until the state didn't change. |
| while (true) { |
| if (_isCanceled) { |
| - _onCancel(); |
| _pending = null; |
| return; |
| } |
| @@ -699,7 +741,7 @@ class _DummyStreamSubscription<T> implements StreamSubscription<T> { |
| void resume() { |
| if (_pauseCounter > 0) _pauseCounter--; |
| } |
| - void cancel() {} |
| + Future cancel() => null; |
| bool get isPaused => _pauseCounter > 0; |
| Future asFuture([futureValue]) => new _Future(); |
| @@ -823,8 +865,9 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
| _stream._resumeSubscription(); |
| } |
| - void cancel() { |
| + Future cancel() { |
| _stream._cancelSubscription(); |
| + return null; |
| } |
| bool get isPaused { |
| @@ -935,7 +978,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
| _state = _STATE_DONE; |
| } |
| - void cancel() { |
| + Future cancel() { |
| StreamSubscription subscription = _subscription; |
| if (_state == _STATE_MOVING) { |
| _Future<bool> hasNext = _futureOrPrefetch; |
| @@ -944,7 +987,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
| } else { |
| _clear(); |
| } |
| - subscription.cancel(); |
| + return subscription.cancel(); |
| } |
| void _onData(T data) { |