Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(173)

Side by Side Diff: packages/async/test/stream_queue_test.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE filevents. 3 // BSD-style license that can be found in the LICENSE filevents.
4 4
5 import "dart:async"; 5 import "dart:async";
6 6
7 import "package:async/async.dart" show StreamQueue; 7 import "package:async/async.dart";
8 import "package:test/test.dart"; 8 import "package:test/test.dart";
9 9
10 import "utils.dart"; 10 import "utils.dart";
11 11
12 main() { 12 main() {
13 group("source stream", () { 13 group("source stream", () {
14 test("is listened to on first request, paused between requests", () async { 14 test("is listened to on first request, paused between requests", () async {
15 var controller = new StreamController(); 15 var controller = new StreamController<int>();
16 var events = new StreamQueue<int>(controller.stream); 16 var events = new StreamQueue<int>(controller.stream);
17 await flushMicrotasks(); 17 await flushMicrotasks();
18 expect(controller.hasListener, isFalse); 18 expect(controller.hasListener, isFalse);
19 19
20 var next = events.next; 20 var next = events.next;
21 expect(controller.hasListener, isTrue); 21 expect(controller.hasListener, isTrue);
22 expect(controller.isPaused, isFalse); 22 expect(controller.isPaused, isFalse);
23 23
24 controller.add(1); 24 controller.add(1);
25 25
26 expect(await next, 1); 26 expect(await next, 1);
27 expect(controller.hasListener, isTrue); 27 expect(controller.hasListener, isTrue);
28 expect(controller.isPaused, isTrue); 28 expect(controller.isPaused, isTrue);
29 29
30 next = events.next; 30 next = events.next;
31 expect(controller.hasListener, isTrue); 31 expect(controller.hasListener, isTrue);
32 expect(controller.isPaused, isFalse); 32 expect(controller.isPaused, isFalse);
33 33
34 controller.add(2); 34 controller.add(2);
35 35
36 expect(await next, 2); 36 expect(await next, 2);
37 expect(controller.hasListener, isTrue); 37 expect(controller.hasListener, isTrue);
38 expect(controller.isPaused, isTrue); 38 expect(controller.isPaused, isTrue);
39 39
40 events.cancel(); 40 events.cancel();
41 expect(controller.hasListener, isFalse); 41 expect(controller.hasListener, isFalse);
42 }); 42 });
43 }); 43 });
44 44
45 group("eventsDispatched", () {
46 test("increments after a next future completes", () async {
47 var events = new StreamQueue<int>(createStream());
48
49 expect(events.eventsDispatched, equals(0));
50 await flushMicrotasks();
51 expect(events.eventsDispatched, equals(0));
52
53 var next = events.next;
54 expect(events.eventsDispatched, equals(0));
55
56 await next;
57 expect(events.eventsDispatched, equals(1));
58
59 await events.next;
60 expect(events.eventsDispatched, equals(2));
61 });
62
63 test("increments multiple times for multi-value requests", () async {
64 var events = new StreamQueue<int>(createStream());
65 await events.take(3);
66 expect(events.eventsDispatched, equals(3));
67 });
68
69 test("increments multiple times for an accepted transaction", () async {
70 var events = new StreamQueue<int>(createStream());
71 await events.withTransaction((queue) async {
72 await queue.next;
73 await queue.next;
74 return true;
75 });
76 expect(events.eventsDispatched, equals(2));
77 });
78
79 test("doesn't increment for rest requests", () async {
80 var events = new StreamQueue<int>(createStream());
81 await events.rest.toList();
82 expect(events.eventsDispatched, equals(0));
83 });
84 });
85
86 group("lookAhead operation", () {
87 test("as simple list of events", () async {
88 var events = new StreamQueue<int>(createStream());
89 expect(await events.lookAhead(4), [1, 2, 3, 4]);
90 expect(await events.next, 1);
91 expect(await events.lookAhead(2), [2, 3]);
92 expect(await events.take(2), [2, 3]);
93 expect(await events.next, 4);
94 await events.cancel();
95 });
96
97 test("of 0 events", () async {
98 var events = new StreamQueue<int>(createStream());
99 expect(events.lookAhead(0), completion([]));
100 expect(events.next, completion(1));
101 expect(events.lookAhead(0), completion([]));
102 expect(events.next, completion(2));
103 expect(events.lookAhead(0), completion([]));
104 expect(events.next, completion(3));
105 expect(events.lookAhead(0), completion([]));
106 expect(events.next, completion(4));
107 expect(events.lookAhead(0), completion([]));
108 expect(events.lookAhead(5), completion([]));
109 expect(events.next, throwsStateError);
110 await events.cancel();
111 });
112
113 test("with bad arguments throws", () async {
114 var events = new StreamQueue<int>(createStream());
115 expect(() => events.lookAhead(-1), throwsArgumentError);
116 expect(await events.next, 1); // Did not consume event.
117 expect(() => events.lookAhead(-1), throwsArgumentError);
118 expect(await events.next, 2); // Did not consume event.
119 await events.cancel();
120 });
121
122 test("of too many arguments", () async {
123 var events = new StreamQueue<int>(createStream());
124 expect(await events.lookAhead(6), [1, 2, 3, 4]);
125 await events.cancel();
126 });
127
128 test("too large later", () async {
129 var events = new StreamQueue<int>(createStream());
130 expect(await events.next, 1);
131 expect(await events.next, 2);
132 expect(await events.lookAhead(6), [3, 4]);
133 await events.cancel();
134 });
135
136 test("error", () async {
137 var events = new StreamQueue<int>(createErrorStream());
138 expect(events.lookAhead(4), throwsA("To err is divine!"));
139 expect(events.take(4), throwsA("To err is divine!"));
140 expect(await events.next, 4);
141 await events.cancel();
142 });
143 });
144
45 group("next operation", () { 145 group("next operation", () {
46 test("simple sequence of requests", () async { 146 test("simple sequence of requests", () async {
47 var events = new StreamQueue<int>(createStream()); 147 var events = new StreamQueue<int>(createStream());
48 for (int i = 1; i <= 4; i++) { 148 for (int i = 1; i <= 4; i++) {
49 expect(await events.next, i); 149 expect(await events.next, i);
50 } 150 }
51 expect(events.next, throwsStateError); 151 expect(events.next, throwsStateError);
52 }); 152 });
53 153
54 test("multiple requests at the same time", () async { 154 test("multiple requests at the same time", () async {
55 var events = new StreamQueue<int>(createStream()); 155 var events = new StreamQueue<int>(createStream());
56 var result = await Future.wait( 156 var result = await Future
57 [events.next, events.next, events.next, events.next]); 157 .wait([events.next, events.next, events.next, events.next]);
58 expect(result, [1, 2, 3, 4]); 158 expect(result, [1, 2, 3, 4]);
59 await events.cancel(); 159 await events.cancel();
60 }); 160 });
61 161
62 test("sequence of requests with error", () async { 162 test("sequence of requests with error", () async {
63 var events = new StreamQueue<int>(createErrorStream()); 163 var events = new StreamQueue<int>(createErrorStream());
64 expect(await events.next, 1); 164 expect(await events.next, 1);
65 expect(await events.next, 2); 165 expect(await events.next, 2);
66 expect(events.next, throwsA("To err is divine!")); 166 expect(events.next, throwsA("To err is divine!"));
67 expect(await events.next, 4); 167 expect(await events.next, 4);
68 await events.cancel(); 168 await events.cancel();
69 }); 169 });
70 }); 170 });
71 171
72 group("skip operation", () { 172 group("skip operation", () {
73 test("of two elements in the middle of sequence", () async { 173 test("of two elements in the middle of sequence", () async {
74 var events = new StreamQueue<int>(createStream()); 174 var events = new StreamQueue<int>(createStream());
75 expect(await events.next, 1); 175 expect(await events.next, 1);
76 expect(await events.skip(2), 0); 176 expect(await events.skip(2), 0);
77 expect(await events.next, 4); 177 expect(await events.next, 4);
78 await events.cancel(); 178 await events.cancel();
79 }); 179 });
80 180
81 test("with negative/bad arguments throws", () async { 181 test("with negative/bad arguments throws", () async {
82 var events = new StreamQueue<int>(createStream()); 182 var events = new StreamQueue<int>(createStream());
83 expect(() => events.skip(-1), throwsArgumentError); 183 expect(() => events.skip(-1), throwsArgumentError);
84 // A non-int throws either a type error or an argument error, 184 // A non-int throws either a type error or an argument error,
85 // depending on whether it's checked mode or not. 185 // depending on whether it's checked mode or not.
86 expect(await events.next, 1); // Did not consume event. 186 expect(await events.next, 1); // Did not consume event.
87 expect(() => events.skip(-1), throwsArgumentError); 187 expect(() => events.skip(-1), throwsArgumentError);
88 expect(await events.next, 2); // Did not consume event. 188 expect(await events.next, 2); // Did not consume event.
89 await events.cancel(); 189 await events.cancel();
90 }); 190 });
91 191
92 test("of 0 elements works", () async { 192 test("of 0 elements works", () async {
93 var events = new StreamQueue<int>(createStream()); 193 var events = new StreamQueue<int>(createStream());
94 expect(events.skip(0), completion(0)); 194 expect(events.skip(0), completion(0));
95 expect(events.next, completion(1)); 195 expect(events.next, completion(1));
96 expect(events.skip(0), completion(0)); 196 expect(events.skip(0), completion(0));
97 expect(events.next, completion(2)); 197 expect(events.next, completion(2));
98 expect(events.skip(0), completion(0)); 198 expect(events.skip(0), completion(0));
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 await events.cancel(); 243 await events.cancel();
144 }); 244 });
145 test("multiple skips at same time complete in order.", () async { 245 test("multiple skips at same time complete in order.", () async {
146 var events = new StreamQueue<int>(createStream()); 246 var events = new StreamQueue<int>(createStream());
147 var skip1 = events.skip(1); 247 var skip1 = events.skip(1);
148 var skip2 = events.skip(0); 248 var skip2 = events.skip(0);
149 var skip3 = events.skip(4); 249 var skip3 = events.skip(4);
150 var skip4 = events.skip(1); 250 var skip4 = events.skip(1);
151 var index = 0; 251 var index = 0;
152 // Check that futures complete in order. 252 // Check that futures complete in order.
153 sequence(expectedValue, sequenceIndex) => (value) { 253 Func1Required<int> sequence(expectedValue, sequenceIndex) => (value) {
154 expect(value, expectedValue); 254 expect(value, expectedValue);
155 expect(index, sequenceIndex); 255 expect(index, sequenceIndex);
156 index++; 256 index++;
157 }; 257 };
158 await Future.wait([skip1.then(sequence(0, 0)), 258 await Future.wait([
159 skip2.then(sequence(0, 1)), 259 skip1.then(sequence(0, 0)),
160 skip3.then(sequence(1, 2)), 260 skip2.then(sequence(0, 1)),
161 skip4.then(sequence(1, 3))]); 261 skip3.then(sequence(1, 2)),
262 skip4.then(sequence(1, 3))
263 ]);
162 await events.cancel(); 264 await events.cancel();
163 }); 265 });
164 }); 266 });
165 267
166 group("take operation", () { 268 group("take operation", () {
167 test("as simple take of events", () async { 269 test("as simple take of events", () async {
168 var events = new StreamQueue<int>(createStream()); 270 var events = new StreamQueue<int>(createStream());
169 expect(await events.next, 1); 271 expect(await events.next, 1);
170 expect(await events.take(2), [2, 3]); 272 expect(await events.take(2), [2, 3]);
171 expect(await events.next, 4); 273 expect(await events.next, 4);
(...skipping 12 matching lines...) Expand all
184 expect(events.next, completion(4)); 286 expect(events.next, completion(4));
185 expect(events.take(0), completion([])); 287 expect(events.take(0), completion([]));
186 expect(events.take(5), completion([])); 288 expect(events.take(5), completion([]));
187 expect(events.next, throwsStateError); 289 expect(events.next, throwsStateError);
188 await events.cancel(); 290 await events.cancel();
189 }); 291 });
190 292
191 test("with bad arguments throws", () async { 293 test("with bad arguments throws", () async {
192 var events = new StreamQueue<int>(createStream()); 294 var events = new StreamQueue<int>(createStream());
193 expect(() => events.take(-1), throwsArgumentError); 295 expect(() => events.take(-1), throwsArgumentError);
194 expect(await events.next, 1); // Did not consume event. 296 expect(await events.next, 1); // Did not consume event.
195 expect(() => events.take(-1), throwsArgumentError); 297 expect(() => events.take(-1), throwsArgumentError);
196 expect(await events.next, 2); // Did not consume event. 298 expect(await events.next, 2); // Did not consume event.
197 await events.cancel(); 299 await events.cancel();
198 }); 300 });
199 301
200 test("of too many arguments", () async { 302 test("of too many arguments", () async {
201 var events = new StreamQueue<int>(createStream()); 303 var events = new StreamQueue<int>(createStream());
202 expect(await events.take(6), [1, 2, 3, 4]); 304 expect(await events.take(6), [1, 2, 3, 4]);
203 await events.cancel(); 305 await events.cancel();
204 }); 306 });
205 307
206 test("too large later", () async { 308 test("too large later", () async {
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
281 expect(() => events.next, throwsStateError); 383 expect(() => events.next, throwsStateError);
282 expect(() => events.skip(1), throwsStateError); 384 expect(() => events.skip(1), throwsStateError);
283 expect(() => events.take(1), throwsStateError); 385 expect(() => events.take(1), throwsStateError);
284 expect(() => events.rest, throwsStateError); 386 expect(() => events.rest, throwsStateError);
285 expect(() => events.cancel(), throwsStateError); 387 expect(() => events.cancel(), throwsStateError);
286 expect(stream.toList(), completion([1, 2, 3, 4])); 388 expect(stream.toList(), completion([1, 2, 3, 4]));
287 }); 389 });
288 390
289 test("forwards to underlying stream", () async { 391 test("forwards to underlying stream", () async {
290 var cancel = new Completer(); 392 var cancel = new Completer();
291 var controller = new StreamController(onCancel: () => cancel.future); 393 var controller = new StreamController<int>(onCancel: () => cancel.future);
292 var events = new StreamQueue<int>(controller.stream); 394 var events = new StreamQueue<int>(controller.stream);
293 expect(controller.hasListener, isFalse); 395 expect(controller.hasListener, isFalse);
294 var next = events.next; 396 var next = events.next;
295 expect(controller.hasListener, isTrue); 397 expect(controller.hasListener, isTrue);
296 expect(controller.isPaused, isFalse); 398 expect(controller.isPaused, isFalse);
297 399
298 controller.add(1); 400 controller.add(1);
299 expect(await next, 1); 401 expect(await next, 1);
300 expect(controller.isPaused, isTrue); 402 expect(controller.isPaused, isTrue);
301 403
(...skipping 24 matching lines...) Expand all
326 await flushMicrotasks(); 428 await flushMicrotasks();
327 expect(lastEvent, 3); 429 expect(lastEvent, 3);
328 430
329 var cancelFuture = subscription.cancel(); 431 var cancelFuture = subscription.cancel();
330 expect(controller.hasListener, isFalse); 432 expect(controller.hasListener, isFalse);
331 cancel.complete(42); 433 cancel.complete(42);
332 expect(cancelFuture, completion(42)); 434 expect(cancelFuture, completion(42));
333 }); 435 });
334 }); 436 });
335 437
438 group("peek operation", () {
439 test("peeks one event", () async {
440 var events = new StreamQueue<int>(createStream());
441 expect(await events.peek, 1);
442 expect(await events.next, 1);
443 expect(await events.peek, 2);
444 expect(await events.take(2), [2, 3]);
445 expect(await events.peek, 4);
446 expect(await events.next, 4);
447 // Throws at end.
448 expect(events.peek, throws);
449 await events.cancel();
450 });
451 test("multiple requests at the same time", () async {
452 var events = new StreamQueue<int>(createStream());
453 var result = await Future.wait(
454 [events.peek, events.peek, events.next, events.peek, events.peek]);
455 expect(result, [1, 1, 1, 2, 2]);
456 await events.cancel();
457 });
458 test("sequence of requests with error", () async {
459 var events = new StreamQueue<int>(createErrorStream());
460 expect(await events.next, 1);
461 expect(await events.next, 2);
462 expect(events.peek, throwsA("To err is divine!"));
463 // Error stays in queue.
464 expect(events.peek, throwsA("To err is divine!"));
465 expect(events.next, throwsA("To err is divine!"));
466 expect(await events.next, 4);
467 await events.cancel();
468 });
469 });
470
336 group("cancel operation", () { 471 group("cancel operation", () {
337 test("closes the events, prevents any other operation", () async { 472 test("closes the events, prevents any other operation", () async {
338 var events = new StreamQueue<int>(createStream()); 473 var events = new StreamQueue<int>(createStream());
339 await events.cancel(); 474 await events.cancel();
475 expect(() => events.lookAhead(1), throwsStateError);
340 expect(() => events.next, throwsStateError); 476 expect(() => events.next, throwsStateError);
477 expect(() => events.peek, throwsStateError);
341 expect(() => events.skip(1), throwsStateError); 478 expect(() => events.skip(1), throwsStateError);
342 expect(() => events.take(1), throwsStateError); 479 expect(() => events.take(1), throwsStateError);
343 expect(() => events.rest, throwsStateError); 480 expect(() => events.rest, throwsStateError);
344 expect(() => events.cancel(), throwsStateError); 481 expect(() => events.cancel(), throwsStateError);
345 }); 482 });
346 483
347 test("cancels underlying subscription when called before any event", 484 test("cancels underlying subscription when called before any event",
348 () async { 485 () async {
349 var cancelFuture = new Future.value(42); 486 var cancelFuture = new Future.value(42);
350 var controller = new StreamController(onCancel: () => cancelFuture); 487 var controller = new StreamController<int>(onCancel: () => cancelFuture);
351 var events = new StreamQueue<int>(controller.stream); 488 var events = new StreamQueue<int>(controller.stream);
352 expect(await events.cancel(), 42); 489 expect(await events.cancel(), 42);
353 }); 490 });
354 491
355 test("cancels underlying subscription, returns result", () async { 492 test("cancels underlying subscription, returns result", () async {
356 var cancelFuture = new Future.value(42); 493 var cancelFuture = new Future.value(42);
357 var controller = new StreamController(onCancel: () => cancelFuture); 494 var controller = new StreamController<int>(onCancel: () => cancelFuture);
358 var events = new StreamQueue<int>(controller.stream); 495 var events = new StreamQueue<int>(controller.stream);
359 controller.add(1); 496 controller.add(1);
360 expect(await events.next, 1); 497 expect(await events.next, 1);
361 expect(await events.cancel(), 42); 498 expect(await events.cancel(), 42);
362 }); 499 });
363 500
364 group("with immediate: true", () { 501 group("with immediate: true", () {
365 test("closes the events, prevents any other operation", () async { 502 test("closes the events, prevents any other operation", () async {
366 var events = new StreamQueue<int>(createStream()); 503 var events = new StreamQueue<int>(createStream());
367 await events.cancel(immediate: true); 504 await events.cancel(immediate: true);
368 expect(() => events.next, throwsStateError); 505 expect(() => events.next, throwsStateError);
369 expect(() => events.skip(1), throwsStateError); 506 expect(() => events.skip(1), throwsStateError);
370 expect(() => events.take(1), throwsStateError); 507 expect(() => events.take(1), throwsStateError);
371 expect(() => events.rest, throwsStateError); 508 expect(() => events.rest, throwsStateError);
372 expect(() => events.cancel(), throwsStateError); 509 expect(() => events.cancel(), throwsStateError);
373 }); 510 });
374 511
375 test("cancels the underlying subscription immediately", () async { 512 test("cancels the underlying subscription immediately", () async {
376 var controller = new StreamController(); 513 var controller = new StreamController<int>();
377 controller.add(1); 514 controller.add(1);
378 515
379 var events = new StreamQueue<int>(controller.stream); 516 var events = new StreamQueue<int>(controller.stream);
380 expect(await events.next, 1); 517 expect(await events.next, 1);
381 expect(controller.hasListener, isTrue); 518 expect(controller.hasListener, isTrue);
382 519
383 events.cancel(immediate: true); 520 events.cancel(immediate: true);
384 await expect(controller.hasListener, isFalse); 521 await expect(controller.hasListener, isFalse);
385 }); 522 });
386 523
387 test("cancels the underlying subscription when called before any event", 524 test("cancels the underlying subscription when called before any event",
388 () async { 525 () async {
389 var cancelFuture = new Future.value(42); 526 var cancelFuture = new Future.value(42);
390 var controller = new StreamController(onCancel: () => cancelFuture); 527 var controller =
528 new StreamController<int>(onCancel: () => cancelFuture);
391 529
392 var events = new StreamQueue<int>(controller.stream); 530 var events = new StreamQueue<int>(controller.stream);
393 expect(await events.cancel(immediate: true), 42); 531 expect(await events.cancel(immediate: true), 42);
394 }); 532 });
395 533
396 test("closes pending requests", () async { 534 test("closes pending requests", () async {
397 var events = new StreamQueue<int>(createStream()); 535 var events = new StreamQueue<int>(createStream());
398 expect(await events.next, 1); 536 expect(await events.next, 1);
399 expect(events.next, throwsStateError); 537 expect(events.next, throwsStateError);
400 expect(events.hasNext, completion(isFalse)); 538 expect(events.hasNext, completion(isFalse));
401 539
402 await events.cancel(immediate: true); 540 await events.cancel(immediate: true);
403 }); 541 });
404 542
405 test("returns the result of closing the underlying subscription", 543 test("returns the result of closing the underlying subscription",
406 () async { 544 () async {
407 var controller = new StreamController( 545 var controller =
408 onCancel: () => new Future.value(42)); 546 new StreamController<int>(onCancel: () => new Future.value(42));
409 var events = new StreamQueue<int>(controller.stream); 547 var events = new StreamQueue<int>(controller.stream);
410 expect(await events.cancel(immediate: true), 42); 548 expect(await events.cancel(immediate: true), 42);
411 }); 549 });
412 550
413 test("listens and then cancels a stream that hasn't been listened to yet", 551 test("listens and then cancels a stream that hasn't been listened to yet",
414 () async { 552 () async {
415 var wasListened = false; 553 var wasListened = false;
416 var controller = new StreamController( 554 var controller =
417 onListen: () => wasListened = true); 555 new StreamController<int>(onListen: () => wasListened = true);
418 var events = new StreamQueue<int>(controller.stream); 556 var events = new StreamQueue<int>(controller.stream);
419 expect(wasListened, isFalse); 557 expect(wasListened, isFalse);
420 expect(controller.hasListener, isFalse); 558 expect(controller.hasListener, isFalse);
421 559
422 await events.cancel(immediate: true); 560 await events.cancel(immediate: true);
423 expect(wasListened, isTrue); 561 expect(wasListened, isTrue);
424 expect(controller.hasListener, isFalse); 562 expect(controller.hasListener, isFalse);
425 }); 563 });
426 }); 564 });
427 }); 565 });
(...skipping 13 matching lines...) Expand all
441 test("true at end", () async { 579 test("true at end", () async {
442 var events = new StreamQueue<int>(createStream()); 580 var events = new StreamQueue<int>(createStream());
443 for (int i = 1; i <= 4; i++) { 581 for (int i = 1; i <= 4; i++) {
444 expect(await events.next, i); 582 expect(await events.next, i);
445 } 583 }
446 expect(await events.hasNext, isFalse); 584 expect(await events.hasNext, isFalse);
447 }); 585 });
448 586
449 test("true when enqueued", () async { 587 test("true when enqueued", () async {
450 var events = new StreamQueue<int>(createStream()); 588 var events = new StreamQueue<int>(createStream());
451 var values = []; 589 var values = <int>[];
452 for (int i = 1; i <= 3; i++) { 590 for (int i = 1; i <= 3; i++) {
453 events.next.then(values.add); 591 events.next.then(values.add);
454 } 592 }
455 expect(values, isEmpty); 593 expect(values, isEmpty);
456 expect(await events.hasNext, isTrue); 594 expect(await events.hasNext, isTrue);
457 expect(values, [1, 2, 3]); 595 expect(values, [1, 2, 3]);
458 }); 596 });
459 597
460 test("false when enqueued", () async { 598 test("false when enqueued", () async {
461 var events = new StreamQueue<int>(createStream()); 599 var events = new StreamQueue<int>(createStream());
462 var values = []; 600 var values = <int>[];
463 for (int i = 1; i <= 4; i++) { 601 for (int i = 1; i <= 4; i++) {
464 events.next.then(values.add); 602 events.next.then(values.add);
465 } 603 }
466 expect(values, isEmpty); 604 expect(values, isEmpty);
467 expect(await events.hasNext, isFalse); 605 expect(await events.hasNext, isFalse);
468 expect(values, [1, 2, 3, 4]); 606 expect(values, [1, 2, 3, 4]);
469 }); 607 });
470 608
471 test("true when data event", () async { 609 test("true when data event", () async {
472 var controller = new StreamController(); 610 var controller = new StreamController<int>();
473 var events = new StreamQueue<int>(controller.stream); 611 var events = new StreamQueue<int>(controller.stream);
474 612
475 var hasNext; 613 var hasNext;
476 events.hasNext.then((result) { hasNext = result; }); 614 events.hasNext.then((result) {
615 hasNext = result;
616 });
477 await flushMicrotasks(); 617 await flushMicrotasks();
478 expect(hasNext, isNull); 618 expect(hasNext, isNull);
479 controller.add(42); 619 controller.add(42);
480 expect(hasNext, isNull); 620 expect(hasNext, isNull);
481 await flushMicrotasks(); 621 await flushMicrotasks();
482 expect(hasNext, isTrue); 622 expect(hasNext, isTrue);
483 }); 623 });
484 624
485 test("true when error event", () async { 625 test("true when error event", () async {
486 var controller = new StreamController(); 626 var controller = new StreamController<int>();
487 var events = new StreamQueue<int>(controller.stream); 627 var events = new StreamQueue<int>(controller.stream);
488 628
489 var hasNext; 629 var hasNext;
490 events.hasNext.then((result) { hasNext = result; }); 630 events.hasNext.then((result) {
631 hasNext = result;
632 });
491 await flushMicrotasks(); 633 await flushMicrotasks();
492 expect(hasNext, isNull); 634 expect(hasNext, isNull);
493 controller.addError("BAD"); 635 controller.addError("BAD");
494 expect(hasNext, isNull); 636 expect(hasNext, isNull);
495 await flushMicrotasks(); 637 await flushMicrotasks();
496 expect(hasNext, isTrue); 638 expect(hasNext, isTrue);
497 expect(events.next, throwsA("BAD")); 639 expect(events.next, throwsA("BAD"));
498 }); 640 });
499 641
500 test("- hasNext after hasNext", () async { 642 test("- hasNext after hasNext", () async {
(...skipping 17 matching lines...) Expand all
518 test("- next after true", () async { 660 test("- next after true", () async {
519 var events = new StreamQueue<int>(createStream()); 661 var events = new StreamQueue<int>(createStream());
520 expect(await events.next, 1); 662 expect(await events.next, 1);
521 expect(await events.hasNext, true); 663 expect(await events.hasNext, true);
522 expect(await events.next, 2); 664 expect(await events.next, 2);
523 expect(await events.next, 3); 665 expect(await events.next, 3);
524 }); 666 });
525 667
526 test("- next after true, enqueued", () async { 668 test("- next after true, enqueued", () async {
527 var events = new StreamQueue<int>(createStream()); 669 var events = new StreamQueue<int>(createStream());
528 var responses = []; 670 var responses = <Object>[];
529 events.next.then(responses.add); 671 events.next.then(responses.add);
530 events.hasNext.then(responses.add); 672 events.hasNext.then(responses.add);
531 events.next.then(responses.add); 673 events.next.then(responses.add);
532 do { 674 do {
533 await flushMicrotasks(); 675 await flushMicrotasks();
534 } while (responses.length < 3); 676 } while (responses.length < 3);
535 expect(responses, [1, true, 2]); 677 expect(responses, [1, true, 2]);
536 }); 678 });
537 679
538 test("- skip 0 after true", () async { 680 test("- skip 0 after true", () async {
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
622 764
623 test("- cancel after true on error", () async { 765 test("- cancel after true on error", () async {
624 var events = new StreamQueue<int>(createErrorStream()); 766 var events = new StreamQueue<int>(createErrorStream());
625 expect(await events.next, 1); 767 expect(await events.next, 1);
626 expect(await events.next, 2); 768 expect(await events.next, 2);
627 expect(await events.hasNext, true); 769 expect(await events.hasNext, true);
628 expect(await events.cancel(), null); 770 expect(await events.cancel(), null);
629 }); 771 });
630 }); 772 });
631 773
774 group("startTransaction operation produces a transaction that", () {
775 StreamQueue<int> events;
776 StreamQueueTransaction<int> transaction;
777 StreamQueue<int> queue1;
778 StreamQueue<int> queue2;
779 setUp(() async {
780 events = new StreamQueue(createStream());
781 expect(await events.next, 1);
782 transaction = events.startTransaction();
783 queue1 = transaction.newQueue();
784 queue2 = transaction.newQueue();
785 });
786
787 group("emits queues that", () {
788 test("independently emit events", () async {
789 expect(await queue1.next, 2);
790 expect(await queue2.next, 2);
791 expect(await queue2.next, 3);
792 expect(await queue1.next, 3);
793 expect(await queue1.next, 4);
794 expect(await queue2.next, 4);
795 expect(await queue1.hasNext, isFalse);
796 expect(await queue2.hasNext, isFalse);
797 });
798
799 test("queue requests for events", () async {
800 expect(queue1.next, completion(2));
801 expect(queue2.next, completion(2));
802 expect(queue2.next, completion(3));
803 expect(queue1.next, completion(3));
804 expect(queue1.next, completion(4));
805 expect(queue2.next, completion(4));
806 expect(queue1.hasNext, completion(isFalse));
807 expect(queue2.hasNext, completion(isFalse));
808 });
809
810 test("independently emit errors", () async {
811 events = new StreamQueue(createErrorStream());
812 expect(await events.next, 1);
813 transaction = events.startTransaction();
814 queue1 = transaction.newQueue();
815 queue2 = transaction.newQueue();
816
817 expect(queue1.next, completion(2));
818 expect(queue2.next, completion(2));
819 expect(queue2.next, throwsA("To err is divine!"));
820 expect(queue1.next, throwsA("To err is divine!"));
821 expect(queue1.next, completion(4));
822 expect(queue2.next, completion(4));
823 expect(queue1.hasNext, completion(isFalse));
824 expect(queue2.hasNext, completion(isFalse));
825 });
826 });
827
828 group("when rejected", () {
829 test("further original requests use the previous state", () async {
830 expect(await queue1.next, 2);
831 expect(await queue2.next, 2);
832 expect(await queue2.next, 3);
833
834 await flushMicrotasks();
835 transaction.reject();
836
837 expect(await events.next, 2);
838 expect(await events.next, 3);
839 expect(await events.next, 4);
840 expect(await events.hasNext, isFalse);
841 });
842
843 test("pending original requests use the previous state", () async {
844 expect(await queue1.next, 2);
845 expect(await queue2.next, 2);
846 expect(await queue2.next, 3);
847 expect(events.next, completion(2));
848 expect(events.next, completion(3));
849 expect(events.next, completion(4));
850 expect(events.hasNext, completion(isFalse));
851
852 await flushMicrotasks();
853 transaction.reject();
854 });
855
856 test("further child requests act as though the stream was closed",
857 () async {
858 expect(await queue1.next, 2);
859 transaction.reject();
860
861 expect(await queue1.hasNext, isFalse);
862 expect(queue1.next, throwsStateError);
863 });
864
865 test("pending child requests act as though the stream was closed",
866 () async {
867 expect(await queue1.next, 2);
868 expect(queue1.hasNext, completion(isFalse));
869 expect(queue1.next, throwsStateError);
870 transaction.reject();
871 });
872
873 // Regression test.
874 test("pending child rest requests emit no more events", () async {
875 var controller = new StreamController();
876 var events = new StreamQueue(controller.stream);
877 var transaction = events.startTransaction();
878 var queue = transaction.newQueue();
879
880 // This should emit no more events after the transaction is rejected.
881 queue.rest.listen(expectAsync1((_) {}, count: 3),
882 onDone: expectAsync0(() {}, count: 0));
883
884 controller.add(1);
885 controller.add(2);
886 controller.add(3);
887 await flushMicrotasks();
888
889 transaction.reject();
890 await flushMicrotasks();
891
892 // These shouldn't affect the result of `queue.rest.toList()`.
893 controller.add(4);
894 controller.add(5);
895 });
896
897 test("child requests' cancel() may still be called explicitly", () async {
898 transaction.reject();
899 await queue1.cancel();
900 });
901
902 test("calls to commit() or reject() fail", () async {
903 transaction.reject();
904 expect(transaction.reject, throwsStateError);
905 expect(() => transaction.commit(queue1), throwsStateError);
906 });
907 });
908
909 group("when committed,", () {
910 test("further original requests use the committed state", () async {
911 expect(await queue1.next, 2);
912 await flushMicrotasks();
913 transaction.commit(queue1);
914 expect(await events.next, 3);
915 });
916
917 test("pending original requests use the committed state", () async {
918 expect(await queue1.next, 2);
919 expect(events.next, completion(3));
920 await flushMicrotasks();
921 transaction.commit(queue1);
922 });
923
924 test("further child requests act as though the stream was closed",
925 () async {
926 expect(await queue2.next, 2);
927 transaction.commit(queue2);
928
929 expect(await queue1.hasNext, isFalse);
930 expect(queue1.next, throwsStateError);
931 });
932
933 test("pending child requests act as though the stream was closed",
934 () async {
935 expect(await queue2.next, 2);
936 expect(queue1.hasNext, completion(isFalse));
937 expect(queue1.next, throwsStateError);
938 transaction.commit(queue2);
939 });
940
941 test("further requests act as though the stream was closed", () async {
942 expect(await queue1.next, 2);
943 transaction.commit(queue1);
944
945 expect(await queue1.hasNext, isFalse);
946 expect(queue1.next, throwsStateError);
947 });
948
949 test("cancel() may still be called explicitly", () async {
950 expect(await queue1.next, 2);
951 transaction.commit(queue1);
952 await queue1.cancel();
953 });
954
955 test("throws if there are pending requests", () async {
956 expect(await queue1.next, 2);
957 expect(queue1.hasNext, completion(isTrue));
958 expect(() => transaction.commit(queue1), throwsStateError);
959 });
960
961 test("calls to commit() or reject() fail", () async {
962 transaction.commit(queue1);
963 expect(transaction.reject, throwsStateError);
964 expect(() => transaction.commit(queue1), throwsStateError);
965 });
966 });
967 });
968
969 group("withTransaction operation", () {
970 StreamQueue<int> events;
971 setUp(() async {
972 events = new StreamQueue(createStream());
973 expect(await events.next, 1);
974 });
975
976 test("passes a copy of the parent queue", () async {
977 await events.withTransaction(expectAsync1((queue) async {
978 expect(await queue.next, 2);
979 expect(await queue.next, 3);
980 expect(await queue.next, 4);
981 expect(await queue.hasNext, isFalse);
982 return true;
983 }));
984 });
985
986 test(
987 "the parent queue continues from the child position if it returns "
988 "true", () async {
989 await events.withTransaction(expectAsync1((queue) async {
990 expect(await queue.next, 2);
991 return true;
992 }));
993
994 expect(await events.next, 3);
995 });
996
997 test(
998 "the parent queue continues from its original position if it returns "
999 "false", () async {
1000 await events.withTransaction(expectAsync1((queue) async {
1001 expect(await queue.next, 2);
1002 return false;
1003 }));
1004
1005 expect(await events.next, 2);
1006 });
1007
1008 test("the parent queue continues from the child position if it throws", () {
1009 expect(events.withTransaction(expectAsync1((queue) async {
1010 expect(await queue.next, 2);
1011 throw "oh no";
1012 })), throwsA("oh no"));
1013
1014 expect(events.next, completion(3));
1015 });
1016
1017 test("returns whether the transaction succeeded", () {
1018 expect(events.withTransaction((_) async => true), completion(isTrue));
1019 expect(events.withTransaction((_) async => false), completion(isFalse));
1020 });
1021 });
1022
1023 group("cancelable operation", () {
1024 StreamQueue<int> events;
1025 setUp(() async {
1026 events = new StreamQueue(createStream());
1027 expect(await events.next, 1);
1028 });
1029
1030 test("passes a copy of the parent queue", () async {
1031 await events.cancelable(expectAsync1((queue) async {
1032 expect(await queue.next, 2);
1033 expect(await queue.next, 3);
1034 expect(await queue.next, 4);
1035 expect(await queue.hasNext, isFalse);
1036 })).value;
1037 });
1038
1039 test("the parent queue continues from the child position by default",
1040 () async {
1041 await events.cancelable(expectAsync1((queue) async {
1042 expect(await queue.next, 2);
1043 })).value;
1044
1045 expect(await events.next, 3);
1046 });
1047
1048 test(
1049 "the parent queue continues from the child position if an error is "
1050 "thrown", () async {
1051 expect(
1052 events.cancelable(expectAsync1((queue) async {
1053 expect(await queue.next, 2);
1054 throw "oh no";
1055 })).value,
1056 throwsA("oh no"));
1057
1058 expect(events.next, completion(3));
1059 });
1060
1061 test("the parent queue continues from the original position if canceled",
1062 () async {
1063 var operation = events.cancelable(expectAsync1((queue) async {
1064 expect(await queue.next, 2);
1065 }));
1066 operation.cancel();
1067
1068 expect(await events.next, 2);
1069 });
1070
1071 test("forwards the value from the callback", () async {
1072 expect(
1073 await events.cancelable(expectAsync1((queue) async {
1074 expect(await queue.next, 2);
1075 return "value";
1076 })).value,
1077 "value");
1078 });
1079 });
1080
632 test("all combinations sequential skip/next/take operations", () async { 1081 test("all combinations sequential skip/next/take operations", () async {
633 // Takes all combinations of two of next, skip and take, then ends with 1082 // Takes all combinations of two of next, skip and take, then ends with
634 // doing rest. Each of the first rounds do 10 events of each type, 1083 // doing rest. Each of the first rounds do 10 events of each type,
635 // the rest does 20 elements. 1084 // the rest does 20 elements.
636 var eventCount = 20 * (3 * 3 + 1); 1085 var eventCount = 20 * (3 * 3 + 1);
637 var events = new StreamQueue<int>(createLongStream(eventCount)); 1086 var events = new StreamQueue<int>(createLongStream(eventCount));
638 1087
639 // Test expecting [startIndex .. startIndex + 9] as events using 1088 // Test expecting [startIndex .. startIndex + 9] as events using
640 // `next`. 1089 // `next`.
641 nextTest(startIndex) { 1090 nextTest(startIndex) {
642 for (int i = 0; i < 10; i++) { 1091 for (int i = 0; i < 10; i++) {
643 expect(events.next, completion(startIndex + i)); 1092 expect(events.next, completion(startIndex + i));
644 } 1093 }
645 } 1094 }
646 1095
647 // Test expecting 10 events to be skipped. 1096 // Test expecting 10 events to be skipped.
648 skipTest(startIndex) { 1097 skipTest(startIndex) {
649 expect(events.skip(10), completion(0)); 1098 expect(events.skip(10), completion(0));
650 } 1099 }
651 1100
652 // Test expecting [startIndex .. startIndex + 9] as events using 1101 // Test expecting [startIndex .. startIndex + 9] as events using
653 // `take(10)`. 1102 // `take(10)`.
654 takeTest(startIndex) { 1103 takeTest(startIndex) {
655 expect(events.take(10), 1104 expect(events.take(10),
656 completion(new List.generate(10, (i) => startIndex + i))); 1105 completion(new List.generate(10, (i) => startIndex + i)));
657 } 1106 }
1107
658 var tests = [nextTest, skipTest, takeTest]; 1108 var tests = [nextTest, skipTest, takeTest];
659 1109
660 int counter = 0; 1110 int counter = 0;
661 // Run through all pairs of two tests and run them. 1111 // Run through all pairs of two tests and run them.
662 for (int i = 0; i < tests.length; i++) { 1112 for (int i = 0; i < tests.length; i++) {
663 for (int j = 0; j < tests.length; j++) { 1113 for (int j = 0; j < tests.length; j++) {
664 tests[i](counter); 1114 tests[i](counter);
665 tests[j](counter + 10); 1115 tests[j](counter + 10);
666 counter += 20; 1116 counter += 20;
667 } 1117 }
668 } 1118 }
669 // Then expect 20 more events as a `rest` call. 1119 // Then expect 20 more events as a `rest` call.
670 expect(events.rest.toList(), 1120 expect(events.rest.toList(),
671 completion(new List.generate(20, (i) => counter + i))); 1121 completion(new List.generate(20, (i) => counter + i)));
672 }); 1122 });
673 } 1123 }
674 1124
1125 typedef T Func1Required<T>(T value);
1126
675 Stream<int> createStream() async* { 1127 Stream<int> createStream() async* {
676 yield 1; 1128 yield 1;
677 await flushMicrotasks(); 1129 await flushMicrotasks();
678 yield 2; 1130 yield 2;
679 await flushMicrotasks(); 1131 await flushMicrotasks();
680 yield 3; 1132 yield 3;
681 await flushMicrotasks(); 1133 await flushMicrotasks();
682 yield 4; 1134 yield 4;
683 } 1135 }
684 1136
685 Stream<int> createErrorStream() { 1137 Stream<int> createErrorStream() {
686 StreamController controller = new StreamController<int>(); 1138 var controller = new StreamController<int>();
687 () async { 1139 () async {
688 controller.add(1); 1140 controller.add(1);
689 await flushMicrotasks(); 1141 await flushMicrotasks();
690 controller.add(2); 1142 controller.add(2);
691 await flushMicrotasks(); 1143 await flushMicrotasks();
692 controller.addError("To err is divine!"); 1144 controller.addError("To err is divine!");
693 await flushMicrotasks(); 1145 await flushMicrotasks();
694 controller.add(4); 1146 controller.add(4);
695 await flushMicrotasks(); 1147 await flushMicrotasks();
696 controller.close(); 1148 controller.close();
697 }(); 1149 }();
698 return controller.stream; 1150 return controller.stream;
699 } 1151 }
700 1152
701 Stream<int> createLongStream(int eventCount) async* { 1153 Stream<int> createLongStream(int eventCount) async* {
702 for (int i = 0; i < eventCount; i++) yield i; 1154 for (int i = 0; i < eventCount; i++) yield i;
703 } 1155 }
OLDNEW
« no previous file with comments | « packages/async/test/stream_group_test.dart ('k') | packages/async/test/stream_sink_completer_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698