OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 import "dart:async"; |
| 6 |
| 7 import "package:async/async.dart" show SubscriptionStream; |
| 8 import "package:test/test.dart"; |
| 9 |
| 10 main() { |
| 11 test("subscription stream of an entire subscription", () async { |
| 12 var stream = createStream(); |
| 13 var subscription = stream.listen(null); |
| 14 var subscriptionStream = new SubscriptionStream<int>(subscription); |
| 15 await flushMicrotasks(); |
| 16 expect(subscriptionStream.toList(), completion([1, 2, 3, 4])); |
| 17 }); |
| 18 |
| 19 test("subscription stream after two events", () async { |
| 20 var stream = createStream(); |
| 21 int skips = 0; |
| 22 var c = new Completer(); |
| 23 var sub; |
| 24 sub = stream.listen((v) { |
| 25 ++skips; |
| 26 expect(v, skips); |
| 27 if (skips == 2) c.complete(new SubscriptionStream<int>(sub)); |
| 28 }); |
| 29 Stream<int> ss = await c.future; |
| 30 await flushMicrotasks(); |
| 31 expect(ss.toList(), completion([3, 4])); |
| 32 }); |
| 33 |
| 34 test("listening twice fails", () async { |
| 35 var stream = createStream(); |
| 36 var sourceSubscription = stream.listen(null); |
| 37 var subscriptionStream = new SubscriptionStream<int>(sourceSubscription); |
| 38 var subscription = subscriptionStream.listen(null); |
| 39 expect(() => subscriptionStream.listen(null), throws); |
| 40 await subscription.cancel(); |
| 41 }); |
| 42 |
| 43 test("pause and cancel passed through to original stream", () async { |
| 44 var controller = new StreamController(onCancel: () async => 42); |
| 45 var sourceSubscription = controller.stream.listen(null); |
| 46 var subscriptionStream = new SubscriptionStream(sourceSubscription); |
| 47 expect(controller.isPaused, isTrue); |
| 48 var lastEvent; |
| 49 var subscription = subscriptionStream.listen((v) { lastEvent = v; }); |
| 50 controller.add(1); |
| 51 await flushMicrotasks(); |
| 52 expect(lastEvent, 1); |
| 53 expect(controller.isPaused, isFalse); |
| 54 subscription.pause(); |
| 55 expect(controller.isPaused, isTrue); |
| 56 subscription.resume(); |
| 57 expect(controller.isPaused, isFalse); |
| 58 expect(await subscription.cancel(), 42); |
| 59 expect(controller.hasListener, isFalse); |
| 60 }); |
| 61 |
| 62 group("cancelOnError behavior", () { |
| 63 for (var original in [false, true]) { |
| 64 group("source/new subscription: ${original ? "yes" : "no"}", () { |
| 65 test("no", () async { |
| 66 var stream = createErrorStream(); |
| 67 var sourceSubscription = stream.listen(null, cancelOnError: original); |
| 68 var subscriptionStream = |
| 69 new SubscriptionStream(sourceSubscription, |
| 70 isCancelOnError: original); |
| 71 var done = new Completer(); |
| 72 var events = []; |
| 73 var subscription = subscriptionStream.listen(events.add, |
| 74 onError: events.add, |
| 75 onDone: done.complete); |
| 76 await done.future; |
| 77 var expected = [1, 2, "To err is divine!"]; |
| 78 // If neither subscription is cancelOnError, the fourth event |
| 79 // goes through. |
| 80 if (!original) expected.add(4); |
| 81 expect(events, expected); |
| 82 }); |
| 83 |
| 84 test("yes", () async { |
| 85 var stream = createErrorStream(); |
| 86 var sourceSubscription = stream.listen(null, cancelOnError: original); |
| 87 var subscriptionStream = |
| 88 new SubscriptionStream(sourceSubscription, |
| 89 isCancelOnError: original); |
| 90 var completer = new Completer(); |
| 91 var events = []; |
| 92 subscriptionStream.listen(events.add, |
| 93 onError: (v) { |
| 94 events.add(v); |
| 95 completer.complete(); |
| 96 }, |
| 97 onDone: () => throw "should not happen", |
| 98 cancelOnError: true); |
| 99 await completer.future; |
| 100 await flushMicrotasks(); |
| 101 expect(events, [1, 2, "To err is divine!"]); |
| 102 }); |
| 103 }); |
| 104 |
| 105 for (var cancelOnError in [false, true]) { |
| 106 group(cancelOnError ? "yes" : "no", () { |
| 107 test("- no error, value goes to asFuture", () async { |
| 108 var stream = createStream(); |
| 109 var sourceSubscription = |
| 110 stream.listen(null, cancelOnError: original); |
| 111 var subscriptionStream = |
| 112 new SubscriptionStream(sourceSubscription, |
| 113 isCancelOnError: original); |
| 114 var subscription = |
| 115 subscriptionStream.listen(null, cancelOnError: cancelOnError); |
| 116 expect(subscription.asFuture(42), completion(42)); |
| 117 }); |
| 118 |
| 119 test("- error goes to asFuture", () async { |
| 120 var stream = createErrorStream(); |
| 121 var sourceSubscription = stream.listen(null, |
| 122 cancelOnError: original); |
| 123 var subscriptionStream = |
| 124 new SubscriptionStream(sourceSubscription, |
| 125 isCancelOnError: original); |
| 126 var subscription = |
| 127 subscriptionStream.listen(null, cancelOnError: cancelOnError); |
| 128 expect(subscription.asFuture(), throws); |
| 129 }); |
| 130 }); |
| 131 } |
| 132 } |
| 133 |
| 134 test("mislabeled cancelOnError:false source", () async { |
| 135 var controller = new StreamController(); |
| 136 var sourceSubscription = |
| 137 controller.stream.listen(null, cancelOnError: false); |
| 138 var subscriptionStream = |
| 139 new SubscriptionStream(sourceSubscription, |
| 140 isCancelOnError: true); |
| 141 var lastEvent; |
| 142 var subscription = subscriptionStream.listen( |
| 143 (v) { lastEvent = v; }, |
| 144 onError: (v) { lastEvent = v; }, |
| 145 onDone: () { throw "unreachable"; }, |
| 146 cancelOnError: true); |
| 147 controller.add(1); |
| 148 await flushMicrotasks(); |
| 149 expect(lastEvent, 1); |
| 150 controller.addError(2); |
| 151 await flushMicrotasks(); |
| 152 expect(lastEvent, 2); |
| 153 expect(controller.hasListener, true); // Not canceled! |
| 154 controller.addError(3); |
| 155 await flushMicrotasks(); |
| 156 // This is the badness you get when passing `true` incorrectly! |
| 157 expect(lastEvent, 3); |
| 158 }); |
| 159 |
| 160 test("mislabeled cancelOnError:true source", () async { |
| 161 var controller = new StreamController(); |
| 162 var sourceSubscription = |
| 163 controller.stream.listen(null, cancelOnError: true); |
| 164 var subscriptionStream = |
| 165 new SubscriptionStream(sourceSubscription, |
| 166 isCancelOnError: false); |
| 167 var lastEvent; |
| 168 var subscription = subscriptionStream.listen( |
| 169 (v) { lastEvent = v; }, |
| 170 onError: (v) { lastEvent = v; }, |
| 171 onDone: () { throw "unreachable"; }, |
| 172 cancelOnError: false); |
| 173 controller.add(1); |
| 174 await flushMicrotasks(); |
| 175 expect(lastEvent, 1); |
| 176 controller.addError(2); |
| 177 await flushMicrotasks(); |
| 178 expect(lastEvent, 2); |
| 179 expect(controller.hasListener, false); // Canceled! |
| 180 // This is slightly wrong, but safe. |
| 181 }); |
| 182 }); |
| 183 } |
| 184 |
| 185 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |
| 186 |
| 187 Stream<int> createStream() async* { |
| 188 yield 1; |
| 189 await flushMicrotasks(); |
| 190 yield 2; |
| 191 await flushMicrotasks(); |
| 192 yield 3; |
| 193 await flushMicrotasks(); |
| 194 yield 4; |
| 195 } |
| 196 |
| 197 Stream<int> createErrorStream() { |
| 198 StreamController controller = new StreamController<int>(); |
| 199 () async { |
| 200 controller.add(1); |
| 201 await flushMicrotasks(); |
| 202 controller.add(2); |
| 203 await flushMicrotasks(); |
| 204 controller.addError("To err is divine!"); |
| 205 await flushMicrotasks(); |
| 206 controller.add(4); |
| 207 await flushMicrotasks(); |
| 208 controller.close(); |
| 209 }(); |
| 210 return controller.stream; |
| 211 } |
| 212 |
| 213 Stream<int> createLongStream() async* { |
| 214 for (int i = 0; i < 200; i++) yield i; |
| 215 } |
OLD | NEW |