Chromium Code Reviews

Side by Side Diff: packages/async/test/stream_queue_test.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff |
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE filevents.
4
5 import "dart:async";
6
7 import "package:async/async.dart" show StreamQueue;
8 import "package:test/test.dart";
9
10 import "utils.dart";
11
12 main() {
13 group("source stream", () {
14 test("is listened to on first request, paused between requests", () async {
15 var controller = new StreamController();
16 var events = new StreamQueue<int>(controller.stream);
17 await flushMicrotasks();
18 expect(controller.hasListener, isFalse);
19
20 var next = events.next;
21 expect(controller.hasListener, isTrue);
22 expect(controller.isPaused, isFalse);
23
24 controller.add(1);
25
26 expect(await next, 1);
27 expect(controller.hasListener, isTrue);
28 expect(controller.isPaused, isTrue);
29
30 next = events.next;
31 expect(controller.hasListener, isTrue);
32 expect(controller.isPaused, isFalse);
33
34 controller.add(2);
35
36 expect(await next, 2);
37 expect(controller.hasListener, isTrue);
38 expect(controller.isPaused, isTrue);
39
40 events.cancel();
41 expect(controller.hasListener, isFalse);
42 });
43 });
44
45 group("next operation", () {
46 test("simple sequence of requests", () async {
47 var events = new StreamQueue<int>(createStream());
48 for (int i = 1; i <= 4; i++) {
49 expect(await events.next, i);
50 }
51 expect(events.next, throwsStateError);
52 });
53
54 test("multiple requests at the same time", () async {
55 var events = new StreamQueue<int>(createStream());
56 var result = await Future.wait(
57 [events.next, events.next, events.next, events.next]);
58 expect(result, [1, 2, 3, 4]);
59 await events.cancel();
60 });
61
62 test("sequence of requests with error", () async {
63 var events = new StreamQueue<int>(createErrorStream());
64 expect(await events.next, 1);
65 expect(await events.next, 2);
66 expect(events.next, throwsA("To err is divine!"));
67 expect(await events.next, 4);
68 await events.cancel();
69 });
70 });
71
72 group("skip operation", () {
73 test("of two elements in the middle of sequence", () async {
74 var events = new StreamQueue<int>(createStream());
75 expect(await events.next, 1);
76 expect(await events.skip(2), 0);
77 expect(await events.next, 4);
78 await events.cancel();
79 });
80
81 test("with negative/bad arguments throws", () async {
82 var events = new StreamQueue<int>(createStream());
83 expect(() => events.skip(-1), throwsArgumentError);
84 // A non-int throws either a type error or an argument error,
85 // depending on whether it's checked mode or not.
86 expect(await events.next, 1); // Did not consume event.
87 expect(() => events.skip(-1), throwsArgumentError);
88 expect(await events.next, 2); // Did not consume event.
89 await events.cancel();
90 });
91
92 test("of 0 elements works", () async {
93 var events = new StreamQueue<int>(createStream());
94 expect(events.skip(0), completion(0));
95 expect(events.next, completion(1));
96 expect(events.skip(0), completion(0));
97 expect(events.next, completion(2));
98 expect(events.skip(0), completion(0));
99 expect(events.next, completion(3));
100 expect(events.skip(0), completion(0));
101 expect(events.next, completion(4));
102 expect(events.skip(0), completion(0));
103 expect(events.skip(5), completion(5));
104 expect(events.next, throwsStateError);
105 await events.cancel();
106 });
107
108 test("of too many events ends at stream start", () async {
109 var events = new StreamQueue<int>(createStream());
110 expect(await events.skip(6), 2);
111 await events.cancel();
112 });
113
114 test("of too many events after some events", () async {
115 var events = new StreamQueue<int>(createStream());
116 expect(await events.next, 1);
117 expect(await events.next, 2);
118 expect(await events.skip(6), 4);
119 await events.cancel();
120 });
121
122 test("of too many events ends at stream end", () async {
123 var events = new StreamQueue<int>(createStream());
124 expect(await events.next, 1);
125 expect(await events.next, 2);
126 expect(await events.next, 3);
127 expect(await events.next, 4);
128 expect(await events.skip(2), 2);
129 await events.cancel();
130 });
131
132 test("of events with error", () async {
133 var events = new StreamQueue<int>(createErrorStream());
134 expect(events.skip(4), throwsA("To err is divine!"));
135 expect(await events.next, 4);
136 await events.cancel();
137 });
138
139 test("of events with error, and skip again after", () async {
140 var events = new StreamQueue<int>(createErrorStream());
141 expect(events.skip(4), throwsA("To err is divine!"));
142 expect(events.skip(2), completion(1));
143 await events.cancel();
144 });
145 test("multiple skips at same time complete in order.", () async {
146 var events = new StreamQueue<int>(createStream());
147 var skip1 = events.skip(1);
148 var skip2 = events.skip(0);
149 var skip3 = events.skip(4);
150 var skip4 = events.skip(1);
151 var index = 0;
152 // Check that futures complete in order.
153 sequence(expectedValue, sequenceIndex) => (value) {
154 expect(value, expectedValue);
155 expect(index, sequenceIndex);
156 index++;
157 };
158 await Future.wait([skip1.then(sequence(0, 0)),
159 skip2.then(sequence(0, 1)),
160 skip3.then(sequence(1, 2)),
161 skip4.then(sequence(1, 3))]);
162 await events.cancel();
163 });
164 });
165
166 group("take operation", () {
167 test("as simple take of events", () async {
168 var events = new StreamQueue<int>(createStream());
169 expect(await events.next, 1);
170 expect(await events.take(2), [2, 3]);
171 expect(await events.next, 4);
172 await events.cancel();
173 });
174
175 test("of 0 events", () async {
176 var events = new StreamQueue<int>(createStream());
177 expect(events.take(0), completion([]));
178 expect(events.next, completion(1));
179 expect(events.take(0), completion([]));
180 expect(events.next, completion(2));
181 expect(events.take(0), completion([]));
182 expect(events.next, completion(3));
183 expect(events.take(0), completion([]));
184 expect(events.next, completion(4));
185 expect(events.take(0), completion([]));
186 expect(events.take(5), completion([]));
187 expect(events.next, throwsStateError);
188 await events.cancel();
189 });
190
191 test("with bad arguments throws", () async {
192 var events = new StreamQueue<int>(createStream());
193 expect(() => events.take(-1), throwsArgumentError);
194 expect(await events.next, 1); // Did not consume event.
195 expect(() => events.take(-1), throwsArgumentError);
196 expect(await events.next, 2); // Did not consume event.
197 await events.cancel();
198 });
199
200 test("of too many arguments", () async {
201 var events = new StreamQueue<int>(createStream());
202 expect(await events.take(6), [1, 2, 3, 4]);
203 await events.cancel();
204 });
205
206 test("too large later", () async {
207 var events = new StreamQueue<int>(createStream());
208 expect(await events.next, 1);
209 expect(await events.next, 2);
210 expect(await events.take(6), [3, 4]);
211 await events.cancel();
212 });
213
214 test("error", () async {
215 var events = new StreamQueue<int>(createErrorStream());
216 expect(events.take(4), throwsA("To err is divine!"));
217 expect(await events.next, 4);
218 await events.cancel();
219 });
220 });
221
222 group("rest operation", () {
223 test("after single next", () async {
224 var events = new StreamQueue<int>(createStream());
225 expect(await events.next, 1);
226 expect(await events.rest.toList(), [2, 3, 4]);
227 });
228
229 test("at start", () async {
230 var events = new StreamQueue<int>(createStream());
231 expect(await events.rest.toList(), [1, 2, 3, 4]);
232 });
233
234 test("at end", () async {
235 var events = new StreamQueue<int>(createStream());
236 expect(await events.next, 1);
237 expect(await events.next, 2);
238 expect(await events.next, 3);
239 expect(await events.next, 4);
240 expect(await events.rest.toList(), isEmpty);
241 });
242
243 test("after end", () async {
244 var events = new StreamQueue<int>(createStream());
245 expect(await events.next, 1);
246 expect(await events.next, 2);
247 expect(await events.next, 3);
248 expect(await events.next, 4);
249 expect(events.next, throwsStateError);
250 expect(await events.rest.toList(), isEmpty);
251 });
252
253 test("after receiving done requested before", () async {
254 var events = new StreamQueue<int>(createStream());
255 var next1 = events.next;
256 var next2 = events.next;
257 var next3 = events.next;
258 var rest = events.rest;
259 for (int i = 0; i < 10; i++) {
260 await flushMicrotasks();
261 }
262 expect(await next1, 1);
263 expect(await next2, 2);
264 expect(await next3, 3);
265 expect(await rest.toList(), [4]);
266 });
267
268 test("with an error event error", () async {
269 var events = new StreamQueue<int>(createErrorStream());
270 expect(await events.next, 1);
271 var rest = events.rest;
272 var events2 = new StreamQueue(rest);
273 expect(await events2.next, 2);
274 expect(events2.next, throwsA("To err is divine!"));
275 expect(await events2.next, 4);
276 });
277
278 test("closes the events, prevents other operations", () async {
279 var events = new StreamQueue<int>(createStream());
280 var stream = events.rest;
281 expect(() => events.next, throwsStateError);
282 expect(() => events.skip(1), throwsStateError);
283 expect(() => events.take(1), throwsStateError);
284 expect(() => events.rest, throwsStateError);
285 expect(() => events.cancel(), throwsStateError);
286 expect(stream.toList(), completion([1, 2, 3, 4]));
287 });
288
289 test("forwards to underlying stream", () async {
290 var cancel = new Completer();
291 var controller = new StreamController(onCancel: () => cancel.future);
292 var events = new StreamQueue<int>(controller.stream);
293 expect(controller.hasListener, isFalse);
294 var next = events.next;
295 expect(controller.hasListener, isTrue);
296 expect(controller.isPaused, isFalse);
297
298 controller.add(1);
299 expect(await next, 1);
300 expect(controller.isPaused, isTrue);
301
302 var rest = events.rest;
303 var subscription = rest.listen(null);
304 expect(controller.hasListener, isTrue);
305 expect(controller.isPaused, isFalse);
306
307 var lastEvent;
308 subscription.onData((value) => lastEvent = value);
309
310 controller.add(2);
311
312 await flushMicrotasks();
313 expect(lastEvent, 2);
314 expect(controller.hasListener, isTrue);
315 expect(controller.isPaused, isFalse);
316
317 subscription.pause();
318 expect(controller.isPaused, isTrue);
319
320 controller.add(3);
321
322 await flushMicrotasks();
323 expect(lastEvent, 2);
324 subscription.resume();
325
326 await flushMicrotasks();
327 expect(lastEvent, 3);
328
329 var cancelFuture = subscription.cancel();
330 expect(controller.hasListener, isFalse);
331 cancel.complete(42);
332 expect(cancelFuture, completion(42));
333 });
334 });
335
336 group("cancel operation", () {
337 test("closes the events, prevents any other operation", () async {
338 var events = new StreamQueue<int>(createStream());
339 await events.cancel();
340 expect(() => events.next, throwsStateError);
341 expect(() => events.skip(1), throwsStateError);
342 expect(() => events.take(1), throwsStateError);
343 expect(() => events.rest, throwsStateError);
344 expect(() => events.cancel(), throwsStateError);
345 });
346
347 test("cancels underlying subscription when called before any event",
348 () async {
349 var cancelFuture = new Future.value(42);
350 var controller = new StreamController(onCancel: () => cancelFuture);
351 var events = new StreamQueue<int>(controller.stream);
352 expect(await events.cancel(), 42);
353 });
354
355 test("cancels underlying subscription, returns result", () async {
356 var cancelFuture = new Future.value(42);
357 var controller = new StreamController(onCancel: () => cancelFuture);
358 var events = new StreamQueue<int>(controller.stream);
359 controller.add(1);
360 expect(await events.next, 1);
361 expect(await events.cancel(), 42);
362 });
363
364 group("with immediate: true", () {
365 test("closes the events, prevents any other operation", () async {
366 var events = new StreamQueue<int>(createStream());
367 await events.cancel(immediate: true);
368 expect(() => events.next, throwsStateError);
369 expect(() => events.skip(1), throwsStateError);
370 expect(() => events.take(1), throwsStateError);
371 expect(() => events.rest, throwsStateError);
372 expect(() => events.cancel(), throwsStateError);
373 });
374
375 test("cancels the underlying subscription immediately", () async {
376 var controller = new StreamController();
377 controller.add(1);
378
379 var events = new StreamQueue<int>(controller.stream);
380 expect(await events.next, 1);
381 expect(controller.hasListener, isTrue);
382
383 events.cancel(immediate: true);
384 await expect(controller.hasListener, isFalse);
385 });
386
387 test("cancels the underlying subscription when called before any event",
388 () async {
389 var cancelFuture = new Future.value(42);
390 var controller = new StreamController(onCancel: () => cancelFuture);
391
392 var events = new StreamQueue<int>(controller.stream);
393 expect(await events.cancel(immediate: true), 42);
394 });
395
396 test("closes pending requests", () async {
397 var events = new StreamQueue<int>(createStream());
398 expect(await events.next, 1);
399 expect(events.next, throwsStateError);
400 expect(events.hasNext, completion(isFalse));
401
402 await events.cancel(immediate: true);
403 });
404
405 test("returns the result of closing the underlying subscription",
406 () async {
407 var controller = new StreamController(
408 onCancel: () => new Future.value(42));
409 var events = new StreamQueue<int>(controller.stream);
410 expect(await events.cancel(immediate: true), 42);
411 });
412
413 test("listens and then cancels a stream that hasn't been listened to yet",
414 () async {
415 var wasListened = false;
416 var controller = new StreamController(
417 onListen: () => wasListened = true);
418 var events = new StreamQueue<int>(controller.stream);
419 expect(wasListened, isFalse);
420 expect(controller.hasListener, isFalse);
421
422 await events.cancel(immediate: true);
423 expect(wasListened, isTrue);
424 expect(controller.hasListener, isFalse);
425 });
426 });
427 });
428
429 group("hasNext operation", () {
430 test("true at start", () async {
431 var events = new StreamQueue<int>(createStream());
432 expect(await events.hasNext, isTrue);
433 });
434
435 test("true after start", () async {
436 var events = new StreamQueue<int>(createStream());
437 expect(await events.next, 1);
438 expect(await events.hasNext, isTrue);
439 });
440
441 test("true at end", () async {
442 var events = new StreamQueue<int>(createStream());
443 for (int i = 1; i <= 4; i++) {
444 expect(await events.next, i);
445 }
446 expect(await events.hasNext, isFalse);
447 });
448
449 test("true when enqueued", () async {
450 var events = new StreamQueue<int>(createStream());
451 var values = [];
452 for (int i = 1; i <= 3; i++) {
453 events.next.then(values.add);
454 }
455 expect(values, isEmpty);
456 expect(await events.hasNext, isTrue);
457 expect(values, [1, 2, 3]);
458 });
459
460 test("false when enqueued", () async {
461 var events = new StreamQueue<int>(createStream());
462 var values = [];
463 for (int i = 1; i <= 4; i++) {
464 events.next.then(values.add);
465 }
466 expect(values, isEmpty);
467 expect(await events.hasNext, isFalse);
468 expect(values, [1, 2, 3, 4]);
469 });
470
471 test("true when data event", () async {
472 var controller = new StreamController();
473 var events = new StreamQueue<int>(controller.stream);
474
475 var hasNext;
476 events.hasNext.then((result) { hasNext = result; });
477 await flushMicrotasks();
478 expect(hasNext, isNull);
479 controller.add(42);
480 expect(hasNext, isNull);
481 await flushMicrotasks();
482 expect(hasNext, isTrue);
483 });
484
485 test("true when error event", () async {
486 var controller = new StreamController();
487 var events = new StreamQueue<int>(controller.stream);
488
489 var hasNext;
490 events.hasNext.then((result) { hasNext = result; });
491 await flushMicrotasks();
492 expect(hasNext, isNull);
493 controller.addError("BAD");
494 expect(hasNext, isNull);
495 await flushMicrotasks();
496 expect(hasNext, isTrue);
497 expect(events.next, throwsA("BAD"));
498 });
499
500 test("- hasNext after hasNext", () async {
501 var events = new StreamQueue<int>(createStream());
502 expect(await events.hasNext, true);
503 expect(await events.hasNext, true);
504 expect(await events.next, 1);
505 expect(await events.hasNext, true);
506 expect(await events.hasNext, true);
507 expect(await events.next, 2);
508 expect(await events.hasNext, true);
509 expect(await events.hasNext, true);
510 expect(await events.next, 3);
511 expect(await events.hasNext, true);
512 expect(await events.hasNext, true);
513 expect(await events.next, 4);
514 expect(await events.hasNext, false);
515 expect(await events.hasNext, false);
516 });
517
518 test("- next after true", () async {
519 var events = new StreamQueue<int>(createStream());
520 expect(await events.next, 1);
521 expect(await events.hasNext, true);
522 expect(await events.next, 2);
523 expect(await events.next, 3);
524 });
525
526 test("- next after true, enqueued", () async {
527 var events = new StreamQueue<int>(createStream());
528 var responses = [];
529 events.next.then(responses.add);
530 events.hasNext.then(responses.add);
531 events.next.then(responses.add);
532 do {
533 await flushMicrotasks();
534 } while (responses.length < 3);
535 expect(responses, [1, true, 2]);
536 });
537
538 test("- skip 0 after true", () async {
539 var events = new StreamQueue<int>(createStream());
540 expect(await events.next, 1);
541 expect(await events.hasNext, true);
542 expect(await events.skip(0), 0);
543 expect(await events.next, 2);
544 });
545
546 test("- skip 1 after true", () async {
547 var events = new StreamQueue<int>(createStream());
548 expect(await events.next, 1);
549 expect(await events.hasNext, true);
550 expect(await events.skip(1), 0);
551 expect(await events.next, 3);
552 });
553
554 test("- skip 2 after true", () async {
555 var events = new StreamQueue<int>(createStream());
556 expect(await events.next, 1);
557 expect(await events.hasNext, true);
558 expect(await events.skip(2), 0);
559 expect(await events.next, 4);
560 });
561
562 test("- take 0 after true", () async {
563 var events = new StreamQueue<int>(createStream());
564 expect(await events.next, 1);
565 expect(await events.hasNext, true);
566 expect(await events.take(0), isEmpty);
567 expect(await events.next, 2);
568 });
569
570 test("- take 1 after true", () async {
571 var events = new StreamQueue<int>(createStream());
572 expect(await events.next, 1);
573 expect(await events.hasNext, true);
574 expect(await events.take(1), [2]);
575 expect(await events.next, 3);
576 });
577
578 test("- take 2 after true", () async {
579 var events = new StreamQueue<int>(createStream());
580 expect(await events.next, 1);
581 expect(await events.hasNext, true);
582 expect(await events.take(2), [2, 3]);
583 expect(await events.next, 4);
584 });
585
586 test("- rest after true", () async {
587 var events = new StreamQueue<int>(createStream());
588 expect(await events.next, 1);
589 expect(await events.hasNext, true);
590 var stream = events.rest;
591 expect(await stream.toList(), [2, 3, 4]);
592 });
593
594 test("- rest after true, at last", () async {
595 var events = new StreamQueue<int>(createStream());
596 expect(await events.next, 1);
597 expect(await events.next, 2);
598 expect(await events.next, 3);
599 expect(await events.hasNext, true);
600 var stream = events.rest;
601 expect(await stream.toList(), [4]);
602 });
603
604 test("- rest after false", () async {
605 var events = new StreamQueue<int>(createStream());
606 expect(await events.next, 1);
607 expect(await events.next, 2);
608 expect(await events.next, 3);
609 expect(await events.next, 4);
610 expect(await events.hasNext, false);
611 var stream = events.rest;
612 expect(await stream.toList(), isEmpty);
613 });
614
615 test("- cancel after true on data", () async {
616 var events = new StreamQueue<int>(createStream());
617 expect(await events.next, 1);
618 expect(await events.next, 2);
619 expect(await events.hasNext, true);
620 expect(await events.cancel(), null);
621 });
622
623 test("- cancel after true on error", () async {
624 var events = new StreamQueue<int>(createErrorStream());
625 expect(await events.next, 1);
626 expect(await events.next, 2);
627 expect(await events.hasNext, true);
628 expect(await events.cancel(), null);
629 });
630 });
631
632 test("all combinations sequential skip/next/take operations", () async {
633 // Takes all combinations of two of next, skip and take, then ends with
634 // doing rest. Each of the first rounds do 10 events of each type,
635 // the rest does 20 elements.
636 var eventCount = 20 * (3 * 3 + 1);
637 var events = new StreamQueue<int>(createLongStream(eventCount));
638
639 // Test expecting [startIndex .. startIndex + 9] as events using
640 // `next`.
641 nextTest(startIndex) {
642 for (int i = 0; i < 10; i++) {
643 expect(events.next, completion(startIndex + i));
644 }
645 }
646
647 // Test expecting 10 events to be skipped.
648 skipTest(startIndex) {
649 expect(events.skip(10), completion(0));
650 }
651
652 // Test expecting [startIndex .. startIndex + 9] as events using
653 // `take(10)`.
654 takeTest(startIndex) {
655 expect(events.take(10),
656 completion(new List.generate(10, (i) => startIndex + i)));
657 }
658 var tests = [nextTest, skipTest, takeTest];
659
660 int counter = 0;
661 // Run through all pairs of two tests and run them.
662 for (int i = 0; i < tests.length; i++) {
663 for (int j = 0; j < tests.length; j++) {
664 tests[i](counter);
665 tests[j](counter + 10);
666 counter += 20;
667 }
668 }
669 // Then expect 20 more events as a `rest` call.
670 expect(events.rest.toList(),
671 completion(new List.generate(20, (i) => counter + i)));
672 });
673 }
674
675 Stream<int> createStream() async* {
676 yield 1;
677 await flushMicrotasks();
678 yield 2;
679 await flushMicrotasks();
680 yield 3;
681 await flushMicrotasks();
682 yield 4;
683 }
684
685 Stream<int> createErrorStream() {
686 StreamController controller = new StreamController<int>();
687 () async {
688 controller.add(1);
689 await flushMicrotasks();
690 controller.add(2);
691 await flushMicrotasks();
692 controller.addError("To err is divine!");
693 await flushMicrotasks();
694 controller.add(4);
695 await flushMicrotasks();
696 controller.close();
697 }();
698 return controller.stream;
699 }
700
701 Stream<int> createLongStream(int eventCount) async* {
702 for (int i = 0; i < eventCount; i++) yield i;
703 }
OLDNEW
« no previous file with comments | « packages/async/test/stream_group_test.dart ('k') | packages/async/test/stream_splitter_test.dart » ('j') | no next file with comments »

Powered by Google App Engine