Index: utils/pub/error_group.dart |
diff --git a/utils/pub/error_group.dart b/utils/pub/error_group.dart |
index 90edfa5179678e276142788b61472519e1f06280..16b96b19c36dd3c1ddcb70c54e7b535e1db6c865 100644 |
--- a/utils/pub/error_group.dart |
+++ b/utils/pub/error_group.dart |
@@ -232,19 +232,27 @@ class _ErrorGroupStream extends Stream { |
/// Whether [this] has any listeners. |
bool get _hasListeners => _controller.hasSubscribers; |
+ // TODO(nweiz): Remove this when issue 8512 is fixed. |
+ /// Whether the subscription has been cancelled. |
+ bool _cancelled = false; |
+ |
/// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps |
/// [inner]. |
_ErrorGroupStream(this._group, Stream inner) |
: _controller = inner.isBroadcast ? |
new StreamController.broadcast() : |
new StreamController() { |
- _subscription = inner.listen(_controller.add, |
- onError: (e) => _group._signalError(e), |
- onDone: () { |
- _isDone = true; |
- _group._signalStreamComplete(this); |
- _controller.close(); |
- }); |
+ _subscription = inner.listen((v) { |
+ if (!_cancelled) _controller.add(v); |
+ }, onError: (e) { |
+ if (!_cancelled) _group._signalError(e); |
+ }, onDone: () { |
+ if (!_cancelled) { |
+ _isDone = true; |
+ _group._signalStreamComplete(this); |
+ _controller.close(); |
+ } |
+ }); |
} |
StreamSubscription listen(void onData(value), |
@@ -260,6 +268,7 @@ class _ErrorGroupStream extends Stream { |
/// unless it's already complete. |
void _signalError(AsyncError e) { |
if (_isDone) return; |
+ _cancelled = true; |
_subscription.cancel(); |
// Call these asynchronously to work around issue 7913. |
defer(() { |