Index: test/subscription_stream_test.dart |
diff --git a/test/subscription_stream_test.dart b/test/subscription_stream_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..390c95a14d987238a2318290533d91895844983b |
--- /dev/null |
+++ b/test/subscription_stream_test.dart |
@@ -0,0 +1,215 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import "dart:async"; |
+ |
+import "package:async/async.dart" show SubscriptionStream; |
+import "package:test/test.dart"; |
+ |
+main() { |
+ test("subscription stream of an entire subscription", () async { |
+ var stream = createStream(); |
+ var subscription = stream.listen(null); |
+ var subscriptionStream = new SubscriptionStream<int>(subscription); |
+ await flushMicrotasks(); |
+ expect(subscriptionStream.toList(), completion([1, 2, 3, 4])); |
+ }); |
+ |
+ test("subscription stream after two events", () async { |
+ var stream = createStream(); |
+ int skips = 0; |
+ var c = new Completer(); |
nweiz
2015/06/18 23:44:28
"c" -> "completer"
|
+ var sub; |
+ sub = stream.listen((v) { |
+ ++skips; |
+ expect(v, skips); |
+ if (skips == 2) c.complete(new SubscriptionStream<int>(sub)); |
+ }); |
+ Stream<int> ss = await c.future; |
nweiz
2015/06/18 23:44:28
"ss" -> "stream" or "subscriptionStream"
Lasse Reichstein Nielsen
2015/06/30 10:34:15
Done.
|
+ await flushMicrotasks(); |
+ expect(ss.toList(), completion([3, 4])); |
+ }); |
+ |
+ test("listening twice fails", () async { |
+ var stream = createStream(); |
+ var sourceSubscription = stream.listen(null); |
+ var subscriptionStream = new SubscriptionStream<int>(sourceSubscription); |
+ var subscription = subscriptionStream.listen(null); |
+ expect(() => subscriptionStream.listen(null), throws); |
+ await subscription.cancel(); |
+ }); |
+ |
+ test("pause and cancel passed through to original stream", () async { |
+ var controller = new StreamController(onCancel: () async => 42); |
+ var sourceSubscription = controller.stream.listen(null); |
+ var subscriptionStream = new SubscriptionStream(sourceSubscription); |
+ expect(controller.isPaused, isTrue); |
+ var lastEvent; |
+ var subscription = subscriptionStream.listen((v) { lastEvent = v; }); |
+ controller.add(1); |
+ await flushMicrotasks(); |
+ expect(lastEvent, 1); |
+ expect(controller.isPaused, isFalse); |
+ subscription.pause(); |
+ expect(controller.isPaused, isTrue); |
+ subscription.resume(); |
+ expect(controller.isPaused, isFalse); |
+ expect(await subscription.cancel(), 42); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+ |
+ group("cancelOnError behavior", () { |
+ for (var original in [false, true]) { |
+ group("source/new subscription: ${original ? "yes" : "no"}", () { |
+ test("no", () async { |
+ var stream = createErrorStream(); |
+ var sourceSubscription = stream.listen(null, cancelOnError: original); |
+ var subscriptionStream = |
+ new SubscriptionStream(sourceSubscription, |
+ isCancelOnError: original); |
+ var done = new Completer(); |
+ var events = []; |
+ var subscription = subscriptionStream.listen(events.add, |
+ onError: events.add, |
+ onDone: done.complete); |
+ await done.future; |
+ var expected = [1, 2, "To err is divine!"]; |
+ // If neither subscription is cancelOnError, the fourth event |
+ // goes through. |
+ if (!original) expected.add(4); |
+ expect(events, expected); |
+ }); |
+ |
+ test("yes", () async { |
+ var stream = createErrorStream(); |
+ var sourceSubscription = stream.listen(null, cancelOnError: original); |
+ var subscriptionStream = |
+ new SubscriptionStream(sourceSubscription, |
+ isCancelOnError: original); |
+ var completer = new Completer(); |
+ var events = []; |
+ subscriptionStream.listen(events.add, |
+ onError: (v) { |
+ events.add(v); |
+ completer.complete(); |
+ }, |
+ onDone: () => throw "should not happen", |
+ cancelOnError: true); |
+ await completer.future; |
+ await flushMicrotasks(); |
+ expect(events, [1, 2, "To err is divine!"]); |
+ }); |
+ }); |
+ |
+ for (var cancelOnError in [false, true]) { |
+ group(cancelOnError ? "yes" : "no", () { |
+ test("- no error, value goes to asFuture", () async { |
+ var stream = createStream(); |
+ var sourceSubscription = |
+ stream.listen(null, cancelOnError: original); |
+ var subscriptionStream = |
+ new SubscriptionStream(sourceSubscription, |
+ isCancelOnError: original); |
+ var subscription = |
+ subscriptionStream.listen(null, cancelOnError: cancelOnError); |
+ expect(subscription.asFuture(42), completion(42)); |
+ }); |
+ |
+ test("- error goes to asFuture", () async { |
+ var stream = createErrorStream(); |
+ var sourceSubscription = stream.listen(null, |
+ cancelOnError: original); |
+ var subscriptionStream = |
+ new SubscriptionStream(sourceSubscription, |
+ isCancelOnError: original); |
+ var subscription = |
+ subscriptionStream.listen(null, cancelOnError: cancelOnError); |
+ expect(subscription.asFuture(), throws); |
+ }); |
+ }); |
+ } |
+ } |
+ |
+ test("mislabeled cancelOnError:false source", () async { |
+ var controller = new StreamController(); |
+ var sourceSubscription = |
+ controller.stream.listen(null, cancelOnError: false); |
+ var subscriptionStream = |
+ new SubscriptionStream(sourceSubscription, |
+ isCancelOnError: true); |
+ var lastEvent; |
+ var subscription = subscriptionStream.listen( |
+ (v) { lastEvent = v; }, |
+ onError: (v) { lastEvent = v; }, |
+ onDone: () { throw "unreachable"; }, |
+ cancelOnError: true); |
+ controller.add(1); |
+ await flushMicrotasks(); |
+ expect(lastEvent, 1); |
+ controller.addError(2); |
+ await flushMicrotasks(); |
+ expect(lastEvent, 2); |
+ expect(controller.hasListener, true); // Not canceled! |
+ controller.addError(3); |
+ await flushMicrotasks(); |
+ // This is the badness you get when passing `true` incorrectly! |
+ expect(lastEvent, 3); |
+ }); |
+ |
+ test("mislabeled cancelOnError:true source", () async { |
+ var controller = new StreamController(); |
+ var sourceSubscription = |
+ controller.stream.listen(null, cancelOnError: true); |
+ var subscriptionStream = |
+ new SubscriptionStream(sourceSubscription, |
+ isCancelOnError: false); |
+ var lastEvent; |
+ var subscription = subscriptionStream.listen( |
+ (v) { lastEvent = v; }, |
+ onError: (v) { lastEvent = v; }, |
+ onDone: () { throw "unreachable"; }, |
+ cancelOnError: false); |
+ controller.add(1); |
+ await flushMicrotasks(); |
+ expect(lastEvent, 1); |
+ controller.addError(2); |
+ await flushMicrotasks(); |
+ expect(lastEvent, 2); |
+ expect(controller.hasListener, false); // Canceled! |
+ // This is slightly wrong, but safe. |
+ }); |
+ }); |
+} |
+ |
+Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |
+ |
+Stream<int> createStream() async* { |
+ yield 1; |
+ await flushMicrotasks(); |
+ yield 2; |
+ await flushMicrotasks(); |
+ yield 3; |
+ await flushMicrotasks(); |
+ yield 4; |
+} |
+ |
+Stream<int> createErrorStream() { |
+ StreamController controller = new StreamController<int>(); |
+ () async { |
+ controller.add(1); |
+ await flushMicrotasks(); |
+ controller.add(2); |
+ await flushMicrotasks(); |
+ controller.addError("To err is divine!"); |
+ await flushMicrotasks(); |
+ controller.add(4); |
+ await flushMicrotasks(); |
+ controller.close(); |
+ }(); |
+ return controller.stream; |
+} |
+ |
+Stream<int> createLongStream() async* { |
+ for (int i = 0; i < 200; i++) yield i; |
+} |