Chromium Code Reviews| Index: test/subscription_stream_test.dart |
| diff --git a/test/subscription_stream_test.dart b/test/subscription_stream_test.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..29de52c0ecdf7e172a560446d13d59bf99d825ee |
| --- /dev/null |
| +++ b/test/subscription_stream_test.dart |
| @@ -0,0 +1,134 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.subscription_stream_test; |
| + |
| +import "dart:async"; |
| + |
| +import "package:async/async.dart" show SubscriptionStream; |
| +import "package:test/test.dart"; |
| + |
| +main() { |
| + test("simple", () async { |
| + var stream = createStream(); |
| + var sub = stream.listen(null); |
| + var ss = new SubscriptionStream<int>(sub); |
| + await sleep(10); |
| + expect(ss.toList(), completion([1, 2, 3, 4])); |
| + }); |
| + |
| + test("after 2", () async { |
| + var stream = createStream(); |
| + int skips = 0; |
| + var c = new Completer(); |
| + var sub; |
| + sub = stream.listen((v) { |
| + ++skips; |
| + expect(v, skips); |
| + if (skips == 2) c.complete(new SubscriptionStream<int>(sub)); |
| + }); |
| + Stream<int> ss = await c.future; |
| + await sleep(10); |
| + expect(ss.toList(), completion([3, 4])); |
| + }); |
|
nweiz
2015/06/12 01:24:28
Also test that pausing and canceling forwards prop
Lasse Reichstein Nielsen
2015/06/15 15:46:25
Done.
|
| + |
| + group("cancelOnError", () { |
|
nweiz
2015/06/12 01:24:28
The behavior when the [cancelOnError] values don't
Lasse Reichstein Nielsen
2015/06/15 15:46:25
The onDone w/ both cancelOnErrors true is handled
|
| + for (var original in [false, true]) { |
| + group(original ? "yes" : "no", () { |
| + test("no", () async { |
| + var stream = createErrorStream(); |
| + var sub = stream.listen(null, cancelOnError: original); |
| + var ss = new SubscriptionStream(sub, isCancelOnError: original); |
| + var c = new Completer(); |
| + var l = []; |
| + var sub2 = ss.listen(l.add, onError: l.add, onDone: c.complete); |
| + await c.future; |
| + var expected = [1, 2, "To err is divine!"]; |
| + if (!original) expected.add(4); |
| + expect(l, expected); |
| + }); |
| + |
| + test("yes", () async { |
| + var stream = createErrorStream(); |
| + var sub = stream.listen(null, cancelOnError: original); |
| + var ss = new SubscriptionStream(sub, isCancelOnError: original); |
| + var c = new Completer(); |
| + var l = []; |
| + var sub2 = ss.listen(l.add, |
| + onError: (v) { l.add(v); c.complete(); }, |
| + onDone: () => throw "should not happen", |
| + cancelOnError: true); |
| + await c.future; |
| + await sleep(10); |
| + expect(l, [1, 2, "To err is divine!"]); |
| + }); |
| + }); |
| + |
| + test("no-asFuture-noerr", () async { |
| + var stream = createStream(); |
| + var sub = stream.listen(null, cancelOnError: original); |
| + var ss = new SubscriptionStream(sub, isCancelOnError: original); |
| + var sub2 = ss.listen(null); |
| + expect(sub2.asFuture(42), completion(42)); |
| + }); |
| + |
| + test("no-asFuture-err", () async { |
| + var stream = createErrorStream(); |
| + var sub = stream.listen(null, cancelOnError: original); |
| + var ss = new SubscriptionStream(sub, isCancelOnError: original); |
| + var sub2 = ss.listen(null); |
| + expect(sub2.asFuture(), throws); |
| + }); |
| + |
| + test("yes-asFuture-noerr", () async { |
| + var stream = createStream(); |
| + var sub = stream.listen(null, cancelOnError: original); |
| + var ss = new SubscriptionStream(sub, isCancelOnError: original); |
| + var sub2 = ss.listen(null, cancelOnError: true); |
| + expect(sub2.asFuture(42), completion(42)); |
| + }); |
| + |
| + test("yes-asFuture-err", () async { |
| + var stream = createErrorStream(); |
| + var sub = stream.listen(null, cancelOnError: original); |
| + var ss = new SubscriptionStream(sub, isCancelOnError: original); |
| + var sub2 = ss.listen(null, cancelOnError: true); |
| + expect(sub2.asFuture(), throws); |
| + }); |
| + } |
| + }); |
| +} |
| + |
| +const MS = const Duration(milliseconds: 1); |
| +Future sleep(int n) => new Future.delayed(MS * n); |
| + |
| +Stream<int> createStream() async* { |
| + yield 1; |
| + await sleep(20); |
| + yield 2; |
| + await sleep(10); |
| + yield 3; |
| + await sleep(15); |
| + yield 4; |
| +} |
| + |
| +Stream<int> createErrorStream() { |
| + StreamController controller = new StreamController<int>(); |
| + () async { |
| + controller.add(1); |
| + await sleep(20); |
| + controller.add(2); |
| + await sleep(10); |
| + controller.addError("To err is divine!"); |
| + await sleep(15); |
| + controller.add(4); |
| + await sleep(5); |
| + controller.close(); |
| + }(); |
| + return controller.stream; |
| +} |
| + |
| +Stream<int> createLongStream() async* { |
| + for (int i = 0; i < 200; i++) yield i; |
| +} |