Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart |
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
| index 8c9af64bb9e01fd4bfe9511e9024117bcce3e8e8..2dfc1048ac13f66634b2fae0997b9a70cbf888f7 100644 |
| --- a/sdk/lib/async/stream_impl.dart |
| +++ b/sdk/lib/async/stream_impl.dart |
| @@ -188,9 +188,10 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| // error or done event pending (waiting for the cancel to be done) discard |
| // that event. |
| _state &= ~_STATE_WAIT_FOR_CANCEL; |
| - if (_isCanceled) return _cancelFuture; |
| - _cancel(); |
| - return _cancelFuture; |
| + if (!_isCanceled) { |
| + _cancel(); |
| + } |
| + return _cancelFuture ?? Future._nullFuture; |
| } |
| Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
| @@ -199,8 +200,14 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| // Overwrite the onDone and onError handlers. |
| _onDone = () { result._complete(futureValue); }; |
| _onError = (error, stackTrace) { |
| - cancel(); |
| - result._completeError(error, stackTrace); |
| + Future cancelFuture = cancel(); |
| + if (cancelFuture is Future) { |
|
Lasse Reichstein Nielsen
2016/08/02 01:25:39
cancel() can't return null any more, so there is n
floitsch
2016/08/02 12:03:09
done.
|
| + cancelFuture.then((_) { |
|
Lasse Reichstein Nielsen
2016/08/02 01:25:39
If the cancelFuture has an error, then the result
floitsch
2016/08/02 12:03:09
done.
|
| + result._completeError(error, stackTrace); |
| + }); |
| + } else { |
| + result._completeError(error, stackTrace); |
| + } |
| }; |
| return result; |
| @@ -361,7 +368,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| if (_cancelOnError) { |
| _state |= _STATE_WAIT_FOR_CANCEL; |
| _cancel(); |
| - if (_cancelFuture is Future) { |
| + if (_cancelFuture is Future && |
| + !identical(_cancelFuture, Future._nullFuture)) { |
| _cancelFuture.whenComplete(sendError); |
| } else { |
| sendError(); |
| @@ -389,7 +397,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| _cancel(); |
| _state |= _STATE_WAIT_FOR_CANCEL; |
| - if (_cancelFuture is Future) { |
| + if (_cancelFuture is Future && |
| + !identical(_cancelFuture, Future._nullFuture)) { |
| _cancelFuture.whenComplete(sendDone); |
| } else { |
| sendDone(); |
| @@ -778,7 +787,7 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> { |
| } |
| } |
| - Future cancel() => null; |
| + Future cancel() => Future._nullFuture; |
| Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
| _Future/*<E>*/ result = new _Future/*<E>*/(); |
| @@ -916,7 +925,7 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
| Future cancel() { |
| _stream._cancelSubscription(); |
| - return null; |
| + return Future._nullFuture; |
| } |
| bool get isPaused { |
| @@ -1032,7 +1041,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
| Future cancel() { |
| StreamSubscription subscription = _subscription; |
| - if (subscription == null) return null; |
| + if (subscription == null) return Future._nullFuture; |
| if (_state == _STATE_MOVING) { |
| _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
| _clear(); |