Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(343)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Mark failing tests. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index 7f128ed658fc756e7fe6631e76cedd44f5ea9ba4..bf7c8f015c9a51ddf3ba7a3f3a4775fb85237adc 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -73,10 +73,20 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
* when `cancelOnError` is true.
*/
static const int _STATE_CANCELED = 8;
- static const int _STATE_IN_CALLBACK = 16;
- static const int _STATE_HAS_PENDING = 32;
- static const int _STATE_PAUSE_COUNT = 64;
- static const int _STATE_PAUSE_COUNT_SHIFT = 6;
+ /**
+ * Set when either:
+ *
+ * * an error is sent, and [cancelOnError] is true, or
+ * * a done event is sent.
+ *
+ * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the
+ * state is unset, and no furher events must be delivered.
+ */
+ static const int _STATE_WAIT_FOR_CANCEL = 16;
+ static const int _STATE_IN_CALLBACK = 32;
+ static const int _STATE_HAS_PENDING = 64;
+ static const int _STATE_PAUSE_COUNT = 128;
+ static const int _STATE_PAUSE_COUNT_SHIFT = 7;
/* Event handlers provided in constructor. */
_DataHandler<T> _onData;
@@ -87,6 +97,10 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
/** Bit vector based on state-constants above. */
int _state;
+ // TODO(floitsch): reuse another field
+ /** The future [_onCancel] may return. */
+ Future _cancelFuture;
+
/**
* Queue of pending events.
*
@@ -171,16 +185,14 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
}
}
- void cancel() {
- if (_isCanceled) return;
+ Future cancel() {
+ // The user doesn't want to receive any further events. If there is an
+ // error or done event pending (waiting for the cancel to be done) discard
+ // that event.
+ _state &= ~_STATE_WAIT_FOR_CANCEL;
+ if (_isCanceled) return _cancelFuture;
_cancel();
- if (!_inCallback) {
- // otherwise checkState will be called after firing or callback completes.
- _state |= _STATE_IN_CALLBACK;
- _onCancel();
- _pending = null;
- _state &= ~_STATE_IN_CALLBACK;
- }
+ return _cancelFuture;
}
Future asFuture([var futureValue]) {
@@ -201,6 +213,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
bool get _isClosed => (_state & _STATE_CLOSED) != 0;
bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
+ bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0;
bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
@@ -216,6 +229,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
if (_hasPending) {
_pending.cancelSchedule();
}
+ if (!_inCallback) _pending = null;
+ _cancelFuture = _onCancel();
}
/**
@@ -283,7 +298,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
assert(!_isInputPaused);
}
- void _onCancel() {
+ Future _onCancel() {
assert(_isCanceled);
}
@@ -325,30 +340,59 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
assert(!_isPaused);
assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
- _state |= _STATE_IN_CALLBACK;
- if (!_zone.inSameErrorZone(Zone.current)) {
- // Errors are not allowed to traverse zone boundaries.
- Zone.current.handleUncaughtError(error, stackTrace);
- } else if (_onError is ZoneBinaryCallback) {
- _zone.runBinaryGuarded(_onError, error, stackTrace);
- } else {
- _zone.runUnaryGuarded(_onError, error);
+
+ void sendError() {
+ // If the subscription has been canceled while waiting for the cancel
+ // future to finish we must not report the error.
+ if (_isCanceled && !_waitsForCancel) return;
+ _state |= _STATE_IN_CALLBACK;
+ if (!_zone.inSameErrorZone(Zone.current)) {
+ // Errors are not allowed to traverse zone boundaries.
+ Zone.current.handleUncaughtError(error, stackTrace);
+ } else if (_onError is ZoneBinaryCallback) {
+ _zone.runBinaryGuarded(_onError, error, stackTrace);
+ } else {
+ _zone.runUnaryGuarded(_onError, error);
+ }
+ _state &= ~_STATE_IN_CALLBACK;
}
- _state &= ~_STATE_IN_CALLBACK;
+
if (_cancelOnError) {
+ _state |= _STATE_WAIT_FOR_CANCEL;
_cancel();
+ if (_cancelFuture is Future) {
+ _cancelFuture.whenComplete(sendError);
+ } else {
+ sendError();
+ }
+ } else {
+ sendError();
+ // Only check state if not cancelOnError.
+ _checkState(wasInputPaused);
}
- _checkState(wasInputPaused);
}
void _sendDone() {
assert(!_isCanceled);
assert(!_isPaused);
assert(!_inCallback);
- _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
- _zone.runGuarded(_onDone);
- _onCancel(); // No checkState after cancel, it is always the last event.
- _state &= ~_STATE_IN_CALLBACK;
+
+ void sendDone() {
+ // If the subscription has been canceled while waiting for the cancel
+ // future to finish we must not report the done event.
+ if (!_waitsForCancel) return;
+ _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
+ _zone.runGuarded(_onDone);
+ _state &= ~_STATE_IN_CALLBACK;
+ }
+
+ _cancel();
+ _state |= _STATE_WAIT_FOR_CANCEL;
+ if (_cancelFuture is Future) {
+ _cancelFuture.whenComplete(sendDone);
+ } else {
+ sendDone();
+ }
}
/**
@@ -389,7 +433,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// make a new state-change callback. Loop until the state didn't change.
while (true) {
if (_isCanceled) {
- _onCancel();
_pending = null;
return;
}
@@ -699,7 +742,7 @@ class _DummyStreamSubscription<T> implements StreamSubscription<T> {
void resume() {
if (_pauseCounter > 0) _pauseCounter--;
}
- void cancel() {}
+ Future cancel() => null;
bool get isPaused => _pauseCounter > 0;
Future asFuture([futureValue]) => new _Future();
@@ -823,8 +866,9 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
_stream._resumeSubscription();
}
- void cancel() {
+ Future cancel() {
_stream._cancelSubscription();
+ return null;
}
bool get isPaused {
@@ -935,7 +979,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
_state = _STATE_DONE;
}
- void cancel() {
+ Future cancel() {
StreamSubscription subscription = _subscription;
if (_state == _STATE_MOVING) {
_Future<bool> hasNext = _futureOrPrefetch;
@@ -944,7 +988,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
} else {
_clear();
}
- subscription.cancel();
+ return subscription.cancel();
}
void _onData(T data) {
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698