Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(655)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove dir stuff. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index ee390cfc7f1bb8acd59e58c9f1bf911dba02624a..ecbd4dc10a4489ecaf55f673ffd51838c6e96aa2 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -71,10 +71,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
* when `cancelOnError` is true.
*/
static const int _STATE_CANCELED = 8;
- static const int _STATE_IN_CALLBACK = 16;
- static const int _STATE_HAS_PENDING = 32;
- static const int _STATE_PAUSE_COUNT = 64;
- static const int _STATE_PAUSE_COUNT_SHIFT = 6;
+ static const int _STATE_IN_ERROR_CANCEL = 16;
floitsch 2013/10/12 18:53:57 Add comment that _STATE_IN_ERROR_CANCEL implies th
Lasse Reichstein Nielsen 2013/10/14 11:32:33 And document what it means. In painful detail, bec
Anders Johnsen 2013/10/16 11:52:21 Done.
+ static const int _STATE_IN_CALLBACK = 32;
+ static const int _STATE_HAS_PENDING = 64;
+ static const int _STATE_PAUSE_COUNT = 128;
+ static const int _STATE_PAUSE_COUNT_SHIFT = 7;
/* Event handlers provided in constructor. */
_DataHandler<T> _onData;
@@ -85,6 +86,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
/** Bit vector based on state-constants above. */
int _state;
+ _Future _cancelFuture;
floitsch 2013/10/12 18:53:57 Add "TODO(floitsch): reuse another field"
Anders Johnsen 2013/10/16 11:52:21 Done.
+
/**
* Queue of pending events.
*
@@ -179,16 +182,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
}
}
- void cancel() {
- if (_isCanceled) return;
+ Future cancel() {
+ _state &= ~_STATE_IN_ERROR_CANCEL;
floitsch 2013/10/12 18:53:57 Add comment. // The user doesn't want to receive a
Lasse Reichstein Nielsen 2013/10/14 11:32:33 any events anymore -> any further events. This co
Anders Johnsen 2013/10/16 11:52:21 Done.
Anders Johnsen 2013/10/16 11:52:21 Done.
+ if (_isCanceled) return _cancelFuture;
_cancel();
- if (!_inCallback) {
- // otherwise checkState will be called after firing or callback completes.
- _state |= _STATE_IN_CALLBACK;
- _onCancel();
- _pending = null;
- _state &= ~_STATE_IN_CALLBACK;
- }
+ return _cancelFuture;
}
Future asFuture([var futureValue]) {
@@ -209,6 +207,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
bool get _isClosed => (_state & _STATE_CLOSED) != 0;
bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
+ bool get _inErrorCancel => (_state & _STATE_IN_ERROR_CANCEL) != 0;
bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
@@ -224,6 +223,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
if (_hasPending) {
_pending.cancelSchedule();
}
+ if (!_inCallback) _pending = null;
+ _cancelFuture = _onCancel();
floitsch 2013/10/12 18:53:57 I'm not sure we are allowed to call "_onCancel" wh
Lasse Reichstein Nielsen 2013/10/14 11:32:33 I accepted that we had to call _onCancel during an
Anders Johnsen 2013/10/16 11:52:21 Done.
Anders Johnsen 2013/10/16 11:52:21 Done.
}
/**
@@ -291,7 +292,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
assert(!_isInputPaused);
}
- void _onCancel() {
+ Future _onCancel() {
assert(_isCanceled);
}
@@ -333,18 +334,31 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
assert(!_isPaused);
assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
- _state |= _STATE_IN_CALLBACK;
- if (!_zone.inSameErrorZone(Zone.current)) {
- // Errors are not allowed to traverse zone boundaries.
- Zone.current.handleUncaughtError(error);
- } else {
- _zone.runUnaryGuarded(_onError, error);
+ void sendError() {
floitsch 2013/10/12 18:53:57 one line before and one line after the function.
Anders Johnsen 2013/10/16 11:52:21 Done.
+ if (!_isCanceled || _inErrorCancel) {
floitsch 2013/10/12 18:53:57 Comment: If the subscription has been canceled whi
Anders Johnsen 2013/10/16 11:52:21 Done.
+ _state |= _STATE_IN_CALLBACK;
+ if (!_zone.inSameErrorZone(Zone.current)) {
+ // Errors are not allowed to traverse zone boundaries.
+ Zone.current.handleUncaughtError(error);
+ } else {
+ _zone.runUnaryGuarded(_onError, error);
+ }
+ _state &= ~_STATE_IN_CALLBACK;
+ }
}
- _state &= ~_STATE_IN_CALLBACK;
if (_cancelOnError) {
+ _state |= _STATE_IN_ERROR_CANCEL;
_cancel();
+ if (_cancelFuture != null) {
+ _cancelFuture.whenComplete(sendError);
+ } else {
+ sendError();
+ }
+ } else {
+ sendError();
+ // Only check state if not cancelOnError.
+ _checkState(wasInputPaused);
}
- _checkState(wasInputPaused);
}
void _sendDone() {
@@ -353,7 +367,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
assert(!_inCallback);
_state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
_zone.runGuarded(_onDone);
- _onCancel(); // No checkState after cancel, it is always the last event.
+ // TODO(ajohnsen): Run it before _onDone and wait for future?
Anders Johnsen 2013/10/11 12:32:40 Please comment on this one - I'm not sure what the
floitsch 2013/10/12 18:53:57 Neither. The done event should *not* call cancel.
Anders Johnsen 2013/10/16 11:52:21 Done.
Anders Johnsen 2013/10/16 11:52:21 Done.
+ _cancel(); // No checkState after cancel, it is always the last event.
_state &= ~_STATE_IN_CALLBACK;
}
@@ -395,7 +410,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// make a new state-change callback. Loop until the state didn't change.
while (true) {
if (_isCanceled) {
- _onCancel();
_pending = null;
return;
}
@@ -714,7 +728,7 @@ class _DummyStreamSubscription<T> implements StreamSubscription<T> {
void resume() {
if (_pauseCounter > 0) _pauseCounter--;
}
- void cancel() {}
+ Future cancel() => null;
bool get isPaused => _pauseCounter > 0;
Future asFuture([futureValue]) => new _Future();
@@ -837,8 +851,9 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
_stream._resumeSubscription();
}
- void cancel() {
+ Future cancel() {
_stream._cancelSubscription();
+ return null;
}
bool get isPaused {
@@ -949,7 +964,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
_state = _STATE_DONE;
}
- void cancel() {
+ Future cancel() {
StreamSubscription subscription = _subscription;
if (_state == _STATE_MOVING) {
_Future<bool> hasNext = _futureOrPrefetch;
@@ -958,7 +973,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
} else {
_clear();
}
- subscription.cancel();
+ return subscription.cancel();
}
void _onData(T data) {

Powered by Google App Engine
This is Rietveld 408576698