| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import "dart:async"; | 5 import "dart:async"; |
| 6 import "package:async/stream_zip.dart"; | 6 |
| 7 import "package:async/async.dart"; |
| 7 import "package:test/test.dart"; | 8 import "package:test/test.dart"; |
| 8 | 9 |
| 9 /// Create an error with the same values as [base], except that it throwsA | 10 /// Create an error with the same values as [base], except that it throwsA |
| 10 /// when seeing the value [errorValue]. | 11 /// when seeing the value [errorValue]. |
| 11 Stream streamError(Stream base, int errorValue, error) { | 12 Stream streamError(Stream base, int errorValue, error) { |
| 12 return base.map((x) => (x == errorValue) ? throw error : x); | 13 return base.map((x) => (x == errorValue) ? throw error : x); |
| 13 } | 14 } |
| 14 | 15 |
| 15 /// Make a [Stream] from an [Iterable] by adding events to a stream controller | 16 /// Make a [Stream] from an [Iterable] by adding events to a stream controller |
| 16 /// at periodic intervals. | 17 /// at periodic intervals. |
| (...skipping 11 matching lines...) Expand all Loading... |
| 28 } | 29 } |
| 29 }); | 30 }); |
| 30 return controller.stream; | 31 return controller.stream; |
| 31 } | 32 } |
| 32 | 33 |
| 33 /// Counter used to give varying delays for streams. | 34 /// Counter used to give varying delays for streams. |
| 34 int ctr = 0; | 35 int ctr = 0; |
| 35 | 36 |
| 36 main() { | 37 main() { |
| 37 // Test that zipping [streams] gives the results iterated by [expectedData]. | 38 // Test that zipping [streams] gives the results iterated by [expectedData]. |
| 38 testZip(Iterable streams, Iterable expectedData) { | 39 testZip(Iterable<Stream> streams, Iterable expectedData) { |
| 39 List data = []; | 40 List data = []; |
| 40 Stream zip = new StreamZip(streams); | 41 Stream zip = new StreamZip(streams); |
| 41 zip.listen(data.add, onDone: expectAsync(() { | 42 zip.listen(data.add, onDone: expectAsync0(() { |
| 42 expect(data, equals(expectedData)); | 43 expect(data, equals(expectedData)); |
| 43 })); | 44 })); |
| 44 } | 45 } |
| 45 | 46 |
| 46 test("Basic", () { | 47 test("Basic", () { |
| 47 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9])], | 48 testZip([ |
| 48 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 49 mks([1, 2, 3]), |
| 50 mks([4, 5, 6]), |
| 51 mks([7, 8, 9]) |
| 52 ], [ |
| 53 [1, 4, 7], |
| 54 [2, 5, 8], |
| 55 [3, 6, 9] |
| 56 ]); |
| 49 }); | 57 }); |
| 50 | 58 |
| 51 test("Uneven length 1", () { | 59 test("Uneven length 1", () { |
| 52 testZip([mks([1, 2, 3, 99, 100]), mks([4, 5, 6]), mks([7, 8, 9])], | 60 testZip([ |
| 53 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 61 mks([1, 2, 3, 99, 100]), |
| 62 mks([4, 5, 6]), |
| 63 mks([7, 8, 9]) |
| 64 ], [ |
| 65 [1, 4, 7], |
| 66 [2, 5, 8], |
| 67 [3, 6, 9] |
| 68 ]); |
| 54 }); | 69 }); |
| 55 | 70 |
| 56 test("Uneven length 2", () { | 71 test("Uneven length 2", () { |
| 57 testZip([mks([1, 2, 3]), mks([4, 5, 6, 99, 100]), mks([7, 8, 9])], | 72 testZip([ |
| 58 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 73 mks([1, 2, 3]), |
| 74 mks([4, 5, 6, 99, 100]), |
| 75 mks([7, 8, 9]) |
| 76 ], [ |
| 77 [1, 4, 7], |
| 78 [2, 5, 8], |
| 79 [3, 6, 9] |
| 80 ]); |
| 59 }); | 81 }); |
| 60 | 82 |
| 61 test("Uneven length 3", () { | 83 test("Uneven length 3", () { |
| 62 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])], | 84 testZip([ |
| 63 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 85 mks([1, 2, 3]), |
| 86 mks([4, 5, 6]), |
| 87 mks([7, 8, 9, 99, 100]) |
| 88 ], [ |
| 89 [1, 4, 7], |
| 90 [2, 5, 8], |
| 91 [3, 6, 9] |
| 92 ]); |
| 64 }); | 93 }); |
| 65 | 94 |
| 66 test("Uneven length 4", () { | 95 test("Uneven length 4", () { |
| 67 testZip([mks([1, 2, 3, 98]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])], | 96 testZip([ |
| 68 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 97 mks([1, 2, 3, 98]), |
| 98 mks([4, 5, 6]), |
| 99 mks([7, 8, 9, 99, 100]) |
| 100 ], [ |
| 101 [1, 4, 7], |
| 102 [2, 5, 8], |
| 103 [3, 6, 9] |
| 104 ]); |
| 69 }); | 105 }); |
| 70 | 106 |
| 71 test("Empty 1", () { | 107 test("Empty 1", () { |
| 72 testZip([mks([]), mks([4, 5, 6]), mks([7, 8, 9])], []); | 108 testZip([ |
| 109 mks([]), |
| 110 mks([4, 5, 6]), |
| 111 mks([7, 8, 9]) |
| 112 ], []); |
| 73 }); | 113 }); |
| 74 | 114 |
| 75 test("Empty 2", () { | 115 test("Empty 2", () { |
| 76 testZip([mks([1, 2, 3]), mks([]), mks([7, 8, 9])], []); | 116 testZip([ |
| 117 mks([1, 2, 3]), |
| 118 mks([]), |
| 119 mks([7, 8, 9]) |
| 120 ], []); |
| 77 }); | 121 }); |
| 78 | 122 |
| 79 test("Empty 3", () { | 123 test("Empty 3", () { |
| 80 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([])], []); | 124 testZip([ |
| 125 mks([1, 2, 3]), |
| 126 mks([4, 5, 6]), |
| 127 mks([]) |
| 128 ], []); |
| 81 }); | 129 }); |
| 82 | 130 |
| 83 test("Empty source", () { | 131 test("Empty source", () { |
| 84 testZip([], []); | 132 testZip([], []); |
| 85 }); | 133 }); |
| 86 | 134 |
| 87 test("Single Source", () { | 135 test("Single Source", () { |
| 88 testZip([mks([1, 2, 3])], [[1], [2], [3]]); | 136 testZip([ |
| 137 mks([1, 2, 3]) |
| 138 ], [ |
| 139 [1], |
| 140 [2], |
| 141 [3] |
| 142 ]); |
| 89 }); | 143 }); |
| 90 | 144 |
| 91 test("Other-streams", () { | 145 test("Other-streams", () { |
| 92 Stream st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4); | 146 Stream st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4); |
| 93 Stream st2 = new Stream.periodic(const Duration(milliseconds: 5), | 147 Stream st2 = |
| 94 (x) => x + 4).take(3); | 148 new Stream.periodic(const Duration(milliseconds: 5), (x) => x + 4) |
| 149 .take(3); |
| 95 StreamController c = new StreamController.broadcast(); | 150 StreamController c = new StreamController.broadcast(); |
| 96 Stream st3 = c.stream; | 151 Stream st3 = c.stream; |
| 97 testZip([st1, st2, st3], | 152 testZip([ |
| 98 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 153 st1, |
| 99 c..add(7)..add(8)..add(9)..close(); | 154 st2, |
| 155 st3 |
| 156 ], [ |
| 157 [1, 4, 7], |
| 158 [2, 5, 8], |
| 159 [3, 6, 9] |
| 160 ]); |
| 161 c |
| 162 ..add(7) |
| 163 ..add(8) |
| 164 ..add(9) |
| 165 ..close(); |
| 100 }); | 166 }); |
| 101 | 167 |
| 102 test("Error 1", () { | 168 test("Error 1", () { |
| 103 expect(new StreamZip([streamError(mks([1, 2, 3]), 2, "BAD-1"), | 169 expect( |
| 104 mks([4, 5, 6]), | 170 new StreamZip([ |
| 105 mks([7, 8, 9])]).toList(), | 171 streamError(mks([1, 2, 3]), 2, "BAD-1"), |
| 106 throwsA(equals("BAD-1"))); | 172 mks([4, 5, 6]), |
| 173 mks([7, 8, 9]) |
| 174 ]).toList(), |
| 175 throwsA(equals("BAD-1"))); |
| 107 }); | 176 }); |
| 108 | 177 |
| 109 test("Error 2", () { | 178 test("Error 2", () { |
| 110 expect(new StreamZip([mks([1, 2, 3]), | 179 expect( |
| 111 streamError(mks([4, 5, 6]), 5, "BAD-2"), | 180 new StreamZip([ |
| 112 mks([7, 8, 9])]).toList(), | 181 mks([1, 2, 3]), |
| 113 throwsA(equals("BAD-2"))); | 182 streamError(mks([4, 5, 6]), 5, "BAD-2"), |
| 183 mks([7, 8, 9]) |
| 184 ]).toList(), |
| 185 throwsA(equals("BAD-2"))); |
| 114 }); | 186 }); |
| 115 | 187 |
| 116 test("Error 3", () { | 188 test("Error 3", () { |
| 117 expect(new StreamZip([mks([1, 2, 3]), | 189 expect( |
| 118 mks([4, 5, 6]), | 190 new StreamZip([ |
| 119 streamError(mks([7, 8, 9]), 8, "BAD-3")]).toList(), | 191 mks([1, 2, 3]), |
| 120 throwsA(equals("BAD-3"))); | 192 mks([4, 5, 6]), |
| 193 streamError(mks([7, 8, 9]), 8, "BAD-3") |
| 194 ]).toList(), |
| 195 throwsA(equals("BAD-3"))); |
| 121 }); | 196 }); |
| 122 | 197 |
| 123 test("Error at end", () { | 198 test("Error at end", () { |
| 124 expect(new StreamZip([mks([1, 2, 3]), | 199 expect( |
| 125 streamError(mks([4, 5, 6]), 6, "BAD-4"), | 200 new StreamZip([ |
| 126 mks([7, 8, 9])]).toList(), | 201 mks([1, 2, 3]), |
| 127 throwsA(equals("BAD-4"))); | 202 streamError(mks([4, 5, 6]), 6, "BAD-4"), |
| 203 mks([7, 8, 9]) |
| 204 ]).toList(), |
| 205 throwsA(equals("BAD-4"))); |
| 128 }); | 206 }); |
| 129 | 207 |
| 130 test("Error before first end", () { | 208 test("Error before first end", () { |
| 131 // StreamControllers' streams with no "close" called will never be done, | 209 // StreamControllers' streams with no "close" called will never be done, |
| 132 // so the fourth event of the first stream is guaranteed to come first. | 210 // so the fourth event of the first stream is guaranteed to come first. |
| 133 expect(new StreamZip( | 211 expect( |
| 134 [streamError(mks([1, 2, 3, 4]), 4, "BAD-5"), | 212 new StreamZip([ |
| 135 (new StreamController()..add(4)..add(5)..add(6)).stream, | 213 streamError(mks([1, 2, 3, 4]), 4, "BAD-5"), |
| 136 (new StreamController()..add(7)..add(8)..add(9)).stream] | 214 (new StreamController()..add(4)..add(5)..add(6)).stream, |
| 137 ).toList(), | 215 (new StreamController()..add(7)..add(8)..add(9)).stream |
| 138 throwsA(equals("BAD-5"))); | 216 ]).toList(), |
| 217 throwsA(equals("BAD-5"))); |
| 139 }); | 218 }); |
| 140 | 219 |
| 141 test("Error after first end", () { | 220 test("Error after first end", () { |
| 142 StreamController controller = new StreamController(); | 221 StreamController controller = new StreamController(); |
| 143 controller..add(7)..add(8)..add(9); | 222 controller..add(7)..add(8)..add(9); |
| 144 // Transformer that puts error into controller when one of the first two | 223 // Transformer that puts error into controller when one of the first two |
| 145 // streams have sent a done event. | 224 // streams have sent a done event. |
| 146 StreamTransformer trans = new StreamTransformer.fromHandlers( | 225 StreamTransformer trans = |
| 147 handleDone: (EventSink s) { | 226 new StreamTransformer.fromHandlers(handleDone: (EventSink s) { |
| 148 Timer.run(() { controller.addError("BAD-6"); }); | 227 Timer.run(() { |
| 228 controller.addError("BAD-6"); |
| 229 }); |
| 149 s.close(); | 230 s.close(); |
| 150 }); | 231 }); |
| 151 testZip([mks([1, 2, 3]).transform(trans), | 232 testZip([ |
| 152 mks([4, 5, 6]).transform(trans), | 233 mks([1, 2, 3]).transform(trans), |
| 153 controller.stream], | 234 mks([4, 5, 6]).transform(trans), |
| 154 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | 235 controller.stream |
| 236 ], [ |
| 237 [1, 4, 7], |
| 238 [2, 5, 8], |
| 239 [3, 6, 9] |
| 240 ]); |
| 155 }); | 241 }); |
| 156 | 242 |
| 157 test("Pause/Resume", () { | 243 test("Pause/Resume", () { |
| 158 int sc1p = 0; | 244 int sc1p = 0; |
| 159 StreamController c1 = new StreamController( | 245 StreamController c1 = new StreamController(onPause: () { |
| 160 onPause: () { | 246 sc1p++; |
| 161 sc1p++; | 247 }, onResume: () { |
| 162 }, | 248 sc1p--; |
| 163 onResume: () { | 249 }); |
| 164 sc1p--; | |
| 165 }); | |
| 166 | 250 |
| 167 int sc2p = 0; | 251 int sc2p = 0; |
| 168 StreamController c2 = new StreamController( | 252 StreamController c2 = new StreamController(onPause: () { |
| 169 onPause: () { | 253 sc2p++; |
| 170 sc2p++; | 254 }, onResume: () { |
| 171 }, | 255 sc2p--; |
| 172 onResume: () { | 256 }); |
| 173 sc2p--; | |
| 174 }); | |
| 175 | 257 |
| 176 var done = expectAsync((){ | 258 var done = expectAsync0(() { |
| 177 expect(sc1p, equals(1)); | 259 expect(sc1p, equals(1)); |
| 178 expect(sc2p, equals(0)); | 260 expect(sc2p, equals(0)); |
| 179 }); // Call to complete test. | 261 }); // Call to complete test. |
| 180 | 262 |
| 181 Stream zip = new StreamZip([c1.stream, c2.stream]); | 263 Stream zip = new StreamZip([c1.stream, c2.stream]); |
| 182 | 264 |
| 183 const ms25 = const Duration(milliseconds: 25); | 265 const ms25 = const Duration(milliseconds: 25); |
| 184 | 266 |
| 185 // StreamIterator uses pause and resume to control flow. | 267 // StreamIterator uses pause and resume to control flow. |
| 186 StreamIterator it = new StreamIterator(zip); | 268 StreamIterator it = new StreamIterator(zip); |
| 187 | 269 |
| 188 it.moveNext().then((hasMore) { | 270 it.moveNext().then((hasMore) { |
| 189 expect(hasMore, isTrue); | 271 expect(hasMore, isTrue); |
| 190 expect(it.current, equals([1, 2])); | 272 expect(it.current, equals([1, 2])); |
| 191 return it.moveNext(); | 273 return it.moveNext(); |
| 192 }).then((hasMore) { | 274 }).then((hasMore) { |
| 193 expect(hasMore, isTrue); | 275 expect(hasMore, isTrue); |
| 194 expect(it.current, equals([3, 4])); | 276 expect(it.current, equals([3, 4])); |
| 195 c2.add(6); | 277 c2.add(6); |
| 196 return it.moveNext(); | 278 return it.moveNext(); |
| 197 }).then((hasMore) { | 279 }).then((hasMore) { |
| 198 expect(hasMore, isTrue); | 280 expect(hasMore, isTrue); |
| 199 expect(it.current, equals([5, 6])); | 281 expect(it.current, equals([5, 6])); |
| 200 new Future.delayed(ms25).then((_) { c2.add(8); }); | 282 new Future.delayed(ms25).then((_) { |
| 283 c2.add(8); |
| 284 }); |
| 201 return it.moveNext(); | 285 return it.moveNext(); |
| 202 }).then((hasMore) { | 286 }).then((hasMore) { |
| 203 expect(hasMore, isTrue); | 287 expect(hasMore, isTrue); |
| 204 expect(it.current, equals([7, 8])); | 288 expect(it.current, equals([7, 8])); |
| 205 c2.add(9); | 289 c2.add(9); |
| 206 return it.moveNext(); | 290 return it.moveNext(); |
| 207 }).then((hasMore) { | 291 }).then((hasMore) { |
| 208 expect(hasMore, isFalse); | 292 expect(hasMore, isFalse); |
| 209 done(); | 293 done(); |
| 210 }); | 294 }); |
| 211 | 295 |
| 212 c1..add(1)..add(3)..add(5)..add(7)..close(); | 296 c1 |
| 297 ..add(1) |
| 298 ..add(3) |
| 299 ..add(5) |
| 300 ..add(7) |
| 301 ..close(); |
| 213 c2..add(2)..add(4); | 302 c2..add(2)..add(4); |
| 214 }); | 303 }); |
| 215 | 304 |
| 216 test("pause-resume2", () { | 305 test("pause-resume2", () { |
| 217 var s1 = new Stream.fromIterable([0, 2, 4, 6, 8]); | 306 var s1 = new Stream.fromIterable([0, 2, 4, 6, 8]); |
| 218 var s2 = new Stream.fromIterable([1, 3, 5, 7]); | 307 var s2 = new Stream.fromIterable([1, 3, 5, 7]); |
| 219 var sz = new StreamZip([s1, s2]); | 308 var sz = new StreamZip([s1, s2]); |
| 220 int ctr = 0; | 309 int ctr = 0; |
| 221 var sub; | 310 var sub; |
| 222 sub = sz.listen(expectAsync((v) { | 311 sub = sz.listen(expectAsync1((v) { |
| 223 expect(v, equals([ctr * 2, ctr * 2 + 1])); | 312 expect(v, equals([ctr * 2, ctr * 2 + 1])); |
| 224 if (ctr == 1) { | 313 if (ctr == 1) { |
| 225 sub.pause(new Future.delayed(const Duration(milliseconds: 25))); | 314 sub.pause(new Future.delayed(const Duration(milliseconds: 25))); |
| 226 } else if (ctr == 2) { | 315 } else if (ctr == 2) { |
| 227 sub.pause(); | 316 sub.pause(); |
| 228 new Future.delayed(const Duration(milliseconds: 25)).then((_) { | 317 new Future.delayed(const Duration(milliseconds: 25)).then((_) { |
| 229 sub.resume(); | 318 sub.resume(); |
| 230 }); | 319 }); |
| 231 } | 320 } |
| 232 ctr++; | 321 ctr++; |
| 233 }, count: 4)); | 322 }, count: 4)); |
| 234 }); | 323 }); |
| 235 } | 324 } |
| OLD | NEW |