OLD | NEW |
| (Empty) |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE filevents. | |
4 | |
5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/ | |
6 // lands. | |
7 | |
8 import "dart:async"; | |
9 | |
10 import "package:test/src/util/stream_queue.dart"; | |
11 import "package:test/test.dart"; | |
12 | |
13 main() { | |
14 group("source stream", () { | |
15 test("is listened to on first request, paused between requests", () async { | |
16 var controller = new StreamController(); | |
17 var events = new StreamQueue<int>(controller.stream); | |
18 await flushMicrotasks(); | |
19 expect(controller.hasListener, isFalse); | |
20 | |
21 var next = events.next; | |
22 expect(controller.hasListener, isTrue); | |
23 expect(controller.isPaused, isFalse); | |
24 | |
25 controller.add(1); | |
26 | |
27 expect(await next, 1); | |
28 expect(controller.hasListener, isTrue); | |
29 expect(controller.isPaused, isTrue); | |
30 | |
31 next = events.next; | |
32 expect(controller.hasListener, isTrue); | |
33 expect(controller.isPaused, isFalse); | |
34 | |
35 controller.add(2); | |
36 | |
37 expect(await next, 2); | |
38 expect(controller.hasListener, isTrue); | |
39 expect(controller.isPaused, isTrue); | |
40 | |
41 events.cancel(); | |
42 expect(controller.hasListener, isFalse); | |
43 }); | |
44 }); | |
45 | |
46 group("next operation", () { | |
47 test("simple sequence of requests", () async { | |
48 var events = new StreamQueue<int>(createStream()); | |
49 for (int i = 1; i <= 4; i++) { | |
50 expect(await events.next, i); | |
51 } | |
52 expect(events.next, throwsStateError); | |
53 }); | |
54 | |
55 test("multiple requests at the same time", () async { | |
56 var events = new StreamQueue<int>(createStream()); | |
57 var result = await Future.wait( | |
58 [events.next, events.next, events.next, events.next]); | |
59 expect(result, [1, 2, 3, 4]); | |
60 await events.cancel(); | |
61 }); | |
62 | |
63 test("sequence of requests with error", () async { | |
64 var events = new StreamQueue<int>(createErrorStream()); | |
65 expect(await events.next, 1); | |
66 expect(await events.next, 2); | |
67 expect(events.next, throwsA("To err is divine!")); | |
68 expect(await events.next, 4); | |
69 await events.cancel(); | |
70 }); | |
71 }); | |
72 | |
73 group("skip operation", () { | |
74 test("of two elements in the middle of sequence", () async { | |
75 var events = new StreamQueue<int>(createStream()); | |
76 expect(await events.next, 1); | |
77 expect(await events.skip(2), 0); | |
78 expect(await events.next, 4); | |
79 await events.cancel(); | |
80 }); | |
81 | |
82 test("with negative/bad arguments throws", () async { | |
83 var events = new StreamQueue<int>(createStream()); | |
84 expect(() => events.skip(-1), throwsArgumentError); | |
85 // A non-int throws either a type error or an argument error, | |
86 // depending on whether it's checked mode or not. | |
87 expect(await events.next, 1); // Did not consume event. | |
88 expect(() => events.skip(-1), throwsArgumentError); | |
89 expect(await events.next, 2); // Did not consume event. | |
90 await events.cancel(); | |
91 }); | |
92 | |
93 test("of 0 elements works", () async { | |
94 var events = new StreamQueue<int>(createStream()); | |
95 expect(events.skip(0), completion(0)); | |
96 expect(events.next, completion(1)); | |
97 expect(events.skip(0), completion(0)); | |
98 expect(events.next, completion(2)); | |
99 expect(events.skip(0), completion(0)); | |
100 expect(events.next, completion(3)); | |
101 expect(events.skip(0), completion(0)); | |
102 expect(events.next, completion(4)); | |
103 expect(events.skip(0), completion(0)); | |
104 expect(events.skip(5), completion(5)); | |
105 expect(events.next, throwsStateError); | |
106 await events.cancel(); | |
107 }); | |
108 | |
109 test("of too many events ends at stream start", () async { | |
110 var events = new StreamQueue<int>(createStream()); | |
111 expect(await events.skip(6), 2); | |
112 await events.cancel(); | |
113 }); | |
114 | |
115 test("of too many events after some events", () async { | |
116 var events = new StreamQueue<int>(createStream()); | |
117 expect(await events.next, 1); | |
118 expect(await events.next, 2); | |
119 expect(await events.skip(6), 4); | |
120 await events.cancel(); | |
121 }); | |
122 | |
123 test("of too many events ends at stream end", () async { | |
124 var events = new StreamQueue<int>(createStream()); | |
125 expect(await events.next, 1); | |
126 expect(await events.next, 2); | |
127 expect(await events.next, 3); | |
128 expect(await events.next, 4); | |
129 expect(await events.skip(2), 2); | |
130 await events.cancel(); | |
131 }); | |
132 | |
133 test("of events with error", () async { | |
134 var events = new StreamQueue<int>(createErrorStream()); | |
135 expect(events.skip(4), throwsA("To err is divine!")); | |
136 expect(await events.next, 4); | |
137 await events.cancel(); | |
138 }); | |
139 | |
140 test("of events with error, and skip again after", () async { | |
141 var events = new StreamQueue<int>(createErrorStream()); | |
142 expect(events.skip(4), throwsA("To err is divine!")); | |
143 expect(events.skip(2), completion(1)); | |
144 await events.cancel(); | |
145 }); | |
146 test("multiple skips at same time complete in order.", () async { | |
147 var events = new StreamQueue<int>(createStream()); | |
148 var skip1 = events.skip(1); | |
149 var skip2 = events.skip(0); | |
150 var skip3 = events.skip(4); | |
151 var skip4 = events.skip(1); | |
152 var index = 0; | |
153 // Check that futures complete in order. | |
154 sequence(expectedValue, sequenceIndex) => (value) { | |
155 expect(value, expectedValue); | |
156 expect(index, sequenceIndex); | |
157 index++; | |
158 }; | |
159 await Future.wait([skip1.then(sequence(0, 0)), | |
160 skip2.then(sequence(0, 1)), | |
161 skip3.then(sequence(1, 2)), | |
162 skip4.then(sequence(1, 3))]); | |
163 await events.cancel(); | |
164 }); | |
165 }); | |
166 | |
167 group("take operation", () { | |
168 test("as simple take of events", () async { | |
169 var events = new StreamQueue<int>(createStream()); | |
170 expect(await events.next, 1); | |
171 expect(await events.take(2), [2, 3]); | |
172 expect(await events.next, 4); | |
173 await events.cancel(); | |
174 }); | |
175 | |
176 test("of 0 events", () async { | |
177 var events = new StreamQueue<int>(createStream()); | |
178 expect(events.take(0), completion([])); | |
179 expect(events.next, completion(1)); | |
180 expect(events.take(0), completion([])); | |
181 expect(events.next, completion(2)); | |
182 expect(events.take(0), completion([])); | |
183 expect(events.next, completion(3)); | |
184 expect(events.take(0), completion([])); | |
185 expect(events.next, completion(4)); | |
186 expect(events.take(0), completion([])); | |
187 expect(events.take(5), completion([])); | |
188 expect(events.next, throwsStateError); | |
189 await events.cancel(); | |
190 }); | |
191 | |
192 test("with bad arguments throws", () async { | |
193 var events = new StreamQueue<int>(createStream()); | |
194 expect(() => events.take(-1), throwsArgumentError); | |
195 expect(await events.next, 1); // Did not consume event. | |
196 expect(() => events.take(-1), throwsArgumentError); | |
197 expect(await events.next, 2); // Did not consume event. | |
198 await events.cancel(); | |
199 }); | |
200 | |
201 test("of too many arguments", () async { | |
202 var events = new StreamQueue<int>(createStream()); | |
203 expect(await events.take(6), [1, 2, 3, 4]); | |
204 await events.cancel(); | |
205 }); | |
206 | |
207 test("too large later", () async { | |
208 var events = new StreamQueue<int>(createStream()); | |
209 expect(await events.next, 1); | |
210 expect(await events.next, 2); | |
211 expect(await events.take(6), [3, 4]); | |
212 await events.cancel(); | |
213 }); | |
214 | |
215 test("error", () async { | |
216 var events = new StreamQueue<int>(createErrorStream()); | |
217 expect(events.take(4), throwsA("To err is divine!")); | |
218 expect(await events.next, 4); | |
219 await events.cancel(); | |
220 }); | |
221 }); | |
222 | |
223 group("rest operation", () { | |
224 test("after single next", () async { | |
225 var events = new StreamQueue<int>(createStream()); | |
226 expect(await events.next, 1); | |
227 expect(await events.rest.toList(), [2, 3, 4]); | |
228 }); | |
229 | |
230 test("at start", () async { | |
231 var events = new StreamQueue<int>(createStream()); | |
232 expect(await events.rest.toList(), [1, 2, 3, 4]); | |
233 }); | |
234 | |
235 test("at end", () async { | |
236 var events = new StreamQueue<int>(createStream()); | |
237 expect(await events.next, 1); | |
238 expect(await events.next, 2); | |
239 expect(await events.next, 3); | |
240 expect(await events.next, 4); | |
241 expect(await events.rest.toList(), isEmpty); | |
242 }); | |
243 | |
244 test("after end", () async { | |
245 var events = new StreamQueue<int>(createStream()); | |
246 expect(await events.next, 1); | |
247 expect(await events.next, 2); | |
248 expect(await events.next, 3); | |
249 expect(await events.next, 4); | |
250 expect(events.next, throwsStateError); | |
251 expect(await events.rest.toList(), isEmpty); | |
252 }); | |
253 | |
254 test("after receiving done requested before", () async { | |
255 var events = new StreamQueue<int>(createStream()); | |
256 var next1 = events.next; | |
257 var next2 = events.next; | |
258 var next3 = events.next; | |
259 var rest = events.rest; | |
260 for (int i = 0; i < 10; i++) { | |
261 await flushMicrotasks(); | |
262 } | |
263 expect(await next1, 1); | |
264 expect(await next2, 2); | |
265 expect(await next3, 3); | |
266 expect(await rest.toList(), [4]); | |
267 }); | |
268 | |
269 test("with an error event error", () async { | |
270 var events = new StreamQueue<int>(createErrorStream()); | |
271 expect(await events.next, 1); | |
272 var rest = events.rest; | |
273 var events2 = new StreamQueue(rest); | |
274 expect(await events2.next, 2); | |
275 expect(events2.next, throwsA("To err is divine!")); | |
276 expect(await events2.next, 4); | |
277 }); | |
278 | |
279 test("closes the events, prevents other operations", () async { | |
280 var events = new StreamQueue<int>(createStream()); | |
281 var stream = events.rest; | |
282 expect(() => events.next, throwsStateError); | |
283 expect(() => events.skip(1), throwsStateError); | |
284 expect(() => events.take(1), throwsStateError); | |
285 expect(() => events.rest, throwsStateError); | |
286 expect(() => events.cancel(), throwsStateError); | |
287 expect(stream.toList(), completion([1, 2, 3, 4])); | |
288 }); | |
289 | |
290 test("forwards to underlying stream", () async { | |
291 var cancel = new Completer(); | |
292 var controller = new StreamController(onCancel: () => cancel.future); | |
293 var events = new StreamQueue<int>(controller.stream); | |
294 expect(controller.hasListener, isFalse); | |
295 var next = events.next; | |
296 expect(controller.hasListener, isTrue); | |
297 expect(controller.isPaused, isFalse); | |
298 | |
299 controller.add(1); | |
300 expect(await next, 1); | |
301 expect(controller.isPaused, isTrue); | |
302 | |
303 var rest = events.rest; | |
304 var subscription = rest.listen(null); | |
305 expect(controller.hasListener, isTrue); | |
306 expect(controller.isPaused, isFalse); | |
307 | |
308 var lastEvent; | |
309 subscription.onData((value) => lastEvent = value); | |
310 | |
311 controller.add(2); | |
312 | |
313 await flushMicrotasks(); | |
314 expect(lastEvent, 2); | |
315 expect(controller.hasListener, isTrue); | |
316 expect(controller.isPaused, isFalse); | |
317 | |
318 subscription.pause(); | |
319 expect(controller.isPaused, isTrue); | |
320 | |
321 controller.add(3); | |
322 | |
323 await flushMicrotasks(); | |
324 expect(lastEvent, 2); | |
325 subscription.resume(); | |
326 | |
327 await flushMicrotasks(); | |
328 expect(lastEvent, 3); | |
329 | |
330 var cancelFuture = subscription.cancel(); | |
331 expect(controller.hasListener, isFalse); | |
332 cancel.complete(42); | |
333 expect(cancelFuture, completion(42)); | |
334 }); | |
335 }); | |
336 | |
337 group("cancel operation", () { | |
338 test("closes the events, prevents any other operation", () async { | |
339 var events = new StreamQueue<int>(createStream()); | |
340 await events.cancel(); | |
341 expect(() => events.next, throwsStateError); | |
342 expect(() => events.skip(1), throwsStateError); | |
343 expect(() => events.take(1), throwsStateError); | |
344 expect(() => events.rest, throwsStateError); | |
345 expect(() => events.cancel(), throwsStateError); | |
346 }); | |
347 | |
348 test("cancels underlying subscription when called before any event", | |
349 () async { | |
350 var cancelFuture = new Future.value(42); | |
351 var controller = new StreamController(onCancel: () => cancelFuture); | |
352 var events = new StreamQueue<int>(controller.stream); | |
353 expect(await events.cancel(), 42); | |
354 }); | |
355 | |
356 test("cancels underlying subscription, returns result", () async { | |
357 var cancelFuture = new Future.value(42); | |
358 var controller = new StreamController(onCancel: () => cancelFuture); | |
359 var events = new StreamQueue<int>(controller.stream); | |
360 controller.add(1); | |
361 expect(await events.next, 1); | |
362 expect(await events.cancel(), 42); | |
363 }); | |
364 | |
365 group("with immediate: true", () { | |
366 test("closes the events, prevents any other operation", () async { | |
367 var events = new StreamQueue<int>(createStream()); | |
368 await events.cancel(immediate: true); | |
369 expect(() => events.next, throwsStateError); | |
370 expect(() => events.skip(1), throwsStateError); | |
371 expect(() => events.take(1), throwsStateError); | |
372 expect(() => events.rest, throwsStateError); | |
373 expect(() => events.cancel(), throwsStateError); | |
374 }); | |
375 | |
376 test("cancels the underlying subscription immediately", () async { | |
377 var controller = new StreamController(); | |
378 controller.add(1); | |
379 | |
380 var events = new StreamQueue<int>(controller.stream); | |
381 expect(await events.next, 1); | |
382 expect(controller.hasListener, isTrue); | |
383 | |
384 events.cancel(immediate: true); | |
385 await expect(controller.hasListener, isFalse); | |
386 }); | |
387 | |
388 test("cancels the underlying subscription when called before any event", | |
389 () async { | |
390 var cancelFuture = new Future.value(42); | |
391 var controller = new StreamController(onCancel: () => cancelFuture); | |
392 | |
393 var events = new StreamQueue<int>(controller.stream); | |
394 expect(await events.cancel(immediate: true), 42); | |
395 }); | |
396 | |
397 test("closes pending requests", () async { | |
398 var events = new StreamQueue<int>(createStream()); | |
399 expect(await events.next, 1); | |
400 expect(events.next, throwsStateError); | |
401 expect(events.hasNext, completion(isFalse)); | |
402 | |
403 await events.cancel(immediate: true); | |
404 }); | |
405 | |
406 test("returns the result of closing the underlying subscription", | |
407 () async { | |
408 var controller = new StreamController( | |
409 onCancel: () => new Future.value(42)); | |
410 var events = new StreamQueue<int>(controller.stream); | |
411 expect(await events.cancel(immediate: true), 42); | |
412 }); | |
413 | |
414 test("listens and then cancels a stream that hasn't been listened to yet", | |
415 () async { | |
416 var wasListened = false; | |
417 var controller = new StreamController( | |
418 onListen: () => wasListened = true); | |
419 var events = new StreamQueue<int>(controller.stream); | |
420 expect(wasListened, isFalse); | |
421 expect(controller.hasListener, isFalse); | |
422 | |
423 await events.cancel(immediate: true); | |
424 expect(wasListened, isTrue); | |
425 expect(controller.hasListener, isFalse); | |
426 }); | |
427 }); | |
428 }); | |
429 | |
430 group("hasNext operation", () { | |
431 test("true at start", () async { | |
432 var events = new StreamQueue<int>(createStream()); | |
433 expect(await events.hasNext, isTrue); | |
434 }); | |
435 | |
436 test("true after start", () async { | |
437 var events = new StreamQueue<int>(createStream()); | |
438 expect(await events.next, 1); | |
439 expect(await events.hasNext, isTrue); | |
440 }); | |
441 | |
442 test("true at end", () async { | |
443 var events = new StreamQueue<int>(createStream()); | |
444 for (int i = 1; i <= 4; i++) { | |
445 expect(await events.next, i); | |
446 } | |
447 expect(await events.hasNext, isFalse); | |
448 }); | |
449 | |
450 test("true when enqueued", () async { | |
451 var events = new StreamQueue<int>(createStream()); | |
452 var values = []; | |
453 for (int i = 1; i <= 3; i++) { | |
454 events.next.then(values.add); | |
455 } | |
456 expect(values, isEmpty); | |
457 expect(await events.hasNext, isTrue); | |
458 expect(values, [1, 2, 3]); | |
459 }); | |
460 | |
461 test("false when enqueued", () async { | |
462 var events = new StreamQueue<int>(createStream()); | |
463 var values = []; | |
464 for (int i = 1; i <= 4; i++) { | |
465 events.next.then(values.add); | |
466 } | |
467 expect(values, isEmpty); | |
468 expect(await events.hasNext, isFalse); | |
469 expect(values, [1, 2, 3, 4]); | |
470 }); | |
471 | |
472 test("true when data event", () async { | |
473 var controller = new StreamController(); | |
474 var events = new StreamQueue<int>(controller.stream); | |
475 | |
476 var hasNext; | |
477 events.hasNext.then((result) { hasNext = result; }); | |
478 await flushMicrotasks(); | |
479 expect(hasNext, isNull); | |
480 controller.add(42); | |
481 expect(hasNext, isNull); | |
482 await flushMicrotasks(); | |
483 expect(hasNext, isTrue); | |
484 }); | |
485 | |
486 test("true when error event", () async { | |
487 var controller = new StreamController(); | |
488 var events = new StreamQueue<int>(controller.stream); | |
489 | |
490 var hasNext; | |
491 events.hasNext.then((result) { hasNext = result; }); | |
492 await flushMicrotasks(); | |
493 expect(hasNext, isNull); | |
494 controller.addError("BAD"); | |
495 expect(hasNext, isNull); | |
496 await flushMicrotasks(); | |
497 expect(hasNext, isTrue); | |
498 expect(events.next, throwsA("BAD")); | |
499 }); | |
500 | |
501 test("- hasNext after hasNext", () async { | |
502 var events = new StreamQueue<int>(createStream()); | |
503 expect(await events.hasNext, true); | |
504 expect(await events.hasNext, true); | |
505 expect(await events.next, 1); | |
506 expect(await events.hasNext, true); | |
507 expect(await events.hasNext, true); | |
508 expect(await events.next, 2); | |
509 expect(await events.hasNext, true); | |
510 expect(await events.hasNext, true); | |
511 expect(await events.next, 3); | |
512 expect(await events.hasNext, true); | |
513 expect(await events.hasNext, true); | |
514 expect(await events.next, 4); | |
515 expect(await events.hasNext, false); | |
516 expect(await events.hasNext, false); | |
517 }); | |
518 | |
519 test("- next after true", () async { | |
520 var events = new StreamQueue<int>(createStream()); | |
521 expect(await events.next, 1); | |
522 expect(await events.hasNext, true); | |
523 expect(await events.next, 2); | |
524 expect(await events.next, 3); | |
525 }); | |
526 | |
527 test("- next after true, enqueued", () async { | |
528 var events = new StreamQueue<int>(createStream()); | |
529 var responses = []; | |
530 events.next.then(responses.add); | |
531 events.hasNext.then(responses.add); | |
532 events.next.then(responses.add); | |
533 do { | |
534 await flushMicrotasks(); | |
535 } while (responses.length < 3); | |
536 expect(responses, [1, true, 2]); | |
537 }); | |
538 | |
539 test("- skip 0 after true", () async { | |
540 var events = new StreamQueue<int>(createStream()); | |
541 expect(await events.next, 1); | |
542 expect(await events.hasNext, true); | |
543 expect(await events.skip(0), 0); | |
544 expect(await events.next, 2); | |
545 }); | |
546 | |
547 test("- skip 1 after true", () async { | |
548 var events = new StreamQueue<int>(createStream()); | |
549 expect(await events.next, 1); | |
550 expect(await events.hasNext, true); | |
551 expect(await events.skip(1), 0); | |
552 expect(await events.next, 3); | |
553 }); | |
554 | |
555 test("- skip 2 after true", () async { | |
556 var events = new StreamQueue<int>(createStream()); | |
557 expect(await events.next, 1); | |
558 expect(await events.hasNext, true); | |
559 expect(await events.skip(2), 0); | |
560 expect(await events.next, 4); | |
561 }); | |
562 | |
563 test("- take 0 after true", () async { | |
564 var events = new StreamQueue<int>(createStream()); | |
565 expect(await events.next, 1); | |
566 expect(await events.hasNext, true); | |
567 expect(await events.take(0), isEmpty); | |
568 expect(await events.next, 2); | |
569 }); | |
570 | |
571 test("- take 1 after true", () async { | |
572 var events = new StreamQueue<int>(createStream()); | |
573 expect(await events.next, 1); | |
574 expect(await events.hasNext, true); | |
575 expect(await events.take(1), [2]); | |
576 expect(await events.next, 3); | |
577 }); | |
578 | |
579 test("- take 2 after true", () async { | |
580 var events = new StreamQueue<int>(createStream()); | |
581 expect(await events.next, 1); | |
582 expect(await events.hasNext, true); | |
583 expect(await events.take(2), [2, 3]); | |
584 expect(await events.next, 4); | |
585 }); | |
586 | |
587 test("- rest after true", () async { | |
588 var events = new StreamQueue<int>(createStream()); | |
589 expect(await events.next, 1); | |
590 expect(await events.hasNext, true); | |
591 var stream = events.rest; | |
592 expect(await stream.toList(), [2, 3, 4]); | |
593 }); | |
594 | |
595 test("- rest after true, at last", () async { | |
596 var events = new StreamQueue<int>(createStream()); | |
597 expect(await events.next, 1); | |
598 expect(await events.next, 2); | |
599 expect(await events.next, 3); | |
600 expect(await events.hasNext, true); | |
601 var stream = events.rest; | |
602 expect(await stream.toList(), [4]); | |
603 }); | |
604 | |
605 test("- rest after false", () async { | |
606 var events = new StreamQueue<int>(createStream()); | |
607 expect(await events.next, 1); | |
608 expect(await events.next, 2); | |
609 expect(await events.next, 3); | |
610 expect(await events.next, 4); | |
611 expect(await events.hasNext, false); | |
612 var stream = events.rest; | |
613 expect(await stream.toList(), isEmpty); | |
614 }); | |
615 | |
616 test("- cancel after true on data", () async { | |
617 var events = new StreamQueue<int>(createStream()); | |
618 expect(await events.next, 1); | |
619 expect(await events.next, 2); | |
620 expect(await events.hasNext, true); | |
621 expect(await events.cancel(), null); | |
622 }); | |
623 | |
624 test("- cancel after true on error", () async { | |
625 var events = new StreamQueue<int>(createErrorStream()); | |
626 expect(await events.next, 1); | |
627 expect(await events.next, 2); | |
628 expect(await events.hasNext, true); | |
629 expect(await events.cancel(), null); | |
630 }); | |
631 }); | |
632 | |
633 group("fork operation", () { | |
634 test("produces a stream queue with the same events", () async { | |
635 var queue1 = new StreamQueue<int>(createStream()); | |
636 var queue2 = queue1.fork(); | |
637 | |
638 expect(await queue1.next, 1); | |
639 expect(await queue1.next, 2); | |
640 expect(await queue1.next, 3); | |
641 expect(await queue1.next, 4); | |
642 expect(await queue1.hasNext, isFalse); | |
643 | |
644 expect(await queue2.next, 1); | |
645 expect(await queue2.next, 2); | |
646 expect(await queue2.next, 3); | |
647 expect(await queue2.next, 4); | |
648 expect(await queue2.hasNext, isFalse); | |
649 }); | |
650 | |
651 test("produces a stream queue with the same errors", () async { | |
652 var queue1 = new StreamQueue<int>(createErrorStream()); | |
653 var queue2 = queue1.fork(); | |
654 | |
655 expect(await queue1.next, 1); | |
656 expect(await queue1.next, 2); | |
657 expect(queue1.next, throwsA("To err is divine!")); | |
658 expect(await queue1.next, 4); | |
659 expect(await queue1.hasNext, isFalse); | |
660 | |
661 expect(await queue2.next, 1); | |
662 expect(await queue2.next, 2); | |
663 expect(queue2.next, throwsA("To err is divine!")); | |
664 expect(await queue2.next, 4); | |
665 expect(await queue2.hasNext, isFalse); | |
666 }); | |
667 | |
668 test("forks at the current point in the source queue", () { | |
669 var queue1 = new StreamQueue<int>(createStream()); | |
670 | |
671 expect(queue1.next, completion(1)); | |
672 expect(queue1.next, completion(2)); | |
673 | |
674 var queue2 = queue1.fork(); | |
675 | |
676 expect(queue1.next, completion(3)); | |
677 expect(queue1.next, completion(4)); | |
678 expect(queue1.hasNext, completion(isFalse)); | |
679 | |
680 expect(queue2.next, completion(3)); | |
681 expect(queue2.next, completion(4)); | |
682 expect(queue2.hasNext, completion(isFalse)); | |
683 }); | |
684 | |
685 test("can be created after there are pending values", () async { | |
686 var queue1 = new StreamQueue<int>(createStream()); | |
687 await flushMicrotasks(); | |
688 | |
689 var queue2 = queue1.fork(); | |
690 expect(await queue2.next, 1); | |
691 expect(await queue2.next, 2); | |
692 expect(await queue2.next, 3); | |
693 expect(await queue2.next, 4); | |
694 expect(await queue2.hasNext, isFalse); | |
695 }); | |
696 | |
697 test("multiple forks can be created at different points", () async { | |
698 var queue1 = new StreamQueue<int>(createStream()); | |
699 | |
700 var queue2 = queue1.fork(); | |
701 expect(await queue1.next, 1); | |
702 expect(await queue2.next, 1); | |
703 | |
704 var queue3 = queue1.fork(); | |
705 expect(await queue1.next, 2); | |
706 expect(await queue2.next, 2); | |
707 expect(await queue3.next, 2); | |
708 | |
709 var queue4 = queue1.fork(); | |
710 expect(await queue1.next, 3); | |
711 expect(await queue2.next, 3); | |
712 expect(await queue3.next, 3); | |
713 expect(await queue4.next, 3); | |
714 | |
715 var queue5 = queue1.fork(); | |
716 expect(await queue1.next, 4); | |
717 expect(await queue2.next, 4); | |
718 expect(await queue3.next, 4); | |
719 expect(await queue4.next, 4); | |
720 expect(await queue5.next, 4); | |
721 | |
722 var queue6 = queue1.fork(); | |
723 expect(await queue1.hasNext, isFalse); | |
724 expect(await queue2.hasNext, isFalse); | |
725 expect(await queue3.hasNext, isFalse); | |
726 expect(await queue4.hasNext, isFalse); | |
727 expect(await queue5.hasNext, isFalse); | |
728 expect(await queue6.hasNext, isFalse); | |
729 }); | |
730 | |
731 test("same-level forks receive data in the order they were created", | |
732 () async { | |
733 var queue1 = new StreamQueue<int>(createStream()); | |
734 var queue2 = queue1.fork(); | |
735 var queue3 = queue1.fork(); | |
736 var queue4 = queue1.fork(); | |
737 var queue5 = queue1.fork(); | |
738 | |
739 for (var i = 0; i < 4; i++) { | |
740 var queue1Fired = false; | |
741 var queue2Fired = false; | |
742 var queue3Fired = false; | |
743 var queue4Fired = false; | |
744 var queue5Fired = false; | |
745 | |
746 queue5.next.then(expectAsync((_) { | |
747 queue5Fired = true; | |
748 expect(queue1Fired, isTrue); | |
749 expect(queue2Fired, isTrue); | |
750 expect(queue3Fired, isTrue); | |
751 expect(queue4Fired, isTrue); | |
752 })); | |
753 | |
754 queue1.next.then(expectAsync((_) { | |
755 queue1Fired = true; | |
756 expect(queue2Fired, isFalse); | |
757 expect(queue3Fired, isFalse); | |
758 expect(queue4Fired, isFalse); | |
759 expect(queue5Fired, isFalse); | |
760 })); | |
761 | |
762 queue4.next.then(expectAsync((_) { | |
763 queue4Fired = true; | |
764 expect(queue1Fired, isTrue); | |
765 expect(queue2Fired, isTrue); | |
766 expect(queue3Fired, isTrue); | |
767 expect(queue5Fired, isFalse); | |
768 })); | |
769 | |
770 queue2.next.then(expectAsync((_) { | |
771 queue2Fired = true; | |
772 expect(queue1Fired, isTrue); | |
773 expect(queue3Fired, isFalse); | |
774 expect(queue4Fired, isFalse); | |
775 expect(queue5Fired, isFalse); | |
776 })); | |
777 | |
778 queue3.next.then(expectAsync((_) { | |
779 queue3Fired = true; | |
780 expect(queue1Fired, isTrue); | |
781 expect(queue2Fired, isTrue); | |
782 expect(queue4Fired, isFalse); | |
783 expect(queue5Fired, isFalse); | |
784 })); | |
785 } | |
786 }); | |
787 | |
788 test("forks can be created from forks", () async { | |
789 var queue1 = new StreamQueue<int>(createStream()); | |
790 | |
791 var queue2 = queue1.fork(); | |
792 expect(await queue1.next, 1); | |
793 expect(await queue2.next, 1); | |
794 | |
795 var queue3 = queue2.fork(); | |
796 expect(await queue1.next, 2); | |
797 expect(await queue2.next, 2); | |
798 expect(await queue3.next, 2); | |
799 | |
800 var queue4 = queue3.fork(); | |
801 expect(await queue1.next, 3); | |
802 expect(await queue2.next, 3); | |
803 expect(await queue3.next, 3); | |
804 expect(await queue4.next, 3); | |
805 | |
806 var queue5 = queue4.fork(); | |
807 expect(await queue1.next, 4); | |
808 expect(await queue2.next, 4); | |
809 expect(await queue3.next, 4); | |
810 expect(await queue4.next, 4); | |
811 expect(await queue5.next, 4); | |
812 | |
813 var queue6 = queue5.fork(); | |
814 expect(await queue1.hasNext, isFalse); | |
815 expect(await queue2.hasNext, isFalse); | |
816 expect(await queue3.hasNext, isFalse); | |
817 expect(await queue4.hasNext, isFalse); | |
818 expect(await queue5.hasNext, isFalse); | |
819 expect(await queue6.hasNext, isFalse); | |
820 }); | |
821 | |
822 group("canceling:", () { | |
823 test("cancelling a fork doesn't cancel its source", () async { | |
824 var queue1 = new StreamQueue<int>(createStream()); | |
825 var queue2 = queue1.fork(); | |
826 | |
827 queue2.cancel(); | |
828 expect(() => queue2.next, throwsStateError); | |
829 | |
830 expect(await queue1.next, 1); | |
831 expect(await queue1.next, 2); | |
832 expect(await queue1.next, 3); | |
833 expect(await queue1.next, 4); | |
834 expect(await queue1.hasNext, isFalse); | |
835 }); | |
836 | |
837 test("cancelling a source doesn't cancel its unmaterialized fork", | |
838 () async { | |
839 var queue1 = new StreamQueue<int>(createStream()); | |
840 var queue2 = queue1.fork(); | |
841 | |
842 queue1.cancel(); | |
843 expect(() => queue1.next, throwsStateError); | |
844 | |
845 expect(await queue2.next, 1); | |
846 expect(await queue2.next, 2); | |
847 expect(await queue2.next, 3); | |
848 expect(await queue2.next, 4); | |
849 expect(await queue2.hasNext, isFalse); | |
850 }); | |
851 | |
852 test("cancelling a source doesn't cancel its materialized fork", | |
853 () async { | |
854 var queue1 = new StreamQueue<int>(createStream()); | |
855 var queue2 = queue1.fork(); | |
856 | |
857 expect(await queue1.next, 1); | |
858 | |
859 queue1.cancel(); | |
860 expect(() => queue1.next, throwsStateError); | |
861 | |
862 expect(await queue2.next, 1); | |
863 expect(await queue2.next, 2); | |
864 expect(await queue2.next, 3); | |
865 expect(await queue2.next, 4); | |
866 expect(await queue2.hasNext, isFalse); | |
867 }); | |
868 | |
869 test("the underlying stream is only canceled once all forks are canceled", | |
870 () async { | |
871 var controller = new StreamController(); | |
872 var queue1 = new StreamQueue<int>(controller.stream); | |
873 var queue2 = queue1.fork(); | |
874 | |
875 await flushMicrotasks(); | |
876 expect(controller.hasListener, isFalse); | |
877 | |
878 expect(queue1.next, completion(1)); | |
879 await flushMicrotasks(); | |
880 expect(controller.hasListener, isTrue); | |
881 | |
882 queue2.cancel(); | |
883 await flushMicrotasks(); | |
884 expect(controller.hasListener, isTrue); | |
885 | |
886 controller.add(1); | |
887 queue1.cancel(); | |
888 await flushMicrotasks(); | |
889 expect(controller.hasListener, isFalse); | |
890 }); | |
891 | |
892 group("with immediate,", () { | |
893 test("cancelling a fork doesn't cancel its source", () async { | |
894 var queue1 = new StreamQueue<int>(createStream()); | |
895 var queue2 = queue1.fork(); | |
896 | |
897 queue2.cancel(immediate: true); | |
898 expect(() => queue2.next, throwsStateError); | |
899 | |
900 expect(await queue1.next, 1); | |
901 expect(await queue1.next, 2); | |
902 expect(await queue1.next, 3); | |
903 expect(await queue1.next, 4); | |
904 expect(await queue1.hasNext, isFalse); | |
905 }); | |
906 | |
907 test("cancelling a source doesn't cancel its unmaterialized fork", | |
908 () async { | |
909 var queue1 = new StreamQueue<int>(createStream()); | |
910 var queue2 = queue1.fork(); | |
911 | |
912 queue1.cancel(immediate: true); | |
913 expect(() => queue1.next, throwsStateError); | |
914 | |
915 expect(await queue2.next, 1); | |
916 expect(await queue2.next, 2); | |
917 expect(await queue2.next, 3); | |
918 expect(await queue2.next, 4); | |
919 expect(await queue2.hasNext, isFalse); | |
920 }); | |
921 | |
922 test("cancelling a source doesn't cancel its materialized fork", | |
923 () async { | |
924 var queue1 = new StreamQueue<int>(createStream()); | |
925 var queue2 = queue1.fork(); | |
926 | |
927 expect(await queue1.next, 1); | |
928 | |
929 queue1.cancel(immediate: true); | |
930 expect(() => queue1.next, throwsStateError); | |
931 | |
932 expect(await queue2.next, 1); | |
933 expect(await queue2.next, 2); | |
934 expect(await queue2.next, 3); | |
935 expect(await queue2.next, 4); | |
936 expect(await queue2.hasNext, isFalse); | |
937 }); | |
938 | |
939 test("the underlying stream is only canceled once all forks are " | |
940 "canceled", () async { | |
941 var controller = new StreamController(); | |
942 var queue1 = new StreamQueue<int>(controller.stream); | |
943 var queue2 = queue1.fork(); | |
944 | |
945 await flushMicrotasks(); | |
946 expect(controller.hasListener, isFalse); | |
947 | |
948 expect(queue1.next, throwsStateError); | |
949 await flushMicrotasks(); | |
950 expect(controller.hasListener, isTrue); | |
951 | |
952 queue2.cancel(immediate: true); | |
953 await flushMicrotasks(); | |
954 expect(controller.hasListener, isTrue); | |
955 | |
956 queue1.cancel(immediate: true); | |
957 await flushMicrotasks(); | |
958 expect(controller.hasListener, isFalse); | |
959 }); | |
960 }); | |
961 }); | |
962 | |
963 group("pausing:", () { | |
964 test("the underlying stream is only implicitly paused when no forks are " | |
965 "awaiting input", () async { | |
966 var controller = new StreamController(); | |
967 var queue1 = new StreamQueue<int>(controller.stream); | |
968 var queue2 = queue1.fork(); | |
969 | |
970 controller.add(1); | |
971 expect(await queue1.next, 1); | |
972 expect(controller.hasListener, isTrue); | |
973 expect(controller.isPaused, isTrue); | |
974 | |
975 expect(queue1.next, completion(2)); | |
976 await flushMicrotasks(); | |
977 expect(controller.isPaused, isFalse); | |
978 | |
979 controller.add(2); | |
980 await flushMicrotasks(); | |
981 expect(controller.isPaused, isTrue); | |
982 | |
983 expect(queue2.next, completion(1)); | |
984 expect(queue2.next, completion(2)); | |
985 expect(queue2.next, completion(3)); | |
986 await flushMicrotasks(); | |
987 expect(controller.isPaused, isFalse); | |
988 | |
989 controller.add(3); | |
990 await flushMicrotasks(); | |
991 expect(controller.isPaused, isTrue); | |
992 }); | |
993 | |
994 test("pausing a fork doesn't pause its source", () async { | |
995 var queue1 = new StreamQueue<int>(createStream()); | |
996 var queue2 = queue1.fork(); | |
997 | |
998 queue2.rest.listen(expectAsync((_) {}, count: 0)).pause(); | |
999 | |
1000 expect(await queue1.next, 1); | |
1001 expect(await queue1.next, 2); | |
1002 expect(await queue1.next, 3); | |
1003 expect(await queue1.next, 4); | |
1004 expect(await queue1.hasNext, isFalse); | |
1005 }); | |
1006 | |
1007 test("pausing a source doesn't pause its fork", () async { | |
1008 var queue1 = new StreamQueue<int>(createStream()); | |
1009 var queue2 = queue1.fork(); | |
1010 | |
1011 queue1.rest.listen(expectAsync((_) {}, count: 0)).pause(); | |
1012 | |
1013 expect(await queue2.next, 1); | |
1014 expect(await queue2.next, 2); | |
1015 expect(await queue2.next, 3); | |
1016 expect(await queue2.next, 4); | |
1017 expect(await queue2.hasNext, isFalse); | |
1018 }); | |
1019 | |
1020 test("the underlying stream is only paused when all forks are paused", | |
1021 () async { | |
1022 var controller = new StreamController(); | |
1023 var queue1 = new StreamQueue<int>(controller.stream); | |
1024 var queue2 = queue1.fork(); | |
1025 | |
1026 await flushMicrotasks(); | |
1027 expect(controller.hasListener, isFalse); | |
1028 | |
1029 var sub1 = queue1.rest.listen(null); | |
1030 await flushMicrotasks(); | |
1031 expect(controller.hasListener, isTrue); | |
1032 expect(controller.isPaused, isFalse); | |
1033 | |
1034 sub1.pause(); | |
1035 await flushMicrotasks(); | |
1036 expect(controller.isPaused, isTrue); | |
1037 | |
1038 expect(queue2.next, completion(1)); | |
1039 await flushMicrotasks(); | |
1040 expect(controller.isPaused, isFalse); | |
1041 | |
1042 controller.add(1); | |
1043 await flushMicrotasks(); | |
1044 expect(controller.isPaused, isTrue); | |
1045 | |
1046 var sub2 = queue2.rest.listen(null); | |
1047 await flushMicrotasks(); | |
1048 expect(controller.isPaused, isFalse); | |
1049 | |
1050 sub2.pause(); | |
1051 await flushMicrotasks(); | |
1052 expect(controller.isPaused, isTrue); | |
1053 | |
1054 sub1.resume(); | |
1055 await flushMicrotasks(); | |
1056 expect(controller.isPaused, isFalse); | |
1057 }); | |
1058 }); | |
1059 }); | |
1060 | |
1061 test("all combinations sequential skip/next/take operations", () async { | |
1062 // Takes all combinations of two of next, skip and take, then ends with | |
1063 // doing rest. Each of the first rounds do 10 events of each type, | |
1064 // the rest does 20 elements. | |
1065 var eventCount = 20 * (3 * 3 + 1); | |
1066 var events = new StreamQueue<int>(createLongStream(eventCount)); | |
1067 | |
1068 // Test expecting [startIndex .. startIndex + 9] as events using | |
1069 // `next`. | |
1070 nextTest(startIndex) { | |
1071 for (int i = 0; i < 10; i++) { | |
1072 expect(events.next, completion(startIndex + i)); | |
1073 } | |
1074 } | |
1075 | |
1076 // Test expecting 10 events to be skipped. | |
1077 skipTest(startIndex) { | |
1078 expect(events.skip(10), completion(0)); | |
1079 } | |
1080 | |
1081 // Test expecting [startIndex .. startIndex + 9] as events using | |
1082 // `take(10)`. | |
1083 takeTest(startIndex) { | |
1084 expect(events.take(10), | |
1085 completion(new List.generate(10, (i) => startIndex + i))); | |
1086 } | |
1087 var tests = [nextTest, skipTest, takeTest]; | |
1088 | |
1089 int counter = 0; | |
1090 // Run through all pairs of two tests and run them. | |
1091 for (int i = 0; i < tests.length; i++) { | |
1092 for (int j = 0; j < tests.length; j++) { | |
1093 tests[i](counter); | |
1094 tests[j](counter + 10); | |
1095 counter += 20; | |
1096 } | |
1097 } | |
1098 // Then expect 20 more events as a `rest` call. | |
1099 expect(events.rest.toList(), | |
1100 completion(new List.generate(20, (i) => counter + i))); | |
1101 }); | |
1102 } | |
1103 | |
1104 Stream<int> createStream() async* { | |
1105 yield 1; | |
1106 await flushMicrotasks(); | |
1107 yield 2; | |
1108 await flushMicrotasks(); | |
1109 yield 3; | |
1110 await flushMicrotasks(); | |
1111 yield 4; | |
1112 } | |
1113 | |
1114 Stream<int> createErrorStream() { | |
1115 StreamController controller = new StreamController<int>(); | |
1116 () async { | |
1117 controller.add(1); | |
1118 await flushMicrotasks(); | |
1119 controller.add(2); | |
1120 await flushMicrotasks(); | |
1121 controller.addError("To err is divine!"); | |
1122 await flushMicrotasks(); | |
1123 controller.add(4); | |
1124 await flushMicrotasks(); | |
1125 controller.close(); | |
1126 }(); | |
1127 return controller.stream; | |
1128 } | |
1129 | |
1130 Stream<int> createLongStream(int eventCount) async* { | |
1131 for (int i = 0; i < eventCount; i++) yield i; | |
1132 } | |
1133 | |
1134 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); | |
OLD | NEW |