Chromium Code Reviews| Index: test/subscription_stream_test.dart |
| diff --git a/test/subscription_stream_test.dart b/test/subscription_stream_test.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..390c95a14d987238a2318290533d91895844983b |
| --- /dev/null |
| +++ b/test/subscription_stream_test.dart |
| @@ -0,0 +1,215 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +import "dart:async"; |
| + |
| +import "package:async/async.dart" show SubscriptionStream; |
| +import "package:test/test.dart"; |
| + |
| +main() { |
| + test("subscription stream of an entire subscription", () async { |
| + var stream = createStream(); |
| + var subscription = stream.listen(null); |
| + var subscriptionStream = new SubscriptionStream<int>(subscription); |
| + await flushMicrotasks(); |
| + expect(subscriptionStream.toList(), completion([1, 2, 3, 4])); |
| + }); |
| + |
| + test("subscription stream after two events", () async { |
| + var stream = createStream(); |
| + int skips = 0; |
| + var c = new Completer(); |
|
nweiz
2015/06/18 23:44:28
"c" -> "completer"
|
| + var sub; |
| + sub = stream.listen((v) { |
| + ++skips; |
| + expect(v, skips); |
| + if (skips == 2) c.complete(new SubscriptionStream<int>(sub)); |
| + }); |
| + Stream<int> ss = await c.future; |
|
nweiz
2015/06/18 23:44:28
"ss" -> "stream" or "subscriptionStream"
Lasse Reichstein Nielsen
2015/06/30 10:34:15
Done.
|
| + await flushMicrotasks(); |
| + expect(ss.toList(), completion([3, 4])); |
| + }); |
| + |
| + test("listening twice fails", () async { |
| + var stream = createStream(); |
| + var sourceSubscription = stream.listen(null); |
| + var subscriptionStream = new SubscriptionStream<int>(sourceSubscription); |
| + var subscription = subscriptionStream.listen(null); |
| + expect(() => subscriptionStream.listen(null), throws); |
| + await subscription.cancel(); |
| + }); |
| + |
| + test("pause and cancel passed through to original stream", () async { |
| + var controller = new StreamController(onCancel: () async => 42); |
| + var sourceSubscription = controller.stream.listen(null); |
| + var subscriptionStream = new SubscriptionStream(sourceSubscription); |
| + expect(controller.isPaused, isTrue); |
| + var lastEvent; |
| + var subscription = subscriptionStream.listen((v) { lastEvent = v; }); |
| + controller.add(1); |
| + await flushMicrotasks(); |
| + expect(lastEvent, 1); |
| + expect(controller.isPaused, isFalse); |
| + subscription.pause(); |
| + expect(controller.isPaused, isTrue); |
| + subscription.resume(); |
| + expect(controller.isPaused, isFalse); |
| + expect(await subscription.cancel(), 42); |
| + expect(controller.hasListener, isFalse); |
| + }); |
| + |
| + group("cancelOnError behavior", () { |
| + for (var original in [false, true]) { |
| + group("source/new subscription: ${original ? "yes" : "no"}", () { |
| + test("no", () async { |
| + var stream = createErrorStream(); |
| + var sourceSubscription = stream.listen(null, cancelOnError: original); |
| + var subscriptionStream = |
| + new SubscriptionStream(sourceSubscription, |
| + isCancelOnError: original); |
| + var done = new Completer(); |
| + var events = []; |
| + var subscription = subscriptionStream.listen(events.add, |
| + onError: events.add, |
| + onDone: done.complete); |
| + await done.future; |
| + var expected = [1, 2, "To err is divine!"]; |
| + // If neither subscription is cancelOnError, the fourth event |
| + // goes through. |
| + if (!original) expected.add(4); |
| + expect(events, expected); |
| + }); |
| + |
| + test("yes", () async { |
| + var stream = createErrorStream(); |
| + var sourceSubscription = stream.listen(null, cancelOnError: original); |
| + var subscriptionStream = |
| + new SubscriptionStream(sourceSubscription, |
| + isCancelOnError: original); |
| + var completer = new Completer(); |
| + var events = []; |
| + subscriptionStream.listen(events.add, |
| + onError: (v) { |
| + events.add(v); |
| + completer.complete(); |
| + }, |
| + onDone: () => throw "should not happen", |
| + cancelOnError: true); |
| + await completer.future; |
| + await flushMicrotasks(); |
| + expect(events, [1, 2, "To err is divine!"]); |
| + }); |
| + }); |
| + |
| + for (var cancelOnError in [false, true]) { |
| + group(cancelOnError ? "yes" : "no", () { |
| + test("- no error, value goes to asFuture", () async { |
| + var stream = createStream(); |
| + var sourceSubscription = |
| + stream.listen(null, cancelOnError: original); |
| + var subscriptionStream = |
| + new SubscriptionStream(sourceSubscription, |
| + isCancelOnError: original); |
| + var subscription = |
| + subscriptionStream.listen(null, cancelOnError: cancelOnError); |
| + expect(subscription.asFuture(42), completion(42)); |
| + }); |
| + |
| + test("- error goes to asFuture", () async { |
| + var stream = createErrorStream(); |
| + var sourceSubscription = stream.listen(null, |
| + cancelOnError: original); |
| + var subscriptionStream = |
| + new SubscriptionStream(sourceSubscription, |
| + isCancelOnError: original); |
| + var subscription = |
| + subscriptionStream.listen(null, cancelOnError: cancelOnError); |
| + expect(subscription.asFuture(), throws); |
| + }); |
| + }); |
| + } |
| + } |
| + |
| + test("mislabeled cancelOnError:false source", () async { |
| + var controller = new StreamController(); |
| + var sourceSubscription = |
| + controller.stream.listen(null, cancelOnError: false); |
| + var subscriptionStream = |
| + new SubscriptionStream(sourceSubscription, |
| + isCancelOnError: true); |
| + var lastEvent; |
| + var subscription = subscriptionStream.listen( |
| + (v) { lastEvent = v; }, |
| + onError: (v) { lastEvent = v; }, |
| + onDone: () { throw "unreachable"; }, |
| + cancelOnError: true); |
| + controller.add(1); |
| + await flushMicrotasks(); |
| + expect(lastEvent, 1); |
| + controller.addError(2); |
| + await flushMicrotasks(); |
| + expect(lastEvent, 2); |
| + expect(controller.hasListener, true); // Not canceled! |
| + controller.addError(3); |
| + await flushMicrotasks(); |
| + // This is the badness you get when passing `true` incorrectly! |
| + expect(lastEvent, 3); |
| + }); |
| + |
| + test("mislabeled cancelOnError:true source", () async { |
| + var controller = new StreamController(); |
| + var sourceSubscription = |
| + controller.stream.listen(null, cancelOnError: true); |
| + var subscriptionStream = |
| + new SubscriptionStream(sourceSubscription, |
| + isCancelOnError: false); |
| + var lastEvent; |
| + var subscription = subscriptionStream.listen( |
| + (v) { lastEvent = v; }, |
| + onError: (v) { lastEvent = v; }, |
| + onDone: () { throw "unreachable"; }, |
| + cancelOnError: false); |
| + controller.add(1); |
| + await flushMicrotasks(); |
| + expect(lastEvent, 1); |
| + controller.addError(2); |
| + await flushMicrotasks(); |
| + expect(lastEvent, 2); |
| + expect(controller.hasListener, false); // Canceled! |
| + // This is slightly wrong, but safe. |
| + }); |
| + }); |
| +} |
| + |
| +Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |
| + |
| +Stream<int> createStream() async* { |
| + yield 1; |
| + await flushMicrotasks(); |
| + yield 2; |
| + await flushMicrotasks(); |
| + yield 3; |
| + await flushMicrotasks(); |
| + yield 4; |
| +} |
| + |
| +Stream<int> createErrorStream() { |
| + StreamController controller = new StreamController<int>(); |
| + () async { |
| + controller.add(1); |
| + await flushMicrotasks(); |
| + controller.add(2); |
| + await flushMicrotasks(); |
| + controller.addError("To err is divine!"); |
| + await flushMicrotasks(); |
| + controller.add(4); |
| + await flushMicrotasks(); |
| + controller.close(); |
| + }(); |
| + return controller.stream; |
| +} |
| + |
| +Stream<int> createLongStream() async* { |
| + for (int i = 0; i < 200; i++) yield i; |
| +} |