OLD | NEW |
| (Empty) |
1 // Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library stream_group_by_test; | |
6 | |
7 import "dart:async"; | |
8 | |
9 import "package:expect/expect.dart"; | |
10 import "package:async_helper/async_helper.dart"; | |
11 | |
12 int len(x) => x.length; | |
13 String wrap(x) => "[$x]"; | |
14 | |
15 void main() { | |
16 asyncStart(); | |
17 // groupBy. | |
18 test("splits", () async { | |
19 var grouped = stringStream.groupBy<int>(len); | |
20 var byLength = <int, Future<List<String>>>{}; | |
21 await for (GroupedEvents<int, String> group in grouped) { | |
22 byLength[group.key] = group.values.toList(); | |
23 } | |
24 Expect.listEquals([1, 2, 4, 3], byLength.keys.toList()); | |
25 expectCompletes(byLength[1], ["a", "b"]); | |
26 expectCompletes(byLength[2], ["ab"]); | |
27 expectCompletes(byLength[3], ["abe", "lea"]); | |
28 expectCompletes(byLength[4], ["abel", "bell", "able", "abba"]); | |
29 }); | |
30 | |
31 test("empty", () async { | |
32 var grouped = emptyStream.groupBy<int>(len); | |
33 var byLength = <int, Future<List<String>>>{}; | |
34 await for (GroupedEvents<int, String> group in grouped) { | |
35 byLength[group.key] = group.values.toList(); | |
36 } | |
37 Expect.isTrue(byLength.isEmpty); | |
38 }); | |
39 | |
40 test("single group", () async { | |
41 var grouped = repeatStream(5, "x").groupBy<int>(len); | |
42 var byLength = <int, Future<List<String>>>{}; | |
43 await for (GroupedEvents<int, String> group in grouped) { | |
44 byLength[group.key] = group.values.toList(); | |
45 } | |
46 Expect.listEquals([1], byLength.keys.toList()); | |
47 expectCompletes(byLength[1], ["x", "x", "x", "x", "x"]); | |
48 }); | |
49 | |
50 test("with error", () async { | |
51 var grouped = stringErrorStream(3).groupBy<int>(len); | |
52 var byLength = <int, Future<List<String>>>{}; | |
53 bool caught = false; | |
54 try { | |
55 await for (GroupedEvents<int, String> group in grouped) { | |
56 byLength[group.key] = group.values.toList(); | |
57 } | |
58 } catch (e) { | |
59 Expect.equals("BAD", e); | |
60 caught = true; | |
61 } | |
62 Expect.isTrue(caught); | |
63 Expect.listEquals([1, 2, 4], byLength.keys.toList()); | |
64 expectCompletes(byLength[1], ["a", "b"]); | |
65 expectCompletes(byLength[2], ["ab"]); | |
66 expectCompletes(byLength[4], ["abel"]); | |
67 }); | |
68 | |
69 // For comparison with later tests. | |
70 test("no pause or cancel", () async { | |
71 var grouped = stringStream.groupBy<int>(len); | |
72 var events = []; | |
73 var futures = []; | |
74 await grouped.forEach((sg) { | |
75 var key = sg.key; | |
76 var sub; | |
77 sub = sg.values.listen((value) { | |
78 events.add("$key:$value"); | |
79 }); | |
80 var c = new Completer(); | |
81 futures.add(c.future); | |
82 sub.onDone(() { | |
83 c.complete(null); | |
84 }); | |
85 }); | |
86 await Future.wait(futures); | |
87 Expect.listEquals([ | |
88 "1:a", | |
89 "2:ab", | |
90 "1:b", | |
91 "4:abel", | |
92 "3:abe", | |
93 "4:bell", | |
94 "4:able", | |
95 "4:abba", | |
96 "3:lea", | |
97 ], events); | |
98 }); | |
99 | |
100 test("pause on group", () async { | |
101 // Pausing the individial group's stream just makes it buffer. | |
102 var grouped = stringStream.groupBy<int>(len); | |
103 var events = []; | |
104 var futures = []; | |
105 await grouped.forEach((sg) { | |
106 var key = sg.key; | |
107 var sub; | |
108 sub = sg.values.listen((value) { | |
109 events.add("$key:$value"); | |
110 if (value == "a") { | |
111 // Pause until a later timer event, which is after stringStream | |
112 // has delivered all events. | |
113 sub.pause(new Future.delayed(Duration.ZERO, () {})); | |
114 } | |
115 }); | |
116 var c = new Completer(); | |
117 futures.add(c.future); | |
118 sub.onDone(() { | |
119 c.complete(null); | |
120 }); | |
121 }); | |
122 await Future.wait(futures); | |
123 Expect.listEquals([ | |
124 "1:a", | |
125 "2:ab", | |
126 "4:abel", | |
127 "3:abe", | |
128 "4:bell", | |
129 "4:able", | |
130 "4:abba", | |
131 "3:lea", | |
132 "1:b" | |
133 ], events); | |
134 }); | |
135 | |
136 test("pause on group-stream", () async { | |
137 // Pausing the stream returned by groupBy stops everything. | |
138 var grouped = stringStream.groupBy<int>(len); | |
139 var events = []; | |
140 var futures = []; | |
141 var done = new Completer(); | |
142 var sub; | |
143 sub = grouped.listen((sg) { | |
144 var key = sg.key; | |
145 futures.add(sg.values.forEach((value) { | |
146 events.add("$key:$value"); | |
147 if (value == "a") { | |
148 // Pause everything until a later timer event. | |
149 asyncStart(); | |
150 var eventSnapshot = events.toList(); | |
151 var delay = new Future.delayed(Duration.ZERO).then((_) { | |
152 // No events added. | |
153 Expect.listEquals(eventSnapshot, events); | |
154 asyncEnd(); // Ensures this test has run. | |
155 }); | |
156 sub.pause(delay); | |
157 } | |
158 })); | |
159 }); | |
160 sub.onDone(() { | |
161 done.complete(null); | |
162 }); | |
163 futures.add(done.future); | |
164 await Future.wait(futures); | |
165 Expect.listEquals([ | |
166 "1:a", | |
167 "2:ab", | |
168 "1:b", | |
169 "4:abel", | |
170 "3:abe", | |
171 "4:bell", | |
172 "4:able", | |
173 "4:abba", | |
174 "3:lea", | |
175 ], events); | |
176 }); | |
177 | |
178 test("cancel on group", () async { | |
179 // Cancelling the individial group's stream just makes that one stop. | |
180 var grouped = stringStream.groupBy<int>(len); | |
181 var events = []; | |
182 var futures = []; | |
183 await grouped.forEach((sg) { | |
184 var key = sg.key; | |
185 var sub; | |
186 var c = new Completer(); | |
187 sub = sg.values.listen((value) { | |
188 events.add("$key:$value"); | |
189 if (value == "bell") { | |
190 // Pause until a later timer event, which is after stringStream | |
191 // has delivered all events. | |
192 sub.cancel(); | |
193 c.complete(null); | |
194 } | |
195 }); | |
196 futures.add(c.future); | |
197 sub.onDone(() { | |
198 c.complete(null); | |
199 }); | |
200 }); | |
201 await Future.wait(futures); | |
202 Expect.listEquals([ | |
203 "1:a", | |
204 "2:ab", | |
205 "1:b", | |
206 "4:abel", | |
207 "3:abe", | |
208 "4:bell", | |
209 "3:lea", | |
210 ], events); | |
211 }); | |
212 | |
213 test("cancel on group-stream", () async { | |
214 // Cancel the stream returned by groupBy ends everything. | |
215 var grouped = stringStream.groupBy<int>(len); | |
216 var events = []; | |
217 var futures = []; | |
218 var done = new Completer(); | |
219 var sub; | |
220 sub = grouped.listen((sg) { | |
221 var key = sg.key; | |
222 futures.add(sg.values.forEach((value) { | |
223 events.add("$key:$value"); | |
224 if (value == "bell") { | |
225 // Pause everything until a later timer event. | |
226 futures.add(sub.cancel()); | |
227 done.complete(); | |
228 } | |
229 })); | |
230 }); | |
231 futures.add(done.future); | |
232 await Future.wait(futures); | |
233 Expect.listEquals([ | |
234 "1:a", | |
235 "2:ab", | |
236 "1:b", | |
237 "4:abel", | |
238 "3:abe", | |
239 "4:bell", | |
240 ], events); | |
241 }); | |
242 | |
243 asyncEnd(); | |
244 } | |
245 | |
246 expectCompletes(future, result) { | |
247 asyncStart(); | |
248 future.then((v) { | |
249 if (result is List) { | |
250 Expect.listEquals(result, v); | |
251 } else { | |
252 Expect.equals(v, result); | |
253 } | |
254 asyncEnd(); | |
255 }, onError: (e, s) { | |
256 Expect.fail("$e\n$s"); | |
257 }); | |
258 } | |
259 | |
260 void test(name, func) { | |
261 asyncStart(); | |
262 func().then((_) { | |
263 asyncEnd(); | |
264 }, onError: (e, s) { | |
265 Expect.fail("$name: $e\n$s"); | |
266 }); | |
267 } | |
268 | |
269 var strings = const [ | |
270 "a", | |
271 "ab", | |
272 "b", | |
273 "abel", | |
274 "abe", | |
275 "bell", | |
276 "able", | |
277 "abba", | |
278 "lea" | |
279 ]; | |
280 | |
281 Stream<String> get stringStream async* { | |
282 for (var string in strings) { | |
283 yield string; | |
284 } | |
285 } | |
286 | |
287 Stream get emptyStream async* {} | |
288 | |
289 Stream repeatStream(int count, value) async* { | |
290 for (var i = 0; i < count; i++) { | |
291 yield value; | |
292 } | |
293 } | |
294 | |
295 // Just some valid stack trace. | |
296 var stack = StackTrace.current; | |
297 | |
298 Stream<String> stringErrorStream(int errorAfter) async* { | |
299 for (int i = 0; i < strings.length; i++) { | |
300 yield strings[i]; | |
301 if (i == errorAfter) { | |
302 // Emit error, but continue afterwards. | |
303 yield* new Future.error("BAD", stack).asStream(); | |
304 } | |
305 } | |
306 } | |
307 | |
308 Stream intStream(int count, [int start = 0]) async* { | |
309 for (int i = 0; i < count; i++) { | |
310 yield start++; | |
311 } | |
312 } | |
313 | |
314 Stream timerStream(int count, Duration interval) async* { | |
315 for (int i = 0; i < count; i++) { | |
316 await new Future.delayed(interval); | |
317 yield i; | |
318 } | |
319 } | |
OLD | NEW |