OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 // Test the basic StreamController and StreamController.singleSubscription. | 5 // Test the basic StreamController and StreamController.singleSubscription. |
6 library stream_controller_async_test; | 6 library stream_controller_async_test; |
7 | 7 |
8 import "package:expect/expect.dart"; | 8 import "package:expect/expect.dart"; |
9 import 'dart:async'; | 9 import 'dart:async'; |
10 import 'dart:isolate'; | 10 import 'dart:isolate'; |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
46 }); | 46 }); |
47 | 47 |
48 test("Single-subscription StreamController.fold throws", () { | 48 test("Single-subscription StreamController.fold throws", () { |
49 StreamController c = new StreamController(); | 49 StreamController c = new StreamController(); |
50 Stream stream = c.stream; | 50 Stream stream = c.stream; |
51 stream.fold(0, (a,b) { throw "Fnyf!"; }) | 51 stream.fold(0, (a,b) { throw "Fnyf!"; }) |
52 .catchError(expectAsync1((e) { Expect.equals("Fnyf!", e); })); | 52 .catchError(expectAsync1((e) { Expect.equals("Fnyf!", e); })); |
53 c.add(42); | 53 c.add(42); |
54 }); | 54 }); |
55 | 55 |
56 test("Single-subscription StreamController subscription changes", () { | |
57 StreamController c = new StreamController(); | |
58 EventSink sink = c.sink; | |
59 Stream stream = c.stream; | |
60 int counter = 0; | |
61 var subscription; | |
62 subscription = stream.listen((data) { | |
63 counter += data; | |
64 Expect.throws(() => stream.listen(null), (e) => e is StateError); | |
65 subscription.cancel(); | |
66 stream.listen((data) { | |
67 counter += data * 10; | |
68 }, | |
69 onDone: expectAsync0(() { | |
70 Expect.equals(1 + 20, counter); | |
71 })); | |
72 }); | |
73 sink.add(1); | |
74 sink.add(2); | |
75 sink.close(); | |
76 }); | |
77 | |
78 test("Single-subscription StreamController events are buffered when" | 56 test("Single-subscription StreamController events are buffered when" |
79 " there is no subscriber", | 57 " there is no subscriber", |
80 () { | 58 () { |
81 StreamController c = new StreamController(); | 59 StreamController c = new StreamController(); |
82 EventSink sink = c.sink; | 60 EventSink sink = c.sink; |
83 Stream stream = c.stream; | 61 Stream stream = c.stream; |
84 int counter = 0; | 62 int counter = 0; |
85 sink.add(1); | 63 sink.add(1); |
86 sink.add(2); | 64 sink.add(2); |
87 sink.close(); | 65 sink.close(); |
88 stream.listen( | 66 stream.listen( |
89 (data) { | 67 (data) { |
90 counter += data; | 68 counter += data; |
91 }, | 69 }, |
92 onDone: expectAsync0(() { | 70 onDone: expectAsync0(() { |
93 Expect.equals(3, counter); | 71 Expect.equals(3, counter); |
94 })); | 72 })); |
95 }); | 73 }); |
96 | |
97 // Test subscription changes while firing. | |
98 test("Single-subscription StreamController subscription changes while firing", | |
99 () { | |
100 StreamController c = new StreamController(); | |
101 EventSink sink = c.sink; | |
102 Stream stream = c.stream; | |
103 int counter = 0; | |
104 var subscription = stream.listen(null); | |
105 subscription.onData(expectAsync1((data) { | |
106 counter += data; | |
107 subscription.cancel(); | |
108 stream.listen((data) { | |
109 counter += 10 * data; | |
110 }, | |
111 onDone: expectAsync0(() { | |
112 Expect.equals(1 + 20 + 30 + 40 + 50, counter); | |
113 })); | |
114 Expect.throws(() => stream.listen(null), (e) => e is StateError); | |
115 })); | |
116 sink.add(1); // seen by stream 1 | |
117 sink.add(2); // seen by stream 10 and 100 | |
118 sink.add(3); // -"- | |
119 sink.add(4); // -"- | |
120 sink.add(5); // seen by stream 10 | |
121 sink.close(); | |
122 }); | |
123 } | 74 } |
124 | 75 |
125 testExtraMethods() { | 76 testExtraMethods() { |
126 Events sentEvents = new Events()..add(7)..add(9)..add(13)..add(87)..close(); | 77 Events sentEvents = new Events()..add(7)..add(9)..add(13)..add(87)..close(); |
127 | 78 |
128 test("forEach", () { | 79 test("forEach", () { |
129 StreamController c = new StreamController(); | 80 StreamController c = new StreamController(); |
130 Events actualEvents = new Events(); | 81 Events actualEvents = new Events(); |
131 Future f = c.stream.forEach(actualEvents.add); | 82 Future f = c.stream.forEach(actualEvents.add); |
132 f.then(expectAsync1((_) { | 83 f.then(expectAsync1((_) { |
(...skipping 338 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
471 testStreamError("handleError", (s, act) => s.handleError(act)); | 422 testStreamError("handleError", (s, act) => s.handleError(act)); |
472 testStreamError("handleTest", (s, act) => s.handleError((v) {}, test: act)); | 423 testStreamError("handleTest", (s, act) => s.handleError((v) {}, test: act)); |
473 testFuture("forEach", (s, act) => s.forEach(act)); | 424 testFuture("forEach", (s, act) => s.forEach(act)); |
474 testFuture("every", (s, act) => s.every(act)); | 425 testFuture("every", (s, act) => s.every(act)); |
475 testFuture("any", (s, act) => s.any(act)); | 426 testFuture("any", (s, act) => s.any(act)); |
476 testFuture("reduce", (s, act) => s.reduce((a,b) => act(b))); | 427 testFuture("reduce", (s, act) => s.reduce((a,b) => act(b))); |
477 testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b))); | 428 testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b))); |
478 testFuture("drain", (s, act) => s.drain().then(act)); | 429 testFuture("drain", (s, act) => s.drain().then(act)); |
479 } | 430 } |
480 | 431 |
| 432 void testMultiplex() { |
| 433 test("multiplex-basic", () { |
| 434 StreamController<int> c = new StreamController.multiplex( |
| 435 onListen: expectAsync0(() {}), |
| 436 onCancel: expectAsync0(() {}) |
| 437 ); |
| 438 Stream<int> s = c.stream; |
| 439 s.listen(expectAsync1((x) { expect(x, equals(42)); })); |
| 440 c.add(42); |
| 441 c.close(); |
| 442 }); |
| 443 |
| 444 test("multiplex-listen-twice", () { |
| 445 StreamController<int> c = new StreamController.multiplex( |
| 446 onListen: expectAsync0(() {}), |
| 447 onCancel: expectAsync0(() {}) |
| 448 ); |
| 449 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }, count: 2)); |
| 450 c.add(42); |
| 451 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); |
| 452 c.add(42); |
| 453 c.close(); |
| 454 }); |
| 455 |
| 456 test("multiplex-listen-twice-non-overlap", () { |
| 457 StreamController<int> c = new StreamController.multiplex( |
| 458 onListen: expectAsync0(() {}, count: 2), |
| 459 onCancel: expectAsync0(() {}, count: 2) |
| 460 ); |
| 461 var sub = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); |
| 462 c.add(42); |
| 463 sub.cancel(); |
| 464 c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); |
| 465 c.add(42); |
| 466 c.close(); |
| 467 }); |
| 468 |
| 469 test("multiplex-individual-pause", () { |
| 470 StreamController<int> c = new StreamController.multiplex( |
| 471 onListen: expectAsync0(() {}), |
| 472 onCancel: expectAsync0(() {}) |
| 473 ); |
| 474 var sub1 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); |
| 475 var sub2 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); }, |
| 476 count: 3)); |
| 477 c.add(42); |
| 478 sub1.pause(); |
| 479 c.add(42); |
| 480 sub1.cancel(); |
| 481 var sub3 = c.stream.listen(expectAsync1((x) { expect(x, equals(42)); })); |
| 482 c.add(42); |
| 483 c.close(); |
| 484 }); |
| 485 } |
| 486 |
481 main() { | 487 main() { |
482 testController(); | 488 testController(); |
483 testSingleController(); | 489 testSingleController(); |
484 testExtraMethods(); | 490 testExtraMethods(); |
485 testPause(); | 491 testPause(); |
486 testRethrow(); | 492 testRethrow(); |
| 493 testMultiplex(); |
487 } | 494 } |
OLD | NEW |