OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 import "dart:async"; | |
6 | |
7 import "package:async/async.dart" show StreamCompleter; | |
8 import "package:test/test.dart"; | |
9 | |
10 main() { | |
11 test("a stream is linked before listening", () async { | |
12 var completer = new StreamCompleter(); | |
13 completer.setSourceStream(createStream()); | |
14 expect(completer.stream.toList(), completion([1, 2, 3, 4])); | |
15 }); | |
16 | |
17 test("listened to before a stream is linked", () async { | |
18 var completer = new StreamCompleter(); | |
19 Future done = completer.stream.toList(); | |
nweiz
2015/06/18 23:44:27
Nit: "var"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
I have actually found full typing to catch bugs in
| |
20 await flushMicrotasks(); | |
21 completer.setSourceStream(createStream()); | |
22 expect(done, completion([1, 2, 3, 4])); | |
23 }); | |
24 | |
25 test("cancel before linking a stream doesn't listen on stream", () async { | |
26 var completer = new StreamCompleter(); | |
27 var subscription = completer.stream.listen(null); | |
28 subscription.pause(); // Should be ignored. | |
29 subscription.cancel(); | |
30 completer.setSourceStream(new UnusableStream()); // Doesn't throw. | |
31 }); | |
32 | |
33 test("listen and pause before linking stream", () async { | |
nweiz
2015/06/18 23:44:27
Also test that this pause triggers "onPause" in th
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
34 var controller = new StreamCompleter(); | |
35 var events = []; | |
36 var subscription = controller.stream.listen(events.add); | |
37 var done = subscription.asFuture(); | |
38 subscription.pause(); | |
39 controller.setSourceStream(createStream()); | |
40 await flushMicrotasks(); | |
41 expect(events, []); | |
42 subscription.resume(); | |
43 await done; | |
44 expect(events, [1, 2, 3, 4]); | |
45 }); | |
46 | |
47 test("pause more than once", () async { | |
48 var completer = new StreamCompleter(); | |
49 var events = []; | |
50 var subscription = completer.stream.listen(events.add); | |
51 Future done = subscription.asFuture(); | |
52 subscription.pause(); | |
53 subscription.pause(); | |
54 subscription.pause(); | |
55 completer.setSourceStream(createStream()); | |
56 for (int i = 0; i < 3; i++) { | |
57 await flushMicrotasks(); | |
58 expect(events, []); | |
59 subscription.resume(); | |
60 } | |
61 await done; | |
62 expect(events, [1, 2, 3, 4]); | |
63 }); | |
64 | |
65 test("cancel new stream before source is done.", () async { | |
66 var completer = new StreamCompleter(); | |
67 var listened = false; | |
68 var canceled = false; | |
69 var lastEvent = -1; | |
70 var controller = new StreamController(onListen: () { listened = true; }, | |
nweiz
2015/06/18 23:44:27
It's not actually in the style guide right now, bu
Lasse Reichstein Nielsen
2015/06/30 10:34:14
That's silly.
I'll rewrite it to
() => listened
| |
71 onCancel: () { canceled = true; }); | |
72 var subscription; | |
73 subscription = completer.stream.listen( | |
74 (v) { | |
nweiz
2015/06/18 23:44:27
"v" -> "value"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
75 expect(v, lessThan(3)); | |
76 lastEvent = v; | |
77 if (v == 2) { | |
78 subscription.cancel(); | |
79 } | |
80 }, | |
81 onError: unreachable("error"), | |
82 onDone: unreachable("done"), | |
83 cancelOnError: true); | |
84 completer.setSourceStream(controller.stream); | |
85 expect(listened, isTrue); | |
86 | |
87 await flushMicrotasks(); | |
88 expect(canceled, isFalse); | |
89 controller.add(1); | |
90 | |
91 await flushMicrotasks(); | |
92 expect(lastEvent, 1); | |
93 expect(canceled, isFalse); | |
94 controller.add(2); | |
95 | |
96 await flushMicrotasks(); | |
97 expect(lastEvent, 2); | |
98 expect(canceled, isTrue); | |
99 }); | |
100 | |
101 test("complete with setEmpty before listening", () async { | |
102 var completer = new StreamCompleter(); | |
103 completer.setEmpty(); | |
104 var done = new Completer(); | |
105 completer.stream.listen( | |
106 unreachable("data"), | |
107 onError: unreachable("error"), | |
108 onDone: done.complete); | |
109 await done.future; | |
110 }); | |
111 | |
112 test("complete with setEmpty after listening", () async { | |
113 var completer = new StreamCompleter(); | |
114 var done = new Completer(); | |
115 completer.stream.listen( | |
116 unreachable("data"), | |
117 onError: unreachable("error"), | |
118 onDone: done.complete); | |
119 completer.setEmpty(); | |
120 await done.future; | |
121 }); | |
122 | |
123 test("source stream isn't listened to until completer stream is.", () async { | |
nweiz
2015/06/18 23:44:27
Nit: no period
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
124 var completer = new StreamCompleter(); | |
125 bool listened = false; | |
nweiz
2015/06/18 23:44:27
Nit: "var"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
126 var controller; | |
127 controller = new StreamController(onListen: () { | |
128 listened = true; | |
129 () async { controller.close(); } (); // In later microtask. | |
nweiz
2015/06/18 23:44:27
I think "scheduleMicrotask" is a little more expli
Lasse Reichstein Nielsen
2015/06/30 10:34:14
And it has exactly the same length. Scary.
| |
130 }); | |
131 | |
132 completer.setSourceStream(controller.stream); | |
133 await flushMicrotasks(); | |
134 expect(listened, isFalse); | |
135 var subscription = completer.stream.listen(null); | |
136 expect(listened, isTrue); | |
137 await subscription.asFuture(); | |
138 }); | |
139 | |
140 test("cancelOnError true when listening before linking stream", () async { | |
141 var completer = new StreamCompleter(); | |
142 var listened = false; | |
143 var canceled = false; | |
144 var lastEvent = -1; | |
145 var controller = new StreamController(onListen: () { listened = true; }, | |
146 onCancel: () { canceled = true; }); | |
147 var subscription = completer.stream.listen( | |
148 (v) { | |
149 expect(v, lessThan(3)); | |
150 lastEvent = v; | |
151 }, | |
152 onError: (v) { | |
153 expect(v, "3"); | |
154 lastEvent = v; | |
155 }, | |
156 onDone: unreachable("done"), | |
157 cancelOnError: true); | |
158 completer.setSourceStream(controller.stream); | |
159 expect(listened, isTrue); | |
160 | |
161 await flushMicrotasks(); | |
162 expect(canceled, isFalse); | |
163 controller.add(1); | |
164 | |
165 await flushMicrotasks(); | |
166 expect(lastEvent, 1); | |
167 expect(canceled, isFalse); | |
168 controller.add(2); | |
169 | |
170 await flushMicrotasks(); | |
171 expect(lastEvent, 2); | |
172 expect(canceled, isFalse); | |
173 controller.addError("3"); | |
174 | |
175 await flushMicrotasks(); | |
176 expect(lastEvent, "3"); | |
177 expect(canceled, isTrue); | |
178 }); | |
179 | |
180 test("cancelOnError true when listening after linking stream", () async { | |
181 var completer = new StreamCompleter(); | |
182 var listened = false; | |
183 var canceled = false; | |
184 var lastEvent = -1; | |
185 var controller = new StreamController(onListen: () { listened = true; }, | |
186 onCancel: () { canceled = true; }); | |
187 completer.setSourceStream(controller.stream); | |
188 controller.add(1); | |
189 expect(listened, isFalse); | |
190 | |
191 var subscription = completer.stream.listen( | |
192 (v) { | |
193 expect(v, lessThan(3)); | |
194 lastEvent = v; | |
195 }, | |
196 onError: (v) { | |
197 expect(v, "3"); | |
198 lastEvent = v; | |
199 }, | |
200 onDone: unreachable("done"), | |
201 cancelOnError: true); | |
202 | |
203 expect(listened, isTrue); | |
204 | |
205 await flushMicrotasks(); | |
206 expect(lastEvent, 1); | |
207 expect(canceled, isFalse); | |
208 controller.add(2); | |
209 | |
210 await flushMicrotasks(); | |
211 expect(lastEvent, 2); | |
212 expect(canceled, isFalse); | |
213 controller.addError("3"); | |
214 | |
215 await flushMicrotasks(); | |
216 expect(canceled, isTrue); | |
217 }); | |
218 | |
219 test("linking a stream after setSourceStream before listen", () async { | |
220 var completer = new StreamCompleter(); | |
221 completer.setSourceStream(createStream()); | |
222 expect(() { completer.setSourceStream(createStream()); }, throws); | |
223 expect(() { completer.setEmpty(createStream()); }, throws); | |
224 await completer.stream.toList(); | |
225 // Still fails after source is done | |
226 expect(() { completer.setSourceStream(createStream()); }, throws); | |
227 expect(() { completer.setEmpty(createStream()); }, throws); | |
228 }); | |
229 | |
230 test("linking a stream after setSourceStream after listen", () async { | |
231 var completer = new StreamCompleter(); | |
232 var list = completer.stream.toList(); | |
233 completer.setSourceStream(createStream()); | |
234 expect(() { completer.setSourceStream(createStream()); }, throws); | |
235 expect(() { completer.setEmpty(createStream()); }, throws); | |
236 await list; | |
237 // Still fails after source is done. | |
238 expect(() { completer.setSourceStream(createStream()); }, throws); | |
239 expect(() { completer.setEmpty(createStream()); }, throws); | |
240 }); | |
241 | |
242 test("linking a stream after setEmpty before listen", () async { | |
243 var completer = new StreamCompleter(); | |
244 completer.setEmpty(); | |
245 expect(() { completer.setSourceStream(createStream()); }, throws); | |
246 expect(() { completer.setEmpty(createStream()); }, throws); | |
247 await completer.stream.toList(); | |
248 // Still fails after source is done | |
249 expect(() { completer.setSourceStream(createStream()); }, throws); | |
250 expect(() { completer.setEmpty(createStream()); }, throws); | |
251 }); | |
252 | |
253 test("linking a stream after setEmpty() after listen", () async { | |
254 var completer = new StreamCompleter(); | |
255 var list = completer.stream.toList(); | |
256 completer.setEmpty(); | |
257 expect(() { completer.setSourceStream(createStream()); }, throws); | |
258 expect(() { completer.setEmpty(createStream()); }, throws); | |
259 await list; | |
260 // Still fails after source is done. | |
261 expect(() { completer.setSourceStream(createStream()); }, throws); | |
262 expect(() { completer.setEmpty(createStream()); }, throws); | |
263 }); | |
264 | |
265 test("Listening more than once after setting stream", () async { | |
nweiz
2015/06/18 23:44:27
Nit: "listening" (also below)
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
266 var completer = new StreamCompleter(); | |
267 completer.setSourceStream(createStream()); | |
268 var list = completer.stream.toList(); | |
269 expect(() { completer.stream.toList(); }, throws); | |
270 await list; | |
271 expect(() { completer.stream.toList(); }, throws); | |
272 }); | |
273 | |
274 test("Listening more than once before setting stream", () async { | |
275 var completer = new StreamCompleter(); | |
276 var list = completer.stream.toList(); | |
277 expect(() { completer.stream.toList(); }, throws); | |
278 }); | |
279 | |
280 test("setting onData etc. before and after setting stream", () async { | |
281 var completer = new StreamCompleter(); | |
282 var controller = new StreamController(); | |
283 var subscription = completer.stream.listen(null); | |
284 var lastEvent = 0; | |
285 subscription.onData((v) { lastEvent = v; }); | |
286 subscription.onError((v) { lastEvent = "$v"; }); | |
287 subscription.onDone(() { lastEvent = -1; }); | |
288 completer.setSourceStream(controller.stream); | |
289 await flushMicrotasks(); | |
290 controller.add(1); | |
291 await flushMicrotasks(); | |
292 expect(lastEvent, 1); | |
293 controller.addError(2); | |
294 await flushMicrotasks(); | |
295 expect(lastEvent, "2"); | |
296 subscription.onData((v) { lastEvent = -v; }); | |
297 subscription.onError((v) { lastEvent = "${-v}"; }); | |
298 controller.add(1); | |
299 await flushMicrotasks(); | |
300 expect(lastEvent, -1); | |
301 controller.addError(2); | |
302 await flushMicrotasks(); | |
303 expect(lastEvent, "-2"); | |
304 controller.close(); | |
305 await flushMicrotasks(); | |
306 expect(lastEvent, -1); | |
307 }); | |
308 | |
309 test("pause w/ resume future accross setting stream", () async { | |
310 var completer = new StreamCompleter(); | |
311 var resume = new Completer(); | |
312 var subscription = completer.stream.listen(unreachable("data")); | |
313 var lastEvent = 0; | |
314 subscription.pause(resume.future); | |
315 await flushMicrotasks(); | |
316 completer.setSourceStream(createStream()); | |
317 await flushMicrotasks(); | |
318 resume.complete(); | |
319 var events = []; | |
320 subscription.onData(events.add); | |
321 await subscription.asFuture(); | |
322 expect(events, [1, 2, 3, 4]); | |
323 }); | |
324 | |
325 test("asFuture with error accross setting stream", () async { | |
326 var completer = new StreamCompleter(); | |
327 var controller = new StreamController(); | |
328 var subscription = completer.stream.listen(unreachable("data"), | |
329 cancelOnError: false); | |
330 var done = subscription.asFuture(); | |
331 expect(controller.hasListener, isFalse); | |
332 completer.setSourceStream(controller.stream); | |
333 await flushMicrotasks(); | |
334 expect(controller.hasListener, isTrue); | |
335 controller.addError(42); | |
336 await done.then(unreachable("data"), onError: (error) { | |
337 expect(error, 42); | |
338 }); | |
339 expect(controller.hasListener, isFalse); | |
340 }); | |
341 } | |
342 | |
343 /// A zero-millisecond timer should wait until after all microtasks. | |
344 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); | |
nweiz
2015/06/18 23:44:27
Consider moving this (and maybe [unreachable] and
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
345 | |
346 Stream<int> createStream() async* { | |
347 yield 1; | |
348 await flushMicrotasks(); | |
349 yield 2; | |
350 await flushMicrotasks(); | |
351 yield 3; | |
352 await flushMicrotasks(); | |
353 yield 4; | |
354 } | |
355 | |
356 unreachable(String name) => ([a, b]) { fail("Unreachable: $name"); }; | |
357 | |
358 class UnusableStream extends Stream { | |
359 listen(onData, {onError, onDone, cancelOnError}) { | |
360 throw new UnimplementedError("Gotcha!"); | |
361 } | |
362 } | |
OLD | NEW |