OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 // Test merging streams. | 5 // Test merging streams. |
6 library merge_stream_test; | 6 library merge_stream_test; |
7 | 7 |
8 import "dart:async"; | 8 import "dart:async"; |
9 import '../../../pkg/unittest/lib/unittest.dart'; | 9 import '../../../pkg/unittest/lib/unittest.dart'; |
10 import 'event_helper.dart'; | 10 import 'event_helper.dart'; |
11 | 11 |
12 testSupercedeStream() { | 12 testSupercedeStream() { |
13 { // Simple case of superceding lower priority streams. | 13 { // Simple case of superceding lower priority streams. |
14 StreamController s1 = new StreamController(); | 14 StreamController s1 = new StreamController.multiSubscription(); |
15 StreamController s2 = new StreamController(); | 15 StreamController s2 = new StreamController.multiSubscription(); |
16 StreamController s3 = new StreamController(); | 16 StreamController s3 = new StreamController.multiSubscription(); |
17 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); | 17 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); |
18 Events expected = new Events()..add(1)..add(2)..add(3)..add(4)..close(); | 18 Events expected = new Events()..add(1)..add(2)..add(3)..add(4)..close(); |
19 Events actual = new Events.capture(merge); | 19 Events actual = new Events.capture(merge); |
20 s1.add(1); | 20 s1.add(1); |
21 s2.add(2); | 21 s2.add(2); |
22 s1.add(1); // Ignored. | 22 s1.add(1); // Ignored. |
23 s2.add(3); | 23 s2.add(3); |
24 s3.add(4); | 24 s3.add(4); |
25 s2.add(3); // Ignored. | 25 s2.add(3); // Ignored. |
26 s3.close(); | 26 s3.close(); |
27 Expect.listEquals(expected.events, actual.events); | 27 Expect.listEquals(expected.events, actual.events); |
28 } | 28 } |
29 | 29 |
30 { // Superceding more than one stream at a time. | 30 { // Superceding more than one stream at a time. |
31 StreamController s1 = new StreamController(); | 31 StreamController s1 = new StreamController.multiSubscription(); |
32 StreamController s2 = new StreamController(); | 32 StreamController s2 = new StreamController.multiSubscription(); |
33 StreamController s3 = new StreamController(); | 33 StreamController s3 = new StreamController.multiSubscription(); |
34 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); | 34 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); |
35 Events expected = new Events()..add(1)..add(2)..close(); | 35 Events expected = new Events()..add(1)..add(2)..close(); |
36 Events actual = new Events.capture(merge); | 36 Events actual = new Events.capture(merge); |
37 s1.add(1); | 37 s1.add(1); |
38 s3.add(2); | 38 s3.add(2); |
39 s1.add(1); // Ignored. | 39 s1.add(1); // Ignored. |
40 s2.add(1); // Ignored. | 40 s2.add(1); // Ignored. |
41 s3.close(); | 41 s3.close(); |
42 Expect.listEquals(expected.events, actual.events); | 42 Expect.listEquals(expected.events, actual.events); |
43 } | 43 } |
44 | 44 |
45 { // Closing a stream before superceding it. | 45 { // Closing a stream before superceding it. |
46 StreamController s1 = new StreamController(); | 46 StreamController s1 = new StreamController.multiSubscription(); |
47 StreamController s2 = new StreamController(); | 47 StreamController s2 = new StreamController.multiSubscription(); |
48 StreamController s3 = new StreamController(); | 48 StreamController s3 = new StreamController.multiSubscription(); |
49 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); | 49 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); |
50 Events expected = new Events()..add(1)..add(2)..add(3)..close(); | 50 Events expected = new Events()..add(1)..add(2)..add(3)..close(); |
51 Events actual = new Events.capture(merge); | 51 Events actual = new Events.capture(merge); |
52 s1.add(1); | 52 s1.add(1); |
53 s1.close(); | 53 s1.close(); |
54 s3.close(); | 54 s3.close(); |
55 s2.add(2); | 55 s2.add(2); |
56 s2.add(3); | 56 s2.add(3); |
57 s2.close(); | 57 s2.close(); |
58 Expect.listEquals(expected.events, actual.events); | 58 Expect.listEquals(expected.events, actual.events); |
59 } | 59 } |
60 | 60 |
61 { // Errors from all non-superceded streams are forwarded. | 61 { // Errors from all non-superceded streams are forwarded. |
62 StreamController s1 = new StreamController(); | 62 StreamController s1 = new StreamController.multiSubscription(); |
63 StreamController s2 = new StreamController(); | 63 StreamController s2 = new StreamController.multiSubscription(); |
64 StreamController s3 = new StreamController(); | 64 StreamController s3 = new StreamController.multiSubscription(); |
65 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); | 65 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); |
66 Events expected = | 66 Events expected = |
67 new Events()..add(1)..error("1")..error("2")..error("3") | 67 new Events()..add(1)..error("1")..error("2")..error("3") |
68 ..add(3)..error("6")..add(4)..close(); | 68 ..add(3)..error("6")..add(4)..close(); |
69 Events actual = new Events.capture(merge); | 69 Events actual = new Events.capture(merge); |
70 s1.add(1); | 70 s1.add(1); |
71 s1.signalError(new AsyncError("1")); | 71 s1.signalError(new AsyncError("1")); |
72 s2.signalError(new AsyncError("2")); | 72 s2.signalError(new AsyncError("2")); |
73 s3.signalError(new AsyncError("3")); | 73 s3.signalError(new AsyncError("3")); |
74 s3.add(3); | 74 s3.add(3); |
75 s1.signalError(new AsyncError("4")); | 75 s1.signalError(new AsyncError("4")); |
76 s2.signalError(new AsyncError("5")); | 76 s2.signalError(new AsyncError("5")); |
77 s3.signalError(new AsyncError("6")); | 77 s3.signalError(new AsyncError("6")); |
78 s1.close(); | 78 s1.close(); |
79 s2.close(); | 79 s2.close(); |
80 s3.add(4); | 80 s3.add(4); |
81 s3.close(); | 81 s3.close(); |
82 Expect.listEquals(expected.events, actual.events); | 82 Expect.listEquals(expected.events, actual.events); |
83 } | 83 } |
84 | 84 |
85 test("Pausing on a superceding stream", () { | 85 test("Pausing on a superceding stream", () { |
86 StreamController s1 = new StreamController(); | 86 StreamController s1 = new StreamController.multiSubscription(); |
87 StreamController s2 = new StreamController(); | 87 StreamController s2 = new StreamController.multiSubscription(); |
88 StreamController s3 = new StreamController(); | 88 StreamController s3 = new StreamController.multiSubscription(); |
89 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); | 89 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); |
90 Events expected = new Events()..add(1)..add(2)..add(3); | 90 Events expected = new Events()..add(1)..add(2)..add(3); |
91 Events actual = new Events.capture(merge); | 91 Events actual = new Events.capture(merge); |
92 s1.add(1); | 92 s1.add(1); |
93 s2.add(2); | 93 s2.add(2); |
94 s2.add(3); | 94 s2.add(3); |
95 Expect.listEquals(expected.events, actual.events); | 95 Expect.listEquals(expected.events, actual.events); |
96 actual.pause(); // Pauses the stream that feeds the actual Events. | 96 actual.pause(); // Pauses the stream that feeds the actual Events. |
97 Events expected2 = expected.copy(); | 97 Events expected2 = expected.copy(); |
98 expected..add(5)..add(6)..close(); | 98 expected..add(5)..add(6)..close(); |
99 expected2..add(6)..close(); | 99 expected2..add(6)..close(); |
100 s1.add(4); | 100 s1.add(4); |
101 s2.add(5); // May or may not arrive before '6' when resuming. | 101 s2.add(5); // May or may not arrive before '6' when resuming. |
102 s3.add(6); | 102 s3.add(6); |
103 s3.close(); | 103 s3.close(); |
104 actual.onDone(expectAsync0(() { | 104 actual.onDone(expectAsync0(() { |
105 if (expected.events.length == actual.events.length) { | 105 if (expected.events.length == actual.events.length) { |
106 Expect.listEquals(expected.events, actual.events); | 106 Expect.listEquals(expected.events, actual.events); |
107 } else { | 107 } else { |
108 Expect.listEquals(expected2.events, actual.events); | 108 Expect.listEquals(expected2.events, actual.events); |
109 } | 109 } |
110 })); | 110 })); |
111 actual.resume(); | 111 actual.resume(); |
112 }); | 112 }); |
113 } | 113 } |
114 | 114 |
115 void testCyclicStream() { | 115 void testCyclicStream() { |
116 test("Simple case of superceding lower priority streams", () { | 116 test("Simple case of superceding lower priority streams", () { |
117 StreamController s1 = new StreamController(); | 117 StreamController s1 = new StreamController.multiSubscription(); |
118 StreamController s2 = new StreamController(); | 118 StreamController s2 = new StreamController.multiSubscription(); |
119 StreamController s3 = new StreamController(); | 119 StreamController s3 = new StreamController.multiSubscription(); |
120 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]); | 120 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]); |
121 Events expected = | 121 Events expected = |
122 new Events()..add(1)..add(2)..add(3)..add(4)..add(5)..add(6)..close(); | 122 new Events()..add(1)..add(2)..add(3)..add(4)..add(5)..add(6)..close(); |
123 Events actual = new Events.capture(merge); | 123 Events actual = new Events.capture(merge); |
124 Expect.isFalse(s1.isPaused); | 124 Expect.isFalse(s1.isPaused); |
125 Expect.isTrue(s2.isPaused); | 125 Expect.isTrue(s2.isPaused); |
126 Expect.isTrue(s3.isPaused); | 126 Expect.isTrue(s3.isPaused); |
127 s3.add(3); | 127 s3.add(3); |
128 s1.add(1); | 128 s1.add(1); |
129 s1.add(4); | 129 s1.add(4); |
130 s1.add(6); | 130 s1.add(6); |
131 s1.close(); | 131 s1.close(); |
132 s2.add(2); | 132 s2.add(2); |
133 s2.add(5); | 133 s2.add(5); |
134 s2.close(); | 134 s2.close(); |
135 s3.close(); | 135 s3.close(); |
136 actual.onDone(expectAsync0(() { | 136 actual.onDone(expectAsync0(() { |
137 Expect.listEquals(expected.events, actual.events); | 137 Expect.listEquals(expected.events, actual.events); |
138 })); | 138 })); |
139 }); | 139 }); |
140 | 140 |
141 test("Cyclic merge with errors", () { | 141 test("Cyclic merge with errors", () { |
142 StreamController s1 = new StreamController(); | 142 StreamController s1 = new StreamController.multiSubscription(); |
143 StreamController s2 = new StreamController(); | 143 StreamController s2 = new StreamController.multiSubscription(); |
144 StreamController s3 = new StreamController(); | 144 StreamController s3 = new StreamController.multiSubscription(); |
145 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]); | 145 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]); |
146 Events expected = | 146 Events expected = |
147 new Events()..add(1)..error("1")..add(2)..add(3)..error("2") | 147 new Events()..add(1)..error("1")..add(2)..add(3)..error("2") |
148 ..add(4)..add(5)..error("3")..add(6)..close(); | 148 ..add(4)..add(5)..error("3")..add(6)..close(); |
149 Events actual = new Events.capture(merge); | 149 Events actual = new Events.capture(merge); |
150 Expect.isFalse(s1.isPaused); | 150 Expect.isFalse(s1.isPaused); |
151 Expect.isTrue(s2.isPaused); | 151 Expect.isTrue(s2.isPaused); |
152 Expect.isTrue(s3.isPaused); | 152 Expect.isTrue(s3.isPaused); |
153 s3.add(3); | 153 s3.add(3); |
154 s3.signalError(new AsyncError("3")); // Error just before a "done". | 154 s3.signalError(new AsyncError("3")); // Error just before a "done". |
(...skipping 10 matching lines...) Expand all Loading... |
165 actual.onDone(expectAsync0(() { | 165 actual.onDone(expectAsync0(() { |
166 Expect.listEquals(expected.events, actual.events); | 166 Expect.listEquals(expected.events, actual.events); |
167 })); | 167 })); |
168 }); | 168 }); |
169 } | 169 } |
170 | 170 |
171 main() { | 171 main() { |
172 testSupercedeStream(); | 172 testSupercedeStream(); |
173 testCyclicStream(); | 173 testCyclicStream(); |
174 } | 174 } |
OLD | NEW |