Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 5c1ff6586db7e289246bcb052c7d67b9c43cf7f7..c1699f0d636b1a22025551fcd36f9d5ccc4f3674 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -1135,6 +1135,8 @@ class _EventTransformStreamSubscription<S, T> |
final StreamEventTransformer<S, T> _transformer; |
/** Whether to unsubscribe when emitting an error. */ |
final bool _unsubscribeOnError; |
+ /** Whether this stream has sent a done event. */ |
+ bool _isClosed = false; |
/** Source of incoming events. */ |
StreamSubscription<S> _subscription; |
/** Cached EventSink wrapper for this class. */ |
@@ -1166,9 +1168,11 @@ class _EventTransformStreamSubscription<S, T> |
void cancel() { |
if (_isSubscribed) { |
- _subscription.cancel(); |
+ StreamSubscription subscription = _subscription; |
_subscription = null; |
+ subscription.cancel(); |
} |
+ _isClosed = true; |
} |
void _handleData(S data) { |
@@ -1189,6 +1193,7 @@ class _EventTransformStreamSubscription<S, T> |
void _handleDone() { |
try { |
+ _subscription = null; |
_transformer.handleDone(_sink); |
} catch (e, s) { |
_sendError(_asyncError(e, s)); |
@@ -1197,12 +1202,12 @@ class _EventTransformStreamSubscription<S, T> |
// EventOutputSink interface. |
void _sendData(T data) { |
- if (!_isSubscribed) return; |
+ if (_isClosed) return; |
_onData(data); |
} |
void _sendError(AsyncError error) { |
- if (!_isSubscribed) return; |
+ if (_isClosed) return; |
_onError(error); |
if (_unsubscribeOnError) { |
cancel(); |
@@ -1210,9 +1215,12 @@ class _EventTransformStreamSubscription<S, T> |
} |
void _sendDone() { |
- if (!_isSubscribed) return; |
- _subscription.cancel(); |
- _subscription = null; |
+ if (_isClosed) throw new StateError("Already closed."); |
+ _isClosed = true; |
+ if (_isSubscribed) { |
+ _subscription.cancel(); |
+ _subscription = null; |
+ } |
_onDone(); |
} |
} |