OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library stream_group_by_test; |
| 6 |
| 7 import "dart:async"; |
| 8 |
| 9 import "package:expect/expect.dart"; |
| 10 import "package:async_helper/async_helper.dart"; |
| 11 |
| 12 int len(x) => x.length; |
| 13 String wrap(x) => "[$x]"; |
| 14 |
| 15 void main() { |
| 16 asyncStart(); |
| 17 // groupBy. |
| 18 test("splits", () async { |
| 19 var grouped = stringStream.groupBy<int>(len); |
| 20 var byLength = <int, Future<List<String>>>{}; |
| 21 await for (StreamGroup<int, String> group in grouped) { |
| 22 byLength[group.key] = group.values.toList(); |
| 23 } |
| 24 Expect.listEquals([1, 2, 4, 3], byLength.keys.toList()); |
| 25 expectCompletes(byLength[1], ["a", "b"]); |
| 26 expectCompletes(byLength[2], ["ab"]); |
| 27 expectCompletes(byLength[3], ["abe", "lea"]); |
| 28 expectCompletes(byLength[4], ["abel", "bell", "able", "abba"]); |
| 29 }); |
| 30 |
| 31 test("empty", () async { |
| 32 var grouped = emptyStream.groupBy<int>(len); |
| 33 var byLength = <int, Future<List<String>>>{}; |
| 34 await for (StreamGroup<int, String> group in grouped) { |
| 35 byLength[group.key] = group.values.toList(); |
| 36 } |
| 37 Expect.isTrue(byLength.isEmpty); |
| 38 }); |
| 39 |
| 40 test("single group", () async { |
| 41 var grouped = repeatStream(5, "x").groupBy<int>(len); |
| 42 var byLength = <int, Future<List<String>>>{}; |
| 43 await for (StreamGroup<int, String> group in grouped) { |
| 44 byLength[group.key] = group.values.toList(); |
| 45 } |
| 46 Expect.listEquals([1], byLength.keys.toList()); |
| 47 expectCompletes(byLength[1], ["x", "x", "x", "x", "x"]); |
| 48 }); |
| 49 |
| 50 test("with error", () async { |
| 51 var grouped = stringErrorStream(3).groupBy<int>(len); |
| 52 var byLength = <int, Future<List<String>>>{}; |
| 53 bool caught = false; |
| 54 try { |
| 55 await for (StreamGroup<int, String> group in grouped) { |
| 56 byLength[group.key] = group.values.toList(); |
| 57 } |
| 58 } catch (e) { |
| 59 Expect.equals("BAD", e); |
| 60 caught = true; |
| 61 } |
| 62 Expect.isTrue(caught); |
| 63 Expect.listEquals([1, 2, 4], byLength.keys.toList()); |
| 64 expectCompletes(byLength[1], ["a", "b"]); |
| 65 expectCompletes(byLength[2], ["ab"]); |
| 66 expectCompletes(byLength[4], ["abel"]); |
| 67 }); |
| 68 |
| 69 // For comparison with later tests. |
| 70 test("no pause or cancel", () async { |
| 71 var grouped = stringStream.groupBy<int>(len); |
| 72 var events = []; |
| 73 var futures = []; |
| 74 await grouped.forEach((sg) { |
| 75 var key = sg.key; |
| 76 var sub; |
| 77 sub = sg.values.listen((value) { |
| 78 events.add("$key:$value"); |
| 79 }); |
| 80 var c = new Completer(); |
| 81 futures.add(c.future); |
| 82 sub.onDone(() { |
| 83 c.complete(null); |
| 84 }); |
| 85 }); |
| 86 await Future.wait(futures); |
| 87 Expect.listEquals([ |
| 88 "1:a", |
| 89 "2:ab", |
| 90 "1:b", |
| 91 "4:abel", |
| 92 "3:abe", |
| 93 "4:bell", |
| 94 "4:able", |
| 95 "4:abba", |
| 96 "3:lea", |
| 97 ], events); |
| 98 }); |
| 99 |
| 100 test("pause on group", () async { |
| 101 // Pausing the individial group's stream just makes it buffer. |
| 102 var grouped = stringStream.groupBy<int>(len); |
| 103 var events = []; |
| 104 var futures = []; |
| 105 await grouped.forEach((sg) { |
| 106 var key = sg.key; |
| 107 var sub; |
| 108 sub = sg.values.listen((value) { |
| 109 events.add("$key:$value"); |
| 110 if (value == "a") { |
| 111 // Pause until a later timer event, which is after stringStream |
| 112 // has delivered all events. |
| 113 sub.pause(new Future.delayed(Duration.ZERO, () {})); |
| 114 } |
| 115 }); |
| 116 var c = new Completer(); |
| 117 futures.add(c.future); |
| 118 sub.onDone(() { |
| 119 c.complete(null); |
| 120 }); |
| 121 }); |
| 122 await Future.wait(futures); |
| 123 Expect.listEquals([ |
| 124 "1:a", |
| 125 "2:ab", |
| 126 "4:abel", |
| 127 "3:abe", |
| 128 "4:bell", |
| 129 "4:able", |
| 130 "4:abba", |
| 131 "3:lea", |
| 132 "1:b" |
| 133 ], events); |
| 134 }); |
| 135 |
| 136 test("pause on group-stream", () async { |
| 137 // Pausing the stream returned by groupBy stops everything. |
| 138 var grouped = stringStream.groupBy<int>(len); |
| 139 var events = []; |
| 140 var futures = []; |
| 141 var done = new Completer(); |
| 142 var sub; |
| 143 sub = grouped.listen((sg) { |
| 144 var key = sg.key; |
| 145 futures.add(sg.values.forEach((value) { |
| 146 events.add("$key:$value"); |
| 147 if (value == "a") { |
| 148 // Pause everything until a later timer event. |
| 149 asyncStart(); |
| 150 var eventSnapshot = events.toList(); |
| 151 var delay = new Future.delayed(Duration.ZERO).then((_) { |
| 152 // No events added. |
| 153 Expect.listEquals(eventSnapshot, events); |
| 154 asyncEnd(); // Ensures this test has run. |
| 155 }); |
| 156 sub.pause(delay); |
| 157 } |
| 158 })); |
| 159 }); |
| 160 sub.onDone(() { |
| 161 done.complete(null); |
| 162 }); |
| 163 futures.add(done.future); |
| 164 await Future.wait(futures); |
| 165 Expect.listEquals([ |
| 166 "1:a", |
| 167 "2:ab", |
| 168 "1:b", |
| 169 "4:abel", |
| 170 "3:abe", |
| 171 "4:bell", |
| 172 "4:able", |
| 173 "4:abba", |
| 174 "3:lea", |
| 175 ], events); |
| 176 }); |
| 177 |
| 178 test("cancel on group", () async { |
| 179 // Cancelling the individial group's stream just makes that one stop. |
| 180 var grouped = stringStream.groupBy<int>(len); |
| 181 var events = []; |
| 182 var futures = []; |
| 183 await grouped.forEach((sg) { |
| 184 var key = sg.key; |
| 185 var sub; |
| 186 var c = new Completer(); |
| 187 sub = sg.values.listen((value) { |
| 188 events.add("$key:$value"); |
| 189 if (value == "bell") { |
| 190 // Pause until a later timer event, which is after stringStream |
| 191 // has delivered all events. |
| 192 sub.cancel(); |
| 193 c.complete(null); |
| 194 } |
| 195 }); |
| 196 futures.add(c.future); |
| 197 sub.onDone(() { |
| 198 c.complete(null); |
| 199 }); |
| 200 }); |
| 201 await Future.wait(futures); |
| 202 Expect.listEquals([ |
| 203 "1:a", |
| 204 "2:ab", |
| 205 "1:b", |
| 206 "4:abel", |
| 207 "3:abe", |
| 208 "4:bell", |
| 209 "3:lea", |
| 210 ], events); |
| 211 }); |
| 212 |
| 213 test("cancel on group-stream", () async { |
| 214 // Cancel the stream returned by groupBy ends everything. |
| 215 var grouped = stringStream.groupBy<int>(len); |
| 216 var events = []; |
| 217 var futures = []; |
| 218 var done = new Completer(); |
| 219 var sub; |
| 220 sub = grouped.listen((sg) { |
| 221 var key = sg.key; |
| 222 futures.add(sg.values.forEach((value) { |
| 223 events.add("$key:$value"); |
| 224 if (value == "bell") { |
| 225 // Pause everything until a later timer event. |
| 226 futures.add(sub.cancel()); |
| 227 done.complete(); |
| 228 } |
| 229 })); |
| 230 }); |
| 231 futures.add(done.future); |
| 232 await Future.wait(futures); |
| 233 Expect.listEquals([ |
| 234 "1:a", |
| 235 "2:ab", |
| 236 "1:b", |
| 237 "4:abel", |
| 238 "3:abe", |
| 239 "4:bell", |
| 240 ], events); |
| 241 }); |
| 242 |
| 243 asyncEnd(); |
| 244 } |
| 245 |
| 246 expectCompletes(future, result) { |
| 247 asyncStart(); |
| 248 future.then((v) { |
| 249 if (result is List) { |
| 250 Expect.listEquals(result, v); |
| 251 } else { |
| 252 Expect.equals(v, result); |
| 253 } |
| 254 asyncEnd(); |
| 255 }, onError: (e, s) { |
| 256 Expect.fail("$e\n$s"); |
| 257 }); |
| 258 } |
| 259 |
| 260 void test(name, func) { |
| 261 asyncStart(); |
| 262 func().then((_) { |
| 263 asyncEnd(); |
| 264 }, onError: (e, s) { |
| 265 Expect.fail("$name: $e\n$s"); |
| 266 }); |
| 267 } |
| 268 |
| 269 var strings = const [ |
| 270 "a", |
| 271 "ab", |
| 272 "b", |
| 273 "abel", |
| 274 "abe", |
| 275 "bell", |
| 276 "able", |
| 277 "abba", |
| 278 "lea" |
| 279 ]; |
| 280 |
| 281 Stream<String> get stringStream async* { |
| 282 for (var string in strings) { |
| 283 yield string; |
| 284 } |
| 285 } |
| 286 |
| 287 Stream get emptyStream async* {} |
| 288 |
| 289 Stream repeatStream(int count, value) async* { |
| 290 for (var i = 0; i < count; i++) { |
| 291 yield value; |
| 292 } |
| 293 } |
| 294 |
| 295 // Just some valid stack trace. |
| 296 var stack = StackTrace.current; |
| 297 |
| 298 Stream<String> stringErrorStream(int errorAfter) async* { |
| 299 for (int i = 0; i < strings.length; i++) { |
| 300 yield strings[i]; |
| 301 if (i == errorAfter) { |
| 302 // Emit error, but continue afterwards. |
| 303 yield* new Future.error("BAD", stack).asStream(); |
| 304 } |
| 305 } |
| 306 } |
| 307 |
| 308 Stream intStream(int count, [int start = 0]) async* { |
| 309 for (int i = 0; i < count; i++) { |
| 310 yield start++; |
| 311 } |
| 312 } |
| 313 |
| 314 Stream timerStream(int count, Duration interval) async* { |
| 315 for (int i = 0; i < count; i++) { |
| 316 await new Future.delayed(interval); |
| 317 yield i; |
| 318 } |
| 319 } |
OLD | NEW |