Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart |
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
| index ee390cfc7f1bb8acd59e58c9f1bf911dba02624a..ecbd4dc10a4489ecaf55f673ffd51838c6e96aa2 100644 |
| --- a/sdk/lib/async/stream_impl.dart |
| +++ b/sdk/lib/async/stream_impl.dart |
| @@ -71,10 +71,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| * when `cancelOnError` is true. |
| */ |
| static const int _STATE_CANCELED = 8; |
| - static const int _STATE_IN_CALLBACK = 16; |
| - static const int _STATE_HAS_PENDING = 32; |
| - static const int _STATE_PAUSE_COUNT = 64; |
| - static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
| + static const int _STATE_IN_ERROR_CANCEL = 16; |
|
floitsch
2013/10/12 18:53:57
Add comment that _STATE_IN_ERROR_CANCEL implies th
Lasse Reichstein Nielsen
2013/10/14 11:32:33
And document what it means. In painful detail, bec
Anders Johnsen
2013/10/16 11:52:21
Done.
|
| + static const int _STATE_IN_CALLBACK = 32; |
| + static const int _STATE_HAS_PENDING = 64; |
| + static const int _STATE_PAUSE_COUNT = 128; |
| + static const int _STATE_PAUSE_COUNT_SHIFT = 7; |
| /* Event handlers provided in constructor. */ |
| _DataHandler<T> _onData; |
| @@ -85,6 +86,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| /** Bit vector based on state-constants above. */ |
| int _state; |
| + _Future _cancelFuture; |
|
floitsch
2013/10/12 18:53:57
Add "TODO(floitsch): reuse another field"
Anders Johnsen
2013/10/16 11:52:21
Done.
|
| + |
| /** |
| * Queue of pending events. |
| * |
| @@ -179,16 +182,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| } |
| } |
| - void cancel() { |
| - if (_isCanceled) return; |
| + Future cancel() { |
| + _state &= ~_STATE_IN_ERROR_CANCEL; |
|
floitsch
2013/10/12 18:53:57
Add comment.
// The user doesn't want to receive a
Lasse Reichstein Nielsen
2013/10/14 11:32:33
any events anymore -> any further events.
This co
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
|
| + if (_isCanceled) return _cancelFuture; |
| _cancel(); |
| - if (!_inCallback) { |
| - // otherwise checkState will be called after firing or callback completes. |
| - _state |= _STATE_IN_CALLBACK; |
| - _onCancel(); |
| - _pending = null; |
| - _state &= ~_STATE_IN_CALLBACK; |
| - } |
| + return _cancelFuture; |
| } |
| Future asFuture([var futureValue]) { |
| @@ -209,6 +207,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
| bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
| bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| + bool get _inErrorCancel => (_state & _STATE_IN_ERROR_CANCEL) != 0; |
| bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
| bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
| bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
| @@ -224,6 +223,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| if (_hasPending) { |
| _pending.cancelSchedule(); |
| } |
| + if (!_inCallback) _pending = null; |
| + _cancelFuture = _onCancel(); |
|
floitsch
2013/10/12 18:53:57
I'm not sure we are allowed to call "_onCancel" wh
Lasse Reichstein Nielsen
2013/10/14 11:32:33
I accepted that we had to call _onCancel during an
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
|
| } |
| /** |
| @@ -291,7 +292,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| assert(!_isInputPaused); |
| } |
| - void _onCancel() { |
| + Future _onCancel() { |
| assert(_isCanceled); |
| } |
| @@ -333,18 +334,31 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| assert(!_isPaused); |
| assert(!_inCallback); |
| bool wasInputPaused = _isInputPaused; |
| - _state |= _STATE_IN_CALLBACK; |
| - if (!_zone.inSameErrorZone(Zone.current)) { |
| - // Errors are not allowed to traverse zone boundaries. |
| - Zone.current.handleUncaughtError(error); |
| - } else { |
| - _zone.runUnaryGuarded(_onError, error); |
| + void sendError() { |
|
floitsch
2013/10/12 18:53:57
one line before and one line after the function.
Anders Johnsen
2013/10/16 11:52:21
Done.
|
| + if (!_isCanceled || _inErrorCancel) { |
|
floitsch
2013/10/12 18:53:57
Comment: If the subscription has been canceled whi
Anders Johnsen
2013/10/16 11:52:21
Done.
|
| + _state |= _STATE_IN_CALLBACK; |
| + if (!_zone.inSameErrorZone(Zone.current)) { |
| + // Errors are not allowed to traverse zone boundaries. |
| + Zone.current.handleUncaughtError(error); |
| + } else { |
| + _zone.runUnaryGuarded(_onError, error); |
| + } |
| + _state &= ~_STATE_IN_CALLBACK; |
| + } |
| } |
| - _state &= ~_STATE_IN_CALLBACK; |
| if (_cancelOnError) { |
| + _state |= _STATE_IN_ERROR_CANCEL; |
| _cancel(); |
| + if (_cancelFuture != null) { |
| + _cancelFuture.whenComplete(sendError); |
| + } else { |
| + sendError(); |
| + } |
| + } else { |
| + sendError(); |
| + // Only check state if not cancelOnError. |
| + _checkState(wasInputPaused); |
| } |
| - _checkState(wasInputPaused); |
| } |
| void _sendDone() { |
| @@ -353,7 +367,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| assert(!_inCallback); |
| _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
| _zone.runGuarded(_onDone); |
| - _onCancel(); // No checkState after cancel, it is always the last event. |
| + // TODO(ajohnsen): Run it before _onDone and wait for future? |
|
Anders Johnsen
2013/10/11 12:32:40
Please comment on this one - I'm not sure what the
floitsch
2013/10/12 18:53:57
Neither.
The done event should *not* call cancel.
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
|
| + _cancel(); // No checkState after cancel, it is always the last event. |
| _state &= ~_STATE_IN_CALLBACK; |
| } |
| @@ -395,7 +410,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| // make a new state-change callback. Loop until the state didn't change. |
| while (true) { |
| if (_isCanceled) { |
| - _onCancel(); |
| _pending = null; |
| return; |
| } |
| @@ -714,7 +728,7 @@ class _DummyStreamSubscription<T> implements StreamSubscription<T> { |
| void resume() { |
| if (_pauseCounter > 0) _pauseCounter--; |
| } |
| - void cancel() {} |
| + Future cancel() => null; |
| bool get isPaused => _pauseCounter > 0; |
| Future asFuture([futureValue]) => new _Future(); |
| @@ -837,8 +851,9 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
| _stream._resumeSubscription(); |
| } |
| - void cancel() { |
| + Future cancel() { |
| _stream._cancelSubscription(); |
| + return null; |
| } |
| bool get isPaused { |
| @@ -949,7 +964,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
| _state = _STATE_DONE; |
| } |
| - void cancel() { |
| + Future cancel() { |
| StreamSubscription subscription = _subscription; |
| if (_state == _STATE_MOVING) { |
| _Future<bool> hasNext = _futureOrPrefetch; |
| @@ -958,7 +973,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
| } else { |
| _clear(); |
| } |
| - subscription.cancel(); |
| + return subscription.cancel(); |
| } |
| void _onData(T data) { |