Index: pkg/scheduled_test/lib/src/utils.dart |
diff --git a/pkg/scheduled_test/lib/src/utils.dart b/pkg/scheduled_test/lib/src/utils.dart |
index 80d8bc467d0aa6a7ffc71b80c04efaf7e14b7638..9a940cca8dd65b6d4a3f2b8c7e118f17c961be6d 100644 |
--- a/pkg/scheduled_test/lib/src/utils.dart |
+++ b/pkg/scheduled_test/lib/src/utils.dart |
@@ -78,19 +78,48 @@ bool orderedIterableEquals(Iterable iterable1, Iterable iterable2) { |
Stream errorStream(error) => new Future.error(error).asStream(); |
/// Returns a buffered stream that will emit the same values as the stream |
-/// returned by [future] once [future] completes. If [future] completes to an |
-/// error, the return value will emit that error and then close. |
-Stream futureStream(Future<Stream> future) { |
- var controller = new StreamController(sync: true); |
- future.then((stream) { |
- stream.listen( |
- controller.add, |
- onError: controller.addError, |
- onDone: controller.close); |
- }).catchError((e) { |
- controller.addError(e); |
+/// returned by [future] once [future] completes. |
+/// |
+/// If [future] completes to an error, the return value will emit that error and |
+/// then close. |
+/// |
+/// If [broadcast] is true, a broadcast stream is returned. This assumes that |
+/// the stream returned by [future] will be a broadcast stream as well. |
+/// [broadcast] defaults to false. |
+Stream futureStream(Future<Stream> future, {bool broadcast: false}) { |
+ var subscription; |
+ var controller; |
+ |
+ future = future.catchError((e, stackTrace) { |
+ if (controller == null) return; |
+ controller.addError(e, stackTrace); |
controller.close(); |
+ controller = null; |
}); |
+ |
+ onListen() { |
+ future.then((stream) { |
+ if (controller == null) return; |
+ subscription = stream.listen( |
+ controller.add, |
+ onError: controller.addError, |
+ onDone: controller.close); |
+ }); |
+ } |
+ |
+ onCancel() { |
+ if (subscription != null) subscription.cancel(); |
+ subscription = null; |
+ controller = null; |
+ } |
+ |
+ if (broadcast) { |
+ controller = new StreamController.broadcast( |
+ sync: true, onListen: onListen, onCancel: onCancel); |
+ } else { |
+ controller = new StreamController( |
+ sync: true, onListen: onListen, onCancel: onCancel); |
+ } |
return controller.stream; |
} |