OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 // Test the basic StreamController and StreamController.singleSubscription. | 5 // Test the basic StreamController and StreamController.singleSubscription. |
6 library stream_controller_async_test; | 6 library stream_controller_async_test; |
7 | 7 |
8 import "package:expect/expect.dart"; | 8 import "package:expect/expect.dart"; |
9 import 'dart:async'; | 9 import 'dart:async'; |
10 import 'dart:isolate'; | 10 import 'dart:isolate'; |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
46 }); | 46 }); |
47 | 47 |
48 test("Single-subscription StreamController.fold throws", () { | 48 test("Single-subscription StreamController.fold throws", () { |
49 StreamController c = new StreamController(); | 49 StreamController c = new StreamController(); |
50 Stream stream = c.stream; | 50 Stream stream = c.stream; |
51 stream.fold(0, (a,b) { throw "Fnyf!"; }) | 51 stream.fold(0, (a,b) { throw "Fnyf!"; }) |
52 .catchError(expectAsync1((e) { Expect.equals("Fnyf!", e); })); | 52 .catchError(expectAsync1((e) { Expect.equals("Fnyf!", e); })); |
53 c.add(42); | 53 c.add(42); |
54 }); | 54 }); |
55 | 55 |
| 56 test("Single-subscription StreamController subscription changes", () { |
| 57 StreamController c = new StreamController(); |
| 58 EventSink sink = c.sink; |
| 59 Stream stream = c.stream; |
| 60 int counter = 0; |
| 61 var subscription; |
| 62 subscription = stream.listen((data) { |
| 63 counter += data; |
| 64 Expect.throws(() => stream.listen(null), (e) => e is StateError); |
| 65 subscription.cancel(); |
| 66 stream.listen((data) { |
| 67 counter += data * 10; |
| 68 }, |
| 69 onDone: expectAsync0(() { |
| 70 Expect.equals(1 + 20, counter); |
| 71 })); |
| 72 }); |
| 73 sink.add(1); |
| 74 sink.add(2); |
| 75 sink.close(); |
| 76 }); |
| 77 |
56 test("Single-subscription StreamController events are buffered when" | 78 test("Single-subscription StreamController events are buffered when" |
57 " there is no subscriber", | 79 " there is no subscriber", |
58 () { | 80 () { |
59 StreamController c = new StreamController(); | 81 StreamController c = new StreamController(); |
60 EventSink sink = c.sink; | 82 EventSink sink = c.sink; |
61 Stream stream = c.stream; | 83 Stream stream = c.stream; |
62 int counter = 0; | 84 int counter = 0; |
63 sink.add(1); | 85 sink.add(1); |
64 sink.add(2); | 86 sink.add(2); |
65 sink.close(); | 87 sink.close(); |
66 stream.listen( | 88 stream.listen( |
67 (data) { | 89 (data) { |
68 counter += data; | 90 counter += data; |
69 }, | 91 }, |
70 onDone: expectAsync0(() { | 92 onDone: expectAsync0(() { |
71 Expect.equals(3, counter); | 93 Expect.equals(3, counter); |
72 })); | 94 })); |
73 }); | 95 }); |
| 96 |
| 97 // Test subscription changes while firing. |
| 98 test("Single-subscription StreamController subscription changes while firing", |
| 99 () { |
| 100 StreamController c = new StreamController(); |
| 101 EventSink sink = c.sink; |
| 102 Stream stream = c.stream; |
| 103 int counter = 0; |
| 104 var subscription = stream.listen(null); |
| 105 subscription.onData(expectAsync1((data) { |
| 106 counter += data; |
| 107 subscription.cancel(); |
| 108 stream.listen((data) { |
| 109 counter += 10 * data; |
| 110 }, |
| 111 onDone: expectAsync0(() { |
| 112 Expect.equals(1 + 20 + 30 + 40 + 50, counter); |
| 113 })); |
| 114 Expect.throws(() => stream.listen(null), (e) => e is StateError); |
| 115 })); |
| 116 sink.add(1); // seen by stream 1 |
| 117 sink.add(2); // seen by stream 10 and 100 |
| 118 sink.add(3); // -"- |
| 119 sink.add(4); // -"- |
| 120 sink.add(5); // seen by stream 10 |
| 121 sink.close(); |
| 122 }); |
74 } | 123 } |
75 | 124 |
76 testExtraMethods() { | 125 testExtraMethods() { |
77 Events sentEvents = new Events()..add(7)..add(9)..add(13)..add(87)..close(); | 126 Events sentEvents = new Events()..add(7)..add(9)..add(13)..add(87)..close(); |
78 | 127 |
79 test("forEach", () { | 128 test("forEach", () { |
80 StreamController c = new StreamController(); | 129 StreamController c = new StreamController(); |
81 Events actualEvents = new Events(); | 130 Events actualEvents = new Events(); |
82 Future f = c.stream.forEach(actualEvents.add); | 131 Future f = c.stream.forEach(actualEvents.add); |
83 f.then(expectAsync1((_) { | 132 f.then(expectAsync1((_) { |
(...skipping 338 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
422 testStreamError("handleError", (s, act) => s.handleError(act)); | 471 testStreamError("handleError", (s, act) => s.handleError(act)); |
423 testStreamError("handleTest", (s, act) => s.handleError((v) {}, test: act)); | 472 testStreamError("handleTest", (s, act) => s.handleError((v) {}, test: act)); |
424 testFuture("forEach", (s, act) => s.forEach(act)); | 473 testFuture("forEach", (s, act) => s.forEach(act)); |
425 testFuture("every", (s, act) => s.every(act)); | 474 testFuture("every", (s, act) => s.every(act)); |
426 testFuture("any", (s, act) => s.any(act)); | 475 testFuture("any", (s, act) => s.any(act)); |
427 testFuture("reduce", (s, act) => s.reduce((a,b) => act(b))); | 476 testFuture("reduce", (s, act) => s.reduce((a,b) => act(b))); |
428 testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b))); | 477 testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b))); |
429 testFuture("drain", (s, act) => s.drain().then(act)); | 478 testFuture("drain", (s, act) => s.drain().then(act)); |
430 } | 479 } |
431 | 480 |
432 void testMultiplex() { | |
433 test("multiplex-basic", () { | |
434 StreamController<int> c = new StreamController.multiplex( | |
435 onListen: expectAsync0(() {}), | |
436 onCancel: expectAsync0(() {}) | |
437 ); | |
438 Stream<int> s = c.stream; | |
439 s.listen(expectAsync1((x) { expect(x, equals(42)); })); | |
440 c.add(42); | |
441 c.close(); | |
442 }); | |
443 | |
444 test("multiplex-listen-twice", () { | |
445 StreamController<int> c = new StreamController.multiplex( | |
446 onListen: expectAsync0(() {}), | |
447 onCancel: expectAsync0(() {}) | |
448 ); | |
449 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }, count: 2)); | |
450 c.add(42); | |
451 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | |
452 c.add(42); | |
453 c.close(); | |
454 }); | |
455 | |
456 test("multiplex-listen-twice-non-overlap", () { | |
457 StreamController<int> c = new StreamController.multiplex( | |
458 onListen: expectAsync0(() {}, count: 2), | |
459 onCancel: expectAsync0(() {}, count: 2) | |
460 ); | |
461 var sub = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | |
462 c.add(42); | |
463 sub.cancel(); | |
464 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | |
465 c.add(42); | |
466 c.close(); | |
467 }); | |
468 | |
469 test("multiplex-individual-pause", () { | |
470 StreamController<int> c = new StreamController.multiplex( | |
471 onListen: expectAsync0(() {}), | |
472 onCancel: expectAsync0(() {}) | |
473 ); | |
474 var sub1 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | |
475 var sub2 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }, | |
476 count: 3)); | |
477 c.add(42); | |
478 sub1.pause(); | |
479 c.add(42); | |
480 sub1.cancel(); | |
481 var sub3 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); | |
482 c.add(42); | |
483 c.close(); | |
484 }); | |
485 } | |
486 | |
487 main() { | 481 main() { |
488 testController(); | 482 testController(); |
489 testSingleController(); | 483 testSingleController(); |
490 testExtraMethods(); | 484 testExtraMethods(); |
491 testPause(); | 485 testPause(); |
492 testRethrow(); | 486 testRethrow(); |
493 testMultiplex(); | |
494 } | 487 } |
OLD | NEW |