Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 759f9c7474391c0dbc13aadc3c52b158223e0429..2ff9b9235a790bc2755a739e94ee50b4bb93d9f7 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -617,8 +617,8 @@ abstract class Stream<T> { |
// TODO(ahe): Restore type when feature is implemented in dart2js |
// checked mode. http://dartbug.com/7733 |
(/*T*/ value) { |
- future._setValue(value); |
subscription.cancel(); |
+ future._setValue(value); |
return; |
}, |
onError: future._setError, |
@@ -673,10 +673,10 @@ abstract class Stream<T> { |
// checked mode. http://dartbug.com/7733 |
(/*T*/ value) { |
if (foundResult) { |
+ subscription.cancel(); |
// This is the second element we get. |
Error error = new StateError("More than one element"); |
future._setError(new AsyncError(error)); |
- subscription.cancel(); |
return; |
} |
foundResult = true; |
@@ -845,8 +845,8 @@ abstract class Stream<T> { |
// checked mode. http://dartbug.com/7733 |
(/*T*/ value) { |
if (index == 0) { |
- future._setValue(value); |
subscription.cancel(); |
+ future._setValue(value); |
return; |
} |
index -= 1; |
@@ -1131,16 +1131,19 @@ class _EventTransformStreamSubscription<S, T> |
onDone: _handleDone); |
} |
+ /** Whether this subscription is still subscribed to its source. */ |
+ bool get _isSubscribed => _subscription != null; |
+ |
void pause([Future pauseSignal]) { |
- if (_subscription != null) _subscription.pause(pauseSignal); |
+ if (_isSubscribed) _subscription.pause(pauseSignal); |
} |
void resume() { |
- if (_subscription != null) _subscription.resume(); |
+ if (_isSubscribed) _subscription.resume(); |
} |
void cancel() { |
- if (_subscription != null) { |
+ if (_isSubscribed) { |
_subscription.cancel(); |
_subscription = null; |
} |
@@ -1163,7 +1166,6 @@ class _EventTransformStreamSubscription<S, T> |
} |
void _handleDone() { |
- _subscription = null; |
try { |
_transformer.handleDone(_sink); |
} catch (e, s) { |
@@ -1173,10 +1175,12 @@ class _EventTransformStreamSubscription<S, T> |
// EventOutputSink interface. |
void _sendData(T data) { |
+ if (!_isSubscribed) return; |
_onData(data); |
} |
void _sendError(AsyncError error) { |
+ if (!_isSubscribed) return; |
_onError(error); |
if (_unsubscribeOnError) { |
cancel(); |
@@ -1184,8 +1188,9 @@ class _EventTransformStreamSubscription<S, T> |
} |
void _sendDone() { |
- // It's ok to cancel even if we have been unsubscribed already. |
- cancel(); |
+ if (!_isSubscribed) return; |
+ _subscription.cancel(); |
+ _subscription = null; |
_onDone(); |
} |
} |