Index: packages/async/test/stream_group_test.dart |
diff --git a/packages/async/test/stream_group_test.dart b/packages/async/test/stream_group_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..db6c9387136cc4f39c1a94710baf9d80133c376d |
--- /dev/null |
+++ b/packages/async/test/stream_group_test.dart |
@@ -0,0 +1,723 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library async.test.stream_group_test; |
+ |
+import 'dart:async'; |
+ |
+import 'package:async/async.dart'; |
+import 'package:test/test.dart'; |
+ |
+main() { |
+ group("single-subscription", () { |
+ var streamGroup; |
+ setUp(() { |
+ streamGroup = new StreamGroup<String>(); |
+ }); |
+ |
+ test("buffers events from multiple sources", () async { |
+ var controller1 = new StreamController<String>(); |
+ streamGroup.add(controller1.stream); |
+ controller1.add("first"); |
+ controller1.close(); |
+ |
+ var controller2 = new StreamController<String>(); |
+ streamGroup.add(controller2.stream); |
+ controller2.add("second"); |
+ controller2.close(); |
+ |
+ await flushMicrotasks(); |
+ |
+ expect(streamGroup.close(), completes); |
+ |
+ expect(streamGroup.stream.toList(), |
+ completion(unorderedEquals(["first", "second"]))); |
+ }); |
+ |
+ test("buffers errors from multiple sources", () async { |
+ var controller1 = new StreamController<String>(); |
+ streamGroup.add(controller1.stream); |
+ controller1.addError("first"); |
+ controller1.close(); |
+ |
+ var controller2 = new StreamController<String>(); |
+ streamGroup.add(controller2.stream); |
+ controller2.addError("second"); |
+ controller2.close(); |
+ |
+ await flushMicrotasks(); |
+ |
+ expect(streamGroup.close(), completes); |
+ |
+ var transformed = streamGroup.stream.transform( |
+ new StreamTransformer.fromHandlers( |
+ handleError: (error, _, sink) => sink.add("error: $error"))); |
+ expect(transformed.toList(), |
+ completion(equals(["error: first", "error: second"]))); |
+ }); |
+ |
+ test("buffers events and errors together", () async { |
+ var controller = new StreamController<String>(); |
+ streamGroup.add(controller.stream); |
+ |
+ controller.add("first"); |
+ controller.addError("second"); |
+ controller.add("third"); |
+ controller.addError("fourth"); |
+ controller.addError("fifth"); |
+ controller.add("sixth"); |
+ controller.close(); |
+ |
+ await flushMicrotasks(); |
+ |
+ expect(streamGroup.close(), completes); |
+ |
+ var transformed = streamGroup.stream.transform( |
+ new StreamTransformer.fromHandlers( |
+ handleData: (data, sink) => sink.add("data: $data"), |
+ handleError: (error, _, sink) => sink.add("error: $error"))); |
+ expect(transformed.toList(), completion(equals([ |
+ "data: first", |
+ "error: second", |
+ "data: third", |
+ "error: fourth", |
+ "error: fifth", |
+ "data: sixth" |
+ ]))); |
+ }); |
+ |
+ test("emits events once there's a listener", () { |
+ var controller = new StreamController<String>(); |
+ streamGroup.add(controller.stream); |
+ |
+ expect(streamGroup.stream.toList(), |
+ completion(equals(["first", "second"]))); |
+ |
+ controller.add("first"); |
+ controller.add("second"); |
+ controller.close(); |
+ |
+ expect(streamGroup.close(), completes); |
+ }); |
+ |
+ test("doesn't buffer events from a broadcast stream", () async { |
+ var controller = new StreamController<String>.broadcast(); |
+ streamGroup.add(controller.stream); |
+ |
+ controller.add("first"); |
+ controller.add("second"); |
+ controller.close(); |
+ |
+ await flushMicrotasks(); |
+ |
+ expect(streamGroup.close(), completes); |
+ expect(streamGroup.stream.toList(), completion(isEmpty)); |
+ }); |
+ |
+ test("when paused, buffers events from a broadcast stream", () async { |
+ var controller = new StreamController<String>.broadcast(); |
+ streamGroup.add(controller.stream); |
+ |
+ var events = []; |
+ var subscription = streamGroup.stream.listen(events.add); |
+ subscription.pause(); |
+ |
+ controller.add("first"); |
+ controller.add("second"); |
+ controller.close(); |
+ await flushMicrotasks(); |
+ |
+ subscription.resume(); |
+ expect(streamGroup.close(), completes); |
+ await flushMicrotasks(); |
+ |
+ expect(events, equals(["first", "second"])); |
+ }); |
+ |
+ test("emits events from a broadcast stream once there's a listener", () { |
+ var controller = new StreamController<String>.broadcast(); |
+ streamGroup.add(controller.stream); |
+ |
+ expect(streamGroup.stream.toList(), |
+ completion(equals(["first", "second"]))); |
+ |
+ controller.add("first"); |
+ controller.add("second"); |
+ controller.close(); |
+ |
+ expect(streamGroup.close(), completes); |
+ }); |
+ |
+ test("forwards cancel errors", () async { |
+ var subscription = streamGroup.stream.listen(null); |
+ |
+ var controller = new StreamController<String>( |
+ onCancel: () => throw "error"); |
+ streamGroup.add(controller.stream); |
+ await flushMicrotasks(); |
+ |
+ expect(subscription.cancel(), throwsA("error")); |
+ }); |
+ |
+ test("forwards a cancel future", () async { |
+ var subscription = streamGroup.stream.listen(null); |
+ |
+ var completer = new Completer(); |
+ var controller = new StreamController<String>( |
+ onCancel: () => completer.future); |
+ streamGroup.add(controller.stream); |
+ await flushMicrotasks(); |
+ |
+ var fired = false; |
+ subscription.cancel().then((_) => fired = true); |
+ |
+ await flushMicrotasks(); |
+ expect(fired, isFalse); |
+ |
+ completer.complete(); |
+ await flushMicrotasks(); |
+ expect(fired, isTrue); |
+ }); |
+ |
+ test("add() while active pauses the stream if the group is paused, then " |
+ "resumes once the group resumes", () async { |
+ var subscription = streamGroup.stream.listen(null); |
+ await flushMicrotasks(); |
+ |
+ var paused = false; |
+ var controller = new StreamController<String>( |
+ onPause: () => paused = true, |
+ onResume: () => paused = false); |
+ |
+ subscription.pause(); |
+ await flushMicrotasks(); |
+ |
+ streamGroup.add(controller.stream); |
+ await flushMicrotasks(); |
+ expect(paused, isTrue); |
+ |
+ subscription.resume(); |
+ await flushMicrotasks(); |
+ expect(paused, isFalse); |
+ }); |
+ |
+ group("add() while canceled", () { |
+ setUp(() async { |
+ streamGroup.stream.listen(null).cancel(); |
+ await flushMicrotasks(); |
+ }); |
+ |
+ test("immediately listens to and cancels the stream", () async { |
+ var listened = false; |
+ var canceled = false; |
+ var controller = new StreamController<String>(onListen: () { |
+ listened = true; |
+ }, onCancel: expectAsync(() { |
+ expect(listened, isTrue); |
+ canceled = true; |
+ })); |
+ |
+ streamGroup.add(controller.stream); |
+ await flushMicrotasks(); |
+ expect(listened, isTrue); |
+ expect(canceled, isTrue); |
+ }); |
+ |
+ test("forwards cancel errors", () { |
+ var controller = new StreamController<String>( |
+ onCancel: () => throw "error"); |
+ |
+ expect(streamGroup.add(controller.stream), throwsA("error")); |
+ }); |
+ |
+ test("forwards a cancel future", () async { |
+ var completer = new Completer(); |
+ var controller = new StreamController<String>( |
+ onCancel: () => completer.future); |
+ |
+ var fired = false; |
+ streamGroup.add(controller.stream).then((_) => fired = true); |
+ |
+ await flushMicrotasks(); |
+ expect(fired, isFalse); |
+ |
+ completer.complete(); |
+ await flushMicrotasks(); |
+ expect(fired, isTrue); |
+ }); |
+ }); |
+ }); |
+ |
+ group("broadcast", () { |
+ var streamGroup; |
+ setUp(() { |
+ streamGroup = new StreamGroup<String>.broadcast(); |
+ }); |
+ |
+ test("buffers events from multiple sources", () async { |
+ var controller1 = new StreamController<String>(); |
+ streamGroup.add(controller1.stream); |
+ controller1.add("first"); |
+ controller1.close(); |
+ |
+ var controller2 = new StreamController<String>(); |
+ streamGroup.add(controller2.stream); |
+ controller2.add("second"); |
+ controller2.close(); |
+ |
+ await flushMicrotasks(); |
+ |
+ expect(streamGroup.close(), completes); |
+ |
+ expect(streamGroup.stream.toList(), |
+ completion(equals(["first", "second"]))); |
+ }); |
+ |
+ test("emits events from multiple sources once there's a listener", () { |
+ var controller1 = new StreamController<String>(); |
+ streamGroup.add(controller1.stream); |
+ |
+ var controller2 = new StreamController<String>(); |
+ streamGroup.add(controller2.stream); |
+ |
+ expect(streamGroup.stream.toList(), |
+ completion(equals(["first", "second"]))); |
+ |
+ controller1.add("first"); |
+ controller2.add("second"); |
+ controller1.close(); |
+ controller2.close(); |
+ |
+ expect(streamGroup.close(), completes); |
+ }); |
+ |
+ test("doesn't buffer events once a listener has been added and removed", |
+ () async { |
+ var controller = new StreamController<String>(); |
+ streamGroup.add(controller.stream); |
+ |
+ streamGroup.stream.listen(null).cancel(); |
+ await flushMicrotasks(); |
+ |
+ controller.add("first"); |
+ controller.addError("second"); |
+ controller.close(); |
+ |
+ await flushMicrotasks(); |
+ |
+ expect(streamGroup.close(), completes); |
+ expect(streamGroup.stream.toList(), completion(isEmpty)); |
+ }); |
+ |
+ test("doesn't buffer events from a broadcast stream", () async { |
+ var controller = new StreamController<String>.broadcast(); |
+ streamGroup.add(controller.stream); |
+ controller.add("first"); |
+ controller.addError("second"); |
+ controller.close(); |
+ |
+ await flushMicrotasks(); |
+ |
+ expect(streamGroup.close(), completes); |
+ expect(streamGroup.stream.toList(), completion(isEmpty)); |
+ }); |
+ |
+ test("emits events from a broadcast stream once there's a listener", () { |
+ var controller = new StreamController<String>.broadcast(); |
+ streamGroup.add(controller.stream); |
+ |
+ expect(streamGroup.stream.toList(), |
+ completion(equals(["first", "second"]))); |
+ |
+ controller.add("first"); |
+ controller.add("second"); |
+ controller.close(); |
+ |
+ expect(streamGroup.close(), completes); |
+ }); |
+ |
+ test("cancels and re-listens broadcast streams", () async { |
+ var subscription = streamGroup.stream.listen(null); |
+ |
+ var controller = new StreamController<String>.broadcast(); |
+ |
+ streamGroup.add(controller.stream); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ subscription.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ |
+ streamGroup.stream.listen(null); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ }); |
+ |
+ test("never cancels single-subscription streams", () async { |
+ var subscription = streamGroup.stream.listen(null); |
+ |
+ var controller = new StreamController<String>( |
+ onCancel: expectAsync(() {}, count: 0)); |
+ |
+ streamGroup.add(controller.stream); |
+ await flushMicrotasks(); |
+ |
+ subscription.cancel(); |
+ await flushMicrotasks(); |
+ |
+ streamGroup.stream.listen(null); |
+ await flushMicrotasks(); |
+ }); |
+ |
+ test("drops events from a single-subscription stream while dormant", |
+ () async { |
+ var events = []; |
+ var subscription = streamGroup.stream.listen(events.add); |
+ |
+ var controller = new StreamController<String>(); |
+ streamGroup.add(controller.stream); |
+ await flushMicrotasks(); |
+ |
+ controller.add("first"); |
+ await flushMicrotasks(); |
+ expect(events, equals(["first"])); |
+ |
+ subscription.cancel(); |
+ controller.add("second"); |
+ await flushMicrotasks(); |
+ expect(events, equals(["first"])); |
+ |
+ streamGroup.stream.listen(events.add); |
+ controller.add("third"); |
+ await flushMicrotasks(); |
+ expect(events, equals(["first", "third"])); |
+ }); |
+ |
+ test("a single-subscription stream can be removed while dormant", () async { |
+ var controller = new StreamController<String>(); |
+ streamGroup.add(controller.stream); |
+ await flushMicrotasks(); |
+ |
+ streamGroup.stream.listen(null).cancel(); |
+ await flushMicrotasks(); |
+ |
+ streamGroup.remove(controller.stream); |
+ expect(controller.hasListener, isFalse); |
+ await flushMicrotasks(); |
+ |
+ expect(streamGroup.stream.toList(), completion(isEmpty)); |
+ controller.add("first"); |
+ expect(streamGroup.close(), completes); |
+ }); |
+ }); |
+ |
+ group("regardless of type", () { |
+ group("single-subscription", () { |
+ regardlessOfType(() => new StreamGroup<String>()); |
+ }); |
+ |
+ group("broadcast", () { |
+ regardlessOfType(() => new StreamGroup<String>.broadcast()); |
+ }); |
+ }); |
+ |
+ test("merge() emits events from all components streams", () { |
+ var controller1 = new StreamController<String>(); |
+ var controller2 = new StreamController<String>(); |
+ |
+ var merged = StreamGroup.merge([controller1.stream, controller2.stream]); |
+ |
+ controller1.add("first"); |
+ controller1.close(); |
+ controller2.add("second"); |
+ controller2.close(); |
+ |
+ expect(merged.toList(), completion(unorderedEquals(["first", "second"]))); |
+ }); |
+} |
+ |
+void regardlessOfType(StreamGroup<String> newStreamGroup()) { |
+ var streamGroup; |
+ setUp(() { |
+ streamGroup = newStreamGroup(); |
+ }); |
+ |
+ group("add()", () { |
+ group("while dormant", () { |
+ test("doesn't listen to the stream until the group is listened to", |
+ () async { |
+ var controller = new StreamController<String>(); |
+ |
+ expect(streamGroup.add(controller.stream), isNull); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ |
+ streamGroup.stream.listen(null); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ }); |
+ |
+ test("is a no-op if the stream is already in the group", () { |
+ var controller = new StreamController<String>(); |
+ streamGroup.add(controller.stream); |
+ streamGroup.add(controller.stream); |
+ streamGroup.add(controller.stream); |
+ |
+ // If the stream was actually listened to multiple times, this would |
+ // throw a StateError. |
+ streamGroup.stream.listen(null); |
+ }); |
+ }); |
+ |
+ group("while active", () { |
+ setUp(() async { |
+ streamGroup.stream.listen(null); |
+ await flushMicrotasks(); |
+ }); |
+ |
+ test("listens to the stream immediately", () async { |
+ var controller = new StreamController<String>(); |
+ |
+ expect(streamGroup.add(controller.stream), isNull); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ }); |
+ |
+ test("is a no-op if the stream is already in the group", () async { |
+ var controller = new StreamController<String>(); |
+ |
+ // If the stream were actually listened to more than once, future |
+ // calls to [add] would throw [StateError]s. |
+ streamGroup.add(controller.stream); |
+ streamGroup.add(controller.stream); |
+ streamGroup.add(controller.stream); |
+ }); |
+ }); |
+ }); |
+ |
+ group("remove()", () { |
+ group("while dormant", () { |
+ test("stops emitting events for a stream that's removed", () async { |
+ var controller = new StreamController<String>(); |
+ streamGroup.add(controller.stream); |
+ |
+ expect(streamGroup.stream.toList(), completion(equals(["first"]))); |
+ |
+ controller.add("first"); |
+ await flushMicrotasks(); |
+ controller.add("second"); |
+ |
+ expect(streamGroup.remove(controller.stream), isNull); |
+ expect(streamGroup.close(), completes); |
+ }); |
+ |
+ test("is a no-op for an unknown stream", () { |
+ var controller = new StreamController<String>(); |
+ expect(streamGroup.remove(controller.stream), isNull); |
+ }); |
+ |
+ test("and closed closes the group when the last stream is removed", |
+ () async { |
+ var controller1 = new StreamController<String>(); |
+ var controller2 = new StreamController<String>(); |
+ |
+ streamGroup.add(controller1.stream); |
+ streamGroup.add(controller2.stream); |
+ await flushMicrotasks(); |
+ |
+ streamGroup.close(); |
+ |
+ streamGroup.remove(controller1.stream); |
+ await flushMicrotasks(); |
+ |
+ streamGroup.remove(controller2.stream); |
+ await flushMicrotasks(); |
+ |
+ expect(streamGroup.stream.toList(), completion(isEmpty)); |
+ }); |
+ }); |
+ |
+ group("while listening", () { |
+ test("doesn't emit events from a removed stream", () { |
+ var controller = new StreamController<String>(); |
+ streamGroup.add(controller.stream); |
+ |
+ // The subscription to [controller.stream] is canceled synchronously, so |
+ // the first event is dropped even though it was added before the |
+ // removal. This is documented in [StreamGroup.remove]. |
+ expect(streamGroup.stream.toList(), completion(isEmpty)); |
+ |
+ controller.add("first"); |
+ expect(streamGroup.remove(controller.stream), isNull); |
+ controller.add("second"); |
+ |
+ expect(streamGroup.close(), completes); |
+ }); |
+ |
+ test("cancels the stream's subscription", () async { |
+ var controller = new StreamController<String>(); |
+ streamGroup.add(controller.stream); |
+ |
+ streamGroup.stream.listen(null); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ streamGroup.remove(controller.stream); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+ |
+ test("forwards cancel errors", () async { |
+ var controller = new StreamController<String>( |
+ onCancel: () => throw "error"); |
+ streamGroup.add(controller.stream); |
+ |
+ streamGroup.stream.listen(null); |
+ await flushMicrotasks(); |
+ |
+ expect(streamGroup.remove(controller.stream), throwsA("error")); |
+ }); |
+ |
+ test("forwards cancel futures", () async { |
+ var completer = new Completer(); |
+ var controller = new StreamController<String>( |
+ onCancel: () => completer.future); |
+ |
+ streamGroup.stream.listen(null); |
+ await flushMicrotasks(); |
+ |
+ streamGroup.add(controller.stream); |
+ await flushMicrotasks(); |
+ |
+ var fired = false; |
+ streamGroup.remove(controller.stream).then((_) => fired = true); |
+ |
+ await flushMicrotasks(); |
+ expect(fired, isFalse); |
+ |
+ completer.complete(); |
+ await flushMicrotasks(); |
+ expect(fired, isTrue); |
+ }); |
+ |
+ test("is a no-op for an unknown stream", () async { |
+ var controller = new StreamController<String>(); |
+ streamGroup.stream.listen(null); |
+ await flushMicrotasks(); |
+ |
+ expect(streamGroup.remove(controller.stream), isNull); |
+ }); |
+ |
+ test("and closed closes the group when the last stream is removed", |
+ () async { |
+ var done = false; |
+ streamGroup.stream.listen(null, onDone: () => done = true); |
+ await flushMicrotasks(); |
+ |
+ var controller1 = new StreamController<String>(); |
+ var controller2 = new StreamController<String>(); |
+ |
+ streamGroup.add(controller1.stream); |
+ streamGroup.add(controller2.stream); |
+ await flushMicrotasks(); |
+ |
+ streamGroup.close(); |
+ |
+ streamGroup.remove(controller1.stream); |
+ await flushMicrotasks(); |
+ expect(done, isFalse); |
+ |
+ streamGroup.remove(controller2.stream); |
+ await flushMicrotasks(); |
+ expect(done, isTrue); |
+ }); |
+ }); |
+ }); |
+ |
+ group("close()", () { |
+ group("while dormant", () { |
+ test("if there are no streams, closes the group", () { |
+ expect(streamGroup.close(), completes); |
+ expect(streamGroup.stream.toList(), completion(isEmpty)); |
+ }); |
+ |
+ test("if there are streams, closes the group once those streams close " |
+ "and there's a listener", () async { |
+ var controller1 = new StreamController<String>(); |
+ var controller2 = new StreamController<String>(); |
+ |
+ streamGroup.add(controller1.stream); |
+ streamGroup.add(controller2.stream); |
+ await flushMicrotasks(); |
+ |
+ streamGroup.close(); |
+ |
+ controller1.close(); |
+ controller2.close(); |
+ expect(streamGroup.stream.toList(), completion(isEmpty)); |
+ }); |
+ }); |
+ |
+ group("while active", () { |
+ test("if there are no streams, closes the group", () { |
+ expect(streamGroup.stream.toList(), completion(isEmpty)); |
+ expect(streamGroup.close(), completes); |
+ }); |
+ |
+ test("if there are streams, closes the group once those streams close", |
+ () async { |
+ var done = false; |
+ streamGroup.stream.listen(null, onDone: () => done = true); |
+ await flushMicrotasks(); |
+ |
+ var controller1 = new StreamController<String>(); |
+ var controller2 = new StreamController<String>(); |
+ |
+ streamGroup.add(controller1.stream); |
+ streamGroup.add(controller2.stream); |
+ await flushMicrotasks(); |
+ |
+ streamGroup.close(); |
+ await flushMicrotasks(); |
+ expect(done, isFalse); |
+ |
+ controller1.close(); |
+ await flushMicrotasks(); |
+ expect(done, isFalse); |
+ |
+ controller2.close(); |
+ await flushMicrotasks(); |
+ expect(done, isTrue); |
+ }); |
+ }); |
+ |
+ test("returns a Future that completes once all events are dispatched", |
+ () async { |
+ var events = []; |
+ streamGroup.stream.listen(events.add); |
+ |
+ var controller = new StreamController<String>(); |
+ streamGroup.add(controller.stream); |
+ await flushMicrotasks(); |
+ |
+ // Add a bunch of events. Each of these will get dispatched in a |
+ // separate microtask, so we can test that [close] only completes once |
+ // all of them have dispatched. |
+ controller.add("one"); |
+ controller.add("two"); |
+ controller.add("three"); |
+ controller.add("four"); |
+ controller.add("five"); |
+ controller.add("six"); |
+ controller.close(); |
+ |
+ await streamGroup.close(); |
+ expect(events, equals(["one", "two", "three", "four", "five", "six"])); |
+ }); |
+ }); |
+} |
+ |
+/// Wait for all microtasks to complete. |
+Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |