OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 import "dart:async"; |
| 6 import "package:async/stream_zip.dart"; |
| 7 import "package:test/test.dart"; |
| 8 |
| 9 /// Create an error with the same values as [base], except that it throwsA |
| 10 /// when seeing the value [errorValue]. |
| 11 Stream streamError(Stream base, int errorValue, error) { |
| 12 return base.map((x) => (x == errorValue) ? throw error : x); |
| 13 } |
| 14 |
| 15 /// Make a [Stream] from an [Iterable] by adding events to a stream controller |
| 16 /// at periodic intervals. |
| 17 Stream mks(Iterable iterable) { |
| 18 Iterator iterator = iterable.iterator; |
| 19 StreamController controller = new StreamController(); |
| 20 // Some varying time between 3 and 10 ms. |
| 21 int ms = ((++ctr) * 5) % 7 + 3; |
| 22 new Timer.periodic(new Duration(milliseconds: ms), (Timer timer) { |
| 23 if (iterator.moveNext()) { |
| 24 controller.add(iterator.current); |
| 25 } else { |
| 26 controller.close(); |
| 27 timer.cancel(); |
| 28 } |
| 29 }); |
| 30 return controller.stream; |
| 31 } |
| 32 |
| 33 /// Counter used to give varying delays for streams. |
| 34 int ctr = 0; |
| 35 |
| 36 main() { |
| 37 // Test that zipping [streams] gives the results iterated by [expectedData]. |
| 38 testZip(Iterable streams, Iterable expectedData) { |
| 39 List data = []; |
| 40 Stream zip = new StreamZip(streams); |
| 41 zip.listen(data.add, onDone: expectAsync(() { |
| 42 expect(data, equals(expectedData)); |
| 43 })); |
| 44 } |
| 45 |
| 46 test("Basic", () { |
| 47 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9])], |
| 48 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); |
| 49 }); |
| 50 |
| 51 test("Uneven length 1", () { |
| 52 testZip([mks([1, 2, 3, 99, 100]), mks([4, 5, 6]), mks([7, 8, 9])], |
| 53 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); |
| 54 }); |
| 55 |
| 56 test("Uneven length 2", () { |
| 57 testZip([mks([1, 2, 3]), mks([4, 5, 6, 99, 100]), mks([7, 8, 9])], |
| 58 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); |
| 59 }); |
| 60 |
| 61 test("Uneven length 3", () { |
| 62 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])], |
| 63 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); |
| 64 }); |
| 65 |
| 66 test("Uneven length 4", () { |
| 67 testZip([mks([1, 2, 3, 98]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])], |
| 68 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); |
| 69 }); |
| 70 |
| 71 test("Empty 1", () { |
| 72 testZip([mks([]), mks([4, 5, 6]), mks([7, 8, 9])], []); |
| 73 }); |
| 74 |
| 75 test("Empty 2", () { |
| 76 testZip([mks([1, 2, 3]), mks([]), mks([7, 8, 9])], []); |
| 77 }); |
| 78 |
| 79 test("Empty 3", () { |
| 80 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([])], []); |
| 81 }); |
| 82 |
| 83 test("Empty source", () { |
| 84 testZip([], []); |
| 85 }); |
| 86 |
| 87 test("Single Source", () { |
| 88 testZip([mks([1, 2, 3])], [[1], [2], [3]]); |
| 89 }); |
| 90 |
| 91 test("Other-streams", () { |
| 92 Stream st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4); |
| 93 Stream st2 = new Stream.periodic(const Duration(milliseconds: 5), |
| 94 (x) => x + 4).take(3); |
| 95 StreamController c = new StreamController.broadcast(); |
| 96 Stream st3 = c.stream; |
| 97 testZip([st1, st2, st3], |
| 98 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); |
| 99 c..add(7)..add(8)..add(9)..close(); |
| 100 }); |
| 101 |
| 102 test("Error 1", () { |
| 103 expect(new StreamZip([streamError(mks([1, 2, 3]), 2, "BAD-1"), |
| 104 mks([4, 5, 6]), |
| 105 mks([7, 8, 9])]).toList(), |
| 106 throwsA(equals("BAD-1"))); |
| 107 }); |
| 108 |
| 109 test("Error 2", () { |
| 110 expect(new StreamZip([mks([1, 2, 3]), |
| 111 streamError(mks([4, 5, 6]), 5, "BAD-2"), |
| 112 mks([7, 8, 9])]).toList(), |
| 113 throwsA(equals("BAD-2"))); |
| 114 }); |
| 115 |
| 116 test("Error 3", () { |
| 117 expect(new StreamZip([mks([1, 2, 3]), |
| 118 mks([4, 5, 6]), |
| 119 streamError(mks([7, 8, 9]), 8, "BAD-3")]).toList(), |
| 120 throwsA(equals("BAD-3"))); |
| 121 }); |
| 122 |
| 123 test("Error at end", () { |
| 124 expect(new StreamZip([mks([1, 2, 3]), |
| 125 streamError(mks([4, 5, 6]), 6, "BAD-4"), |
| 126 mks([7, 8, 9])]).toList(), |
| 127 throwsA(equals("BAD-4"))); |
| 128 }); |
| 129 |
| 130 test("Error before first end", () { |
| 131 // StreamControllers' streams with no "close" called will never be done, |
| 132 // so the fourth event of the first stream is guaranteed to come first. |
| 133 expect(new StreamZip( |
| 134 [streamError(mks([1, 2, 3, 4]), 4, "BAD-5"), |
| 135 (new StreamController()..add(4)..add(5)..add(6)).stream, |
| 136 (new StreamController()..add(7)..add(8)..add(9)).stream] |
| 137 ).toList(), |
| 138 throwsA(equals("BAD-5"))); |
| 139 }); |
| 140 |
| 141 test("Error after first end", () { |
| 142 StreamController controller = new StreamController(); |
| 143 controller..add(7)..add(8)..add(9); |
| 144 // Transformer that puts error into controller when one of the first two |
| 145 // streams have sent a done event. |
| 146 StreamTransformer trans = new StreamTransformer.fromHandlers( |
| 147 handleDone: (EventSink s) { |
| 148 Timer.run(() { controller.addError("BAD-6"); }); |
| 149 s.close(); |
| 150 }); |
| 151 testZip([mks([1, 2, 3]).transform(trans), |
| 152 mks([4, 5, 6]).transform(trans), |
| 153 controller.stream], |
| 154 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); |
| 155 }); |
| 156 |
| 157 test("Pause/Resume", () { |
| 158 int sc1p = 0; |
| 159 StreamController c1 = new StreamController( |
| 160 onPause: () { |
| 161 sc1p++; |
| 162 }, |
| 163 onResume: () { |
| 164 sc1p--; |
| 165 }); |
| 166 |
| 167 int sc2p = 0; |
| 168 StreamController c2 = new StreamController( |
| 169 onPause: () { |
| 170 sc2p++; |
| 171 }, |
| 172 onResume: () { |
| 173 sc2p--; |
| 174 }); |
| 175 |
| 176 var done = expectAsync((){ |
| 177 expect(sc1p, equals(1)); |
| 178 expect(sc2p, equals(0)); |
| 179 }); // Call to complete test. |
| 180 |
| 181 Stream zip = new StreamZip([c1.stream, c2.stream]); |
| 182 |
| 183 const ms25 = const Duration(milliseconds: 25); |
| 184 |
| 185 // StreamIterator uses pause and resume to control flow. |
| 186 StreamIterator it = new StreamIterator(zip); |
| 187 |
| 188 it.moveNext().then((hasMore) { |
| 189 expect(hasMore, isTrue); |
| 190 expect(it.current, equals([1, 2])); |
| 191 return it.moveNext(); |
| 192 }).then((hasMore) { |
| 193 expect(hasMore, isTrue); |
| 194 expect(it.current, equals([3, 4])); |
| 195 c2.add(6); |
| 196 return it.moveNext(); |
| 197 }).then((hasMore) { |
| 198 expect(hasMore, isTrue); |
| 199 expect(it.current, equals([5, 6])); |
| 200 new Future.delayed(ms25).then((_) { c2.add(8); }); |
| 201 return it.moveNext(); |
| 202 }).then((hasMore) { |
| 203 expect(hasMore, isTrue); |
| 204 expect(it.current, equals([7, 8])); |
| 205 c2.add(9); |
| 206 return it.moveNext(); |
| 207 }).then((hasMore) { |
| 208 expect(hasMore, isFalse); |
| 209 done(); |
| 210 }); |
| 211 |
| 212 c1..add(1)..add(3)..add(5)..add(7)..close(); |
| 213 c2..add(2)..add(4); |
| 214 }); |
| 215 |
| 216 test("pause-resume2", () { |
| 217 var s1 = new Stream.fromIterable([0, 2, 4, 6, 8]); |
| 218 var s2 = new Stream.fromIterable([1, 3, 5, 7]); |
| 219 var sz = new StreamZip([s1, s2]); |
| 220 int ctr = 0; |
| 221 var sub; |
| 222 sub = sz.listen(expectAsync((v) { |
| 223 expect(v, equals([ctr * 2, ctr * 2 + 1])); |
| 224 if (ctr == 1) { |
| 225 sub.pause(new Future.delayed(const Duration(milliseconds: 25))); |
| 226 } else if (ctr == 2) { |
| 227 sub.pause(); |
| 228 new Future.delayed(const Duration(milliseconds: 25)).then((_) { |
| 229 sub.resume(); |
| 230 }); |
| 231 } |
| 232 ctr++; |
| 233 }, count: 4)); |
| 234 }); |
| 235 } |
OLD | NEW |