Index: packages/barback/test/stream_pool_test.dart |
diff --git a/packages/barback/test/stream_pool_test.dart b/packages/barback/test/stream_pool_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ed590f3a9f9617a587e52cb58998c1f0a9ad728b |
--- /dev/null |
+++ b/packages/barback/test/stream_pool_test.dart |
@@ -0,0 +1,219 @@ |
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library barback.test.stream_pool_test; |
+ |
+import 'dart:async'; |
+ |
+import 'package:barback/src/utils.dart'; |
+import 'package:barback/src/utils/stream_pool.dart'; |
+import 'package:scheduled_test/scheduled_test.dart'; |
+ |
+import 'utils.dart'; |
+ |
+main() { |
+ initConfig(); |
+ |
+ group("buffered", () { |
+ test("buffers events from multiple inputs", () { |
+ var pool = new StreamPool<String>(); |
+ |
+ var controller1 = new StreamController<String>(); |
+ pool.add(controller1.stream); |
+ controller1.add("first"); |
+ |
+ var controller2 = new StreamController<String>(); |
+ pool.add(controller2.stream); |
+ controller2.add("second"); |
+ |
+ // Call [toList] asynchronously to be sure that the events have been |
+ // buffered beforehand and aren't just being received unbuffered. |
+ expect(newFuture(() => pool.stream.toList()), |
+ completion(equals(["first", "second"]))); |
+ |
+ pumpEventQueue().then((_) => pool.close()); |
+ }); |
+ |
+ test("buffers errors from multiple inputs", () { |
+ var pool = new StreamPool<String>(); |
+ |
+ var controller1 = new StreamController<String>(); |
+ pool.add(controller1.stream); |
+ controller1.add("first"); |
+ |
+ var controller2 = new StreamController<String>(); |
+ pool.add(controller2.stream); |
+ controller2.add("second"); |
+ controller1.addError("third"); |
+ controller2.addError("fourth"); |
+ controller1.add("fifth"); |
+ |
+ expect(newFuture(() { |
+ return pool.stream.transform(new StreamTransformer.fromHandlers( |
+ handleData: (data, sink) => sink.add(["data", data]), |
+ handleError: (error, stackTrace, sink) { |
+ sink.add(["error", error]); |
+ })).toList(); |
+ }), completion(equals([ |
+ ["data", "first"], |
+ ["data", "second"], |
+ ["error", "third"], |
+ ["error", "fourth"], |
+ ["data", "fifth"] |
+ ]))); |
+ |
+ pumpEventQueue().then((_) => pool.close()); |
+ }); |
+ |
+ test("buffers inputs from a broadcast stream", () { |
+ var pool = new StreamPool<String>(); |
+ var controller = new StreamController<String>.broadcast(); |
+ pool.add(controller.stream); |
+ controller.add("first"); |
+ controller.add("second"); |
+ |
+ // Call [toList] asynchronously to be sure that the events have been |
+ // buffered beforehand and aren't just being received unbuffered. |
+ expect(newFuture(() => pool.stream.toList()), |
+ completion(equals(["first", "second"]))); |
+ |
+ pumpEventQueue().then((_) => pool.close()); |
+ }); |
+ }); |
+ |
+ group("broadcast", () { |
+ test("doesn't buffer inputs", () { |
+ var pool = new StreamPool<String>.broadcast(); |
+ |
+ var controller1 = new StreamController<String>.broadcast(); |
+ pool.add(controller1.stream); |
+ controller1.add("first"); |
+ |
+ var controller2 = new StreamController<String>.broadcast(); |
+ pool.add(controller2.stream); |
+ controller2.add("second"); |
+ |
+ // Call [toList] asynchronously to be sure that the events have been |
+ // buffered beforehand and aren't just being received unbuffered. |
+ expect(newFuture(() => pool.stream.toList()), completion(isEmpty)); |
+ |
+ pumpEventQueue().then((_) => pool.close()); |
+ }); |
+ |
+ test("doesn't buffer errors", () { |
+ var pool = new StreamPool<String>.broadcast(); |
+ |
+ var controller1 = new StreamController<String>.broadcast(); |
+ pool.add(controller1.stream); |
+ controller1.addError("first"); |
+ |
+ var controller2 = new StreamController<String>.broadcast(); |
+ pool.add(controller2.stream); |
+ controller2.addError("second"); |
+ |
+ expect(newFuture(() { |
+ return pool.stream.transform(new StreamTransformer.fromHandlers( |
+ handleData: (data, sink) => sink.add(data), |
+ handleError: (error, stackTrace, sink) { sink.add(error); })) |
+ .toList(); |
+ }), completion(isEmpty)); |
+ |
+ pumpEventQueue().then((_) => pool.close()); |
+ }); |
+ |
+ test("doesn't buffer inputs from a buffered stream", () { |
+ var pool = new StreamPool<String>.broadcast(); |
+ var controller = new StreamController<String>(); |
+ pool.add(controller.stream); |
+ controller.add("first"); |
+ controller.add("second"); |
+ |
+ expect(pumpEventQueue().then((_) => pool.stream.toList()), |
+ completion(isEmpty)); |
+ |
+ pumpEventQueue().then((_) => pool.close()); |
+ }); |
+ }); |
+ |
+ for (var type in ["buffered", "broadcast"]) { |
+ group(type, () { |
+ var pool; |
+ var bufferedController; |
+ var bufferedStream; |
+ var bufferedSyncController; |
+ var broadcastController; |
+ var broadcastStream; |
+ var broadcastSyncController; |
+ |
+ setUp(() { |
+ if (type == "buffered") { |
+ pool = new StreamPool<String>(); |
+ } else { |
+ pool = new StreamPool<String>.broadcast(); |
+ } |
+ |
+ bufferedController = new StreamController<String>(); |
+ pool.add(bufferedController.stream); |
+ |
+ bufferedSyncController = new StreamController<String>(sync: true); |
+ pool.add(bufferedSyncController.stream); |
+ |
+ broadcastController = new StreamController<String>.broadcast(); |
+ pool.add(broadcastController.stream); |
+ |
+ broadcastSyncController = |
+ new StreamController<String>.broadcast(sync: true); |
+ pool.add(broadcastSyncController.stream); |
+ }); |
+ |
+ test("emits events to a listener", () { |
+ expect(pool.stream.toList(), completion(equals(["first", "second"]))); |
+ |
+ bufferedController.add("first"); |
+ broadcastController.add("second"); |
+ pumpEventQueue().then((_) => pool.close()); |
+ }); |
+ |
+ test("emits sync events synchronously", () { |
+ var events = []; |
+ pool.stream.listen(events.add); |
+ |
+ bufferedSyncController.add("first"); |
+ expect(events, equals(["first"])); |
+ |
+ broadcastSyncController.add("second"); |
+ expect(events, equals(["first", "second"])); |
+ }); |
+ |
+ test("emits async events asynchronously", () { |
+ var events = []; |
+ pool.stream.listen(events.add); |
+ |
+ bufferedController.add("first"); |
+ broadcastController.add("second"); |
+ expect(events, isEmpty); |
+ |
+ expect(pumpEventQueue().then((_) => events), |
+ completion(equals(["first", "second"]))); |
+ }); |
+ |
+ test("doesn't emit events from removed streams", () { |
+ expect(pool.stream.toList(), completion(equals(["first", "third"]))); |
+ |
+ bufferedController.add("first"); |
+ expect(pumpEventQueue().then((_) { |
+ pool.remove(bufferedController.stream); |
+ bufferedController.add("second"); |
+ }).then((_) { |
+ broadcastController.add("third"); |
+ return pumpEventQueue(); |
+ }).then((_) { |
+ pool.remove(broadcastController.stream); |
+ broadcastController.add("fourth"); |
+ pool.close(); |
+ }), completes); |
+ }); |
+ }); |
+ } |
+} |