Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(79)

Side by Side Diff: tests/lib/async/stream_group_by_test.dart

Issue 2850393003: Add groupBy to Stream. (Closed)
Patch Set: Remove groupValuesBy, add tests. Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« sdk/lib/async/stream.dart ('K') | « sdk/lib/async/stream.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library stream_group_by_test;
6
7 import "dart:async";
8
9 import "package:expect/expect.dart";
10 import "package:async_helper/async_helper.dart";
11
12 int len(x) => x.length;
13 String wrap(x) => "[$x]";
14
15 void main() {
16 asyncStart();
17 // groupBy.
18 test("splits", () async {
19 var grouped = stringStream.groupBy<int>(len);
20 var byLength = <int, Future<List<String>>>{};
21 await for (StreamGroup<int, String> group in grouped) {
22 byLength[group.key] = group.values.toList();
23 }
24 Expect.listEquals([1, 2, 4, 3], byLength.keys.toList());
25 expectCompletes(byLength[1], ["a", "b"]);
26 expectCompletes(byLength[2], ["ab"]);
27 expectCompletes(byLength[3], ["abe", "lea"]);
28 expectCompletes(byLength[4], ["abel", "bell", "able", "abba"]);
29 });
30
31 test("empty", () async {
32 var grouped = emptyStream.groupBy<int>(len);
33 var byLength = <int, Future<List<String>>>{};
34 await for (StreamGroup<int, String> group in grouped) {
35 byLength[group.key] = group.values.toList();
36 }
37 Expect.isTrue(byLength.isEmpty);
38 });
39
40 test("single group", () async {
41 var grouped = repeatStream(5, "x").groupBy<int>(len);
42 var byLength = <int, Future<List<String>>>{};
43 await for (StreamGroup<int, String> group in grouped) {
44 byLength[group.key] = group.values.toList();
45 }
46 Expect.listEquals([1], byLength.keys.toList());
47 expectCompletes(byLength[1], ["x", "x", "x", "x", "x"]);
48 });
49
50 test("with error", () async {
51 var grouped = stringErrorStream(3).groupBy<int>(len);
52 var byLength = <int, Future<List<String>>>{};
53 bool caught = false;
54 try {
55 await for (StreamGroup<int, String> group in grouped) {
56 byLength[group.key] = group.values.toList();
57 }
58 } catch (e) {
59 Expect.equals("BAD", e);
60 caught = true;
61 }
62 Expect.isTrue(caught);
63 Expect.listEquals([1, 2, 4], byLength.keys.toList());
64 expectCompletes(byLength[1], ["a", "b"]);
65 expectCompletes(byLength[2], ["ab"]);
66 expectCompletes(byLength[4], ["abel"]);
67 });
68
69 // For comparison with later tests.
70 test("no pause or cancel", () async {
71 var grouped = stringStream.groupBy<int>(len);
72 var events = [];
73 bool caught = false;
74 var futures = [];
75 await grouped.forEach((sg) {
76 var key = sg.key;
77 var sub;
78 sub = sg.values.listen((value) {
79 events.add("$key:$value");
80 });
81 var c = new Completer();
82 futures.add(c.future);
83 sub.onDone(() {
84 c.complete(null);
85 });
86 });
87 await Future.wait(futures);
88 Expect.listEquals([
89 "1:a",
90 "2:ab",
91 "1:b",
92 "4:abel",
93 "3:abe",
94 "4:bell",
95 "4:able",
96 "4:abba",
97 "3:lea",
98 ], events);
99 });
100
101 test("pause on group", () async {
102 // Pausing the individial group's stream just makes it buffer.
103 var grouped = stringStream.groupBy<int>(len);
104 var events = [];
105 bool caught = false;
106 var futures = [];
107 await grouped.forEach((sg) {
108 var key = sg.key;
109 var sub;
110 sub = sg.values.listen((value) {
111 events.add("$key:$value");
112 if (value == "a") {
113 // Pause until a later timer event, which is after stringStream
114 // has delivered all events.
115 sub.pause(new Future.delayed(Duration.ZERO, () {}));
116 }
117 });
118 var c = new Completer();
119 futures.add(c.future);
120 sub.onDone(() {
121 c.complete(null);
122 });
123 });
124 await Future.wait(futures);
125 Expect.listEquals([
126 "1:a",
127 "2:ab",
128 "4:abel",
129 "3:abe",
130 "4:bell",
131 "4:able",
132 "4:abba",
133 "3:lea",
134 "1:b"
135 ], events);
136 });
137
138 test("pause on group-stream", () async {
139 // Pausing the stream returned by groupBy stops everything.
140 var grouped = stringStream.groupBy<int>(len);
141 var events = [];
142 bool caught = false;
143 var futures = [];
144 var done = new Completer();
145 var sub;
146 sub = grouped.listen((sg) {
147 var key = sg.key;
148 futures.add(sg.values.forEach((value) {
149 events.add("$key:$value");
150 if (value == "a") {
151 // Pause everything until a later timer event.
152 asyncStart();
153 var eventSnapshot = events.toList();
154 var delay = new Future.delayed(Duration.ZERO).then((_) {
155 // No events added.
156 Expect.listEquals(eventSnapshot, events);
157 asyncEnd(); // Ensures this test has run.
158 });
159 sub.pause(delay);
160 }
161 }));
162 });
163 sub.onDone(() {
164 done.complete(null);
165 });
166 futures.add(done.future);
167 await Future.wait(futures);
168 Expect.listEquals([
169 "1:a",
170 "2:ab",
171 "1:b",
172 "4:abel",
173 "3:abe",
174 "4:bell",
175 "4:able",
176 "4:abba",
177 "3:lea",
178 ], events);
179 });
180
181 test("cancel on group", () async {
182 // Cancelling the individial group's stream just makes that one stop.
183 var grouped = stringStream.groupBy<int>(len);
184 var events = [];
185 bool caught = false;
186 var futures = [];
187 await grouped.forEach((sg) {
188 var key = sg.key;
189 var sub;
190 var c = new Completer();
191 sub = sg.values.listen((value) {
192 events.add("$key:$value");
193 if (value == "bell") {
194 // Pause until a later timer event, which is after stringStream
195 // has delivered all events.
196 sub.cancel();
197 c.complete(null);
198 }
199 });
200 futures.add(c.future);
201 sub.onDone(() {
202 c.complete(null);
203 });
204 });
205 await Future.wait(futures);
206 Expect.listEquals([
207 "1:a",
208 "2:ab",
209 "1:b",
210 "4:abel",
211 "3:abe",
212 "4:bell",
213 "3:lea",
214 ], events);
215 });
216
217 test("cancel on group-stream", () async {
218 // Cancel the stream returned by groupBy ends everything.
219 var grouped = stringStream.groupBy<int>(len);
220 var events = [];
221 bool caught = false;
222 var futures = [];
223 var done = new Completer();
224 var sub;
225 sub = grouped.listen((sg) {
226 var key = sg.key;
227 futures.add(sg.values.forEach((value) {
228 events.add("$key:$value");
229 if (value == "bell") {
230 // Pause everything until a later timer event.
231 futures.add(sub.cancel());
232 done.complete();
233 }
234 }));
235 });
236 futures.add(done.future);
237 await Future.wait(futures);
238 Expect.listEquals([
239 "1:a",
240 "2:ab",
241 "1:b",
242 "4:abel",
243 "3:abe",
244 "4:bell",
245 ], events);
246 });
247
248 asyncEnd();
249 }
250
251 expectCompletes(future, result) {
252 asyncStart();
253 future.then((v) {
254 if (result is List) {
255 Expect.listEquals(result, v);
256 } else {
257 Expect.equals(v, result);
258 }
259 asyncEnd();
260 }, onError: (e, s) {
261 Expect.fail("$e\n$s");
262 });
263 }
264
265 void test(name, func) {
266 asyncStart();
267 func().then((_) {
268 asyncEnd();
269 }, onError: (e, s) {
270 Expect.fail("$name: $e\n$s");
271 });
272 }
273
274 var strings = const [
275 "a",
276 "ab",
277 "b",
278 "abel",
279 "abe",
280 "bell",
281 "able",
282 "abba",
283 "lea"
284 ];
285
286 Stream<String> get stringStream async* {
287 for (var string in strings) {
288 yield string;
289 }
290 }
291
292 Stream get emptyStream async* {}
293
294 Stream repeatStream(int count, value) async* {
295 for (var i = 0; i < count; i++) {
296 yield value;
297 }
298 }
299
300 // Just some valid stack trace.
301 var stack = StackTrace.current;
302
303 Stream<String> stringErrorStream(int errorAfter) async* {
304 for (int i = 0; i < strings.length; i++) {
305 yield strings[i];
306 if (i == errorAfter) {
307 // Emit error, but continue afterwards.
308 yield* new Future.error("BAD", stack).asStream();
309 }
310 }
311 }
312
313 Stream intStream(int count, [int start = 0]) async* {
314 for (int i = 0; i < count; i++) {
315 yield start++;
316 }
317 }
318
319 Stream timerStream(int count, Duration interval) async* {
320 for (int i = 0; i < count; i++) {
321 await new Future.delayed(interval);
322 yield i;
323 }
324 }
OLDNEW
« sdk/lib/async/stream.dart ('K') | « sdk/lib/async/stream.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698