Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index ee390cfc7f1bb8acd59e58c9f1bf911dba02624a..ecbd4dc10a4489ecaf55f673ffd51838c6e96aa2 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -71,10 +71,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
* when `cancelOnError` is true. |
*/ |
static const int _STATE_CANCELED = 8; |
- static const int _STATE_IN_CALLBACK = 16; |
- static const int _STATE_HAS_PENDING = 32; |
- static const int _STATE_PAUSE_COUNT = 64; |
- static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
+ static const int _STATE_IN_ERROR_CANCEL = 16; |
floitsch
2013/10/12 18:53:57
Add comment that _STATE_IN_ERROR_CANCEL implies th
Lasse Reichstein Nielsen
2013/10/14 11:32:33
And document what it means. In painful detail, bec
Anders Johnsen
2013/10/16 11:52:21
Done.
|
+ static const int _STATE_IN_CALLBACK = 32; |
+ static const int _STATE_HAS_PENDING = 64; |
+ static const int _STATE_PAUSE_COUNT = 128; |
+ static const int _STATE_PAUSE_COUNT_SHIFT = 7; |
/* Event handlers provided in constructor. */ |
_DataHandler<T> _onData; |
@@ -85,6 +86,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
/** Bit vector based on state-constants above. */ |
int _state; |
+ _Future _cancelFuture; |
floitsch
2013/10/12 18:53:57
Add "TODO(floitsch): reuse another field"
Anders Johnsen
2013/10/16 11:52:21
Done.
|
+ |
/** |
* Queue of pending events. |
* |
@@ -179,16 +182,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
} |
} |
- void cancel() { |
- if (_isCanceled) return; |
+ Future cancel() { |
+ _state &= ~_STATE_IN_ERROR_CANCEL; |
floitsch
2013/10/12 18:53:57
Add comment.
// The user doesn't want to receive a
Lasse Reichstein Nielsen
2013/10/14 11:32:33
any events anymore -> any further events.
This co
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
|
+ if (_isCanceled) return _cancelFuture; |
_cancel(); |
- if (!_inCallback) { |
- // otherwise checkState will be called after firing or callback completes. |
- _state |= _STATE_IN_CALLBACK; |
- _onCancel(); |
- _pending = null; |
- _state &= ~_STATE_IN_CALLBACK; |
- } |
+ return _cancelFuture; |
} |
Future asFuture([var futureValue]) { |
@@ -209,6 +207,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
+ bool get _inErrorCancel => (_state & _STATE_IN_ERROR_CANCEL) != 0; |
bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
@@ -224,6 +223,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
if (_hasPending) { |
_pending.cancelSchedule(); |
} |
+ if (!_inCallback) _pending = null; |
+ _cancelFuture = _onCancel(); |
floitsch
2013/10/12 18:53:57
I'm not sure we are allowed to call "_onCancel" wh
Lasse Reichstein Nielsen
2013/10/14 11:32:33
I accepted that we had to call _onCancel during an
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
|
} |
/** |
@@ -291,7 +292,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
assert(!_isInputPaused); |
} |
- void _onCancel() { |
+ Future _onCancel() { |
assert(_isCanceled); |
} |
@@ -333,18 +334,31 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
assert(!_isPaused); |
assert(!_inCallback); |
bool wasInputPaused = _isInputPaused; |
- _state |= _STATE_IN_CALLBACK; |
- if (!_zone.inSameErrorZone(Zone.current)) { |
- // Errors are not allowed to traverse zone boundaries. |
- Zone.current.handleUncaughtError(error); |
- } else { |
- _zone.runUnaryGuarded(_onError, error); |
+ void sendError() { |
floitsch
2013/10/12 18:53:57
one line before and one line after the function.
Anders Johnsen
2013/10/16 11:52:21
Done.
|
+ if (!_isCanceled || _inErrorCancel) { |
floitsch
2013/10/12 18:53:57
Comment: If the subscription has been canceled whi
Anders Johnsen
2013/10/16 11:52:21
Done.
|
+ _state |= _STATE_IN_CALLBACK; |
+ if (!_zone.inSameErrorZone(Zone.current)) { |
+ // Errors are not allowed to traverse zone boundaries. |
+ Zone.current.handleUncaughtError(error); |
+ } else { |
+ _zone.runUnaryGuarded(_onError, error); |
+ } |
+ _state &= ~_STATE_IN_CALLBACK; |
+ } |
} |
- _state &= ~_STATE_IN_CALLBACK; |
if (_cancelOnError) { |
+ _state |= _STATE_IN_ERROR_CANCEL; |
_cancel(); |
+ if (_cancelFuture != null) { |
+ _cancelFuture.whenComplete(sendError); |
+ } else { |
+ sendError(); |
+ } |
+ } else { |
+ sendError(); |
+ // Only check state if not cancelOnError. |
+ _checkState(wasInputPaused); |
} |
- _checkState(wasInputPaused); |
} |
void _sendDone() { |
@@ -353,7 +367,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
assert(!_inCallback); |
_state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
_zone.runGuarded(_onDone); |
- _onCancel(); // No checkState after cancel, it is always the last event. |
+ // TODO(ajohnsen): Run it before _onDone and wait for future? |
Anders Johnsen
2013/10/11 12:32:40
Please comment on this one - I'm not sure what the
floitsch
2013/10/12 18:53:57
Neither.
The done event should *not* call cancel.
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
|
+ _cancel(); // No checkState after cancel, it is always the last event. |
_state &= ~_STATE_IN_CALLBACK; |
} |
@@ -395,7 +410,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
// make a new state-change callback. Loop until the state didn't change. |
while (true) { |
if (_isCanceled) { |
- _onCancel(); |
_pending = null; |
return; |
} |
@@ -714,7 +728,7 @@ class _DummyStreamSubscription<T> implements StreamSubscription<T> { |
void resume() { |
if (_pauseCounter > 0) _pauseCounter--; |
} |
- void cancel() {} |
+ Future cancel() => null; |
bool get isPaused => _pauseCounter > 0; |
Future asFuture([futureValue]) => new _Future(); |
@@ -837,8 +851,9 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
_stream._resumeSubscription(); |
} |
- void cancel() { |
+ Future cancel() { |
_stream._cancelSubscription(); |
+ return null; |
} |
bool get isPaused { |
@@ -949,7 +964,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
_state = _STATE_DONE; |
} |
- void cancel() { |
+ Future cancel() { |
StreamSubscription subscription = _subscription; |
if (_state == _STATE_MOVING) { |
_Future<bool> hasNext = _futureOrPrefetch; |
@@ -958,7 +973,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
} else { |
_clear(); |
} |
- subscription.cancel(); |
+ return subscription.cancel(); |
} |
void _onData(T data) { |