| Index: sdk/lib/async/stream.dart
|
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
|
| index 759f9c7474391c0dbc13aadc3c52b158223e0429..2ff9b9235a790bc2755a739e94ee50b4bb93d9f7 100644
|
| --- a/sdk/lib/async/stream.dart
|
| +++ b/sdk/lib/async/stream.dart
|
| @@ -617,8 +617,8 @@ abstract class Stream<T> {
|
| // TODO(ahe): Restore type when feature is implemented in dart2js
|
| // checked mode. http://dartbug.com/7733
|
| (/*T*/ value) {
|
| - future._setValue(value);
|
| subscription.cancel();
|
| + future._setValue(value);
|
| return;
|
| },
|
| onError: future._setError,
|
| @@ -673,10 +673,10 @@ abstract class Stream<T> {
|
| // checked mode. http://dartbug.com/7733
|
| (/*T*/ value) {
|
| if (foundResult) {
|
| + subscription.cancel();
|
| // This is the second element we get.
|
| Error error = new StateError("More than one element");
|
| future._setError(new AsyncError(error));
|
| - subscription.cancel();
|
| return;
|
| }
|
| foundResult = true;
|
| @@ -845,8 +845,8 @@ abstract class Stream<T> {
|
| // checked mode. http://dartbug.com/7733
|
| (/*T*/ value) {
|
| if (index == 0) {
|
| - future._setValue(value);
|
| subscription.cancel();
|
| + future._setValue(value);
|
| return;
|
| }
|
| index -= 1;
|
| @@ -1131,16 +1131,19 @@ class _EventTransformStreamSubscription<S, T>
|
| onDone: _handleDone);
|
| }
|
|
|
| + /** Whether this subscription is still subscribed to its source. */
|
| + bool get _isSubscribed => _subscription != null;
|
| +
|
| void pause([Future pauseSignal]) {
|
| - if (_subscription != null) _subscription.pause(pauseSignal);
|
| + if (_isSubscribed) _subscription.pause(pauseSignal);
|
| }
|
|
|
| void resume() {
|
| - if (_subscription != null) _subscription.resume();
|
| + if (_isSubscribed) _subscription.resume();
|
| }
|
|
|
| void cancel() {
|
| - if (_subscription != null) {
|
| + if (_isSubscribed) {
|
| _subscription.cancel();
|
| _subscription = null;
|
| }
|
| @@ -1163,7 +1166,6 @@ class _EventTransformStreamSubscription<S, T>
|
| }
|
|
|
| void _handleDone() {
|
| - _subscription = null;
|
| try {
|
| _transformer.handleDone(_sink);
|
| } catch (e, s) {
|
| @@ -1173,10 +1175,12 @@ class _EventTransformStreamSubscription<S, T>
|
|
|
| // EventOutputSink interface.
|
| void _sendData(T data) {
|
| + if (!_isSubscribed) return;
|
| _onData(data);
|
| }
|
|
|
| void _sendError(AsyncError error) {
|
| + if (!_isSubscribed) return;
|
| _onError(error);
|
| if (_unsubscribeOnError) {
|
| cancel();
|
| @@ -1184,8 +1188,9 @@ class _EventTransformStreamSubscription<S, T>
|
| }
|
|
|
| void _sendDone() {
|
| - // It's ok to cancel even if we have been unsubscribed already.
|
| - cancel();
|
| + if (!_isSubscribed) return;
|
| + _subscription.cancel();
|
| + _subscription = null;
|
| _onDone();
|
| }
|
| }
|
|
|