Index: sdk/lib/_internal/pub/lib/src/error_group.dart |
diff --git a/sdk/lib/_internal/pub/lib/src/error_group.dart b/sdk/lib/_internal/pub/lib/src/error_group.dart |
index cc7b0071b3d82b4eda21a1fe63faecee17ea10ed..b49b45a3702b3ffbdaeeec7f15140c071bcf1279 100644 |
--- a/sdk/lib/_internal/pub/lib/src/error_group.dart |
+++ b/sdk/lib/_internal/pub/lib/src/error_group.dart |
@@ -225,6 +225,10 @@ class _ErrorGroupStream extends Stream { |
/// The underlying [StreamController] for [this]. |
final StreamController _controller; |
+ /// The controller's [Stream]. May be different than `_controller.stream` if |
+ /// the wrapped stream is a broadcasting stream. |
+ Stream _stream; |
+ |
/// The [StreamSubscription] that connects the wrapped [Stream] to |
/// [_controller]. |
StreamSubscription _subscription; |
@@ -235,9 +239,12 @@ class _ErrorGroupStream extends Stream { |
/// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps |
/// [inner]. |
_ErrorGroupStream(this._group, Stream inner) |
- : _controller = |
- inner.isBroadcast ? new StreamController.broadcast(sync: true) |
- : new StreamController(sync: true) { |
+ : _controller = new StreamController(sync: true) { |
+ // Use old-style asBroadcastStream behavior - cancel source _subscription |
+ // the first time the stream has no listeners. |
+ _stream = inner.isBroadcast |
+ ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) |
+ : _controller.stream; |
_subscription = inner.listen((v) { |
_controller.add(v); |
}, onError: (e) { |
@@ -252,7 +259,7 @@ class _ErrorGroupStream extends Stream { |
StreamSubscription listen(void onData(value), |
{void onError(var error), void onDone(), |
bool cancelOnError}) { |
- return _controller.stream.listen(onData, |
+ return _stream.listen(onData, |
onError: onError, |
onDone: onDone, |
cancelOnError: true); |