OLD | NEW |
| (Empty) |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/ | |
6 // lands. | |
7 | |
8 import 'dart:async'; | |
9 | |
10 import 'package:test/test.dart'; | |
11 import 'package:test/src/util/forkable_stream.dart'; | |
12 import 'package:test/src/util/stream_queue.dart'; | |
13 | |
14 void main() { | |
15 var controller; | |
16 var stream; | |
17 setUp(() { | |
18 var cancelFuture = new Future.value(42); | |
19 controller = new StreamController<int>(onCancel: () => cancelFuture); | |
20 stream = new ForkableStream<int>(controller.stream); | |
21 }); | |
22 | |
23 group("with no forks", () { | |
24 test("forwards events, errors, and close", () async { | |
25 var queue = new StreamQueue(stream); | |
26 | |
27 controller.add(1); | |
28 expect(await queue.next, equals(1)); | |
29 | |
30 controller.add(2); | |
31 expect(await queue.next, equals(2)); | |
32 | |
33 controller.addError("error"); | |
34 expect(queue.next, throwsA("error")); | |
35 await flushMicrotasks(); | |
36 | |
37 controller.add(3); | |
38 expect(await queue.next, equals(3)); | |
39 | |
40 controller.close(); | |
41 expect(await queue.hasNext, isFalse); | |
42 }); | |
43 | |
44 test("listens to, pauses, and cancels the controller", () { | |
45 expect(controller.hasListener, isFalse); | |
46 | |
47 var sub = stream.listen(null); | |
48 expect(controller.hasListener, isTrue); | |
49 | |
50 sub.pause(); | |
51 expect(controller.isPaused, isTrue); | |
52 | |
53 sub.resume(); | |
54 expect(controller.isPaused, isFalse); | |
55 | |
56 sub.cancel(); | |
57 expect(controller.hasListener, isFalse); | |
58 }); | |
59 | |
60 test("unpauses the controller when a fork is listened", () { | |
61 stream.listen(null).pause(); | |
62 expect(controller.isPaused, isTrue); | |
63 | |
64 var fork = stream.fork(); | |
65 expect(controller.isPaused, isTrue); | |
66 | |
67 fork.listen(null); | |
68 expect(controller.isPaused, isFalse); | |
69 }); | |
70 }); | |
71 | |
72 group("with a fork created before the stream was listened", () { | |
73 var fork; | |
74 setUp(() { | |
75 fork = stream.fork(); | |
76 }); | |
77 | |
78 test("forwards events, errors, and close to both branches", () async { | |
79 var queue = new StreamQueue(stream); | |
80 var forkQueue = new StreamQueue(fork); | |
81 | |
82 controller.add(1); | |
83 expect(await queue.next, equals(1)); | |
84 expect(await forkQueue.next, equals(1)); | |
85 | |
86 controller.add(2); | |
87 expect(await queue.next, equals(2)); | |
88 expect(await forkQueue.next, equals(2)); | |
89 | |
90 controller.addError("error"); | |
91 expect(queue.next, throwsA("error")); | |
92 expect(forkQueue.next, throwsA("error")); | |
93 await flushMicrotasks(); | |
94 | |
95 controller.add(3); | |
96 expect(await queue.next, equals(3)); | |
97 expect(await forkQueue.next, equals(3)); | |
98 | |
99 controller.close(); | |
100 expect(await queue.hasNext, isFalse); | |
101 expect(await forkQueue.hasNext, isFalse); | |
102 }); | |
103 | |
104 test('listens to the source when the original is listened', () { | |
105 expect(controller.hasListener, isFalse); | |
106 stream.listen(null); | |
107 expect(controller.hasListener, isTrue); | |
108 }); | |
109 | |
110 test('listens to the source when the fork is listened', () { | |
111 expect(controller.hasListener, isFalse); | |
112 fork.listen(null); | |
113 expect(controller.hasListener, isTrue); | |
114 }); | |
115 }); | |
116 | |
117 test("with a fork created after the stream emitted a few events, forwards " | |
118 "future events, errors, and close to both branches", () async { | |
119 var queue = new StreamQueue(stream); | |
120 | |
121 controller.add(1); | |
122 expect(await queue.next, equals(1)); | |
123 | |
124 controller.add(2); | |
125 expect(await queue.next, equals(2)); | |
126 | |
127 var fork = stream.fork(); | |
128 var forkQueue = new StreamQueue(fork); | |
129 | |
130 controller.add(3); | |
131 expect(await queue.next, equals(3)); | |
132 expect(await forkQueue.next, equals(3)); | |
133 | |
134 controller.addError("error"); | |
135 expect(queue.next, throwsA("error")); | |
136 expect(forkQueue.next, throwsA("error")); | |
137 await flushMicrotasks(); | |
138 | |
139 controller.close(); | |
140 expect(await queue.hasNext, isFalse); | |
141 expect(await forkQueue.hasNext, isFalse); | |
142 }); | |
143 | |
144 group("with multiple forks", () { | |
145 var fork1; | |
146 var fork2; | |
147 var fork3; | |
148 var fork4; | |
149 setUp(() { | |
150 fork1 = stream.fork(); | |
151 fork2 = stream.fork(); | |
152 fork3 = stream.fork(); | |
153 fork4 = stream.fork(); | |
154 }); | |
155 | |
156 test("forwards events, errors, and close to all branches", () async { | |
157 var queue1 = new StreamQueue(stream); | |
158 var queue2 = new StreamQueue(fork1); | |
159 var queue3 = new StreamQueue(fork2); | |
160 var queue4 = new StreamQueue(fork3); | |
161 var queue5 = new StreamQueue(fork4); | |
162 | |
163 controller.add(1); | |
164 expect(await queue1.next, equals(1)); | |
165 expect(await queue2.next, equals(1)); | |
166 expect(await queue3.next, equals(1)); | |
167 expect(await queue4.next, equals(1)); | |
168 expect(await queue5.next, equals(1)); | |
169 | |
170 controller.add(2); | |
171 expect(await queue1.next, equals(2)); | |
172 expect(await queue2.next, equals(2)); | |
173 expect(await queue3.next, equals(2)); | |
174 expect(await queue4.next, equals(2)); | |
175 expect(await queue5.next, equals(2)); | |
176 | |
177 controller.addError("error"); | |
178 expect(queue1.next, throwsA("error")); | |
179 expect(queue2.next, throwsA("error")); | |
180 expect(queue3.next, throwsA("error")); | |
181 expect(queue4.next, throwsA("error")); | |
182 expect(queue5.next, throwsA("error")); | |
183 await flushMicrotasks(); | |
184 | |
185 controller.add(3); | |
186 expect(await queue1.next, equals(3)); | |
187 expect(await queue2.next, equals(3)); | |
188 expect(await queue3.next, equals(3)); | |
189 expect(await queue4.next, equals(3)); | |
190 expect(await queue5.next, equals(3)); | |
191 | |
192 controller.close(); | |
193 expect(await queue1.hasNext, isFalse); | |
194 expect(await queue2.hasNext, isFalse); | |
195 expect(await queue3.hasNext, isFalse); | |
196 expect(await queue4.hasNext, isFalse); | |
197 expect(await queue5.hasNext, isFalse); | |
198 }); | |
199 | |
200 test("forwards events in order of forking", () async { | |
201 var queue1 = new StreamQueue(stream); | |
202 var queue2 = new StreamQueue(fork1); | |
203 var queue3 = new StreamQueue(fork2); | |
204 var queue4 = new StreamQueue(fork3); | |
205 var queue5 = new StreamQueue(fork4); | |
206 | |
207 for (var i = 0; i < 4; i++) { | |
208 controller.add(i); | |
209 | |
210 var queue1Fired = false; | |
211 var queue2Fired = false; | |
212 var queue3Fired = false; | |
213 var queue4Fired = false; | |
214 var queue5Fired = false; | |
215 | |
216 queue5.next.then(expectAsync((_) { | |
217 queue5Fired = true; | |
218 expect(queue1Fired, isTrue); | |
219 expect(queue2Fired, isTrue); | |
220 expect(queue3Fired, isTrue); | |
221 expect(queue4Fired, isTrue); | |
222 })); | |
223 | |
224 queue1.next.then(expectAsync((_) { | |
225 queue1Fired = true; | |
226 expect(queue2Fired, isFalse); | |
227 expect(queue3Fired, isFalse); | |
228 expect(queue4Fired, isFalse); | |
229 expect(queue5Fired, isFalse); | |
230 })); | |
231 | |
232 queue4.next.then(expectAsync((_) { | |
233 queue4Fired = true; | |
234 expect(queue1Fired, isTrue); | |
235 expect(queue2Fired, isTrue); | |
236 expect(queue3Fired, isTrue); | |
237 expect(queue5Fired, isFalse); | |
238 })); | |
239 | |
240 queue2.next.then(expectAsync((_) { | |
241 queue2Fired = true; | |
242 expect(queue1Fired, isTrue); | |
243 expect(queue3Fired, isFalse); | |
244 expect(queue4Fired, isFalse); | |
245 expect(queue5Fired, isFalse); | |
246 })); | |
247 | |
248 queue3.next.then(expectAsync((_) { | |
249 queue3Fired = true; | |
250 expect(queue1Fired, isTrue); | |
251 expect(queue2Fired, isTrue); | |
252 expect(queue4Fired, isFalse); | |
253 expect(queue5Fired, isFalse); | |
254 })); | |
255 } | |
256 }); | |
257 | |
258 test("pauses the source when all forks are paused and/or not listening", | |
259 () { | |
260 var sub1 = stream.listen(null); | |
261 var sub2 = fork1.listen(null); | |
262 expect(controller.isPaused, isFalse); | |
263 | |
264 sub1.pause(); | |
265 expect(controller.isPaused, isFalse); | |
266 | |
267 sub2.pause(); | |
268 expect(controller.isPaused, isTrue); | |
269 | |
270 var sub3 = fork2.listen(null); | |
271 expect(controller.isPaused, isFalse); | |
272 | |
273 sub3.pause(); | |
274 expect(controller.isPaused, isTrue); | |
275 | |
276 sub2.resume(); | |
277 expect(controller.isPaused, isFalse); | |
278 | |
279 sub2.cancel(); | |
280 expect(controller.isPaused, isTrue); | |
281 }); | |
282 | |
283 test("cancels the source when all forks are canceled", () async { | |
284 var sub1 = stream.listen(null); | |
285 expect(controller.hasListener, isTrue); | |
286 | |
287 var sub2 = fork1.listen(null); | |
288 expect(controller.hasListener, isTrue); | |
289 | |
290 expect(sub1.cancel(), isNull); | |
291 await flushMicrotasks(); | |
292 expect(controller.hasListener, isTrue); | |
293 | |
294 expect(sub2.cancel(), isNull); | |
295 await flushMicrotasks(); | |
296 expect(controller.hasListener, isTrue); | |
297 | |
298 expect(fork2.listen(null).cancel(), isNull); | |
299 await flushMicrotasks(); | |
300 expect(controller.hasListener, isTrue); | |
301 | |
302 expect(fork3.listen(null).cancel(), isNull); | |
303 await flushMicrotasks(); | |
304 expect(controller.hasListener, isTrue); | |
305 | |
306 expect(fork4.listen(null).cancel(), completion(equals(42))); | |
307 await flushMicrotasks(); | |
308 expect(controller.hasListener, isFalse); | |
309 }); | |
310 }); | |
311 | |
312 group("modification during dispatch:", () { | |
313 test("forking during onCancel", () { | |
314 controller = new StreamController<int>(onCancel: expectAsync(() { | |
315 expect(stream.fork().toList(), completion(isEmpty)); | |
316 })); | |
317 stream = new ForkableStream<int>(controller.stream); | |
318 | |
319 stream.listen(null).cancel(); | |
320 }); | |
321 | |
322 test("forking during onPause", () { | |
323 controller = new StreamController<int>(onPause: expectAsync(() { | |
324 stream.fork().listen(null); | |
325 })); | |
326 stream = new ForkableStream<int>(controller.stream); | |
327 | |
328 stream.listen(null).pause(); | |
329 | |
330 // The fork created in onPause should have resumed the stream. | |
331 expect(controller.isPaused, isFalse); | |
332 }); | |
333 | |
334 test("forking during onData", () { | |
335 var sub; | |
336 sub = stream.listen(expectAsync((value1) { | |
337 expect(value1, equals(1)); | |
338 stream.fork().listen(expectAsync((value2) { | |
339 expect(value2, equals(2)); | |
340 })); | |
341 sub.cancel(); | |
342 })); | |
343 | |
344 controller.add(1); | |
345 controller.add(2); | |
346 }); | |
347 | |
348 test("canceling a fork during onData", () { | |
349 var fork = stream.fork(); | |
350 var forkSub = fork.listen(expectAsync((_) {}, count: 0)); | |
351 | |
352 stream.listen(expectAsync((_) => forkSub.cancel())); | |
353 controller.add(null); | |
354 }); | |
355 | |
356 test("forking during onError", () { | |
357 var sub; | |
358 sub = stream.listen(null, onError: expectAsync((error1) { | |
359 expect(error1, equals("error 1")); | |
360 stream.fork().listen(null, onError: expectAsync((error2) { | |
361 expect(error2, equals("error 2")); | |
362 })); | |
363 sub.cancel(); | |
364 })); | |
365 | |
366 controller.addError("error 1"); | |
367 controller.addError("error 2"); | |
368 }); | |
369 | |
370 test("canceling a fork during onError", () { | |
371 var fork = stream.fork(); | |
372 var forkSub = fork.listen(expectAsync((_) {}, count: 0)); | |
373 | |
374 stream.listen(null, onError: expectAsync((_) => forkSub.cancel())); | |
375 controller.addError("error"); | |
376 }); | |
377 | |
378 test("forking during onDone", () { | |
379 stream.listen(null, onDone: expectAsync(() { | |
380 expect(stream.fork().toList(), completion(isEmpty)); | |
381 })); | |
382 | |
383 controller.close(); | |
384 }); | |
385 | |
386 test("canceling a fork during onDone", () { | |
387 var fork = stream.fork(); | |
388 var forkSub = fork.listen(null, onDone: expectAsync(() {}, count: 0)); | |
389 | |
390 stream.listen(null, onDone: expectAsync(() => forkSub.cancel())); | |
391 controller.close(); | |
392 }); | |
393 }); | |
394 | |
395 group("throws an error when", () { | |
396 test("a cancelled stream is forked", () { | |
397 stream.listen(null).cancel(); | |
398 expect(stream.fork().toList(), completion(isEmpty)); | |
399 }); | |
400 | |
401 test("a cancelled stream is forked even when other forks are alive", () { | |
402 stream.fork().listen(null); | |
403 stream.listen(null).cancel(); | |
404 | |
405 expect(controller.hasListener, isTrue); | |
406 expect(stream.fork().toList(), completion(isEmpty)); | |
407 }); | |
408 | |
409 test("a closed stream is forked", () async { | |
410 controller.close(); | |
411 await stream.listen(null).asFuture(); | |
412 expect(stream.fork().toList(), completion(isEmpty)); | |
413 }); | |
414 }); | |
415 } | |
416 | |
417 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); | |
OLD | NEW |