OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE filevents. | |
4 | |
5 import "dart:async"; | |
6 | |
7 import "package:async/async.dart" show StreamQueue; | |
8 import "package:test/test.dart"; | |
9 | |
10 main() { | |
11 group("source stream", () { | |
12 test("is listened to on first request, paused between requests", () async { | |
13 var controller = new StreamController(); | |
14 var events = new StreamQueue<int>(controller.stream); | |
15 await flushMicrotasks(); | |
16 expect(controller.hasListener, isFalse); | |
nweiz
2015/06/18 23:44:28
I didn't realize these getters existed! Consider u
Lasse Reichstein Nielsen
2015/06/30 10:34:15
Will try. It's not always possible to see the diff
| |
17 var next = events.next; | |
18 expect(controller.hasListener, isTrue); | |
19 expect(controller.isPaused, isFalse); | |
20 controller.add(1); | |
21 expect(await next, 1); | |
22 expect(controller.hasListener, isTrue); | |
23 expect(controller.isPaused, isTrue); | |
24 next = events.next; | |
25 expect(controller.hasListener, isTrue); | |
26 expect(controller.isPaused, isFalse); | |
27 controller.add(2); | |
28 expect(await next, 2); | |
29 expect(controller.hasListener, isTrue); | |
30 expect(controller.isPaused, isTrue); | |
31 var cancel = events.cancel(); | |
32 expect(controller.hasListener, isFalse); | |
nweiz
2015/06/18 23:44:28
A few blank lines would go a long way in making th
Lasse Reichstein Nielsen
2015/06/30 10:34:15
Done.
| |
33 }); | |
34 }); | |
35 | |
36 group("next operation", () { | |
37 test("simple sequence of requests", () async { | |
38 var events = new StreamQueue<int>(createStream()); | |
39 for (int i = 1; i <= 4; i++) { | |
40 expect(await events.next, i); | |
41 } | |
42 expect(events.next, throwsStateError); | |
43 }); | |
44 | |
45 test("multiple requests at the same time", () async { | |
46 var events = new StreamQueue<int>(createStream()); | |
47 var result = await Future.wait( | |
48 [events.next, events.next, events.next, events.next]); | |
49 expect(result, [1, 2, 3, 4]); | |
50 await events.cancel(); | |
51 }); | |
52 | |
53 test("sequence of requests with error", () async { | |
54 var events = new StreamQueue<int>(createErrorStream()); | |
55 expect(await events.next, 1); | |
56 expect(await events.next, 2); | |
57 expect(events.next, throws); | |
nweiz
2015/06/18 23:44:28
Check the error's value. It would be awkward if th
Lasse Reichstein Nielsen
2015/06/30 10:34:15
Done.
| |
58 expect(await events.next, 4); | |
59 await events.cancel(); | |
60 }); | |
61 }); | |
62 | |
63 group("skip operation", () { | |
64 test("of two elements in the middle of sequence", () async { | |
65 var events = new StreamQueue<int>(createStream()); | |
66 expect(await events.next, 1); | |
67 expect(await events.skip(2), 0); | |
68 expect(await events.next, 4); | |
69 await events.cancel(); | |
70 }); | |
71 | |
72 test("with negative/bad arguments throws", () async { | |
73 var events = new StreamQueue<int>(createStream()); | |
74 expect(() => events.skip(-1), throwsArgumentError); | |
75 // A non-int throws either a type error or an argument error, | |
76 // depending on whether it's checked mode or not. | |
77 expect(await events.next, 1); // Did not consume event. | |
78 expect(() => events.skip(-1), throwsArgumentError); | |
79 expect(await events.next, 2); // Did not consume event. | |
80 await events.cancel(); | |
81 }); | |
82 | |
83 test("of 0 elements works", () async { | |
84 var events = new StreamQueue<int>(createStream()); | |
85 expect(events.skip(0), completion(0)); | |
86 expect(events.next, completion(1)); | |
87 expect(events.skip(0), completion(0)); | |
88 expect(events.next, completion(2)); | |
89 expect(events.skip(0), completion(0)); | |
90 expect(events.next, completion(3)); | |
91 expect(events.skip(0), completion(0)); | |
92 expect(events.next, completion(4)); | |
93 expect(events.skip(0), completion(0)); | |
94 expect(events.skip(5), completion(5)); | |
95 expect(events.next, throwsStateError); | |
96 await events.cancel(); | |
97 }); | |
98 | |
99 test("of too many events ends at stream start", () async { | |
100 var events = new StreamQueue<int>(createStream()); | |
101 expect(await events.skip(6), 2); | |
102 await events.cancel(); | |
103 }); | |
104 | |
105 test("of too many events after some events", () async { | |
106 var events = new StreamQueue<int>(createStream()); | |
107 expect(await events.next, 1); | |
108 expect(await events.next, 2); | |
109 expect(await events.skip(6), 4); | |
110 await events.cancel(); | |
111 }); | |
112 | |
113 test("of too many events ends at stream end", () async { | |
114 var events = new StreamQueue<int>(createStream()); | |
115 expect(await events.next, 1); | |
116 expect(await events.next, 2); | |
117 expect(await events.next, 3); | |
118 expect(await events.next, 4); | |
119 expect(await events.skip(2), 2); | |
120 await events.cancel(); | |
121 }); | |
122 | |
123 test("of events with error", () async { | |
124 var events = new StreamQueue<int>(createErrorStream()); | |
125 expect(events.skip(4), throws); | |
126 expect(await events.next, 4); | |
127 await events.cancel(); | |
128 }); | |
129 | |
130 test("of events with error, and skip again after", () async { | |
131 var events = new StreamQueue<int>(createErrorStream()); | |
132 expect(events.skip(4), throws); | |
133 expect(events.skip(2), completion(1)); | |
134 await events.cancel(); | |
135 }); | |
136 | |
137 test("multiple skips at same time complete in order.", () async { | |
138 var events = new StreamQueue<int>(createStream()); | |
139 var skip1 = events.skip(1); | |
140 var skip2 = events.skip(0); | |
141 var skip3 = events.skip(4); | |
142 var skip4 = events.skip(1); | |
143 var index = 0; | |
144 // Check that futures complete in order. | |
145 sequence(expectedValue, sequenceIndex) => (v) { | |
nweiz
2015/06/18 23:44:28
"v" -> "value"
Lasse Reichstein Nielsen
2015/06/30 10:34:15
Done.
| |
146 expect(v, expectedValue); | |
147 expect(index, sequenceIndex); | |
148 index++; | |
149 } | |
150 await Future.wait([skip1.then(sequence(0, 0)), | |
151 skip2.then(sequence(0, 1)), | |
152 skip3.then(sequence(1, 2)), | |
153 skip4.then(sequence(1, 3))]); | |
154 // Complete when they are all done. | |
155 await Future.wait([skip1, skip2, skip3, skip4]); | |
nweiz
2015/06/18 23:44:28
Won't the previous await wait for all the futures
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Yes, this line is redundant. The previous line was
| |
156 await events.cancel(); | |
157 }); | |
158 }); | |
159 | |
160 group("take operation", () { | |
161 test("as simple take of events", () async { | |
162 var events = new StreamQueue<int>(createStream()); | |
163 expect(await events.next, 1); | |
164 expect(await events.take(2), [2, 3]); | |
165 expect(await events.next, 4); | |
166 await events.cancel(); | |
167 }); | |
168 | |
169 test("of 0 events", () async { | |
170 var events = new StreamQueue<int>(createStream()); | |
171 expect(events.take(0), completion([])); | |
nweiz
2015/06/18 23:44:28
I tend to prefer "isEmpty" to "equals([])". It pro
Lasse Reichstein Nielsen
2015/06/30 10:34:15
Done.
| |
172 expect(events.next, completion(1)); | |
173 expect(events.take(0), completion([])); | |
174 expect(events.next, completion(2)); | |
175 expect(events.take(0), completion([])); | |
176 expect(events.next, completion(3)); | |
177 expect(events.take(0), completion([])); | |
178 expect(events.next, completion(4)); | |
179 expect(events.take(0), completion([])); | |
180 expect(events.take(5), completion([])); | |
181 expect(events.next, throwsStateError); | |
182 await events.cancel(); | |
183 }); | |
184 | |
185 test("with bad arguments throws", () async { | |
186 var events = new StreamQueue<int>(createStream()); | |
187 expect(() => events.take(-1), throwsArgumentError); | |
188 expect(await events.next, 1); // Did not consume event. | |
189 expect(() => events.take(-1), throwsArgumentError); | |
190 expect(await events.next, 2); // Did not consume event. | |
191 await events.cancel(); | |
192 }); | |
193 | |
194 test("of too many arguments", () async { | |
195 var events = new StreamQueue<int>(createStream()); | |
196 expect(await events.take(6), [1, 2, 3, 4]); | |
197 await events.cancel(); | |
198 }); | |
199 | |
200 test("too large later", () async { | |
201 var events = new StreamQueue<int>(createStream()); | |
202 expect(await events.next, 1); | |
203 expect(await events.next, 2); | |
204 expect(await events.take(6), [3, 4]); | |
205 await events.cancel(); | |
206 }); | |
207 | |
208 test("error", () async { | |
209 var events = new StreamQueue<int>(createErrorStream()); | |
210 expect(events.take(4), throws); | |
211 expect(await events.next, 4); | |
212 await events.cancel(); | |
213 }); | |
214 }); | |
215 | |
216 group("rest operation", () { | |
217 test("after single next", () async { | |
218 var events = new StreamQueue<int>(createStream()); | |
219 expect(await events.next, 1); | |
220 expect(await events.rest.toList(), [2, 3, 4]); | |
221 }); | |
222 | |
223 test("at start", () async { | |
224 var events = new StreamQueue<int>(createStream()); | |
225 expect(await events.rest.toList(), [1, 2, 3, 4]); | |
226 }); | |
227 | |
228 test("at end", () async { | |
229 var events = new StreamQueue<int>(createStream()); | |
230 expect(await events.next, 1); | |
231 expect(await events.next, 2); | |
232 expect(await events.next, 3); | |
233 expect(await events.next, 4); | |
234 expect(await events.rest.toList(), []); | |
235 }); | |
236 | |
237 test("after end", () async { | |
238 var events = new StreamQueue<int>(createStream()); | |
239 expect(await events.next, 1); | |
240 expect(await events.next, 2); | |
241 expect(await events.next, 3); | |
242 expect(await events.next, 4); | |
243 expect(events.next, throwsStateError); | |
244 expect(await events.rest.toList(), []); | |
245 }); | |
246 | |
247 test("with an error event error", () async { | |
248 var events = new StreamQueue<int>(createErrorStream()); | |
249 expect(await events.next, 1); | |
250 var rest = events.rest; | |
251 var events2 = new StreamQueue(rest); | |
252 expect(await events2.next, 2); | |
253 expect(events2.next, throws); | |
254 expect(await events2.next, 4); | |
255 }); | |
256 | |
257 test("closes the events, prevents other operations", () async { | |
258 var events = new StreamQueue<int>(createStream()); | |
259 var stream = events.rest; | |
260 expect(() => events.next, throwsStateError); | |
261 expect(() => events.skip(1), throwsStateError); | |
262 expect(() => events.take(1), throwsStateError); | |
263 expect(() => events.rest, throwsStateError); | |
264 expect(() => events.cancel(), throwsStateError); | |
265 }); | |
266 | |
267 test("forwards to underlying stream", () async { | |
268 var cancel = new Completer(); | |
269 var controller = new StreamController(onCancel: () => cancel.future); | |
270 var events = new StreamQueue<int>(controller.stream); | |
271 expect(controller.hasListener, isFalse); | |
272 var next = events.next; | |
273 expect(controller.hasListener, isTrue); | |
274 expect(controller.isPaused, isFalse); | |
275 controller.add(1); | |
276 expect(await next, 1); | |
277 expect(controller.isPaused, isTrue); | |
278 var rest = events.rest; | |
279 var subscription = rest.listen(null); | |
280 expect(controller.hasListener, isTrue); | |
281 expect(controller.isPaused, isFalse); | |
282 var lastEvent; | |
283 subscription.onData((v) { lastEvent = v; }); | |
284 controller.add(2); | |
285 await flushMicrotasks(); | |
286 expect(lastEvent, 2); | |
287 expect(controller.hasListener, isTrue); | |
288 expect(controller.isPaused, isFalse); | |
289 subscription.pause(); | |
290 expect(controller.isPaused, isTrue); | |
291 controller.add(3); | |
292 await flushMicrotasks(); | |
293 expect(lastEvent, 2); | |
294 subscription.resume(); | |
295 await flushMicrotasks(); | |
296 expect(lastEvent, 3); | |
297 var cancelFuture = subscription.cancel(); | |
298 expect(controller.hasListener, isFalse); | |
299 cancel.complete(42); | |
300 expect(cancelFuture, completion(42)); | |
301 }); | |
nweiz
2015/06/18 23:44:28
Another test that could use some blank lines.
Lasse Reichstein Nielsen
2015/06/30 10:34:15
Done.
| |
302 }); | |
303 | |
304 group("close operation", () { | |
305 test("closes the events, prevents any other operation", () async { | |
306 var events = new StreamQueue<int>(createStream()); | |
307 await events.cancel(); | |
308 expect(() => events.next, throwsStateError); | |
309 expect(() => events.skip(1), throwsStateError); | |
310 expect(() => events.take(1), throwsStateError); | |
311 expect(() => events.rest, throwsStateError); | |
312 expect(() => events.cancel(), throwsStateError); | |
313 }); | |
314 | |
315 test("cancels underlying subscription, returns result", () async { | |
316 var cancelFuture = new Future.value(42); | |
317 var controller = new StreamController(onCancel: () => cancelFuture); | |
318 var events = new StreamQueue<int>(controller.stream); | |
319 controller.add(1); | |
320 expect(await events.next, 1); | |
321 expect(await events.cancel(), 42); | |
322 }); | |
323 }); | |
324 | |
325 | |
326 group("hasNext operation", () { | |
327 test("true at start", () async { | |
328 var events = new StreamQueue<int>(createStream()); | |
329 expect(await events.hasNext, isTrue); | |
330 }); | |
331 | |
332 test("true after start", () async { | |
333 var events = new StreamQueue<int>(createStream()); | |
334 expect(await events.next, 1); | |
335 expect(await events.hasNext, isTrue); | |
336 }); | |
337 | |
338 test("true false at end", () async { | |
nweiz
2015/06/18 23:44:28
"true false" -> "false"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
339 var events = new StreamQueue<int>(createStream()); | |
340 for (int i = 1; i <= 4; i++) { | |
341 expect(await events.next, i); | |
342 } | |
343 expect(await events.hasNext, isFalse); | |
344 }); | |
345 | |
346 test("true when enqueued", () async { | |
347 var events = new StreamQueue<int>(createStream()); | |
348 var values = []; | |
349 for (int i = 1; i <= 3; i++) { | |
350 events.next.then(values.add); | |
351 } | |
352 expect(values, []); | |
353 expect(await events.hasNext, isTrue); | |
354 expect(values, [1, 2, 3]); | |
355 }); | |
356 | |
357 test("false when enqueued", () async { | |
358 var events = new StreamQueue<int>(createStream()); | |
359 var values = []; | |
360 for (int i = 1; i <= 4; i++) { | |
361 events.next.then(values.add); | |
362 } | |
363 expect(values, []); | |
364 expect(await events.hasNext, isFalse); | |
365 expect(values, [1, 2, 3, 4]); | |
366 }); | |
367 | |
368 test("true when data event", () async { | |
369 var controller = new StreamController(); | |
370 var events = new StreamQueue<int>(controller.stream); | |
371 | |
372 var hasNext; | |
373 events.hasNext.then((result) { hasNext = result; }); | |
374 await flushMicrotasks(); | |
375 expect(hasNext, isNull); | |
376 controller.add(42); | |
377 expect(hasNext, isNull); | |
378 await flushMicrotasks(); | |
379 expect(hasNext, isTrue); | |
380 }); | |
381 | |
382 test("true when error event", () async { | |
383 var controller = new StreamController(); | |
384 var events = new StreamQueue<int>(controller.stream); | |
385 | |
386 var hasNext; | |
387 events.hasNext.then((result) { hasNext = result; }); | |
388 await flushMicrotasks(); | |
389 expect(hasNext, isNull); | |
390 controller.addError("BAD"); | |
391 expect(hasNext, isNull); | |
392 await flushMicrotasks(); | |
393 expect(hasNext, isTrue); | |
394 expect(events.next, throwsA("BAD")); | |
395 }); | |
396 | |
397 test("- hasNext after hasNext", () async { | |
398 var events = new StreamQueue<int>(createStream()); | |
399 expect(await events.hasNext, true); | |
400 expect(await events.hasNext, true); | |
401 expect(await events.next, 1); | |
402 expect(await events.hasNext, true); | |
403 expect(await events.hasNext, true); | |
404 expect(await events.next, 2); | |
405 expect(await events.hasNext, true); | |
406 expect(await events.hasNext, true); | |
407 expect(await events.next, 3); | |
408 expect(await events.hasNext, true); | |
409 expect(await events.hasNext, true); | |
410 expect(await events.next, 4); | |
411 expect(await events.hasNext, false); | |
412 expect(await events.hasNext, false); | |
413 }); | |
414 | |
415 test("- next after true", () async { | |
416 var events = new StreamQueue<int>(createStream()); | |
417 expect(await events.next, 1); | |
418 expect(await events.hasNext, true); | |
419 expect(await events.next, 2); | |
420 expect(await events.next, 3); | |
421 }); | |
422 | |
423 test("- next after true, enqueued", () async { | |
424 var events = new StreamQueue<int>(createStream()); | |
425 var responses = []; | |
426 var first = events.next.then(responses.add); | |
427 var hasSecond = events.hasNext.then(responses.add); | |
428 var second = events.next.then(responses.add); | |
429 do { | |
430 await flushMicrotasks(); | |
431 } while (responses.length < 3); | |
432 expect(responses, [1, true, 2]); | |
433 }); | |
434 | |
435 test("- skip 0 after true", () async { | |
436 var events = new StreamQueue<int>(createStream()); | |
437 expect(await events.next, 1); | |
438 expect(await events.hasNext, true); | |
439 expect(await events.skip(0), 0); | |
440 expect(await events.next, 2); | |
441 }); | |
442 | |
443 test("- skip 1 after true", () async { | |
444 var events = new StreamQueue<int>(createStream()); | |
445 expect(await events.next, 1); | |
446 expect(await events.hasNext, true); | |
447 expect(await events.skip(1), 0); | |
448 expect(await events.next, 3); | |
449 }); | |
450 | |
451 test("- skip 2 after true", () async { | |
452 var events = new StreamQueue<int>(createStream()); | |
453 expect(await events.next, 1); | |
454 expect(await events.hasNext, true); | |
455 expect(await events.skip(2), 0); | |
456 expect(await events.next, 4); | |
457 }); | |
458 | |
459 test("- take 0 after true", () async { | |
460 var events = new StreamQueue<int>(createStream()); | |
461 expect(await events.next, 1); | |
462 expect(await events.hasNext, true); | |
463 expect(await events.take(0), []); | |
464 expect(await events.next, 2); | |
465 }); | |
466 | |
467 test("- take 1 after true", () async { | |
468 var events = new StreamQueue<int>(createStream()); | |
469 expect(await events.next, 1); | |
470 expect(await events.hasNext, true); | |
471 expect(await events.take(1), [2]); | |
472 expect(await events.next, 3); | |
473 }); | |
474 | |
475 test("- take 2 after true", () async { | |
476 var events = new StreamQueue<int>(createStream()); | |
477 expect(await events.next, 1); | |
478 expect(await events.hasNext, true); | |
479 expect(await events.take(2), [2, 3]); | |
480 expect(await events.next, 4); | |
481 }); | |
482 | |
483 test("- rest after true", () async { | |
484 var events = new StreamQueue<int>(createStream()); | |
485 expect(await events.next, 1); | |
486 expect(await events.hasNext, true); | |
487 var stream = events.rest; | |
488 expect(await stream.toList(), [2, 3, 4]); | |
489 }); | |
490 | |
491 test("- rest after true, at last", () async { | |
492 var events = new StreamQueue<int>(createStream()); | |
493 expect(await events.next, 1); | |
494 expect(await events.next, 2); | |
495 expect(await events.next, 3); | |
496 expect(await events.hasNext, true); | |
497 var stream = events.rest; | |
498 expect(await stream.toList(), [4]); | |
499 }); | |
500 | |
501 test("- rest after false", () async { | |
502 var events = new StreamQueue<int>(createStream()); | |
503 expect(await events.next, 1); | |
504 expect(await events.next, 2); | |
505 expect(await events.next, 3); | |
506 expect(await events.next, 4); | |
507 expect(await events.hasNext, false); | |
508 var stream = events.rest; | |
509 expect(await stream.toList(), []); | |
510 }); | |
511 | |
512 test("- cancel after true on data", () async { | |
513 var events = new StreamQueue<int>(createStream()); | |
514 expect(await events.next, 1); | |
515 expect(await events.next, 2); | |
516 expect(await events.hasNext, true); | |
517 expect(await events.cancel(), null); | |
518 }); | |
519 | |
520 test("- cancel after true on error", () async { | |
521 var events = new StreamQueue<int>(createErrorStream()); | |
522 expect(await events.next, 1); | |
523 expect(await events.next, 2); | |
524 expect(await events.hasNext, true); | |
525 expect(await events.cancel(), null); | |
526 }); | |
527 }); | |
528 | |
529 test("all combinations sequential skip/next/take operations", () async { | |
530 // Takes all combinations of two of next, skip and take, then ends with | |
531 // doing rest. Each of the first rounds do 10 events of each type, | |
532 // the rest does 20 elements. | |
533 var eventCount = 20 * (3 * 3 + 1); | |
534 var events = new StreamQueue<int>(createLongStream(eventCount)); | |
535 | |
536 // Test expecting [startIndex .. startIndex + 9] as events using | |
537 // `next`. | |
538 nextTest(startIndex) { | |
539 for (int i = 0; i < 10; i++) { | |
540 expect(events.next, completion(startIndex + i)); | |
541 } | |
542 } | |
543 | |
544 // Test expecting 10 events to be skipped. | |
545 skipTest(startIndex) { | |
546 expect(events.skip(10), completion(0)); | |
547 } | |
548 | |
549 // Test expecting [startIndex .. startIndex + 9] as events using | |
550 // `take(10)`. | |
551 takeTest(startIndex) { | |
552 expect(events.take(10), | |
553 completion(new List.generate(10, (i) => startIndex + i))); | |
554 } | |
555 var tests = [nextTest, skipTest, takeTest]; | |
556 | |
557 int counter = 0; | |
558 // Run through all pairs of two tests and run them. | |
559 for (int i = 0; i < tests.length; i++) { | |
560 for (int j = 0; j < tests.length; j++) { | |
561 tests[i](counter); | |
562 tests[j](counter + 10); | |
563 counter += 20; | |
564 } | |
565 } | |
566 // Then expect 20 more events as a `rest` call. | |
567 expect(events.rest.toList(), | |
568 completion(new List.generate(20, (i) => counter + i))); | |
569 }); | |
570 } | |
nweiz
2015/06/18 23:44:28
Also test passing in "prefetch".
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Prefetch was removed.
| |
571 | |
572 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); | |
573 | |
574 Stream<int> createStream() async* { | |
575 yield 1; | |
576 await flushMicrotasks(); | |
577 yield 2; | |
578 await flushMicrotasks(); | |
579 yield 3; | |
580 await flushMicrotasks(); | |
581 yield 4; | |
582 } | |
583 | |
584 Stream<int> createErrorStream() { | |
585 StreamController controller = new StreamController<int>(); | |
586 () async { | |
587 controller.add(1); | |
588 await flushMicrotasks(); | |
589 controller.add(2); | |
590 await flushMicrotasks(); | |
591 controller.addError("To err is divine!"); | |
592 await flushMicrotasks(); | |
593 controller.add(4); | |
594 await flushMicrotasks(); | |
595 controller.close(); | |
596 }(); | |
597 return controller.stream; | |
598 } | |
599 | |
600 Stream<int> createLongStream(int eventCount) async* { | |
601 for (int i = 0; i < eventCount; i++) yield i; | |
602 } | |
OLD | NEW |