| Index: sdk/lib/async/stream_impl.dart
|
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
|
| index 8c9af64bb9e01fd4bfe9511e9024117bcce3e8e8..cb2e20d0f1373c8ec59700d85e76e81e2dc91f5a 100644
|
| --- a/sdk/lib/async/stream_impl.dart
|
| +++ b/sdk/lib/async/stream_impl.dart
|
| @@ -188,9 +188,10 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| // error or done event pending (waiting for the cancel to be done) discard
|
| // that event.
|
| _state &= ~_STATE_WAIT_FOR_CANCEL;
|
| - if (_isCanceled) return _cancelFuture;
|
| - _cancel();
|
| - return _cancelFuture;
|
| + if (!_isCanceled) {
|
| + _cancel();
|
| + }
|
| + return _cancelFuture ?? Future._nullFuture;
|
| }
|
|
|
| Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
|
| @@ -199,8 +200,14 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| // Overwrite the onDone and onError handlers.
|
| _onDone = () { result._complete(futureValue); };
|
| _onError = (error, stackTrace) {
|
| - cancel();
|
| - result._completeError(error, stackTrace);
|
| + Future cancelFuture = cancel();
|
| + if (!identical(cancelFuture, Future._nullFuture)) {
|
| + cancelFuture.whenComplete(() {
|
| + result._completeError(error, stackTrace);
|
| + });
|
| + } else {
|
| + result._completeError(error, stackTrace);
|
| + }
|
| };
|
|
|
| return result;
|
| @@ -361,7 +368,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| if (_cancelOnError) {
|
| _state |= _STATE_WAIT_FOR_CANCEL;
|
| _cancel();
|
| - if (_cancelFuture is Future) {
|
| + if (_cancelFuture is Future &&
|
| + !identical(_cancelFuture, Future._nullFuture)) {
|
| _cancelFuture.whenComplete(sendError);
|
| } else {
|
| sendError();
|
| @@ -389,7 +397,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
|
|
| _cancel();
|
| _state |= _STATE_WAIT_FOR_CANCEL;
|
| - if (_cancelFuture is Future) {
|
| + if (_cancelFuture is Future &&
|
| + !identical(_cancelFuture, Future._nullFuture)) {
|
| _cancelFuture.whenComplete(sendDone);
|
| } else {
|
| sendDone();
|
| @@ -778,7 +787,7 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
|
| }
|
| }
|
|
|
| - Future cancel() => null;
|
| + Future cancel() => Future._nullFuture;
|
|
|
| Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
|
| _Future/*<E>*/ result = new _Future/*<E>*/();
|
| @@ -916,7 +925,7 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
|
|
|
| Future cancel() {
|
| _stream._cancelSubscription();
|
| - return null;
|
| + return Future._nullFuture;
|
| }
|
|
|
| bool get isPaused {
|
| @@ -1032,7 +1041,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
|
|
| Future cancel() {
|
| StreamSubscription subscription = _subscription;
|
| - if (subscription == null) return null;
|
| + if (subscription == null) return Future._nullFuture;
|
| if (_state == _STATE_MOVING) {
|
| _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| _clear();
|
|
|