Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 2202533003: Return futures on Stream.cancel when possible. (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: Don't make Pipe.cancel wait for the null future. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index 8c9af64bb9e01fd4bfe9511e9024117bcce3e8e8..cb2e20d0f1373c8ec59700d85e76e81e2dc91f5a 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -188,9 +188,10 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// error or done event pending (waiting for the cancel to be done) discard
// that event.
_state &= ~_STATE_WAIT_FOR_CANCEL;
- if (_isCanceled) return _cancelFuture;
- _cancel();
- return _cancelFuture;
+ if (!_isCanceled) {
+ _cancel();
+ }
+ return _cancelFuture ?? Future._nullFuture;
}
Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
@@ -199,8 +200,14 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// Overwrite the onDone and onError handlers.
_onDone = () { result._complete(futureValue); };
_onError = (error, stackTrace) {
- cancel();
- result._completeError(error, stackTrace);
+ Future cancelFuture = cancel();
+ if (!identical(cancelFuture, Future._nullFuture)) {
+ cancelFuture.whenComplete(() {
+ result._completeError(error, stackTrace);
+ });
+ } else {
+ result._completeError(error, stackTrace);
+ }
};
return result;
@@ -361,7 +368,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
if (_cancelOnError) {
_state |= _STATE_WAIT_FOR_CANCEL;
_cancel();
- if (_cancelFuture is Future) {
+ if (_cancelFuture is Future &&
+ !identical(_cancelFuture, Future._nullFuture)) {
_cancelFuture.whenComplete(sendError);
} else {
sendError();
@@ -389,7 +397,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
_cancel();
_state |= _STATE_WAIT_FOR_CANCEL;
- if (_cancelFuture is Future) {
+ if (_cancelFuture is Future &&
+ !identical(_cancelFuture, Future._nullFuture)) {
_cancelFuture.whenComplete(sendDone);
} else {
sendDone();
@@ -778,7 +787,7 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
}
}
- Future cancel() => null;
+ Future cancel() => Future._nullFuture;
Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
_Future/*<E>*/ result = new _Future/*<E>*/();
@@ -916,7 +925,7 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
Future cancel() {
_stream._cancelSubscription();
- return null;
+ return Future._nullFuture;
}
bool get isPaused {
@@ -1032,7 +1041,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
Future cancel() {
StreamSubscription subscription = _subscription;
- if (subscription == null) return null;
+ if (subscription == null) return Future._nullFuture;
if (_state == _STATE_MOVING) {
_Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
_clear();
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698