OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 import "dart:async"; |
| 6 |
| 7 import "package:async/async.dart" show SubscriptionStream; |
| 8 import "package:test/test.dart"; |
| 9 |
| 10 import "utils.dart"; |
| 11 |
| 12 main() { |
| 13 test("subscription stream of an entire subscription", () async { |
| 14 var stream = createStream(); |
| 15 var subscription = stream.listen(null); |
| 16 var subscriptionStream = new SubscriptionStream<int>(subscription); |
| 17 await flushMicrotasks(); |
| 18 expect(subscriptionStream.toList(), completion([1, 2, 3, 4])); |
| 19 }); |
| 20 |
| 21 test("subscription stream after two events", () async { |
| 22 var stream = createStream(); |
| 23 var skips = 0; |
| 24 var completer = new Completer(); |
| 25 var subscription; |
| 26 subscription = stream.listen((value) { |
| 27 ++skips; |
| 28 expect(value, skips); |
| 29 if (skips == 2) { |
| 30 completer.complete(new SubscriptionStream<int>(subscription)); |
| 31 } |
| 32 }); |
| 33 var subscriptionStream = await completer.future; |
| 34 await flushMicrotasks(); |
| 35 expect(subscriptionStream.toList(), completion([3, 4])); |
| 36 }); |
| 37 |
| 38 test("listening twice fails", () async { |
| 39 var stream = createStream(); |
| 40 var sourceSubscription = stream.listen(null); |
| 41 var subscriptionStream = new SubscriptionStream<int>(sourceSubscription); |
| 42 var subscription = subscriptionStream.listen(null); |
| 43 expect(() => subscriptionStream.listen(null), throws); |
| 44 await subscription.cancel(); |
| 45 }); |
| 46 |
| 47 test("pause and cancel passed through to original stream", () async { |
| 48 var controller = new StreamController(onCancel: () async => 42); |
| 49 var sourceSubscription = controller.stream.listen(null); |
| 50 var subscriptionStream = new SubscriptionStream(sourceSubscription); |
| 51 expect(controller.isPaused, isTrue); |
| 52 var lastEvent; |
| 53 var subscription = subscriptionStream.listen((value) { |
| 54 lastEvent = value; |
| 55 }); |
| 56 controller.add(1); |
| 57 |
| 58 await flushMicrotasks(); |
| 59 expect(lastEvent, 1); |
| 60 expect(controller.isPaused, isFalse); |
| 61 |
| 62 subscription.pause(); |
| 63 expect(controller.isPaused, isTrue); |
| 64 |
| 65 subscription.resume(); |
| 66 expect(controller.isPaused, isFalse); |
| 67 |
| 68 expect(await subscription.cancel(), 42); |
| 69 expect(controller.hasListener, isFalse); |
| 70 }); |
| 71 |
| 72 group("cancelOnError source:", () { |
| 73 for (var sourceCancels in [false, true]) { |
| 74 group("${sourceCancels ? "yes" : "no"}:", () { |
| 75 var subscriptionStream; |
| 76 setUp(() { |
| 77 var source = createErrorStream(); |
| 78 var sourceSubscription = source.listen(null, |
| 79 cancelOnError: sourceCancels); |
| 80 subscriptionStream = new SubscriptionStream<int>(sourceSubscription); |
| 81 }); |
| 82 |
| 83 test("- subscriptionStream: no", () async { |
| 84 var done = new Completer(); |
| 85 var events = []; |
| 86 subscriptionStream.listen(events.add, |
| 87 onError: events.add, |
| 88 onDone: done.complete, |
| 89 cancelOnError: false); |
| 90 var expected = [1, 2, "To err is divine!"]; |
| 91 if (sourceCancels) { |
| 92 var timeout = done.future.timeout(const Duration(milliseconds: 5), |
| 93 onTimeout: () => true); |
| 94 expect(await timeout, true); |
| 95 } else { |
| 96 expected.add(4); |
| 97 await done.future; |
| 98 } |
| 99 expect(events, expected); |
| 100 }); |
| 101 |
| 102 test("- subscriptionStream: yes", () async { |
| 103 var completer = new Completer(); |
| 104 var events = []; |
| 105 subscriptionStream.listen(events.add, |
| 106 onError: (value) { |
| 107 events.add(value); |
| 108 completer.complete(); |
| 109 }, |
| 110 onDone: () => throw "should not happen", |
| 111 cancelOnError: true); |
| 112 await completer.future; |
| 113 await flushMicrotasks(); |
| 114 expect(events, [1, 2, "To err is divine!"]); |
| 115 }); |
| 116 }); |
| 117 } |
| 118 |
| 119 for (var cancelOnError in [false, true]) { |
| 120 group(cancelOnError ? "yes" : "no", () { |
| 121 test("- no error, value goes to asFuture", () async { |
| 122 var stream = createStream(); |
| 123 var sourceSubscription = |
| 124 stream.listen(null, cancelOnError: cancelOnError); |
| 125 var subscriptionStream = |
| 126 new SubscriptionStream(sourceSubscription); |
| 127 var subscription = |
| 128 subscriptionStream.listen(null, cancelOnError: cancelOnError); |
| 129 expect(subscription.asFuture(42), completion(42)); |
| 130 }); |
| 131 |
| 132 test("- error goes to asFuture", () async { |
| 133 var stream = createErrorStream(); |
| 134 var sourceSubscription = stream.listen(null, |
| 135 cancelOnError: cancelOnError); |
| 136 var subscriptionStream = |
| 137 new SubscriptionStream(sourceSubscription); |
| 138 |
| 139 var subscription = |
| 140 subscriptionStream.listen(null, cancelOnError: cancelOnError); |
| 141 expect(subscription.asFuture(), throws); |
| 142 }); |
| 143 }); |
| 144 } |
| 145 }); |
| 146 } |
| 147 |
| 148 Stream<int> createStream() async* { |
| 149 yield 1; |
| 150 await flushMicrotasks(); |
| 151 yield 2; |
| 152 await flushMicrotasks(); |
| 153 yield 3; |
| 154 await flushMicrotasks(); |
| 155 yield 4; |
| 156 } |
| 157 |
| 158 Stream<int> createErrorStream() { |
| 159 StreamController controller = new StreamController<int>(); |
| 160 () async { |
| 161 controller.add(1); |
| 162 await flushMicrotasks(); |
| 163 controller.add(2); |
| 164 await flushMicrotasks(); |
| 165 controller.addError("To err is divine!"); |
| 166 await flushMicrotasks(); |
| 167 controller.add(4); |
| 168 await flushMicrotasks(); |
| 169 controller.close(); |
| 170 }(); |
| 171 return controller.stream; |
| 172 } |
| 173 |
| 174 Stream<int> createLongStream() async* { |
| 175 for (int i = 0; i < 200; i++) yield i; |
| 176 } |
OLD | NEW |