Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: test/stream_queue_test.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Address comments. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE filevents.
4
5 import "dart:async";
6
7 import "package:async/async.dart" show StreamQueue;
8 import "package:test/test.dart";
9
10 import "utils.dart";
11
12 main() {
13 group("source stream", () {
14 test("is listened to on first request, paused between requests", () async {
15 var controller = new StreamController();
16 var events = new StreamQueue<int>(controller.stream);
17 await flushMicrotasks();
18 expect(controller.hasListener, isFalse);
19
20 var next = events.next;
21 expect(controller.hasListener, isTrue);
22 expect(controller.isPaused, isFalse);
23
24 controller.add(1);
25
26 expect(await next, 1);
27 expect(controller.hasListener, isTrue);
28 expect(controller.isPaused, isTrue);
29
30 next = events.next;
31 expect(controller.hasListener, isTrue);
32 expect(controller.isPaused, isFalse);
33
34 controller.add(2);
35
36 expect(await next, 2);
37 expect(controller.hasListener, isTrue);
38 expect(controller.isPaused, isTrue);
39
40 var cancel = events.cancel();
41 expect(controller.hasListener, isFalse);
42 });
43 });
44
45 group("next operation", () {
46 test("simple sequence of requests", () async {
47 var events = new StreamQueue<int>(createStream());
48 for (int i = 1; i <= 4; i++) {
49 expect(await events.next, i);
50 }
51 expect(events.next, throwsStateError);
52 });
53
54 test("multiple requests at the same time", () async {
55 var events = new StreamQueue<int>(createStream());
56 var result = await Future.wait(
57 [events.next, events.next, events.next, events.next]);
58 expect(result, [1, 2, 3, 4]);
59 await events.cancel();
60 });
61
62 test("sequence of requests with error", () async {
63 var events = new StreamQueue<int>(createErrorStream());
64 expect(await events.next, 1);
65 expect(await events.next, 2);
66 expect(events.next, throwsA("To err is divine!"));
67 expect(await events.next, 4);
68 await events.cancel();
69 });
70 });
71
72 group("skip operation", () {
73 test("of two elements in the middle of sequence", () async {
74 var events = new StreamQueue<int>(createStream());
75 expect(await events.next, 1);
76 expect(await events.skip(2), 0);
77 expect(await events.next, 4);
78 await events.cancel();
79 });
80
81 test("with negative/bad arguments throws", () async {
82 var events = new StreamQueue<int>(createStream());
83 expect(() => events.skip(-1), throwsArgumentError);
84 // A non-int throws either a type error or an argument error,
85 // depending on whether it's checked mode or not.
86 expect(await events.next, 1); // Did not consume event.
87 expect(() => events.skip(-1), throwsArgumentError);
88 expect(await events.next, 2); // Did not consume event.
89 await events.cancel();
90 });
91
92 test("of 0 elements works", () async {
93 var events = new StreamQueue<int>(createStream());
94 expect(events.skip(0), completion(0));
95 expect(events.next, completion(1));
96 expect(events.skip(0), completion(0));
97 expect(events.next, completion(2));
98 expect(events.skip(0), completion(0));
99 expect(events.next, completion(3));
100 expect(events.skip(0), completion(0));
101 expect(events.next, completion(4));
102 expect(events.skip(0), completion(0));
103 expect(events.skip(5), completion(5));
104 expect(events.next, throwsStateError);
105 await events.cancel();
106 });
107
108 test("of too many events ends at stream start", () async {
109 var events = new StreamQueue<int>(createStream());
110 expect(await events.skip(6), 2);
111 await events.cancel();
112 });
113
114 test("of too many events after some events", () async {
115 var events = new StreamQueue<int>(createStream());
116 expect(await events.next, 1);
117 expect(await events.next, 2);
118 expect(await events.skip(6), 4);
119 await events.cancel();
120 });
121
122 test("of too many events ends at stream end", () async {
123 var events = new StreamQueue<int>(createStream());
124 expect(await events.next, 1);
125 expect(await events.next, 2);
126 expect(await events.next, 3);
127 expect(await events.next, 4);
128 expect(await events.skip(2), 2);
129 await events.cancel();
130 });
131
132 test("of events with error", () async {
133 var events = new StreamQueue<int>(createErrorStream());
134 expect(events.skip(4), throwsA("To err is divine!"));
135 expect(await events.next, 4);
136 await events.cancel();
137 });
138
139 test("of events with error, and skip again after", () async {
140 var events = new StreamQueue<int>(createErrorStream());
141 expect(events.skip(4), throwsA("To err is divine!"));
142 expect(events.skip(2), completion(1));
143 await events.cancel();
144 });
145
146 test("multiple skips at same time complete in order.", () async {
147 var events = new StreamQueue<int>(createStream());
148 var skip1 = events.skip(1);
149 var skip2 = events.skip(0);
150 var skip3 = events.skip(4);
151 var skip4 = events.skip(1);
152 var index = 0;
153 // Check that futures complete in order.
154 sequence(expectedValue, sequenceIndex) => (value) {
155 expect(value, expectedValue);
156 expect(index, sequenceIndex);
157 index++;
158 }
159 await Future.wait([skip1.then(sequence(0, 0)),
160 skip2.then(sequence(0, 1)),
161 skip3.then(sequence(1, 2)),
162 skip4.then(sequence(1, 3))]);
163 await events.cancel();
164 });
165 });
166
167 group("take operation", () {
168 test("as simple take of events", () async {
169 var events = new StreamQueue<int>(createStream());
170 expect(await events.next, 1);
171 expect(await events.take(2), [2, 3]);
172 expect(await events.next, 4);
173 await events.cancel();
174 });
175
176 test("of 0 events", () async {
177 var events = new StreamQueue<int>(createStream());
178 expect(events.take(0), completion([]));
179 expect(events.next, completion(1));
180 expect(events.take(0), completion([]));
181 expect(events.next, completion(2));
182 expect(events.take(0), completion([]));
183 expect(events.next, completion(3));
184 expect(events.take(0), completion([]));
185 expect(events.next, completion(4));
186 expect(events.take(0), completion([]));
187 expect(events.take(5), completion([]));
188 expect(events.next, throwsStateError);
189 await events.cancel();
190 });
191
192 test("with bad arguments throws", () async {
193 var events = new StreamQueue<int>(createStream());
194 expect(() => events.take(-1), throwsArgumentError);
195 expect(await events.next, 1); // Did not consume event.
196 expect(() => events.take(-1), throwsArgumentError);
197 expect(await events.next, 2); // Did not consume event.
198 await events.cancel();
199 });
200
201 test("of too many arguments", () async {
202 var events = new StreamQueue<int>(createStream());
203 expect(await events.take(6), [1, 2, 3, 4]);
204 await events.cancel();
205 });
206
207 test("too large later", () async {
208 var events = new StreamQueue<int>(createStream());
209 expect(await events.next, 1);
210 expect(await events.next, 2);
211 expect(await events.take(6), [3, 4]);
212 await events.cancel();
213 });
214
215 test("error", () async {
216 var events = new StreamQueue<int>(createErrorStream());
217 expect(events.take(4), throwsA("To err is divine!"));
218 expect(await events.next, 4);
219 await events.cancel();
220 });
221 });
222
223 group("rest operation", () {
224 test("after single next", () async {
225 var events = new StreamQueue<int>(createStream());
226 expect(await events.next, 1);
227 expect(await events.rest.toList(), [2, 3, 4]);
228 });
229
230 test("at start", () async {
231 var events = new StreamQueue<int>(createStream());
232 expect(await events.rest.toList(), [1, 2, 3, 4]);
233 });
234
235 test("at end", () async {
236 var events = new StreamQueue<int>(createStream());
237 expect(await events.next, 1);
238 expect(await events.next, 2);
239 expect(await events.next, 3);
240 expect(await events.next, 4);
241 expect(await events.rest.toList(), isEmpty);
242 });
243
244 test("after end", () async {
245 var events = new StreamQueue<int>(createStream());
246 expect(await events.next, 1);
247 expect(await events.next, 2);
248 expect(await events.next, 3);
249 expect(await events.next, 4);
250 expect(events.next, throwsStateError);
251 expect(await events.rest.toList(), isEmpty);
252 });
253
254 test("with an error event error", () async {
255 var events = new StreamQueue<int>(createErrorStream());
256 expect(await events.next, 1);
257 var rest = events.rest;
258 var events2 = new StreamQueue(rest);
259 expect(await events2.next, 2);
260 expect(events2.next, throwsA("To err is divine!"));
261 expect(await events2.next, 4);
262 });
263
264 test("closes the events, prevents other operations", () async {
265 var events = new StreamQueue<int>(createStream());
266 var stream = events.rest;
267 expect(() => events.next, throwsStateError);
268 expect(() => events.skip(1), throwsStateError);
269 expect(() => events.take(1), throwsStateError);
270 expect(() => events.rest, throwsStateError);
271 expect(() => events.cancel(), throwsStateError);
272 });
273
274 test("forwards to underlying stream", () async {
275 var cancel = new Completer();
276 var controller = new StreamController(onCancel: () => cancel.future);
277 var events = new StreamQueue<int>(controller.stream);
278 expect(controller.hasListener, isFalse);
279 var next = events.next;
280 expect(controller.hasListener, isTrue);
281 expect(controller.isPaused, isFalse);
282
283 controller.add(1);
284 expect(await next, 1);
285 expect(controller.isPaused, isTrue);
286
287 var rest = events.rest;
288 var subscription = rest.listen(null);
289 expect(controller.hasListener, isTrue);
290 expect(controller.isPaused, isFalse);
291
292 var lastEvent;
293 subscription.onData((value) => lastEvent = value);
294
295 controller.add(2);
296
297 await flushMicrotasks();
298 expect(lastEvent, 2);
299 expect(controller.hasListener, isTrue);
300 expect(controller.isPaused, isFalse);
301
302 subscription.pause();
303 expect(controller.isPaused, isTrue);
304
305 controller.add(3);
306
307 await flushMicrotasks();
308 expect(lastEvent, 2);
309 subscription.resume();
310
311 await flushMicrotasks();
312 expect(lastEvent, 3);
313
314 var cancelFuture = subscription.cancel();
315 expect(controller.hasListener, isFalse);
316 cancel.complete(42);
317 expect(cancelFuture, completion(42));
318 });
319 });
320
321 group("close operation", () {
322 test("closes the events, prevents any other operation", () async {
323 var events = new StreamQueue<int>(createStream());
324 await events.cancel();
325 expect(() => events.next, throwsStateError);
326 expect(() => events.skip(1), throwsStateError);
327 expect(() => events.take(1), throwsStateError);
328 expect(() => events.rest, throwsStateError);
329 expect(() => events.cancel(), throwsStateError);
330 });
331
332 test("cancels underlying subscription, returns result", () async {
333 var cancelFuture = new Future.value(42);
334 var controller = new StreamController(onCancel: () => cancelFuture);
335 var events = new StreamQueue<int>(controller.stream);
336 controller.add(1);
337 expect(await events.next, 1);
338 expect(await events.cancel(), 42);
339 });
340 });
341
342
343 group("hasNext operation", () {
344 test("true at start", () async {
345 var events = new StreamQueue<int>(createStream());
346 expect(await events.hasNext, isTrue);
347 });
348
349 test("true after start", () async {
350 var events = new StreamQueue<int>(createStream());
351 expect(await events.next, 1);
352 expect(await events.hasNext, isTrue);
353 });
354
355 test("true at end", () async {
356 var events = new StreamQueue<int>(createStream());
357 for (int i = 1; i <= 4; i++) {
358 expect(await events.next, i);
359 }
360 expect(await events.hasNext, isFalse);
361 });
362
363 test("true when enqueued", () async {
364 var events = new StreamQueue<int>(createStream());
365 var values = [];
366 for (int i = 1; i <= 3; i++) {
367 events.next.then(values.add);
368 }
369 expect(values, isEmpty);
370 expect(await events.hasNext, isTrue);
371 expect(values, [1, 2, 3]);
372 });
373
374 test("false when enqueued", () async {
375 var events = new StreamQueue<int>(createStream());
376 var values = [];
377 for (int i = 1; i <= 4; i++) {
378 events.next.then(values.add);
379 }
380 expect(values, isEmpty);
381 expect(await events.hasNext, isFalse);
382 expect(values, [1, 2, 3, 4]);
383 });
384
385 test("true when data event", () async {
386 var controller = new StreamController();
387 var events = new StreamQueue<int>(controller.stream);
388
389 var hasNext;
390 events.hasNext.then((result) { hasNext = result; });
391 await flushMicrotasks();
392 expect(hasNext, isNull);
393 controller.add(42);
394 expect(hasNext, isNull);
395 await flushMicrotasks();
396 expect(hasNext, isTrue);
397 });
398
399 test("true when error event", () async {
400 var controller = new StreamController();
401 var events = new StreamQueue<int>(controller.stream);
402
403 var hasNext;
404 events.hasNext.then((result) { hasNext = result; });
405 await flushMicrotasks();
406 expect(hasNext, isNull);
407 controller.addError("BAD");
408 expect(hasNext, isNull);
409 await flushMicrotasks();
410 expect(hasNext, isTrue);
411 expect(events.next, throwsA("BAD"));
412 });
413
414 test("- hasNext after hasNext", () async {
415 var events = new StreamQueue<int>(createStream());
416 expect(await events.hasNext, true);
417 expect(await events.hasNext, true);
418 expect(await events.next, 1);
419 expect(await events.hasNext, true);
420 expect(await events.hasNext, true);
421 expect(await events.next, 2);
422 expect(await events.hasNext, true);
423 expect(await events.hasNext, true);
424 expect(await events.next, 3);
425 expect(await events.hasNext, true);
426 expect(await events.hasNext, true);
427 expect(await events.next, 4);
428 expect(await events.hasNext, false);
429 expect(await events.hasNext, false);
430 });
431
432 test("- next after true", () async {
433 var events = new StreamQueue<int>(createStream());
434 expect(await events.next, 1);
435 expect(await events.hasNext, true);
436 expect(await events.next, 2);
437 expect(await events.next, 3);
438 });
439
440 test("- next after true, enqueued", () async {
441 var events = new StreamQueue<int>(createStream());
442 var responses = [];
443 var first = events.next.then(responses.add);
444 var hasSecond = events.hasNext.then(responses.add);
445 var second = events.next.then(responses.add);
446 do {
447 await flushMicrotasks();
448 } while (responses.length < 3);
449 expect(responses, [1, true, 2]);
450 });
451
452 test("- skip 0 after true", () async {
453 var events = new StreamQueue<int>(createStream());
454 expect(await events.next, 1);
455 expect(await events.hasNext, true);
456 expect(await events.skip(0), 0);
457 expect(await events.next, 2);
458 });
459
460 test("- skip 1 after true", () async {
461 var events = new StreamQueue<int>(createStream());
462 expect(await events.next, 1);
463 expect(await events.hasNext, true);
464 expect(await events.skip(1), 0);
465 expect(await events.next, 3);
466 });
467
468 test("- skip 2 after true", () async {
469 var events = new StreamQueue<int>(createStream());
470 expect(await events.next, 1);
471 expect(await events.hasNext, true);
472 expect(await events.skip(2), 0);
473 expect(await events.next, 4);
474 });
475
476 test("- take 0 after true", () async {
477 var events = new StreamQueue<int>(createStream());
478 expect(await events.next, 1);
479 expect(await events.hasNext, true);
480 expect(await events.take(0), isEmpty);
481 expect(await events.next, 2);
482 });
483
484 test("- take 1 after true", () async {
485 var events = new StreamQueue<int>(createStream());
486 expect(await events.next, 1);
487 expect(await events.hasNext, true);
488 expect(await events.take(1), [2]);
489 expect(await events.next, 3);
490 });
491
492 test("- take 2 after true", () async {
493 var events = new StreamQueue<int>(createStream());
494 expect(await events.next, 1);
495 expect(await events.hasNext, true);
496 expect(await events.take(2), [2, 3]);
497 expect(await events.next, 4);
498 });
499
500 test("- rest after true", () async {
501 var events = new StreamQueue<int>(createStream());
502 expect(await events.next, 1);
503 expect(await events.hasNext, true);
504 var stream = events.rest;
505 expect(await stream.toList(), [2, 3, 4]);
506 });
507
508 test("- rest after true, at last", () async {
509 var events = new StreamQueue<int>(createStream());
510 expect(await events.next, 1);
511 expect(await events.next, 2);
512 expect(await events.next, 3);
513 expect(await events.hasNext, true);
514 var stream = events.rest;
515 expect(await stream.toList(), [4]);
516 });
517
518 test("- rest after false", () async {
519 var events = new StreamQueue<int>(createStream());
520 expect(await events.next, 1);
521 expect(await events.next, 2);
522 expect(await events.next, 3);
523 expect(await events.next, 4);
524 expect(await events.hasNext, false);
525 var stream = events.rest;
526 expect(await stream.toList(), isEmpty);
527 });
528
529 test("- cancel after true on data", () async {
530 var events = new StreamQueue<int>(createStream());
531 expect(await events.next, 1);
532 expect(await events.next, 2);
533 expect(await events.hasNext, true);
534 expect(await events.cancel(), null);
535 });
536
537 test("- cancel after true on error", () async {
538 var events = new StreamQueue<int>(createErrorStream());
539 expect(await events.next, 1);
540 expect(await events.next, 2);
541 expect(await events.hasNext, true);
542 expect(await events.cancel(), null);
543 });
544 });
545
546 test("all combinations sequential skip/next/take operations", () async {
547 // Takes all combinations of two of next, skip and take, then ends with
548 // doing rest. Each of the first rounds do 10 events of each type,
549 // the rest does 20 elements.
550 var eventCount = 20 * (3 * 3 + 1);
551 var events = new StreamQueue<int>(createLongStream(eventCount));
552
553 // Test expecting [startIndex .. startIndex + 9] as events using
554 // `next`.
555 nextTest(startIndex) {
556 for (int i = 0; i < 10; i++) {
557 expect(events.next, completion(startIndex + i));
558 }
559 }
560
561 // Test expecting 10 events to be skipped.
562 skipTest(startIndex) {
563 expect(events.skip(10), completion(0));
564 }
565
566 // Test expecting [startIndex .. startIndex + 9] as events using
567 // `take(10)`.
568 takeTest(startIndex) {
569 expect(events.take(10),
570 completion(new List.generate(10, (i) => startIndex + i)));
571 }
572 var tests = [nextTest, skipTest, takeTest];
573
574 int counter = 0;
575 // Run through all pairs of two tests and run them.
576 for (int i = 0; i < tests.length; i++) {
577 for (int j = 0; j < tests.length; j++) {
578 tests[i](counter);
579 tests[j](counter + 10);
580 counter += 20;
581 }
582 }
583 // Then expect 20 more events as a `rest` call.
584 expect(events.rest.toList(),
585 completion(new List.generate(20, (i) => counter + i)));
586 });
587 }
588
589 Stream<int> createStream() async* {
590 yield 1;
591 await flushMicrotasks();
592 yield 2;
593 await flushMicrotasks();
594 yield 3;
595 await flushMicrotasks();
596 yield 4;
597 }
598
599 Stream<int> createErrorStream() {
600 StreamController controller = new StreamController<int>();
601 () async {
602 controller.add(1);
603 await flushMicrotasks();
604 controller.add(2);
605 await flushMicrotasks();
606 controller.addError("To err is divine!");
607 await flushMicrotasks();
608 controller.add(4);
609 await flushMicrotasks();
610 controller.close();
611 }();
612 return controller.stream;
613 }
614
615 Stream<int> createLongStream(int eventCount) async* {
616 for (int i = 0; i < eventCount; i++) yield i;
617 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698