Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(48)

Side by Side Diff: test/stream_queue_test.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Add all.dart to test. Apparently people like that. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE filevents.
4
5 import "dart:async";
6
7 import "package:async/async.dart" show StreamQueue;
8 import "package:test/test.dart";
9
10 main() {
11 group("source stream", () {
12 test("is listened to on first request, paused between requests", () async {
13 var controller = new StreamController();
14 var events = new StreamQueue<int>(controller.stream);
15 await flushMicrotasks();
16 expect(controller.hasListener, isFalse);
nweiz 2015/06/18 23:44:28 I didn't realize these getters existed! Consider u
Lasse Reichstein Nielsen 2015/06/30 10:34:15 Will try. It's not always possible to see the diff
17 var next = events.next;
18 expect(controller.hasListener, isTrue);
19 expect(controller.isPaused, isFalse);
20 controller.add(1);
21 expect(await next, 1);
22 expect(controller.hasListener, isTrue);
23 expect(controller.isPaused, isTrue);
24 next = events.next;
25 expect(controller.hasListener, isTrue);
26 expect(controller.isPaused, isFalse);
27 controller.add(2);
28 expect(await next, 2);
29 expect(controller.hasListener, isTrue);
30 expect(controller.isPaused, isTrue);
31 var cancel = events.cancel();
32 expect(controller.hasListener, isFalse);
nweiz 2015/06/18 23:44:28 A few blank lines would go a long way in making th
Lasse Reichstein Nielsen 2015/06/30 10:34:15 Done.
33 });
34 });
35
36 group("next operation", () {
37 test("simple sequence of requests", () async {
38 var events = new StreamQueue<int>(createStream());
39 for (int i = 1; i <= 4; i++) {
40 expect(await events.next, i);
41 }
42 expect(events.next, throwsStateError);
43 });
44
45 test("multiple requests at the same time", () async {
46 var events = new StreamQueue<int>(createStream());
47 var result = await Future.wait(
48 [events.next, events.next, events.next, events.next]);
49 expect(result, [1, 2, 3, 4]);
50 await events.cancel();
51 });
52
53 test("sequence of requests with error", () async {
54 var events = new StreamQueue<int>(createErrorStream());
55 expect(await events.next, 1);
56 expect(await events.next, 2);
57 expect(events.next, throws);
nweiz 2015/06/18 23:44:28 Check the error's value. It would be awkward if th
Lasse Reichstein Nielsen 2015/06/30 10:34:15 Done.
58 expect(await events.next, 4);
59 await events.cancel();
60 });
61 });
62
63 group("skip operation", () {
64 test("of two elements in the middle of sequence", () async {
65 var events = new StreamQueue<int>(createStream());
66 expect(await events.next, 1);
67 expect(await events.skip(2), 0);
68 expect(await events.next, 4);
69 await events.cancel();
70 });
71
72 test("with negative/bad arguments throws", () async {
73 var events = new StreamQueue<int>(createStream());
74 expect(() => events.skip(-1), throwsArgumentError);
75 // A non-int throws either a type error or an argument error,
76 // depending on whether it's checked mode or not.
77 expect(await events.next, 1); // Did not consume event.
78 expect(() => events.skip(-1), throwsArgumentError);
79 expect(await events.next, 2); // Did not consume event.
80 await events.cancel();
81 });
82
83 test("of 0 elements works", () async {
84 var events = new StreamQueue<int>(createStream());
85 expect(events.skip(0), completion(0));
86 expect(events.next, completion(1));
87 expect(events.skip(0), completion(0));
88 expect(events.next, completion(2));
89 expect(events.skip(0), completion(0));
90 expect(events.next, completion(3));
91 expect(events.skip(0), completion(0));
92 expect(events.next, completion(4));
93 expect(events.skip(0), completion(0));
94 expect(events.skip(5), completion(5));
95 expect(events.next, throwsStateError);
96 await events.cancel();
97 });
98
99 test("of too many events ends at stream start", () async {
100 var events = new StreamQueue<int>(createStream());
101 expect(await events.skip(6), 2);
102 await events.cancel();
103 });
104
105 test("of too many events after some events", () async {
106 var events = new StreamQueue<int>(createStream());
107 expect(await events.next, 1);
108 expect(await events.next, 2);
109 expect(await events.skip(6), 4);
110 await events.cancel();
111 });
112
113 test("of too many events ends at stream end", () async {
114 var events = new StreamQueue<int>(createStream());
115 expect(await events.next, 1);
116 expect(await events.next, 2);
117 expect(await events.next, 3);
118 expect(await events.next, 4);
119 expect(await events.skip(2), 2);
120 await events.cancel();
121 });
122
123 test("of events with error", () async {
124 var events = new StreamQueue<int>(createErrorStream());
125 expect(events.skip(4), throws);
126 expect(await events.next, 4);
127 await events.cancel();
128 });
129
130 test("of events with error, and skip again after", () async {
131 var events = new StreamQueue<int>(createErrorStream());
132 expect(events.skip(4), throws);
133 expect(events.skip(2), completion(1));
134 await events.cancel();
135 });
136
137 test("multiple skips at same time complete in order.", () async {
138 var events = new StreamQueue<int>(createStream());
139 var skip1 = events.skip(1);
140 var skip2 = events.skip(0);
141 var skip3 = events.skip(4);
142 var skip4 = events.skip(1);
143 var index = 0;
144 // Check that futures complete in order.
145 sequence(expectedValue, sequenceIndex) => (v) {
nweiz 2015/06/18 23:44:28 "v" -> "value"
Lasse Reichstein Nielsen 2015/06/30 10:34:15 Done.
146 expect(v, expectedValue);
147 expect(index, sequenceIndex);
148 index++;
149 }
150 await Future.wait([skip1.then(sequence(0, 0)),
151 skip2.then(sequence(0, 1)),
152 skip3.then(sequence(1, 2)),
153 skip4.then(sequence(1, 3))]);
154 // Complete when they are all done.
155 await Future.wait([skip1, skip2, skip3, skip4]);
nweiz 2015/06/18 23:44:28 Won't the previous await wait for all the futures
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Yes, this line is redundant. The previous line was
156 await events.cancel();
157 });
158 });
159
160 group("take operation", () {
161 test("as simple take of events", () async {
162 var events = new StreamQueue<int>(createStream());
163 expect(await events.next, 1);
164 expect(await events.take(2), [2, 3]);
165 expect(await events.next, 4);
166 await events.cancel();
167 });
168
169 test("of 0 events", () async {
170 var events = new StreamQueue<int>(createStream());
171 expect(events.take(0), completion([]));
nweiz 2015/06/18 23:44:28 I tend to prefer "isEmpty" to "equals([])". It pro
Lasse Reichstein Nielsen 2015/06/30 10:34:15 Done.
172 expect(events.next, completion(1));
173 expect(events.take(0), completion([]));
174 expect(events.next, completion(2));
175 expect(events.take(0), completion([]));
176 expect(events.next, completion(3));
177 expect(events.take(0), completion([]));
178 expect(events.next, completion(4));
179 expect(events.take(0), completion([]));
180 expect(events.take(5), completion([]));
181 expect(events.next, throwsStateError);
182 await events.cancel();
183 });
184
185 test("with bad arguments throws", () async {
186 var events = new StreamQueue<int>(createStream());
187 expect(() => events.take(-1), throwsArgumentError);
188 expect(await events.next, 1); // Did not consume event.
189 expect(() => events.take(-1), throwsArgumentError);
190 expect(await events.next, 2); // Did not consume event.
191 await events.cancel();
192 });
193
194 test("of too many arguments", () async {
195 var events = new StreamQueue<int>(createStream());
196 expect(await events.take(6), [1, 2, 3, 4]);
197 await events.cancel();
198 });
199
200 test("too large later", () async {
201 var events = new StreamQueue<int>(createStream());
202 expect(await events.next, 1);
203 expect(await events.next, 2);
204 expect(await events.take(6), [3, 4]);
205 await events.cancel();
206 });
207
208 test("error", () async {
209 var events = new StreamQueue<int>(createErrorStream());
210 expect(events.take(4), throws);
211 expect(await events.next, 4);
212 await events.cancel();
213 });
214 });
215
216 group("rest operation", () {
217 test("after single next", () async {
218 var events = new StreamQueue<int>(createStream());
219 expect(await events.next, 1);
220 expect(await events.rest.toList(), [2, 3, 4]);
221 });
222
223 test("at start", () async {
224 var events = new StreamQueue<int>(createStream());
225 expect(await events.rest.toList(), [1, 2, 3, 4]);
226 });
227
228 test("at end", () async {
229 var events = new StreamQueue<int>(createStream());
230 expect(await events.next, 1);
231 expect(await events.next, 2);
232 expect(await events.next, 3);
233 expect(await events.next, 4);
234 expect(await events.rest.toList(), []);
235 });
236
237 test("after end", () async {
238 var events = new StreamQueue<int>(createStream());
239 expect(await events.next, 1);
240 expect(await events.next, 2);
241 expect(await events.next, 3);
242 expect(await events.next, 4);
243 expect(events.next, throwsStateError);
244 expect(await events.rest.toList(), []);
245 });
246
247 test("with an error event error", () async {
248 var events = new StreamQueue<int>(createErrorStream());
249 expect(await events.next, 1);
250 var rest = events.rest;
251 var events2 = new StreamQueue(rest);
252 expect(await events2.next, 2);
253 expect(events2.next, throws);
254 expect(await events2.next, 4);
255 });
256
257 test("closes the events, prevents other operations", () async {
258 var events = new StreamQueue<int>(createStream());
259 var stream = events.rest;
260 expect(() => events.next, throwsStateError);
261 expect(() => events.skip(1), throwsStateError);
262 expect(() => events.take(1), throwsStateError);
263 expect(() => events.rest, throwsStateError);
264 expect(() => events.cancel(), throwsStateError);
265 });
266
267 test("forwards to underlying stream", () async {
268 var cancel = new Completer();
269 var controller = new StreamController(onCancel: () => cancel.future);
270 var events = new StreamQueue<int>(controller.stream);
271 expect(controller.hasListener, isFalse);
272 var next = events.next;
273 expect(controller.hasListener, isTrue);
274 expect(controller.isPaused, isFalse);
275 controller.add(1);
276 expect(await next, 1);
277 expect(controller.isPaused, isTrue);
278 var rest = events.rest;
279 var subscription = rest.listen(null);
280 expect(controller.hasListener, isTrue);
281 expect(controller.isPaused, isFalse);
282 var lastEvent;
283 subscription.onData((v) { lastEvent = v; });
284 controller.add(2);
285 await flushMicrotasks();
286 expect(lastEvent, 2);
287 expect(controller.hasListener, isTrue);
288 expect(controller.isPaused, isFalse);
289 subscription.pause();
290 expect(controller.isPaused, isTrue);
291 controller.add(3);
292 await flushMicrotasks();
293 expect(lastEvent, 2);
294 subscription.resume();
295 await flushMicrotasks();
296 expect(lastEvent, 3);
297 var cancelFuture = subscription.cancel();
298 expect(controller.hasListener, isFalse);
299 cancel.complete(42);
300 expect(cancelFuture, completion(42));
301 });
nweiz 2015/06/18 23:44:28 Another test that could use some blank lines.
Lasse Reichstein Nielsen 2015/06/30 10:34:15 Done.
302 });
303
304 group("close operation", () {
305 test("closes the events, prevents any other operation", () async {
306 var events = new StreamQueue<int>(createStream());
307 await events.cancel();
308 expect(() => events.next, throwsStateError);
309 expect(() => events.skip(1), throwsStateError);
310 expect(() => events.take(1), throwsStateError);
311 expect(() => events.rest, throwsStateError);
312 expect(() => events.cancel(), throwsStateError);
313 });
314
315 test("cancels underlying subscription, returns result", () async {
316 var cancelFuture = new Future.value(42);
317 var controller = new StreamController(onCancel: () => cancelFuture);
318 var events = new StreamQueue<int>(controller.stream);
319 controller.add(1);
320 expect(await events.next, 1);
321 expect(await events.cancel(), 42);
322 });
323 });
324
325
326 group("hasNext operation", () {
327 test("true at start", () async {
328 var events = new StreamQueue<int>(createStream());
329 expect(await events.hasNext, isTrue);
330 });
331
332 test("true after start", () async {
333 var events = new StreamQueue<int>(createStream());
334 expect(await events.next, 1);
335 expect(await events.hasNext, isTrue);
336 });
337
338 test("true false at end", () async {
nweiz 2015/06/18 23:44:28 "true false" -> "false"
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
339 var events = new StreamQueue<int>(createStream());
340 for (int i = 1; i <= 4; i++) {
341 expect(await events.next, i);
342 }
343 expect(await events.hasNext, isFalse);
344 });
345
346 test("true when enqueued", () async {
347 var events = new StreamQueue<int>(createStream());
348 var values = [];
349 for (int i = 1; i <= 3; i++) {
350 events.next.then(values.add);
351 }
352 expect(values, []);
353 expect(await events.hasNext, isTrue);
354 expect(values, [1, 2, 3]);
355 });
356
357 test("false when enqueued", () async {
358 var events = new StreamQueue<int>(createStream());
359 var values = [];
360 for (int i = 1; i <= 4; i++) {
361 events.next.then(values.add);
362 }
363 expect(values, []);
364 expect(await events.hasNext, isFalse);
365 expect(values, [1, 2, 3, 4]);
366 });
367
368 test("true when data event", () async {
369 var controller = new StreamController();
370 var events = new StreamQueue<int>(controller.stream);
371
372 var hasNext;
373 events.hasNext.then((result) { hasNext = result; });
374 await flushMicrotasks();
375 expect(hasNext, isNull);
376 controller.add(42);
377 expect(hasNext, isNull);
378 await flushMicrotasks();
379 expect(hasNext, isTrue);
380 });
381
382 test("true when error event", () async {
383 var controller = new StreamController();
384 var events = new StreamQueue<int>(controller.stream);
385
386 var hasNext;
387 events.hasNext.then((result) { hasNext = result; });
388 await flushMicrotasks();
389 expect(hasNext, isNull);
390 controller.addError("BAD");
391 expect(hasNext, isNull);
392 await flushMicrotasks();
393 expect(hasNext, isTrue);
394 expect(events.next, throwsA("BAD"));
395 });
396
397 test("- hasNext after hasNext", () async {
398 var events = new StreamQueue<int>(createStream());
399 expect(await events.hasNext, true);
400 expect(await events.hasNext, true);
401 expect(await events.next, 1);
402 expect(await events.hasNext, true);
403 expect(await events.hasNext, true);
404 expect(await events.next, 2);
405 expect(await events.hasNext, true);
406 expect(await events.hasNext, true);
407 expect(await events.next, 3);
408 expect(await events.hasNext, true);
409 expect(await events.hasNext, true);
410 expect(await events.next, 4);
411 expect(await events.hasNext, false);
412 expect(await events.hasNext, false);
413 });
414
415 test("- next after true", () async {
416 var events = new StreamQueue<int>(createStream());
417 expect(await events.next, 1);
418 expect(await events.hasNext, true);
419 expect(await events.next, 2);
420 expect(await events.next, 3);
421 });
422
423 test("- next after true, enqueued", () async {
424 var events = new StreamQueue<int>(createStream());
425 var responses = [];
426 var first = events.next.then(responses.add);
427 var hasSecond = events.hasNext.then(responses.add);
428 var second = events.next.then(responses.add);
429 do {
430 await flushMicrotasks();
431 } while (responses.length < 3);
432 expect(responses, [1, true, 2]);
433 });
434
435 test("- skip 0 after true", () async {
436 var events = new StreamQueue<int>(createStream());
437 expect(await events.next, 1);
438 expect(await events.hasNext, true);
439 expect(await events.skip(0), 0);
440 expect(await events.next, 2);
441 });
442
443 test("- skip 1 after true", () async {
444 var events = new StreamQueue<int>(createStream());
445 expect(await events.next, 1);
446 expect(await events.hasNext, true);
447 expect(await events.skip(1), 0);
448 expect(await events.next, 3);
449 });
450
451 test("- skip 2 after true", () async {
452 var events = new StreamQueue<int>(createStream());
453 expect(await events.next, 1);
454 expect(await events.hasNext, true);
455 expect(await events.skip(2), 0);
456 expect(await events.next, 4);
457 });
458
459 test("- take 0 after true", () async {
460 var events = new StreamQueue<int>(createStream());
461 expect(await events.next, 1);
462 expect(await events.hasNext, true);
463 expect(await events.take(0), []);
464 expect(await events.next, 2);
465 });
466
467 test("- take 1 after true", () async {
468 var events = new StreamQueue<int>(createStream());
469 expect(await events.next, 1);
470 expect(await events.hasNext, true);
471 expect(await events.take(1), [2]);
472 expect(await events.next, 3);
473 });
474
475 test("- take 2 after true", () async {
476 var events = new StreamQueue<int>(createStream());
477 expect(await events.next, 1);
478 expect(await events.hasNext, true);
479 expect(await events.take(2), [2, 3]);
480 expect(await events.next, 4);
481 });
482
483 test("- rest after true", () async {
484 var events = new StreamQueue<int>(createStream());
485 expect(await events.next, 1);
486 expect(await events.hasNext, true);
487 var stream = events.rest;
488 expect(await stream.toList(), [2, 3, 4]);
489 });
490
491 test("- rest after true, at last", () async {
492 var events = new StreamQueue<int>(createStream());
493 expect(await events.next, 1);
494 expect(await events.next, 2);
495 expect(await events.next, 3);
496 expect(await events.hasNext, true);
497 var stream = events.rest;
498 expect(await stream.toList(), [4]);
499 });
500
501 test("- rest after false", () async {
502 var events = new StreamQueue<int>(createStream());
503 expect(await events.next, 1);
504 expect(await events.next, 2);
505 expect(await events.next, 3);
506 expect(await events.next, 4);
507 expect(await events.hasNext, false);
508 var stream = events.rest;
509 expect(await stream.toList(), []);
510 });
511
512 test("- cancel after true on data", () async {
513 var events = new StreamQueue<int>(createStream());
514 expect(await events.next, 1);
515 expect(await events.next, 2);
516 expect(await events.hasNext, true);
517 expect(await events.cancel(), null);
518 });
519
520 test("- cancel after true on error", () async {
521 var events = new StreamQueue<int>(createErrorStream());
522 expect(await events.next, 1);
523 expect(await events.next, 2);
524 expect(await events.hasNext, true);
525 expect(await events.cancel(), null);
526 });
527 });
528
529 test("all combinations sequential skip/next/take operations", () async {
530 // Takes all combinations of two of next, skip and take, then ends with
531 // doing rest. Each of the first rounds do 10 events of each type,
532 // the rest does 20 elements.
533 var eventCount = 20 * (3 * 3 + 1);
534 var events = new StreamQueue<int>(createLongStream(eventCount));
535
536 // Test expecting [startIndex .. startIndex + 9] as events using
537 // `next`.
538 nextTest(startIndex) {
539 for (int i = 0; i < 10; i++) {
540 expect(events.next, completion(startIndex + i));
541 }
542 }
543
544 // Test expecting 10 events to be skipped.
545 skipTest(startIndex) {
546 expect(events.skip(10), completion(0));
547 }
548
549 // Test expecting [startIndex .. startIndex + 9] as events using
550 // `take(10)`.
551 takeTest(startIndex) {
552 expect(events.take(10),
553 completion(new List.generate(10, (i) => startIndex + i)));
554 }
555 var tests = [nextTest, skipTest, takeTest];
556
557 int counter = 0;
558 // Run through all pairs of two tests and run them.
559 for (int i = 0; i < tests.length; i++) {
560 for (int j = 0; j < tests.length; j++) {
561 tests[i](counter);
562 tests[j](counter + 10);
563 counter += 20;
564 }
565 }
566 // Then expect 20 more events as a `rest` call.
567 expect(events.rest.toList(),
568 completion(new List.generate(20, (i) => counter + i)));
569 });
570 }
nweiz 2015/06/18 23:44:28 Also test passing in "prefetch".
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Prefetch was removed.
571
572 Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
573
574 Stream<int> createStream() async* {
575 yield 1;
576 await flushMicrotasks();
577 yield 2;
578 await flushMicrotasks();
579 yield 3;
580 await flushMicrotasks();
581 yield 4;
582 }
583
584 Stream<int> createErrorStream() {
585 StreamController controller = new StreamController<int>();
586 () async {
587 controller.add(1);
588 await flushMicrotasks();
589 controller.add(2);
590 await flushMicrotasks();
591 controller.addError("To err is divine!");
592 await flushMicrotasks();
593 controller.add(4);
594 await flushMicrotasks();
595 controller.close();
596 }();
597 return controller.stream;
598 }
599
600 Stream<int> createLongStream(int eventCount) async* {
601 for (int i = 0; i < eventCount; i++) yield i;
602 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698