OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 import "dart:async"; | |
6 | |
7 import "package:async/async.dart" show SubscriptionStream; | |
8 import "package:test/test.dart"; | |
9 | |
10 main() { | |
11 test("subscription stream of an entire subscription", () async { | |
12 var stream = createStream(); | |
13 var subscription = stream.listen(null); | |
14 var subscriptionStream = new SubscriptionStream<int>(subscription); | |
15 await flushMicrotasks(); | |
16 expect(subscriptionStream.toList(), completion([1, 2, 3, 4])); | |
17 }); | |
18 | |
19 test("subscription stream after two events", () async { | |
20 var stream = createStream(); | |
21 int skips = 0; | |
22 var c = new Completer(); | |
nweiz
2015/06/18 23:44:28
"c" -> "completer"
| |
23 var sub; | |
24 sub = stream.listen((v) { | |
25 ++skips; | |
26 expect(v, skips); | |
27 if (skips == 2) c.complete(new SubscriptionStream<int>(sub)); | |
28 }); | |
29 Stream<int> ss = await c.future; | |
nweiz
2015/06/18 23:44:28
"ss" -> "stream" or "subscriptionStream"
Lasse Reichstein Nielsen
2015/06/30 10:34:15
Done.
| |
30 await flushMicrotasks(); | |
31 expect(ss.toList(), completion([3, 4])); | |
32 }); | |
33 | |
34 test("listening twice fails", () async { | |
35 var stream = createStream(); | |
36 var sourceSubscription = stream.listen(null); | |
37 var subscriptionStream = new SubscriptionStream<int>(sourceSubscription); | |
38 var subscription = subscriptionStream.listen(null); | |
39 expect(() => subscriptionStream.listen(null), throws); | |
40 await subscription.cancel(); | |
41 }); | |
42 | |
43 test("pause and cancel passed through to original stream", () async { | |
44 var controller = new StreamController(onCancel: () async => 42); | |
45 var sourceSubscription = controller.stream.listen(null); | |
46 var subscriptionStream = new SubscriptionStream(sourceSubscription); | |
47 expect(controller.isPaused, isTrue); | |
48 var lastEvent; | |
49 var subscription = subscriptionStream.listen((v) { lastEvent = v; }); | |
50 controller.add(1); | |
51 await flushMicrotasks(); | |
52 expect(lastEvent, 1); | |
53 expect(controller.isPaused, isFalse); | |
54 subscription.pause(); | |
55 expect(controller.isPaused, isTrue); | |
56 subscription.resume(); | |
57 expect(controller.isPaused, isFalse); | |
58 expect(await subscription.cancel(), 42); | |
59 expect(controller.hasListener, isFalse); | |
60 }); | |
61 | |
62 group("cancelOnError behavior", () { | |
63 for (var original in [false, true]) { | |
64 group("source/new subscription: ${original ? "yes" : "no"}", () { | |
65 test("no", () async { | |
66 var stream = createErrorStream(); | |
67 var sourceSubscription = stream.listen(null, cancelOnError: original); | |
68 var subscriptionStream = | |
69 new SubscriptionStream(sourceSubscription, | |
70 isCancelOnError: original); | |
71 var done = new Completer(); | |
72 var events = []; | |
73 var subscription = subscriptionStream.listen(events.add, | |
74 onError: events.add, | |
75 onDone: done.complete); | |
76 await done.future; | |
77 var expected = [1, 2, "To err is divine!"]; | |
78 // If neither subscription is cancelOnError, the fourth event | |
79 // goes through. | |
80 if (!original) expected.add(4); | |
81 expect(events, expected); | |
82 }); | |
83 | |
84 test("yes", () async { | |
85 var stream = createErrorStream(); | |
86 var sourceSubscription = stream.listen(null, cancelOnError: original); | |
87 var subscriptionStream = | |
88 new SubscriptionStream(sourceSubscription, | |
89 isCancelOnError: original); | |
90 var completer = new Completer(); | |
91 var events = []; | |
92 subscriptionStream.listen(events.add, | |
93 onError: (v) { | |
94 events.add(v); | |
95 completer.complete(); | |
96 }, | |
97 onDone: () => throw "should not happen", | |
98 cancelOnError: true); | |
99 await completer.future; | |
100 await flushMicrotasks(); | |
101 expect(events, [1, 2, "To err is divine!"]); | |
102 }); | |
103 }); | |
104 | |
105 for (var cancelOnError in [false, true]) { | |
106 group(cancelOnError ? "yes" : "no", () { | |
107 test("- no error, value goes to asFuture", () async { | |
108 var stream = createStream(); | |
109 var sourceSubscription = | |
110 stream.listen(null, cancelOnError: original); | |
111 var subscriptionStream = | |
112 new SubscriptionStream(sourceSubscription, | |
113 isCancelOnError: original); | |
114 var subscription = | |
115 subscriptionStream.listen(null, cancelOnError: cancelOnError); | |
116 expect(subscription.asFuture(42), completion(42)); | |
117 }); | |
118 | |
119 test("- error goes to asFuture", () async { | |
120 var stream = createErrorStream(); | |
121 var sourceSubscription = stream.listen(null, | |
122 cancelOnError: original); | |
123 var subscriptionStream = | |
124 new SubscriptionStream(sourceSubscription, | |
125 isCancelOnError: original); | |
126 var subscription = | |
127 subscriptionStream.listen(null, cancelOnError: cancelOnError); | |
128 expect(subscription.asFuture(), throws); | |
129 }); | |
130 }); | |
131 } | |
132 } | |
133 | |
134 test("mislabeled cancelOnError:false source", () async { | |
135 var controller = new StreamController(); | |
136 var sourceSubscription = | |
137 controller.stream.listen(null, cancelOnError: false); | |
138 var subscriptionStream = | |
139 new SubscriptionStream(sourceSubscription, | |
140 isCancelOnError: true); | |
141 var lastEvent; | |
142 var subscription = subscriptionStream.listen( | |
143 (v) { lastEvent = v; }, | |
144 onError: (v) { lastEvent = v; }, | |
145 onDone: () { throw "unreachable"; }, | |
146 cancelOnError: true); | |
147 controller.add(1); | |
148 await flushMicrotasks(); | |
149 expect(lastEvent, 1); | |
150 controller.addError(2); | |
151 await flushMicrotasks(); | |
152 expect(lastEvent, 2); | |
153 expect(controller.hasListener, true); // Not canceled! | |
154 controller.addError(3); | |
155 await flushMicrotasks(); | |
156 // This is the badness you get when passing `true` incorrectly! | |
157 expect(lastEvent, 3); | |
158 }); | |
159 | |
160 test("mislabeled cancelOnError:true source", () async { | |
161 var controller = new StreamController(); | |
162 var sourceSubscription = | |
163 controller.stream.listen(null, cancelOnError: true); | |
164 var subscriptionStream = | |
165 new SubscriptionStream(sourceSubscription, | |
166 isCancelOnError: false); | |
167 var lastEvent; | |
168 var subscription = subscriptionStream.listen( | |
169 (v) { lastEvent = v; }, | |
170 onError: (v) { lastEvent = v; }, | |
171 onDone: () { throw "unreachable"; }, | |
172 cancelOnError: false); | |
173 controller.add(1); | |
174 await flushMicrotasks(); | |
175 expect(lastEvent, 1); | |
176 controller.addError(2); | |
177 await flushMicrotasks(); | |
178 expect(lastEvent, 2); | |
179 expect(controller.hasListener, false); // Canceled! | |
180 // This is slightly wrong, but safe. | |
181 }); | |
182 }); | |
183 } | |
184 | |
185 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); | |
186 | |
187 Stream<int> createStream() async* { | |
188 yield 1; | |
189 await flushMicrotasks(); | |
190 yield 2; | |
191 await flushMicrotasks(); | |
192 yield 3; | |
193 await flushMicrotasks(); | |
194 yield 4; | |
195 } | |
196 | |
197 Stream<int> createErrorStream() { | |
198 StreamController controller = new StreamController<int>(); | |
199 () async { | |
200 controller.add(1); | |
201 await flushMicrotasks(); | |
202 controller.add(2); | |
203 await flushMicrotasks(); | |
204 controller.addError("To err is divine!"); | |
205 await flushMicrotasks(); | |
206 controller.add(4); | |
207 await flushMicrotasks(); | |
208 controller.close(); | |
209 }(); | |
210 return controller.stream; | |
211 } | |
212 | |
213 Stream<int> createLongStream() async* { | |
214 for (int i = 0; i < 200; i++) yield i; | |
215 } | |
OLD | NEW |