Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(406)

Unified Diff: packages/barback/test/stream_pool_test.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « packages/barback/test/static_provider_test.dart ('k') | packages/barback/test/stream_replayer_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: packages/barback/test/stream_pool_test.dart
diff --git a/packages/barback/test/stream_pool_test.dart b/packages/barback/test/stream_pool_test.dart
new file mode 100644
index 0000000000000000000000000000000000000000..ed590f3a9f9617a587e52cb58998c1f0a9ad728b
--- /dev/null
+++ b/packages/barback/test/stream_pool_test.dart
@@ -0,0 +1,219 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library barback.test.stream_pool_test;
+
+import 'dart:async';
+
+import 'package:barback/src/utils.dart';
+import 'package:barback/src/utils/stream_pool.dart';
+import 'package:scheduled_test/scheduled_test.dart';
+
+import 'utils.dart';
+
+main() {
+ initConfig();
+
+ group("buffered", () {
+ test("buffers events from multiple inputs", () {
+ var pool = new StreamPool<String>();
+
+ var controller1 = new StreamController<String>();
+ pool.add(controller1.stream);
+ controller1.add("first");
+
+ var controller2 = new StreamController<String>();
+ pool.add(controller2.stream);
+ controller2.add("second");
+
+ // Call [toList] asynchronously to be sure that the events have been
+ // buffered beforehand and aren't just being received unbuffered.
+ expect(newFuture(() => pool.stream.toList()),
+ completion(equals(["first", "second"])));
+
+ pumpEventQueue().then((_) => pool.close());
+ });
+
+ test("buffers errors from multiple inputs", () {
+ var pool = new StreamPool<String>();
+
+ var controller1 = new StreamController<String>();
+ pool.add(controller1.stream);
+ controller1.add("first");
+
+ var controller2 = new StreamController<String>();
+ pool.add(controller2.stream);
+ controller2.add("second");
+ controller1.addError("third");
+ controller2.addError("fourth");
+ controller1.add("fifth");
+
+ expect(newFuture(() {
+ return pool.stream.transform(new StreamTransformer.fromHandlers(
+ handleData: (data, sink) => sink.add(["data", data]),
+ handleError: (error, stackTrace, sink) {
+ sink.add(["error", error]);
+ })).toList();
+ }), completion(equals([
+ ["data", "first"],
+ ["data", "second"],
+ ["error", "third"],
+ ["error", "fourth"],
+ ["data", "fifth"]
+ ])));
+
+ pumpEventQueue().then((_) => pool.close());
+ });
+
+ test("buffers inputs from a broadcast stream", () {
+ var pool = new StreamPool<String>();
+ var controller = new StreamController<String>.broadcast();
+ pool.add(controller.stream);
+ controller.add("first");
+ controller.add("second");
+
+ // Call [toList] asynchronously to be sure that the events have been
+ // buffered beforehand and aren't just being received unbuffered.
+ expect(newFuture(() => pool.stream.toList()),
+ completion(equals(["first", "second"])));
+
+ pumpEventQueue().then((_) => pool.close());
+ });
+ });
+
+ group("broadcast", () {
+ test("doesn't buffer inputs", () {
+ var pool = new StreamPool<String>.broadcast();
+
+ var controller1 = new StreamController<String>.broadcast();
+ pool.add(controller1.stream);
+ controller1.add("first");
+
+ var controller2 = new StreamController<String>.broadcast();
+ pool.add(controller2.stream);
+ controller2.add("second");
+
+ // Call [toList] asynchronously to be sure that the events have been
+ // buffered beforehand and aren't just being received unbuffered.
+ expect(newFuture(() => pool.stream.toList()), completion(isEmpty));
+
+ pumpEventQueue().then((_) => pool.close());
+ });
+
+ test("doesn't buffer errors", () {
+ var pool = new StreamPool<String>.broadcast();
+
+ var controller1 = new StreamController<String>.broadcast();
+ pool.add(controller1.stream);
+ controller1.addError("first");
+
+ var controller2 = new StreamController<String>.broadcast();
+ pool.add(controller2.stream);
+ controller2.addError("second");
+
+ expect(newFuture(() {
+ return pool.stream.transform(new StreamTransformer.fromHandlers(
+ handleData: (data, sink) => sink.add(data),
+ handleError: (error, stackTrace, sink) { sink.add(error); }))
+ .toList();
+ }), completion(isEmpty));
+
+ pumpEventQueue().then((_) => pool.close());
+ });
+
+ test("doesn't buffer inputs from a buffered stream", () {
+ var pool = new StreamPool<String>.broadcast();
+ var controller = new StreamController<String>();
+ pool.add(controller.stream);
+ controller.add("first");
+ controller.add("second");
+
+ expect(pumpEventQueue().then((_) => pool.stream.toList()),
+ completion(isEmpty));
+
+ pumpEventQueue().then((_) => pool.close());
+ });
+ });
+
+ for (var type in ["buffered", "broadcast"]) {
+ group(type, () {
+ var pool;
+ var bufferedController;
+ var bufferedStream;
+ var bufferedSyncController;
+ var broadcastController;
+ var broadcastStream;
+ var broadcastSyncController;
+
+ setUp(() {
+ if (type == "buffered") {
+ pool = new StreamPool<String>();
+ } else {
+ pool = new StreamPool<String>.broadcast();
+ }
+
+ bufferedController = new StreamController<String>();
+ pool.add(bufferedController.stream);
+
+ bufferedSyncController = new StreamController<String>(sync: true);
+ pool.add(bufferedSyncController.stream);
+
+ broadcastController = new StreamController<String>.broadcast();
+ pool.add(broadcastController.stream);
+
+ broadcastSyncController =
+ new StreamController<String>.broadcast(sync: true);
+ pool.add(broadcastSyncController.stream);
+ });
+
+ test("emits events to a listener", () {
+ expect(pool.stream.toList(), completion(equals(["first", "second"])));
+
+ bufferedController.add("first");
+ broadcastController.add("second");
+ pumpEventQueue().then((_) => pool.close());
+ });
+
+ test("emits sync events synchronously", () {
+ var events = [];
+ pool.stream.listen(events.add);
+
+ bufferedSyncController.add("first");
+ expect(events, equals(["first"]));
+
+ broadcastSyncController.add("second");
+ expect(events, equals(["first", "second"]));
+ });
+
+ test("emits async events asynchronously", () {
+ var events = [];
+ pool.stream.listen(events.add);
+
+ bufferedController.add("first");
+ broadcastController.add("second");
+ expect(events, isEmpty);
+
+ expect(pumpEventQueue().then((_) => events),
+ completion(equals(["first", "second"])));
+ });
+
+ test("doesn't emit events from removed streams", () {
+ expect(pool.stream.toList(), completion(equals(["first", "third"])));
+
+ bufferedController.add("first");
+ expect(pumpEventQueue().then((_) {
+ pool.remove(bufferedController.stream);
+ bufferedController.add("second");
+ }).then((_) {
+ broadcastController.add("third");
+ return pumpEventQueue();
+ }).then((_) {
+ pool.remove(broadcastController.stream);
+ broadcastController.add("fourth");
+ pool.close();
+ }), completes);
+ });
+ });
+ }
+}
« no previous file with comments | « packages/barback/test/static_provider_test.dart ('k') | packages/barback/test/stream_replayer_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698