OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import "dart:async"; | 5 import "dart:async"; |
6 import "package:async/stream_zip.dart"; | 6 |
| 7 import "package:async/async.dart"; |
7 import "package:test/test.dart"; | 8 import "package:test/test.dart"; |
8 | 9 |
9 /// Create an error with the same values as [base], except that it throwsA | 10 /// Create an error with the same values as [base], except that it throwsA |
10 /// when seeing the value [errorValue]. | 11 /// when seeing the value [errorValue]. |
11 Stream streamError(Stream base, int errorValue, error) { | 12 Stream streamError(Stream base, int errorValue, error) { |
12 return base.map((x) => (x == errorValue) ? throw error : x); | 13 return base.map((x) => (x == errorValue) ? throw error : x); |
13 } | 14 } |
14 | 15 |
15 /// Make a [Stream] from an [Iterable] by adding events to a stream controller | 16 /// Make a [Stream] from an [Iterable] by adding events to a stream controller |
16 /// at periodic intervals. | 17 /// at periodic intervals. |
(...skipping 11 matching lines...) Expand all Loading... |
28 } | 29 } |
29 }); | 30 }); |
30 return controller.stream; | 31 return controller.stream; |
31 } | 32 } |
32 | 33 |
33 /// Counter used to give varying delays for streams. | 34 /// Counter used to give varying delays for streams. |
34 int ctr = 0; | 35 int ctr = 0; |
35 | 36 |
36 main() { | 37 main() { |
37 // Test that zipping [streams] gives the results iterated by [expectedData]. | 38 // Test that zipping [streams] gives the results iterated by [expectedData]. |
38 testZip(Iterable streams, Iterable expectedData) { | 39 testZip(Iterable<Stream> streams, Iterable expectedData) { |
39 List data = []; | 40 List data = []; |
40 Stream zip = new StreamZip(streams); | 41 Stream zip = new StreamZip(streams); |
41 zip.listen(data.add, onDone: expectAsync(() { | 42 zip.listen(data.add, onDone: expectAsync0(() { |
42 expect(data, equals(expectedData)); | 43 expect(data, equals(expectedData)); |
43 })); | 44 })); |
44 } | 45 } |
45 | 46 |
46 test("Basic", () { | 47 test("Basic", () { |
47 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9])], | 48 testZip([ |
48 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 49 mks([1, 2, 3]), |
| 50 mks([4, 5, 6]), |
| 51 mks([7, 8, 9]) |
| 52 ], [ |
| 53 [1, 4, 7], |
| 54 [2, 5, 8], |
| 55 [3, 6, 9] |
| 56 ]); |
49 }); | 57 }); |
50 | 58 |
51 test("Uneven length 1", () { | 59 test("Uneven length 1", () { |
52 testZip([mks([1, 2, 3, 99, 100]), mks([4, 5, 6]), mks([7, 8, 9])], | 60 testZip([ |
53 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 61 mks([1, 2, 3, 99, 100]), |
| 62 mks([4, 5, 6]), |
| 63 mks([7, 8, 9]) |
| 64 ], [ |
| 65 [1, 4, 7], |
| 66 [2, 5, 8], |
| 67 [3, 6, 9] |
| 68 ]); |
54 }); | 69 }); |
55 | 70 |
56 test("Uneven length 2", () { | 71 test("Uneven length 2", () { |
57 testZip([mks([1, 2, 3]), mks([4, 5, 6, 99, 100]), mks([7, 8, 9])], | 72 testZip([ |
58 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 73 mks([1, 2, 3]), |
| 74 mks([4, 5, 6, 99, 100]), |
| 75 mks([7, 8, 9]) |
| 76 ], [ |
| 77 [1, 4, 7], |
| 78 [2, 5, 8], |
| 79 [3, 6, 9] |
| 80 ]); |
59 }); | 81 }); |
60 | 82 |
61 test("Uneven length 3", () { | 83 test("Uneven length 3", () { |
62 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])], | 84 testZip([ |
63 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 85 mks([1, 2, 3]), |
| 86 mks([4, 5, 6]), |
| 87 mks([7, 8, 9, 99, 100]) |
| 88 ], [ |
| 89 [1, 4, 7], |
| 90 [2, 5, 8], |
| 91 [3, 6, 9] |
| 92 ]); |
64 }); | 93 }); |
65 | 94 |
66 test("Uneven length 4", () { | 95 test("Uneven length 4", () { |
67 testZip([mks([1, 2, 3, 98]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])], | 96 testZip([ |
68 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 97 mks([1, 2, 3, 98]), |
| 98 mks([4, 5, 6]), |
| 99 mks([7, 8, 9, 99, 100]) |
| 100 ], [ |
| 101 [1, 4, 7], |
| 102 [2, 5, 8], |
| 103 [3, 6, 9] |
| 104 ]); |
69 }); | 105 }); |
70 | 106 |
71 test("Empty 1", () { | 107 test("Empty 1", () { |
72 testZip([mks([]), mks([4, 5, 6]), mks([7, 8, 9])], []); | 108 testZip([ |
| 109 mks([]), |
| 110 mks([4, 5, 6]), |
| 111 mks([7, 8, 9]) |
| 112 ], []); |
73 }); | 113 }); |
74 | 114 |
75 test("Empty 2", () { | 115 test("Empty 2", () { |
76 testZip([mks([1, 2, 3]), mks([]), mks([7, 8, 9])], []); | 116 testZip([ |
| 117 mks([1, 2, 3]), |
| 118 mks([]), |
| 119 mks([7, 8, 9]) |
| 120 ], []); |
77 }); | 121 }); |
78 | 122 |
79 test("Empty 3", () { | 123 test("Empty 3", () { |
80 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([])], []); | 124 testZip([ |
| 125 mks([1, 2, 3]), |
| 126 mks([4, 5, 6]), |
| 127 mks([]) |
| 128 ], []); |
81 }); | 129 }); |
82 | 130 |
83 test("Empty source", () { | 131 test("Empty source", () { |
84 testZip([], []); | 132 testZip([], []); |
85 }); | 133 }); |
86 | 134 |
87 test("Single Source", () { | 135 test("Single Source", () { |
88 testZip([mks([1, 2, 3])], [[1], [2], [3]]); | 136 testZip([ |
| 137 mks([1, 2, 3]) |
| 138 ], [ |
| 139 [1], |
| 140 [2], |
| 141 [3] |
| 142 ]); |
89 }); | 143 }); |
90 | 144 |
91 test("Other-streams", () { | 145 test("Other-streams", () { |
92 Stream st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4); | 146 Stream st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4); |
93 Stream st2 = new Stream.periodic(const Duration(milliseconds: 5), | 147 Stream st2 = |
94 (x) => x + 4).take(3); | 148 new Stream.periodic(const Duration(milliseconds: 5), (x) => x + 4) |
| 149 .take(3); |
95 StreamController c = new StreamController.broadcast(); | 150 StreamController c = new StreamController.broadcast(); |
96 Stream st3 = c.stream; | 151 Stream st3 = c.stream; |
97 testZip([st1, st2, st3], | 152 testZip([ |
98 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 153 st1, |
99 c..add(7)..add(8)..add(9)..close(); | 154 st2, |
| 155 st3 |
| 156 ], [ |
| 157 [1, 4, 7], |
| 158 [2, 5, 8], |
| 159 [3, 6, 9] |
| 160 ]); |
| 161 c |
| 162 ..add(7) |
| 163 ..add(8) |
| 164 ..add(9) |
| 165 ..close(); |
100 }); | 166 }); |
101 | 167 |
102 test("Error 1", () { | 168 test("Error 1", () { |
103 expect(new StreamZip([streamError(mks([1, 2, 3]), 2, "BAD-1"), | 169 expect( |
104 mks([4, 5, 6]), | 170 new StreamZip([ |
105 mks([7, 8, 9])]).toList(), | 171 streamError(mks([1, 2, 3]), 2, "BAD-1"), |
106 throwsA(equals("BAD-1"))); | 172 mks([4, 5, 6]), |
| 173 mks([7, 8, 9]) |
| 174 ]).toList(), |
| 175 throwsA(equals("BAD-1"))); |
107 }); | 176 }); |
108 | 177 |
109 test("Error 2", () { | 178 test("Error 2", () { |
110 expect(new StreamZip([mks([1, 2, 3]), | 179 expect( |
111 streamError(mks([4, 5, 6]), 5, "BAD-2"), | 180 new StreamZip([ |
112 mks([7, 8, 9])]).toList(), | 181 mks([1, 2, 3]), |
113 throwsA(equals("BAD-2"))); | 182 streamError(mks([4, 5, 6]), 5, "BAD-2"), |
| 183 mks([7, 8, 9]) |
| 184 ]).toList(), |
| 185 throwsA(equals("BAD-2"))); |
114 }); | 186 }); |
115 | 187 |
116 test("Error 3", () { | 188 test("Error 3", () { |
117 expect(new StreamZip([mks([1, 2, 3]), | 189 expect( |
118 mks([4, 5, 6]), | 190 new StreamZip([ |
119 streamError(mks([7, 8, 9]), 8, "BAD-3")]).toList(), | 191 mks([1, 2, 3]), |
120 throwsA(equals("BAD-3"))); | 192 mks([4, 5, 6]), |
| 193 streamError(mks([7, 8, 9]), 8, "BAD-3") |
| 194 ]).toList(), |
| 195 throwsA(equals("BAD-3"))); |
121 }); | 196 }); |
122 | 197 |
123 test("Error at end", () { | 198 test("Error at end", () { |
124 expect(new StreamZip([mks([1, 2, 3]), | 199 expect( |
125 streamError(mks([4, 5, 6]), 6, "BAD-4"), | 200 new StreamZip([ |
126 mks([7, 8, 9])]).toList(), | 201 mks([1, 2, 3]), |
127 throwsA(equals("BAD-4"))); | 202 streamError(mks([4, 5, 6]), 6, "BAD-4"), |
| 203 mks([7, 8, 9]) |
| 204 ]).toList(), |
| 205 throwsA(equals("BAD-4"))); |
128 }); | 206 }); |
129 | 207 |
130 test("Error before first end", () { | 208 test("Error before first end", () { |
131 // StreamControllers' streams with no "close" called will never be done, | 209 // StreamControllers' streams with no "close" called will never be done, |
132 // so the fourth event of the first stream is guaranteed to come first. | 210 // so the fourth event of the first stream is guaranteed to come first. |
133 expect(new StreamZip( | 211 expect( |
134 [streamError(mks([1, 2, 3, 4]), 4, "BAD-5"), | 212 new StreamZip([ |
135 (new StreamController()..add(4)..add(5)..add(6)).stream, | 213 streamError(mks([1, 2, 3, 4]), 4, "BAD-5"), |
136 (new StreamController()..add(7)..add(8)..add(9)).stream] | 214 (new StreamController()..add(4)..add(5)..add(6)).stream, |
137 ).toList(), | 215 (new StreamController()..add(7)..add(8)..add(9)).stream |
138 throwsA(equals("BAD-5"))); | 216 ]).toList(), |
| 217 throwsA(equals("BAD-5"))); |
139 }); | 218 }); |
140 | 219 |
141 test("Error after first end", () { | 220 test("Error after first end", () { |
142 StreamController controller = new StreamController(); | 221 StreamController controller = new StreamController(); |
143 controller..add(7)..add(8)..add(9); | 222 controller..add(7)..add(8)..add(9); |
144 // Transformer that puts error into controller when one of the first two | 223 // Transformer that puts error into controller when one of the first two |
145 // streams have sent a done event. | 224 // streams have sent a done event. |
146 StreamTransformer trans = new StreamTransformer.fromHandlers( | 225 StreamTransformer trans = |
147 handleDone: (EventSink s) { | 226 new StreamTransformer.fromHandlers(handleDone: (EventSink s) { |
148 Timer.run(() { controller.addError("BAD-6"); }); | 227 Timer.run(() { |
| 228 controller.addError("BAD-6"); |
| 229 }); |
149 s.close(); | 230 s.close(); |
150 }); | 231 }); |
151 testZip([mks([1, 2, 3]).transform(trans), | 232 testZip([ |
152 mks([4, 5, 6]).transform(trans), | 233 mks([1, 2, 3]).transform(trans), |
153 controller.stream], | 234 mks([4, 5, 6]).transform(trans), |
154 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 235 controller.stream |
| 236 ], [ |
| 237 [1, 4, 7], |
| 238 [2, 5, 8], |
| 239 [3, 6, 9] |
| 240 ]); |
155 }); | 241 }); |
156 | 242 |
157 test("Pause/Resume", () { | 243 test("Pause/Resume", () { |
158 int sc1p = 0; | 244 int sc1p = 0; |
159 StreamController c1 = new StreamController( | 245 StreamController c1 = new StreamController(onPause: () { |
160 onPause: () { | 246 sc1p++; |
161 sc1p++; | 247 }, onResume: () { |
162 }, | 248 sc1p--; |
163 onResume: () { | 249 }); |
164 sc1p--; | |
165 }); | |
166 | 250 |
167 int sc2p = 0; | 251 int sc2p = 0; |
168 StreamController c2 = new StreamController( | 252 StreamController c2 = new StreamController(onPause: () { |
169 onPause: () { | 253 sc2p++; |
170 sc2p++; | 254 }, onResume: () { |
171 }, | 255 sc2p--; |
172 onResume: () { | 256 }); |
173 sc2p--; | |
174 }); | |
175 | 257 |
176 var done = expectAsync((){ | 258 var done = expectAsync0(() { |
177 expect(sc1p, equals(1)); | 259 expect(sc1p, equals(1)); |
178 expect(sc2p, equals(0)); | 260 expect(sc2p, equals(0)); |
179 }); // Call to complete test. | 261 }); // Call to complete test. |
180 | 262 |
181 Stream zip = new StreamZip([c1.stream, c2.stream]); | 263 Stream zip = new StreamZip([c1.stream, c2.stream]); |
182 | 264 |
183 const ms25 = const Duration(milliseconds: 25); | 265 const ms25 = const Duration(milliseconds: 25); |
184 | 266 |
185 // StreamIterator uses pause and resume to control flow. | 267 // StreamIterator uses pause and resume to control flow. |
186 StreamIterator it = new StreamIterator(zip); | 268 StreamIterator it = new StreamIterator(zip); |
187 | 269 |
188 it.moveNext().then((hasMore) { | 270 it.moveNext().then((hasMore) { |
189 expect(hasMore, isTrue); | 271 expect(hasMore, isTrue); |
190 expect(it.current, equals([1, 2])); | 272 expect(it.current, equals([1, 2])); |
191 return it.moveNext(); | 273 return it.moveNext(); |
192 }).then((hasMore) { | 274 }).then((hasMore) { |
193 expect(hasMore, isTrue); | 275 expect(hasMore, isTrue); |
194 expect(it.current, equals([3, 4])); | 276 expect(it.current, equals([3, 4])); |
195 c2.add(6); | 277 c2.add(6); |
196 return it.moveNext(); | 278 return it.moveNext(); |
197 }).then((hasMore) { | 279 }).then((hasMore) { |
198 expect(hasMore, isTrue); | 280 expect(hasMore, isTrue); |
199 expect(it.current, equals([5, 6])); | 281 expect(it.current, equals([5, 6])); |
200 new Future.delayed(ms25).then((_) { c2.add(8); }); | 282 new Future.delayed(ms25).then((_) { |
| 283 c2.add(8); |
| 284 }); |
201 return it.moveNext(); | 285 return it.moveNext(); |
202 }).then((hasMore) { | 286 }).then((hasMore) { |
203 expect(hasMore, isTrue); | 287 expect(hasMore, isTrue); |
204 expect(it.current, equals([7, 8])); | 288 expect(it.current, equals([7, 8])); |
205 c2.add(9); | 289 c2.add(9); |
206 return it.moveNext(); | 290 return it.moveNext(); |
207 }).then((hasMore) { | 291 }).then((hasMore) { |
208 expect(hasMore, isFalse); | 292 expect(hasMore, isFalse); |
209 done(); | 293 done(); |
210 }); | 294 }); |
211 | 295 |
212 c1..add(1)..add(3)..add(5)..add(7)..close(); | 296 c1 |
| 297 ..add(1) |
| 298 ..add(3) |
| 299 ..add(5) |
| 300 ..add(7) |
| 301 ..close(); |
213 c2..add(2)..add(4); | 302 c2..add(2)..add(4); |
214 }); | 303 }); |
215 | 304 |
216 test("pause-resume2", () { | 305 test("pause-resume2", () { |
217 var s1 = new Stream.fromIterable([0, 2, 4, 6, 8]); | 306 var s1 = new Stream.fromIterable([0, 2, 4, 6, 8]); |
218 var s2 = new Stream.fromIterable([1, 3, 5, 7]); | 307 var s2 = new Stream.fromIterable([1, 3, 5, 7]); |
219 var sz = new StreamZip([s1, s2]); | 308 var sz = new StreamZip([s1, s2]); |
220 int ctr = 0; | 309 int ctr = 0; |
221 var sub; | 310 var sub; |
222 sub = sz.listen(expectAsync((v) { | 311 sub = sz.listen(expectAsync1((v) { |
223 expect(v, equals([ctr * 2, ctr * 2 + 1])); | 312 expect(v, equals([ctr * 2, ctr * 2 + 1])); |
224 if (ctr == 1) { | 313 if (ctr == 1) { |
225 sub.pause(new Future.delayed(const Duration(milliseconds: 25))); | 314 sub.pause(new Future.delayed(const Duration(milliseconds: 25))); |
226 } else if (ctr == 2) { | 315 } else if (ctr == 2) { |
227 sub.pause(); | 316 sub.pause(); |
228 new Future.delayed(const Duration(milliseconds: 25)).then((_) { | 317 new Future.delayed(const Duration(milliseconds: 25)).then((_) { |
229 sub.resume(); | 318 sub.resume(); |
230 }); | 319 }); |
231 } | 320 } |
232 ctr++; | 321 ctr++; |
233 }, count: 4)); | 322 }, count: 4)); |
234 }); | 323 }); |
235 } | 324 } |
OLD | NEW |