OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 import "dart:async"; | |
6 import "package:sequence_zip/stream_zip.dart"; | |
7 import "package:unittest/unittest.dart"; | |
8 | |
9 /// Create an error with the same values as [base], except that it throwsA | |
10 /// when seeing the value [errorValue]. | |
11 Stream streamError(Stream base, int errorValue, error) { | |
12 return base.map((x) => (x == errorValue) ? throw error : x); | |
13 } | |
14 | |
15 /// Make a [Stream] from an [Iterable] by adding events to a stream controller | |
16 /// at periodic intervals. | |
17 Stream mks(Iterable iterable) { | |
18 Iterator iterator = iterable.iterator; | |
19 StreamController controller = new StreamController(); | |
20 // Some varying time between 3 and 10 ms. | |
21 int ms = ((++ctr) * 5) % 7 + 3; | |
22 new Timer.periodic(new Duration(milliseconds: ms), (Timer timer) { | |
23 if (iterator.moveNext()) { | |
24 controller.add(iterator.current); | |
25 } else { | |
26 controller.close(); | |
27 timer.cancel(); | |
28 } | |
29 }); | |
30 return controller.stream; | |
31 } | |
32 | |
33 /// Counter used to give varying delays for streams. | |
34 int ctr = 0; | |
35 | |
36 main() { | |
37 // Test that zipping [streams] gives the results iterated by [expectedData]. | |
38 testZip(Iterable streams, Iterable expectedData) { | |
39 List data = []; | |
40 Stream zip = new StreamZip(streams); | |
41 zip.listen(data.add, onDone: expectAsync0(() { | |
42 expect(data, equals(expectedData)); | |
43 })); | |
44 } | |
45 | |
46 test("Basic", () { | |
47 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9])], | |
48 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | |
49 }); | |
50 | |
51 test("Uneven length 1", () { | |
52 testZip([mks([1, 2, 3, 99, 100]), mks([4, 5, 6]), mks([7, 8, 9])], | |
53 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | |
54 }); | |
55 | |
56 test("Uneven length 2", () { | |
57 testZip([mks([1, 2, 3]), mks([4, 5, 6, 99, 100]), mks([7, 8, 9])], | |
58 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | |
59 }); | |
60 | |
61 test("Uneven length 3", () { | |
62 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])], | |
63 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | |
64 }); | |
65 | |
66 test("Uneven length 4", () { | |
67 testZip([mks([1, 2, 3, 98]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])], | |
68 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | |
69 }); | |
70 | |
71 test("Empty 1", () { | |
72 testZip([mks([]), mks([4, 5, 6]), mks([7, 8, 9])], []); | |
73 }); | |
74 | |
75 test("Empty 2", () { | |
76 testZip([mks([1, 2, 3]), mks([]), mks([7, 8, 9])], []); | |
77 }); | |
78 | |
79 test("Empty 3", () { | |
80 testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([])], []); | |
81 }); | |
82 | |
83 test("Empty source", () { | |
84 testZip([], []); | |
85 }); | |
86 | |
87 test("Single Source", () { | |
88 testZip([mks([1, 2, 3])], [[1], [2], [3]]); | |
89 }); | |
90 | |
91 test("Other-streams", () { | |
92 Stream st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4); | |
93 Stream st2 = new Stream.periodic(const Duration(milliseconds: 5), | |
94 (x) => x + 4).take(3); | |
95 StreamController c = new StreamController.broadcast(); | |
96 Stream st3 = c.stream; | |
97 testZip([st1, st2, st3], | |
98 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | |
99 c..add(7)..add(8)..add(9)..close(); | |
100 }); | |
101 | |
102 test("Error 1", () { | |
103 expect(new StreamZip([streamError(mks([1, 2, 3]), 2, "BAD-1"), | |
104 mks([4, 5, 6]), | |
105 mks([7, 8, 9])]).toList(), | |
106 throwsA(equals("BAD-1"))); | |
107 }); | |
108 | |
109 test("Error 2", () { | |
110 expect(new StreamZip([mks([1, 2, 3]), | |
111 streamError(mks([4, 5, 6]), 5, "BAD-2"), | |
112 mks([7, 8, 9])]).toList(), | |
113 throwsA(equals("BAD-2"))); | |
114 }); | |
115 | |
116 test("Error 3", () { | |
117 expect(new StreamZip([mks([1, 2, 3]), | |
118 mks([4, 5, 6]), | |
119 streamError(mks([7, 8, 9]), 8, "BAD-3")]).toList(), | |
120 throwsA(equals("BAD-3"))); | |
121 }); | |
122 | |
123 test("Error at end", () { | |
124 expect(new StreamZip([mks([1, 2, 3]), | |
125 streamError(mks([4, 5, 6]), 6, "BAD-4"), | |
126 mks([7, 8, 9])]).toList(), | |
127 throwsA(equals("BAD-4"))); | |
128 }); | |
129 | |
130 test("Error before first end", () { | |
131 // StreamControllers' streams with no "close" called will never be done, | |
132 // so the fourth event of the first stream is guaranteed to come first. | |
133 expect(new StreamZip( | |
134 [streamError(mks([1, 2, 3, 4]), 4, "BAD-5"), | |
135 (new StreamController()..add(4)..add(5)..add(6)).stream, | |
136 (new StreamController()..add(7)..add(8)..add(9)).stream] | |
137 ).toList(), | |
138 throwsA(equals("BAD-5"))); | |
139 }); | |
140 | |
141 test("Error after first end", () { | |
142 StreamController controller = new StreamController(); | |
143 controller..add(7)..add(8)..add(9); | |
144 // Transformer that puts error into controller when one of the first two | |
145 // streams have sent a done event. | |
146 StreamTransformer trans = new StreamTransformer.fromHandlers( | |
147 handleDone: (EventSink s) { | |
148 Timer.run(() { controller.addError("BAD-6"); }); | |
149 s.close(); | |
150 }); | |
151 testZip([mks([1, 2, 3]).transform(trans), | |
152 mks([4, 5, 6]).transform(trans), | |
153 controller.stream], | |
154 [[1, 4, 7], [2, 5, 8], [3, 6, 9]]); | |
155 }); | |
156 | |
157 test("Pause/Resume", () { | |
158 var done = expectAsync0((){}); // Call to complete test. | |
159 | |
160 int sc1p = 0; | |
161 StreamController c1 = new StreamController( | |
162 onPause: () { | |
163 sc1p++; | |
164 }, | |
165 onResume: () { | |
166 sc1p--; | |
167 }); | |
168 | |
169 int sc2p = 0; | |
170 StreamController c2 = new StreamController( | |
171 onPause: () { | |
172 sc2p++; | |
173 }, | |
174 onResume: () { | |
175 sc2p--; | |
176 }); | |
177 Stream zip = new StreamZip([c1.stream, c2.stream]); | |
178 | |
179 const ms25 = const Duration(milliseconds: 25); | |
180 | |
181 // StreamIterator uses pause and resume to control flow. | |
182 StreamIterator it = new StreamIterator(zip); | |
183 | |
184 it.moveNext().then((hasMore) { | |
185 expect(hasMore, isTrue); | |
186 expect(it.current, equals([1, 2])); | |
187 return it.moveNext(); | |
188 }).then((hasMore) { | |
189 expect(hasMore, isTrue); | |
190 expect(it.current, equals([3, 4])); | |
191 c2.add(6); | |
192 return it.moveNext(); | |
193 }).then((hasMore) { | |
194 expect(hasMore, isTrue); | |
195 expect(it.current, equals([5, 6])); | |
196 new Future.delayed(ms25).then((_) { c2.add(8); }); | |
197 return it.moveNext(); | |
198 }).then((hasMore) { | |
199 expect(hasMore, isTrue); | |
200 expect(it.current, equals([7, 8])); | |
201 c2.add(9); | |
202 return it.moveNext(); | |
203 }).then((hasMore) { | |
204 expect(hasMore, isFalse); | |
205 done(); | |
206 }); | |
207 | |
208 c1..add(1)..add(3)..add(5)..add(7)..close(); | |
209 c2..add(2)..add(4); | |
210 }); | |
211 | |
212 test("pause-resume2", () { | |
213 var s1 = new Stream.fromIterable([0, 2, 4, 6, 8]); | |
214 var s2 = new Stream.fromIterable([1, 3, 5, 7]); | |
215 var sz = new StreamZip([s1, s2]); | |
216 int ctr = 0; | |
217 var sub; | |
218 sub = sz.listen(expectAsync1((v) { | |
219 expect(v, equals([ctr * 2, ctr * 2 + 1])); | |
220 if (ctr == 1) { | |
221 sub.pause(new Future.delayed(const Duration(milliseconds: 25))); | |
222 } else if (ctr == 2) { | |
223 sub.pause(); | |
224 new Future.delayed(const Duration(milliseconds: 25)).then((_) { | |
225 sub.resume(); | |
226 }); | |
227 } | |
228 ctr++; | |
229 }, count: 4)); | |
230 }); | |
231 } | |
OLD | NEW |