Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(139)

Side by Side Diff: tests/lib/async/stream_group_by_test.dart

Issue 2850393003: Add groupBy to Stream. (Closed)
Patch Set: Fix typos and warnings Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library stream_group_by_test;
6
7 import "dart:async";
8
9 import "package:expect/expect.dart";
10 import "package:async_helper/async_helper.dart";
11
12 int len(x) => x.length;
13 String wrap(x) => "[$x]";
14
15 void main() {
16 asyncStart();
17 // groupBy.
18 test("splits", () async {
19 var grouped = stringStream.groupBy<int>(len);
20 var byLength = <int, Future<List<String>>>{};
21 await for (StreamGroup<int, String> group in grouped) {
22 byLength[group.key] = group.values.toList();
23 }
24 Expect.listEquals([1, 2, 4, 3], byLength.keys.toList());
25 expectCompletes(byLength[1], ["a", "b"]);
26 expectCompletes(byLength[2], ["ab"]);
27 expectCompletes(byLength[3], ["abe", "lea"]);
28 expectCompletes(byLength[4], ["abel", "bell", "able", "abba"]);
29 });
30
31 test("empty", () async {
32 var grouped = emptyStream.groupBy<int>(len);
33 var byLength = <int, Future<List<String>>>{};
34 await for (StreamGroup<int, String> group in grouped) {
35 byLength[group.key] = group.values.toList();
36 }
37 Expect.isTrue(byLength.isEmpty);
38 });
39
40 test("single group", () async {
41 var grouped = repeatStream(5, "x").groupBy<int>(len);
42 var byLength = <int, Future<List<String>>>{};
43 await for (StreamGroup<int, String> group in grouped) {
44 byLength[group.key] = group.values.toList();
45 }
46 Expect.listEquals([1], byLength.keys.toList());
47 expectCompletes(byLength[1], ["x", "x", "x", "x", "x"]);
48 });
49
50 test("with error", () async {
51 var grouped = stringErrorStream(3).groupBy<int>(len);
52 var byLength = <int, Future<List<String>>>{};
53 bool caught = false;
54 try {
55 await for (StreamGroup<int, String> group in grouped) {
56 byLength[group.key] = group.values.toList();
57 }
58 } catch (e) {
59 Expect.equals("BAD", e);
60 caught = true;
61 }
62 Expect.isTrue(caught);
63 Expect.listEquals([1, 2, 4], byLength.keys.toList());
64 expectCompletes(byLength[1], ["a", "b"]);
65 expectCompletes(byLength[2], ["ab"]);
66 expectCompletes(byLength[4], ["abel"]);
67 });
68
69 // For comparison with later tests.
70 test("no pause or cancel", () async {
71 var grouped = stringStream.groupBy<int>(len);
72 var events = [];
73 var futures = [];
74 await grouped.forEach((sg) {
75 var key = sg.key;
76 var sub;
77 sub = sg.values.listen((value) {
78 events.add("$key:$value");
79 });
80 var c = new Completer();
81 futures.add(c.future);
82 sub.onDone(() {
83 c.complete(null);
84 });
85 });
86 await Future.wait(futures);
87 Expect.listEquals([
88 "1:a",
89 "2:ab",
90 "1:b",
91 "4:abel",
92 "3:abe",
93 "4:bell",
94 "4:able",
95 "4:abba",
96 "3:lea",
97 ], events);
98 });
99
100 test("pause on group", () async {
101 // Pausing the individial group's stream just makes it buffer.
102 var grouped = stringStream.groupBy<int>(len);
103 var events = [];
104 var futures = [];
105 await grouped.forEach((sg) {
106 var key = sg.key;
107 var sub;
108 sub = sg.values.listen((value) {
109 events.add("$key:$value");
110 if (value == "a") {
111 // Pause until a later timer event, which is after stringStream
112 // has delivered all events.
113 sub.pause(new Future.delayed(Duration.ZERO, () {}));
114 }
115 });
116 var c = new Completer();
117 futures.add(c.future);
118 sub.onDone(() {
119 c.complete(null);
120 });
121 });
122 await Future.wait(futures);
123 Expect.listEquals([
124 "1:a",
125 "2:ab",
126 "4:abel",
127 "3:abe",
128 "4:bell",
129 "4:able",
130 "4:abba",
131 "3:lea",
132 "1:b"
133 ], events);
134 });
135
136 test("pause on group-stream", () async {
137 // Pausing the stream returned by groupBy stops everything.
138 var grouped = stringStream.groupBy<int>(len);
139 var events = [];
140 var futures = [];
141 var done = new Completer();
142 var sub;
143 sub = grouped.listen((sg) {
144 var key = sg.key;
145 futures.add(sg.values.forEach((value) {
146 events.add("$key:$value");
147 if (value == "a") {
148 // Pause everything until a later timer event.
149 asyncStart();
150 var eventSnapshot = events.toList();
151 var delay = new Future.delayed(Duration.ZERO).then((_) {
152 // No events added.
153 Expect.listEquals(eventSnapshot, events);
154 asyncEnd(); // Ensures this test has run.
155 });
156 sub.pause(delay);
157 }
158 }));
159 });
160 sub.onDone(() {
161 done.complete(null);
162 });
163 futures.add(done.future);
164 await Future.wait(futures);
165 Expect.listEquals([
166 "1:a",
167 "2:ab",
168 "1:b",
169 "4:abel",
170 "3:abe",
171 "4:bell",
172 "4:able",
173 "4:abba",
174 "3:lea",
175 ], events);
176 });
177
178 test("cancel on group", () async {
179 // Cancelling the individial group's stream just makes that one stop.
180 var grouped = stringStream.groupBy<int>(len);
181 var events = [];
182 var futures = [];
183 await grouped.forEach((sg) {
184 var key = sg.key;
185 var sub;
186 var c = new Completer();
187 sub = sg.values.listen((value) {
188 events.add("$key:$value");
189 if (value == "bell") {
190 // Pause until a later timer event, which is after stringStream
191 // has delivered all events.
192 sub.cancel();
193 c.complete(null);
194 }
195 });
196 futures.add(c.future);
197 sub.onDone(() {
198 c.complete(null);
199 });
200 });
201 await Future.wait(futures);
202 Expect.listEquals([
203 "1:a",
204 "2:ab",
205 "1:b",
206 "4:abel",
207 "3:abe",
208 "4:bell",
209 "3:lea",
210 ], events);
211 });
212
213 test("cancel on group-stream", () async {
214 // Cancel the stream returned by groupBy ends everything.
215 var grouped = stringStream.groupBy<int>(len);
216 var events = [];
217 var futures = [];
218 var done = new Completer();
219 var sub;
220 sub = grouped.listen((sg) {
221 var key = sg.key;
222 futures.add(sg.values.forEach((value) {
223 events.add("$key:$value");
224 if (value == "bell") {
225 // Pause everything until a later timer event.
226 futures.add(sub.cancel());
227 done.complete();
228 }
229 }));
230 });
231 futures.add(done.future);
232 await Future.wait(futures);
233 Expect.listEquals([
234 "1:a",
235 "2:ab",
236 "1:b",
237 "4:abel",
238 "3:abe",
239 "4:bell",
240 ], events);
241 });
242
243 asyncEnd();
244 }
245
246 expectCompletes(future, result) {
247 asyncStart();
248 future.then((v) {
249 if (result is List) {
250 Expect.listEquals(result, v);
251 } else {
252 Expect.equals(v, result);
253 }
254 asyncEnd();
255 }, onError: (e, s) {
256 Expect.fail("$e\n$s");
257 });
258 }
259
260 void test(name, func) {
261 asyncStart();
262 func().then((_) {
263 asyncEnd();
264 }, onError: (e, s) {
265 Expect.fail("$name: $e\n$s");
266 });
267 }
268
269 var strings = const [
270 "a",
271 "ab",
272 "b",
273 "abel",
274 "abe",
275 "bell",
276 "able",
277 "abba",
278 "lea"
279 ];
280
281 Stream<String> get stringStream async* {
282 for (var string in strings) {
283 yield string;
284 }
285 }
286
287 Stream get emptyStream async* {}
288
289 Stream repeatStream(int count, value) async* {
290 for (var i = 0; i < count; i++) {
291 yield value;
292 }
293 }
294
295 // Just some valid stack trace.
296 var stack = StackTrace.current;
297
298 Stream<String> stringErrorStream(int errorAfter) async* {
299 for (int i = 0; i < strings.length; i++) {
300 yield strings[i];
301 if (i == errorAfter) {
302 // Emit error, but continue afterwards.
303 yield* new Future.error("BAD", stack).asStream();
304 }
305 }
306 }
307
308 Stream intStream(int count, [int start = 0]) async* {
309 for (int i = 0; i < count; i++) {
310 yield start++;
311 }
312 }
313
314 Stream timerStream(int count, Duration interval) async* {
315 for (int i = 0; i < count; i++) {
316 await new Future.delayed(interval);
317 yield i;
318 }
319 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698