Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(368)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebase Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index c8fbe70f29ef6e07131376f3f14b6e55e244c147..dfbfa1ac2949ba41d692f711043c9d3c4ab9a642 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -85,6 +85,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
/** Bit vector based on state-constants above. */
int _state;
+ _FutureImpl _cancelFuture;
floitsch 2013/07/12 16:42:34 Add TODO that this should be in some field that is
Lasse Reichstein Nielsen 2013/07/17 07:28:46 And add documentation saying what it is, and where
+
/**
* Queue of pending events.
*
@@ -177,16 +179,25 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
}
}
- void cancel() {
- if (_isCanceled) return;
+ void _chainCancelFuture(_FutureImpl future) {
Lasse Reichstein Nielsen 2013/07/17 07:28:46 _cancelFuture._setOrChainValue(future); It handle
+ if (future == null) {
+ _cancelFuture._setValue(null);
+ } else {
+ future._chain(_cancelFuture);
+ }
+ }
+
+ Future cancel() {
+ if (_isCanceled) return _cancelFuture;
_cancel();
if (!_inCallback) {
// otherwise checkState will be called after firing or callback completes.
_state |= _STATE_IN_CALLBACK;
- _onCancel();
+ _chainCancelFuture(_onCancel());
Lasse Reichstein Nielsen 2013/07/17 07:28:46 So this line could just be: _cancelFuture.setOrCh
_pending = null;
_state &= ~_STATE_IN_CALLBACK;
}
+ return _cancelFuture;
}
Future asFuture([var futureValue]) {
@@ -218,6 +229,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
bool get isPaused => _isPaused;
void _cancel() {
+ _cancelFuture = new _FutureImpl();
_state |= _STATE_CANCELED;
_zone.cancelCallbackExpectation();
if (_hasPending) {
@@ -290,7 +302,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
assert(!_isInputPaused);
}
- void _onCancel() {
+ _FutureImpl _onCancel() {
assert(_isCanceled);
}
@@ -394,7 +406,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// make a new state-change callback. Loop until the state didn't change.
while (true) {
if (_isCanceled) {
- _onCancel();
+ _chainCancelFuture(_onCancel());
Lasse Reichstein Nielsen 2013/07/17 07:28:46 _cancelFuture._setOrChainValue(_onCancel());
_pending = null;
return;
}

Powered by Google App Engine
This is Rietveld 408576698