OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 // Test the basic StreamController and StreamController.singleSubscription. | 5 // Test the basic StreamController and StreamController.singleSubscription. |
6 library stream_controller_async_test; | 6 library stream_controller_async_test; |
7 | 7 |
8 import "package:expect/expect.dart"; | 8 import "package:expect/expect.dart"; |
9 import 'dart:async'; | 9 import 'dart:async'; |
10 import 'dart:isolate'; | 10 import 'dart:isolate'; |
11 import '../../../pkg/unittest/lib/unittest.dart'; | 11 import '../../../pkg/unittest/lib/unittest.dart'; |
12 import 'event_helper.dart'; | 12 import 'event_helper.dart'; |
| 13 import 'stream_state_helper.dart'; |
13 | 14 |
14 testController() { | 15 testController() { |
15 // Test fold | 16 // Test fold |
16 test("StreamController.fold", () { | 17 test("StreamController.fold", () { |
17 StreamController c = new StreamController(); | 18 StreamController c = new StreamController(); |
18 Stream stream = c.stream.asBroadcastStream(); | 19 Stream stream = c.stream.asBroadcastStream(); |
19 stream.fold(0, (a,b) => a + b) | 20 stream.fold(0, (a,b) => a + b) |
20 .then(expectAsync1((int v) { | 21 .then(expectAsync1((int v) { |
21 Expect.equals(42, v); | 22 Expect.equals(42, v); |
22 })); | 23 })); |
(...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
259 Future f = c.stream.drain(); | 260 Future f = c.stream.drain(); |
260 f.catchError(expectAsync1((error) { Expect.equals("error", error); })); | 261 f.catchError(expectAsync1((error) { Expect.equals("error", error); })); |
261 Events errorEvents = new Events()..error("error")..error("error2")..close(); | 262 Events errorEvents = new Events()..error("error")..error("error2")..close(); |
262 errorEvents.replay(c); | 263 errorEvents.replay(c); |
263 }); | 264 }); |
264 | 265 |
265 } | 266 } |
266 | 267 |
267 testPause() { | 268 testPause() { |
268 test("pause event-unpause", () { | 269 test("pause event-unpause", () { |
269 StreamController c = new StreamController(); | 270 StreamProtocolTest test = new StreamProtocolTest(); |
270 Events actualEvents = new Events.capture(c.stream); | |
271 Events expectedEvents = new Events(); | |
272 expectedEvents.add(42); | |
273 c.add(42); | |
274 Expect.listEquals(expectedEvents.events, actualEvents.events); | |
275 Completer completer = new Completer(); | 271 Completer completer = new Completer(); |
276 actualEvents.pause(completer.future); | 272 test..expectListen() |
277 c..add(43)..add(44)..close(); | 273 ..expectData(42, () { test.pause(completer.future); }) |
278 Expect.listEquals(expectedEvents.events, actualEvents.events); | 274 ..expectPause(() { |
279 completer.complete(); | 275 completer.complete(null); |
280 expectedEvents..add(43)..add(44)..close(); | 276 }) |
281 actualEvents.onDone(expectAsync0(() { | 277 ..expectData(43) |
282 Expect.listEquals(expectedEvents.events, actualEvents.events); | 278 ..expectData(44) |
283 })); | 279 ..expectDone() |
| 280 ..expectCancel(); |
| 281 test.listen(); |
| 282 test.add(42); |
| 283 test.add(43); |
| 284 test.add(44); |
| 285 test.close(); |
284 }); | 286 }); |
285 | 287 |
286 test("pause twice event-unpause", () { | 288 test("pause twice event-unpause", () { |
287 StreamController c = new StreamController(); | 289 StreamProtocolTest test = new StreamProtocolTest(); |
288 Events actualEvents = new Events.capture(c.stream); | |
289 Events expectedEvents = new Events(); | |
290 expectedEvents.add(42); | |
291 c.add(42); | |
292 Expect.listEquals(expectedEvents.events, actualEvents.events); | |
293 Completer completer = new Completer(); | 290 Completer completer = new Completer(); |
294 Completer completer2 = new Completer(); | 291 Completer completer2 = new Completer(); |
295 actualEvents.pause(completer.future); | 292 test..expectListen() |
296 actualEvents.pause(completer2.future); | 293 ..expectData(42, () { |
297 c..add(43)..add(44)..close(); | 294 test.pause(completer.future); |
298 Expect.listEquals(expectedEvents.events, actualEvents.events); | 295 test.pause(completer2.future); |
299 completer.complete(); | 296 }) |
300 Expect.listEquals(expectedEvents.events, actualEvents.events); | 297 ..expectPause(() { |
301 completer2.complete(); | 298 completer.future.then(completer2.complete); |
302 expectedEvents..add(43)..add(44)..close(); | 299 completer.complete(null); |
303 actualEvents.onDone(expectAsync0((){ | 300 }) |
304 Expect.listEquals(expectedEvents.events, actualEvents.events); | 301 ..expectData(43) |
305 })); | 302 ..expectData(44) |
| 303 ..expectDone() |
| 304 ..expectCancel(); |
| 305 test..listen() |
| 306 ..add(42) |
| 307 ..add(43) |
| 308 ..add(44) |
| 309 ..close(); |
306 }); | 310 }); |
307 | 311 |
308 test("pause twice direct-unpause", () { | 312 test("pause twice direct-unpause", () { |
309 StreamController c = new StreamController(); | 313 StreamProtocolTest test = new StreamProtocolTest(); |
310 Events actualEvents = new Events.capture(c.stream); | 314 test..expectListen() |
311 Events expectedEvents = new Events(); | 315 ..expectData(42, () { |
312 expectedEvents.add(42); | 316 test.pause(); |
313 c.add(42); | 317 test.pause(); |
314 Expect.listEquals(expectedEvents.events, actualEvents.events); | 318 }) |
315 actualEvents.pause(); | 319 ..expectPause(() { |
316 actualEvents.pause(); | 320 test.resume(); |
317 c.add(43); | 321 test.resume(); |
318 c.add(44); | 322 }) |
319 c.close(); | 323 ..expectData(43) |
320 Expect.listEquals(expectedEvents.events, actualEvents.events); | 324 ..expectData(44) |
321 actualEvents.resume(); | 325 ..expectDone() |
322 Expect.listEquals(expectedEvents.events, actualEvents.events); | 326 ..expectCancel(); |
323 expectedEvents..add(43)..add(44)..close(); | 327 test..listen() |
324 actualEvents.onDone(expectAsync0(() { | 328 ..add(42) |
325 Expect.listEquals(expectedEvents.events, actualEvents.events); | 329 ..add(43) |
326 })); | 330 ..add(44) |
327 actualEvents.resume(); | 331 ..close(); |
328 }); | 332 }); |
329 | 333 |
330 test("pause twice direct-event-unpause", () { | 334 test("pause twice direct-event-unpause", () { |
331 StreamController c = new StreamController(); | 335 StreamProtocolTest test = new StreamProtocolTest(); |
332 Events actualEvents = new Events.capture(c.stream); | |
333 Events expectedEvents = new Events(); | |
334 expectedEvents.add(42); | |
335 c.add(42); | |
336 Expect.listEquals(expectedEvents.events, actualEvents.events); | |
337 Completer completer = new Completer(); | 336 Completer completer = new Completer(); |
338 actualEvents.pause(completer.future); | 337 test..expectListen() |
339 actualEvents.pause(); | 338 ..expectData(42, () { |
340 c.add(43); | 339 test.pause(); |
341 c.add(44); | 340 test.pause(completer.future); |
342 c.close(); | 341 test.add(43); |
343 Expect.listEquals(expectedEvents.events, actualEvents.events); | 342 test.add(44); |
344 actualEvents.resume(); | 343 test.close(); |
345 Expect.listEquals(expectedEvents.events, actualEvents.events); | 344 }) |
346 expectedEvents..add(43)..add(44)..close(); | 345 ..expectPause(() { |
347 actualEvents.onDone(expectAsync0(() { | 346 completer.future.then((v) => test.resume()); |
348 Expect.listEquals(expectedEvents.events, actualEvents.events); | 347 completer.complete(null); |
349 })); | 348 }) |
350 completer.complete(); | 349 ..expectData(43) |
351 }); | 350 ..expectData(44) |
352 | 351 ..expectDone() |
353 test("pause twice direct-unpause", () { | 352 ..expectCancel(); |
354 StreamController c = new StreamController(); | 353 test..listen() |
355 Events actualEvents = new Events.capture(c.stream); | 354 ..add(42); |
356 Events expectedEvents = new Events(); | |
357 expectedEvents.add(42); | |
358 c.add(42); | |
359 Expect.listEquals(expectedEvents.events, actualEvents.events); | |
360 Completer completer = new Completer(); | |
361 actualEvents.pause(completer.future); | |
362 actualEvents.pause(); | |
363 c.add(43); | |
364 c.add(44); | |
365 c.close(); | |
366 Expect.listEquals(expectedEvents.events, actualEvents.events); | |
367 completer.complete(); | |
368 Expect.listEquals(expectedEvents.events, actualEvents.events); | |
369 expectedEvents..add(43)..add(44)..close(); | |
370 actualEvents.onDone(expectAsync0(() { | |
371 Expect.listEquals(expectedEvents.events, actualEvents.events); | |
372 })); | |
373 actualEvents.resume(); | |
374 }); | 355 }); |
375 } | 356 } |
376 | 357 |
377 class TestError { const TestError(); } | 358 class TestError { const TestError(); } |
378 | 359 |
379 testRethrow() { | 360 testRethrow() { |
380 TestError error = const TestError(); | 361 TestError error = const TestError(); |
381 | 362 |
382 | 363 |
383 testStream(name, streamValueTransform) { | 364 testStream(name, streamValueTransform) { |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
424 testFuture("forEach", (s, act) => s.forEach(act)); | 405 testFuture("forEach", (s, act) => s.forEach(act)); |
425 testFuture("every", (s, act) => s.every(act)); | 406 testFuture("every", (s, act) => s.every(act)); |
426 testFuture("any", (s, act) => s.any(act)); | 407 testFuture("any", (s, act) => s.any(act)); |
427 testFuture("reduce", (s, act) => s.reduce((a,b) => act(b))); | 408 testFuture("reduce", (s, act) => s.reduce((a,b) => act(b))); |
428 testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b))); | 409 testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b))); |
429 testFuture("drain", (s, act) => s.drain().then(act)); | 410 testFuture("drain", (s, act) => s.drain().then(act)); |
430 } | 411 } |
431 | 412 |
432 void testBroadcastController() { | 413 void testBroadcastController() { |
433 test("broadcast-controller-basic", () { | 414 test("broadcast-controller-basic", () { |
434 StreamController<int> c = new StreamController.broadcast( | 415 StreamProtocolTest test = new StreamProtocolTest.broadcast(); |
435 onListen: expectAsync0(() {}), | 416 test..expectListen() |
436 onCancel: expectAsync0(() {}) | 417 ..expectData(42) |
437 ); | 418 ..expectDone() |
438 Stream<int> s = c.stream; | 419 ..expectCancel(test.terminate); |
439 s.listen(expectAsync1((x) { expect(x, equals(42)); })); | 420 test..listen() |
440 c.add(42); | 421 ..add(42) |
441 c.close(); | 422 ..close(); |
442 }); | 423 }); |
443 | 424 |
444 test("broadcast-controller-listen-twice", () { | 425 test("broadcast-controller-listen-twice", () { |
445 StreamController<int> c = new StreamController.broadcast( | 426 StreamProtocolTest test = new StreamProtocolTest.broadcast(); |
446 onListen: expectAsync0(() {}), | 427 test..expectListen() |
447 onCancel: expectAsync0(() {}) | 428 ..expectData(42, () { |
448 ); | 429 test.listen(); |
449 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }, count: 2)); | 430 test.add(37); |
450 c.add(42); | 431 test.close(); |
451 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | 432 }) |
452 c.add(42); | 433 // Order is not guaranteed between subscriptions if not sync. |
453 c.close(); | 434 ..expectData(37) |
| 435 ..expectData(37) |
| 436 ..expectDone() |
| 437 ..expectDone() |
| 438 ..expectCancel(test.terminate); |
| 439 test.listen(); |
| 440 test.add(42); |
454 }); | 441 }); |
455 | 442 |
456 test("broadcast-controller-listen-twice-non-overlap", () { | 443 test("broadcast-controller-listen-twice-non-overlap", () { |
457 StreamController<int> c = new StreamController.broadcast( | 444 StreamProtocolTest test = new StreamProtocolTest.broadcast(); |
458 onListen: expectAsync0(() {}, count: 2), | 445 test |
459 onCancel: expectAsync0(() {}, count: 2) | 446 ..expectListen(() { |
460 ); | 447 test.add(42); |
461 var sub = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | 448 }) |
462 c.add(42); | 449 ..expectData(42, () { |
463 sub.cancel(); | 450 test.cancel(); |
464 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | 451 }) |
465 c.add(42); | 452 ..expectCancel(() { |
466 c.close(); | 453 test.listen(); |
| 454 })..expectListen(() { |
| 455 test.add(37); |
| 456 }) |
| 457 ..expectData(37, () { |
| 458 test.close(); |
| 459 }) |
| 460 ..expectDone() |
| 461 ..expectCancel(test.terminate); |
| 462 test.listen(); |
467 }); | 463 }); |
468 | 464 |
469 test("broadcast-controller-individual-pause", () { | 465 test("broadcast-controller-individual-pause", () { |
470 StreamController<int> c = new StreamController.broadcast( | 466 StreamProtocolTest test = new StreamProtocolTest.broadcast(); |
471 onListen: expectAsync0(() {}), | 467 test.trace = true; |
472 onCancel: expectAsync0(() {}) | 468 var sub1; |
473 ); | 469 test..expectListen() |
474 var sub1 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | 470 ..expectData(42) |
475 var sub2 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }, | 471 ..expectData(42, () { sub1.pause(); }) |
476 count: 3)); | 472 ..expectData(43, () { |
477 c.add(42); | 473 sub1.cancel(); |
478 sub1.pause(); | 474 test.listen(); |
479 c.add(42); | 475 test.add(44); |
480 sub1.cancel(); | 476 test.expectData(44); |
481 var sub3 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | 477 test.expectData(44, test.terminate); |
482 c.add(42); | 478 }); |
483 c.close(); | 479 sub1 = test.listen(); |
| 480 test.listen(); |
| 481 test.add(42); |
| 482 test.add(43); |
484 }); | 483 }); |
485 | 484 |
486 test("broadcast-controller-add-in-callback", () { | 485 test("broadcast-controller-add-in-callback", () { |
487 StreamController<int> c; | 486 StreamProtocolTest test = new StreamProtocolTest.broadcast(); |
488 c = new StreamController( | 487 test.expectListen(); |
489 onListen: expectAsync0(() {}), | 488 var sub = test.listen(); |
490 onCancel: expectAsync0(() { | 489 test.add(42); |
491 c.add(42); | 490 sub.expectData(42, () { |
492 }) | 491 test.add(87); |
493 ); | |
494 var sub; | |
495 sub = c.stream.asBroadcastStream().listen(expectAsync1((v) { | |
496 Expect.equals(37, v); | |
497 c.add(21); | |
498 sub.cancel(); | 492 sub.cancel(); |
499 })); | 493 }); |
500 c.add(37); // Triggers listener, which adds 21 and removes itself. | 494 test.expectCancel(() { |
501 // Removing listener triggers onCancel which adds another 42. | 495 test.add(37); |
502 // Both 21 and 42 are lost because there are no listeners. | 496 test.terminate(); |
| 497 }); |
503 }); | 498 }); |
504 } | 499 } |
505 | 500 |
506 main() { | 501 main() { |
507 testController(); | 502 testController(); |
508 testSingleController(); | 503 testSingleController(); |
509 testExtraMethods(); | 504 testExtraMethods(); |
510 testPause(); | 505 testPause(); |
511 testRethrow(); | 506 testRethrow(); |
512 testBroadcastController(); | 507 testBroadcastController(); |
513 } | 508 } |
OLD | NEW |