Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(243)

Side by Side Diff: test/stream_queue_test.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Address comments. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « test/stream_completer_test.dart ('k') | test/subscription_stream_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE filevents.
4
5 import "dart:async";
6
7 import "package:async/async.dart" show StreamQueue;
8 import "package:test/test.dart";
9
10 import "utils.dart";
11
12 main() {
13 group("source stream", () {
14 test("is listened to on first request, paused between requests", () async {
15 var controller = new StreamController();
16 var events = new StreamQueue<int>(controller.stream);
17 await flushMicrotasks();
18 expect(controller.hasListener, isFalse);
19
20 var next = events.next;
21 expect(controller.hasListener, isTrue);
22 expect(controller.isPaused, isFalse);
23
24 controller.add(1);
25
26 expect(await next, 1);
27 expect(controller.hasListener, isTrue);
28 expect(controller.isPaused, isTrue);
29
30 next = events.next;
31 expect(controller.hasListener, isTrue);
32 expect(controller.isPaused, isFalse);
33
34 controller.add(2);
35
36 expect(await next, 2);
37 expect(controller.hasListener, isTrue);
38 expect(controller.isPaused, isTrue);
39
40 var cancel = events.cancel();
41 expect(controller.hasListener, isFalse);
42 });
43 });
44
45 group("next operation", () {
46 test("simple sequence of requests", () async {
47 var events = new StreamQueue<int>(createStream());
48 for (int i = 1; i <= 4; i++) {
49 expect(await events.next, i);
50 }
51 expect(events.next, throwsStateError);
52 });
53
54 test("multiple requests at the same time", () async {
55 var events = new StreamQueue<int>(createStream());
56 var result = await Future.wait(
57 [events.next, events.next, events.next, events.next]);
58 expect(result, [1, 2, 3, 4]);
59 await events.cancel();
60 });
61
62 test("sequence of requests with error", () async {
63 var events = new StreamQueue<int>(createErrorStream());
64 expect(await events.next, 1);
65 expect(await events.next, 2);
66 expect(events.next, throwsA("To err is divine!"));
67 expect(await events.next, 4);
68 await events.cancel();
69 });
70 });
71
72 group("skip operation", () {
73 test("of two elements in the middle of sequence", () async {
74 var events = new StreamQueue<int>(createStream());
75 expect(await events.next, 1);
76 expect(await events.skip(2), 0);
77 expect(await events.next, 4);
78 await events.cancel();
79 });
80
81 test("with negative/bad arguments throws", () async {
82 var events = new StreamQueue<int>(createStream());
83 expect(() => events.skip(-1), throwsArgumentError);
84 // A non-int throws either a type error or an argument error,
85 // depending on whether it's checked mode or not.
86 expect(await events.next, 1); // Did not consume event.
87 expect(() => events.skip(-1), throwsArgumentError);
88 expect(await events.next, 2); // Did not consume event.
89 await events.cancel();
90 });
91
92 test("of 0 elements works", () async {
93 var events = new StreamQueue<int>(createStream());
94 expect(events.skip(0), completion(0));
95 expect(events.next, completion(1));
96 expect(events.skip(0), completion(0));
97 expect(events.next, completion(2));
98 expect(events.skip(0), completion(0));
99 expect(events.next, completion(3));
100 expect(events.skip(0), completion(0));
101 expect(events.next, completion(4));
102 expect(events.skip(0), completion(0));
103 expect(events.skip(5), completion(5));
104 expect(events.next, throwsStateError);
105 await events.cancel();
106 });
107
108 test("of too many events ends at stream start", () async {
109 var events = new StreamQueue<int>(createStream());
110 expect(await events.skip(6), 2);
111 await events.cancel();
112 });
113
114 test("of too many events after some events", () async {
115 var events = new StreamQueue<int>(createStream());
116 expect(await events.next, 1);
117 expect(await events.next, 2);
118 expect(await events.skip(6), 4);
119 await events.cancel();
120 });
121
122 test("of too many events ends at stream end", () async {
123 var events = new StreamQueue<int>(createStream());
124 expect(await events.next, 1);
125 expect(await events.next, 2);
126 expect(await events.next, 3);
127 expect(await events.next, 4);
128 expect(await events.skip(2), 2);
129 await events.cancel();
130 });
131
132 test("of events with error", () async {
133 var events = new StreamQueue<int>(createErrorStream());
134 expect(events.skip(4), throwsA("To err is divine!"));
135 expect(await events.next, 4);
136 await events.cancel();
137 });
138
139 test("of events with error, and skip again after", () async {
140 var events = new StreamQueue<int>(createErrorStream());
141 expect(events.skip(4), throwsA("To err is divine!"));
142 expect(events.skip(2), completion(1));
143 await events.cancel();
144 });
145
146 test("multiple skips at same time complete in order.", () async {
147 var events = new StreamQueue<int>(createStream());
148 var skip1 = events.skip(1);
149 var skip2 = events.skip(0);
150 var skip3 = events.skip(4);
151 var skip4 = events.skip(1);
152 var index = 0;
153 // Check that futures complete in order.
154 sequence(expectedValue, sequenceIndex) => (value) {
155 expect(value, expectedValue);
156 expect(index, sequenceIndex);
157 index++;
158 }
159 await Future.wait([skip1.then(sequence(0, 0)),
160 skip2.then(sequence(0, 1)),
161 skip3.then(sequence(1, 2)),
162 skip4.then(sequence(1, 3))]);
163 await events.cancel();
164 });
165 });
166
167 group("take operation", () {
168 test("as simple take of events", () async {
169 var events = new StreamQueue<int>(createStream());
170 expect(await events.next, 1);
171 expect(await events.take(2), [2, 3]);
172 expect(await events.next, 4);
173 await events.cancel();
174 });
175
176 test("of 0 events", () async {
177 var events = new StreamQueue<int>(createStream());
178 expect(events.take(0), completion([]));
179 expect(events.next, completion(1));
180 expect(events.take(0), completion([]));
181 expect(events.next, completion(2));
182 expect(events.take(0), completion([]));
183 expect(events.next, completion(3));
184 expect(events.take(0), completion([]));
185 expect(events.next, completion(4));
186 expect(events.take(0), completion([]));
187 expect(events.take(5), completion([]));
188 expect(events.next, throwsStateError);
189 await events.cancel();
190 });
191
192 test("with bad arguments throws", () async {
193 var events = new StreamQueue<int>(createStream());
194 expect(() => events.take(-1), throwsArgumentError);
195 expect(await events.next, 1); // Did not consume event.
196 expect(() => events.take(-1), throwsArgumentError);
197 expect(await events.next, 2); // Did not consume event.
198 await events.cancel();
199 });
200
201 test("of too many arguments", () async {
202 var events = new StreamQueue<int>(createStream());
203 expect(await events.take(6), [1, 2, 3, 4]);
204 await events.cancel();
205 });
206
207 test("too large later", () async {
208 var events = new StreamQueue<int>(createStream());
209 expect(await events.next, 1);
210 expect(await events.next, 2);
211 expect(await events.take(6), [3, 4]);
212 await events.cancel();
213 });
214
215 test("error", () async {
216 var events = new StreamQueue<int>(createErrorStream());
217 expect(events.take(4), throwsA("To err is divine!"));
218 expect(await events.next, 4);
219 await events.cancel();
220 });
221 });
222
223 group("rest operation", () {
224 test("after single next", () async {
225 var events = new StreamQueue<int>(createStream());
226 expect(await events.next, 1);
227 expect(await events.rest.toList(), [2, 3, 4]);
228 });
229
230 test("at start", () async {
231 var events = new StreamQueue<int>(createStream());
232 expect(await events.rest.toList(), [1, 2, 3, 4]);
233 });
234
235 test("at end", () async {
236 var events = new StreamQueue<int>(createStream());
237 expect(await events.next, 1);
238 expect(await events.next, 2);
239 expect(await events.next, 3);
240 expect(await events.next, 4);
241 expect(await events.rest.toList(), isEmpty);
242 });
243
244 test("after end", () async {
245 var events = new StreamQueue<int>(createStream());
246 expect(await events.next, 1);
247 expect(await events.next, 2);
248 expect(await events.next, 3);
249 expect(await events.next, 4);
250 expect(events.next, throwsStateError);
251 expect(await events.rest.toList(), isEmpty);
252 });
253
254 test("after receiving done requested before", () async {
255 var events = new StreamQueue<int>(createStream());
256 var next1 = events.next;
257 var next2 = events.next;
258 var next3 = events.next;
259 var rest = events.rest;
260 for (int i = 0; i < 10; i++) {
261 await flushMicrotasks();
262 }
263 expect(await next1, 1);
264 expect(await next2, 2);
265 expect(await next3, 3);
266 expect(await rest.toList(), [4]);
267 });
268
269 test("with an error event error", () async {
270 var events = new StreamQueue<int>(createErrorStream());
271 expect(await events.next, 1);
272 var rest = events.rest;
273 var events2 = new StreamQueue(rest);
274 expect(await events2.next, 2);
275 expect(events2.next, throwsA("To err is divine!"));
276 expect(await events2.next, 4);
277 });
278
279 test("closes the events, prevents other operations", () async {
280 var events = new StreamQueue<int>(createStream());
281 var stream = events.rest;
282 expect(() => events.next, throwsStateError);
283 expect(() => events.skip(1), throwsStateError);
284 expect(() => events.take(1), throwsStateError);
285 expect(() => events.rest, throwsStateError);
286 expect(() => events.cancel(), throwsStateError);
287 });
288
289 test("forwards to underlying stream", () async {
290 var cancel = new Completer();
291 var controller = new StreamController(onCancel: () => cancel.future);
292 var events = new StreamQueue<int>(controller.stream);
293 expect(controller.hasListener, isFalse);
294 var next = events.next;
295 expect(controller.hasListener, isTrue);
296 expect(controller.isPaused, isFalse);
297
298 controller.add(1);
299 expect(await next, 1);
300 expect(controller.isPaused, isTrue);
301
302 var rest = events.rest;
303 var subscription = rest.listen(null);
304 expect(controller.hasListener, isTrue);
305 expect(controller.isPaused, isFalse);
306
307 var lastEvent;
308 subscription.onData((value) => lastEvent = value);
309
310 controller.add(2);
311
312 await flushMicrotasks();
313 expect(lastEvent, 2);
314 expect(controller.hasListener, isTrue);
315 expect(controller.isPaused, isFalse);
316
317 subscription.pause();
318 expect(controller.isPaused, isTrue);
319
320 controller.add(3);
321
322 await flushMicrotasks();
323 expect(lastEvent, 2);
324 subscription.resume();
325
326 await flushMicrotasks();
327 expect(lastEvent, 3);
328
329 var cancelFuture = subscription.cancel();
330 expect(controller.hasListener, isFalse);
331 cancel.complete(42);
332 expect(cancelFuture, completion(42));
333 });
334 });
335
336 group("close operation", () {
337 test("closes the events, prevents any other operation", () async {
338 var events = new StreamQueue<int>(createStream());
339 await events.cancel();
340 expect(() => events.next, throwsStateError);
341 expect(() => events.skip(1), throwsStateError);
342 expect(() => events.take(1), throwsStateError);
343 expect(() => events.rest, throwsStateError);
344 expect(() => events.cancel(), throwsStateError);
345 });
346
347 test("cancels underlying subscription, returns result", () async {
348 var cancelFuture = new Future.value(42);
349 var controller = new StreamController(onCancel: () => cancelFuture);
350 var events = new StreamQueue<int>(controller.stream);
351 controller.add(1);
352 expect(await events.next, 1);
353 expect(await events.cancel(), 42);
354 });
355 });
356
357
358 group("hasNext operation", () {
359 test("true at start", () async {
360 var events = new StreamQueue<int>(createStream());
361 expect(await events.hasNext, isTrue);
362 });
363
364 test("true after start", () async {
365 var events = new StreamQueue<int>(createStream());
366 expect(await events.next, 1);
367 expect(await events.hasNext, isTrue);
368 });
369
370 test("true at end", () async {
371 var events = new StreamQueue<int>(createStream());
372 for (int i = 1; i <= 4; i++) {
373 expect(await events.next, i);
374 }
375 expect(await events.hasNext, isFalse);
376 });
377
378 test("true when enqueued", () async {
379 var events = new StreamQueue<int>(createStream());
380 var values = [];
381 for (int i = 1; i <= 3; i++) {
382 events.next.then(values.add);
383 }
384 expect(values, isEmpty);
385 expect(await events.hasNext, isTrue);
386 expect(values, [1, 2, 3]);
387 });
388
389 test("false when enqueued", () async {
390 var events = new StreamQueue<int>(createStream());
391 var values = [];
392 for (int i = 1; i <= 4; i++) {
393 events.next.then(values.add);
394 }
395 expect(values, isEmpty);
396 expect(await events.hasNext, isFalse);
397 expect(values, [1, 2, 3, 4]);
398 });
399
400 test("true when data event", () async {
401 var controller = new StreamController();
402 var events = new StreamQueue<int>(controller.stream);
403
404 var hasNext;
405 events.hasNext.then((result) { hasNext = result; });
406 await flushMicrotasks();
407 expect(hasNext, isNull);
408 controller.add(42);
409 expect(hasNext, isNull);
410 await flushMicrotasks();
411 expect(hasNext, isTrue);
412 });
413
414 test("true when error event", () async {
415 var controller = new StreamController();
416 var events = new StreamQueue<int>(controller.stream);
417
418 var hasNext;
419 events.hasNext.then((result) { hasNext = result; });
420 await flushMicrotasks();
421 expect(hasNext, isNull);
422 controller.addError("BAD");
423 expect(hasNext, isNull);
424 await flushMicrotasks();
425 expect(hasNext, isTrue);
426 expect(events.next, throwsA("BAD"));
427 });
428
429 test("- hasNext after hasNext", () async {
430 var events = new StreamQueue<int>(createStream());
431 expect(await events.hasNext, true);
432 expect(await events.hasNext, true);
433 expect(await events.next, 1);
434 expect(await events.hasNext, true);
435 expect(await events.hasNext, true);
436 expect(await events.next, 2);
437 expect(await events.hasNext, true);
438 expect(await events.hasNext, true);
439 expect(await events.next, 3);
440 expect(await events.hasNext, true);
441 expect(await events.hasNext, true);
442 expect(await events.next, 4);
443 expect(await events.hasNext, false);
444 expect(await events.hasNext, false);
445 });
446
447 test("- next after true", () async {
448 var events = new StreamQueue<int>(createStream());
449 expect(await events.next, 1);
450 expect(await events.hasNext, true);
451 expect(await events.next, 2);
452 expect(await events.next, 3);
453 });
454
455 test("- next after true, enqueued", () async {
456 var events = new StreamQueue<int>(createStream());
457 var responses = [];
458 var first = events.next.then(responses.add);
459 var hasSecond = events.hasNext.then(responses.add);
460 var second = events.next.then(responses.add);
461 do {
462 await flushMicrotasks();
463 } while (responses.length < 3);
464 expect(responses, [1, true, 2]);
465 });
466
467 test("- skip 0 after true", () async {
468 var events = new StreamQueue<int>(createStream());
469 expect(await events.next, 1);
470 expect(await events.hasNext, true);
471 expect(await events.skip(0), 0);
472 expect(await events.next, 2);
473 });
474
475 test("- skip 1 after true", () async {
476 var events = new StreamQueue<int>(createStream());
477 expect(await events.next, 1);
478 expect(await events.hasNext, true);
479 expect(await events.skip(1), 0);
480 expect(await events.next, 3);
481 });
482
483 test("- skip 2 after true", () async {
484 var events = new StreamQueue<int>(createStream());
485 expect(await events.next, 1);
486 expect(await events.hasNext, true);
487 expect(await events.skip(2), 0);
488 expect(await events.next, 4);
489 });
490
491 test("- take 0 after true", () async {
492 var events = new StreamQueue<int>(createStream());
493 expect(await events.next, 1);
494 expect(await events.hasNext, true);
495 expect(await events.take(0), isEmpty);
496 expect(await events.next, 2);
497 });
498
499 test("- take 1 after true", () async {
500 var events = new StreamQueue<int>(createStream());
501 expect(await events.next, 1);
502 expect(await events.hasNext, true);
503 expect(await events.take(1), [2]);
504 expect(await events.next, 3);
505 });
506
507 test("- take 2 after true", () async {
508 var events = new StreamQueue<int>(createStream());
509 expect(await events.next, 1);
510 expect(await events.hasNext, true);
511 expect(await events.take(2), [2, 3]);
512 expect(await events.next, 4);
513 });
514
515 test("- rest after true", () async {
516 var events = new StreamQueue<int>(createStream());
517 expect(await events.next, 1);
518 expect(await events.hasNext, true);
519 var stream = events.rest;
520 expect(await stream.toList(), [2, 3, 4]);
521 });
522
523 test("- rest after true, at last", () async {
524 var events = new StreamQueue<int>(createStream());
525 expect(await events.next, 1);
526 expect(await events.next, 2);
527 expect(await events.next, 3);
528 expect(await events.hasNext, true);
529 var stream = events.rest;
530 expect(await stream.toList(), [4]);
531 });
532
533 test("- rest after false", () async {
534 var events = new StreamQueue<int>(createStream());
535 expect(await events.next, 1);
536 expect(await events.next, 2);
537 expect(await events.next, 3);
538 expect(await events.next, 4);
539 expect(await events.hasNext, false);
540 var stream = events.rest;
541 expect(await stream.toList(), isEmpty);
542 });
543
544 test("- cancel after true on data", () async {
545 var events = new StreamQueue<int>(createStream());
546 expect(await events.next, 1);
547 expect(await events.next, 2);
548 expect(await events.hasNext, true);
549 expect(await events.cancel(), null);
550 });
551
552 test("- cancel after true on error", () async {
553 var events = new StreamQueue<int>(createErrorStream());
554 expect(await events.next, 1);
555 expect(await events.next, 2);
556 expect(await events.hasNext, true);
557 expect(await events.cancel(), null);
558 });
559 });
560
561 test("all combinations sequential skip/next/take operations", () async {
562 // Takes all combinations of two of next, skip and take, then ends with
563 // doing rest. Each of the first rounds do 10 events of each type,
564 // the rest does 20 elements.
565 var eventCount = 20 * (3 * 3 + 1);
566 var events = new StreamQueue<int>(createLongStream(eventCount));
567
568 // Test expecting [startIndex .. startIndex + 9] as events using
569 // `next`.
570 nextTest(startIndex) {
571 for (int i = 0; i < 10; i++) {
572 expect(events.next, completion(startIndex + i));
573 }
574 }
575
576 // Test expecting 10 events to be skipped.
577 skipTest(startIndex) {
578 expect(events.skip(10), completion(0));
579 }
580
581 // Test expecting [startIndex .. startIndex + 9] as events using
582 // `take(10)`.
583 takeTest(startIndex) {
584 expect(events.take(10),
585 completion(new List.generate(10, (i) => startIndex + i)));
586 }
587 var tests = [nextTest, skipTest, takeTest];
588
589 int counter = 0;
590 // Run through all pairs of two tests and run them.
591 for (int i = 0; i < tests.length; i++) {
592 for (int j = 0; j < tests.length; j++) {
593 tests[i](counter);
594 tests[j](counter + 10);
595 counter += 20;
596 }
597 }
598 // Then expect 20 more events as a `rest` call.
599 expect(events.rest.toList(),
600 completion(new List.generate(20, (i) => counter + i)));
601 });
602 }
603
604 Stream<int> createStream() async* {
605 yield 1;
606 await flushMicrotasks();
607 yield 2;
608 await flushMicrotasks();
609 yield 3;
610 await flushMicrotasks();
611 yield 4;
612 }
613
614 Stream<int> createErrorStream() {
615 StreamController controller = new StreamController<int>();
616 () async {
617 controller.add(1);
618 await flushMicrotasks();
619 controller.add(2);
620 await flushMicrotasks();
621 controller.addError("To err is divine!");
622 await flushMicrotasks();
623 controller.add(4);
624 await flushMicrotasks();
625 controller.close();
626 }();
627 return controller.stream;
628 }
629
630 Stream<int> createLongStream(int eventCount) async* {
631 for (int i = 0; i < eventCount; i++) yield i;
632 }
OLDNEW
« no previous file with comments | « test/stream_completer_test.dart ('k') | test/subscription_stream_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698