Index: test/subscription_stream_test.dart |
diff --git a/test/subscription_stream_test.dart b/test/subscription_stream_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..29de52c0ecdf7e172a560446d13d59bf99d825ee |
--- /dev/null |
+++ b/test/subscription_stream_test.dart |
@@ -0,0 +1,134 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library async.subscription_stream_test; |
+ |
+import "dart:async"; |
+ |
+import "package:async/async.dart" show SubscriptionStream; |
+import "package:test/test.dart"; |
ahe
2015/06/08 16:55:16
This most likely makes this a really slow test to
Lasse Reichstein Nielsen
2015/06/09 07:33:53
If the test package is slow on dart2js, then it sh
|
+ |
+main() { |
+ test("simple", () async { |
+ var stream = createStream(); |
+ var sub = stream.listen(null); |
+ var ss = new SubscriptionStream<int>(sub); |
+ await sleep(10); |
+ expect(ss.toList(), completion([1, 2, 3, 4])); |
+ }); |
+ |
+ test("after 2", () async { |
+ var stream = createStream(); |
+ int skips = 0; |
+ var c = new Completer(); |
+ var sub; |
+ sub = stream.listen((v) { |
+ ++skips; |
+ expect(v, skips); |
+ if (skips == 2) c.complete(new SubscriptionStream<int>(sub)); |
+ }); |
+ Stream<int> ss = await c.future; |
+ await sleep(10); |
+ expect(ss.toList(), completion([3, 4])); |
+ }); |
+ |
+ group("cancelOnError", () { |
+ for (var original in [false, true]) { |
+ group(original ? "yes" : "no", () { |
+ test("no", () async { |
+ var stream = createErrorStream(); |
+ var sub = stream.listen(null, cancelOnError: original); |
+ var ss = new SubscriptionStream(sub, isCancelOnError: original); |
+ var c = new Completer(); |
+ var l = []; |
+ var sub2 = ss.listen(l.add, onError: l.add, onDone: c.complete); |
+ await c.future; |
+ var expected = [1, 2, "To err is divine!"]; |
+ if (!original) expected.add(4); |
+ expect(l, expected); |
+ }); |
+ |
+ test("yes", () async { |
+ var stream = createErrorStream(); |
+ var sub = stream.listen(null, cancelOnError: original); |
+ var ss = new SubscriptionStream(sub, isCancelOnError: original); |
+ var c = new Completer(); |
+ var l = []; |
+ var sub2 = ss.listen(l.add, |
+ onError: (v) { l.add(v); c.complete(); }, |
+ onDone: () => throw "should not happen", |
+ cancelOnError: true); |
+ await c.future; |
+ await sleep(10); |
+ expect(l, [1, 2, "To err is divine!"]); |
+ }); |
+ }); |
+ |
+ test("no-asFuture-noerr", () async { |
+ var stream = createStream(); |
+ var sub = stream.listen(null, cancelOnError: original); |
+ var ss = new SubscriptionStream(sub, isCancelOnError: original); |
+ var sub2 = ss.listen(null); |
+ expect(sub2.asFuture(42), completion(42)); |
+ }); |
+ |
+ test("no-asFuture-err", () async { |
+ var stream = createErrorStream(); |
+ var sub = stream.listen(null, cancelOnError: original); |
+ var ss = new SubscriptionStream(sub, isCancelOnError: original); |
+ var sub2 = ss.listen(null); |
+ expect(sub2.asFuture(), throws); |
+ }); |
+ |
+ test("yes-asFuture-noerr", () async { |
+ var stream = createStream(); |
+ var sub = stream.listen(null, cancelOnError: original); |
+ var ss = new SubscriptionStream(sub, isCancelOnError: original); |
+ var sub2 = ss.listen(null, cancelOnError: true); |
+ expect(sub2.asFuture(42), completion(42)); |
+ }); |
+ |
+ test("yes-asFuture-err", () async { |
+ var stream = createErrorStream(); |
+ var sub = stream.listen(null, cancelOnError: original); |
+ var ss = new SubscriptionStream(sub, isCancelOnError: original); |
+ var sub2 = ss.listen(null, cancelOnError: true); |
+ expect(sub2.asFuture(), throws); |
+ }); |
+ } |
+ }); |
+} |
+ |
+const MS = const Duration(milliseconds: 1); |
+Future sleep(int n) => new Future.delayed(MS * n); |
+ |
+Stream<int> createStream() async* { |
+ yield 1; |
+ await sleep(20); |
+ yield 2; |
+ await sleep(10); |
+ yield 3; |
+ await sleep(15); |
+ yield 4; |
+} |
+ |
+Stream<int> createErrorStream() { |
+ StreamController controller = new StreamController<int>(); |
+ () async { |
+ controller.add(1); |
+ await sleep(20); |
+ controller.add(2); |
+ await sleep(10); |
+ controller.addError("To err is divine!"); |
+ await sleep(15); |
+ controller.add(4); |
+ await sleep(5); |
+ controller.close(); |
+ }(); |
+ return controller.stream; |
+} |
+ |
+Stream<int> createLongStream() async* { |
+ for (int i = 0; i < 200; i++) yield i; |
+} |