Index: tests/lib/async/stream_group_by_test.dart |
diff --git a/tests/lib/async/stream_group_by_test.dart b/tests/lib/async/stream_group_by_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..edba9e089a9e49947a8cfb58ee7bedbaca6c731e |
--- /dev/null |
+++ b/tests/lib/async/stream_group_by_test.dart |
@@ -0,0 +1,319 @@ |
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library stream_group_by_test; |
+ |
+import "dart:async"; |
+ |
+import "package:expect/expect.dart"; |
+import "package:async_helper/async_helper.dart"; |
+ |
+int len(x) => x.length; |
+String wrap(x) => "[$x]"; |
+ |
+void main() { |
+ asyncStart(); |
+ // groupBy. |
+ test("splits", () async { |
+ var grouped = stringStream.groupBy<int>(len); |
+ var byLength = <int, Future<List<String>>>{}; |
+ await for (StreamGroup<int, String> group in grouped) { |
+ byLength[group.key] = group.values.toList(); |
+ } |
+ Expect.listEquals([1, 2, 4, 3], byLength.keys.toList()); |
+ expectCompletes(byLength[1], ["a", "b"]); |
+ expectCompletes(byLength[2], ["ab"]); |
+ expectCompletes(byLength[3], ["abe", "lea"]); |
+ expectCompletes(byLength[4], ["abel", "bell", "able", "abba"]); |
+ }); |
+ |
+ test("empty", () async { |
+ var grouped = emptyStream.groupBy<int>(len); |
+ var byLength = <int, Future<List<String>>>{}; |
+ await for (StreamGroup<int, String> group in grouped) { |
+ byLength[group.key] = group.values.toList(); |
+ } |
+ Expect.isTrue(byLength.isEmpty); |
+ }); |
+ |
+ test("single group", () async { |
+ var grouped = repeatStream(5, "x").groupBy<int>(len); |
+ var byLength = <int, Future<List<String>>>{}; |
+ await for (StreamGroup<int, String> group in grouped) { |
+ byLength[group.key] = group.values.toList(); |
+ } |
+ Expect.listEquals([1], byLength.keys.toList()); |
+ expectCompletes(byLength[1], ["x", "x", "x", "x", "x"]); |
+ }); |
+ |
+ test("with error", () async { |
+ var grouped = stringErrorStream(3).groupBy<int>(len); |
+ var byLength = <int, Future<List<String>>>{}; |
+ bool caught = false; |
+ try { |
+ await for (StreamGroup<int, String> group in grouped) { |
+ byLength[group.key] = group.values.toList(); |
+ } |
+ } catch (e) { |
+ Expect.equals("BAD", e); |
+ caught = true; |
+ } |
+ Expect.isTrue(caught); |
+ Expect.listEquals([1, 2, 4], byLength.keys.toList()); |
+ expectCompletes(byLength[1], ["a", "b"]); |
+ expectCompletes(byLength[2], ["ab"]); |
+ expectCompletes(byLength[4], ["abel"]); |
+ }); |
+ |
+ // For comparison with later tests. |
+ test("no pause or cancel", () async { |
+ var grouped = stringStream.groupBy<int>(len); |
+ var events = []; |
+ var futures = []; |
+ await grouped.forEach((sg) { |
+ var key = sg.key; |
+ var sub; |
+ sub = sg.values.listen((value) { |
+ events.add("$key:$value"); |
+ }); |
+ var c = new Completer(); |
+ futures.add(c.future); |
+ sub.onDone(() { |
+ c.complete(null); |
+ }); |
+ }); |
+ await Future.wait(futures); |
+ Expect.listEquals([ |
+ "1:a", |
+ "2:ab", |
+ "1:b", |
+ "4:abel", |
+ "3:abe", |
+ "4:bell", |
+ "4:able", |
+ "4:abba", |
+ "3:lea", |
+ ], events); |
+ }); |
+ |
+ test("pause on group", () async { |
+ // Pausing the individial group's stream just makes it buffer. |
+ var grouped = stringStream.groupBy<int>(len); |
+ var events = []; |
+ var futures = []; |
+ await grouped.forEach((sg) { |
+ var key = sg.key; |
+ var sub; |
+ sub = sg.values.listen((value) { |
+ events.add("$key:$value"); |
+ if (value == "a") { |
+ // Pause until a later timer event, which is after stringStream |
+ // has delivered all events. |
+ sub.pause(new Future.delayed(Duration.ZERO, () {})); |
+ } |
+ }); |
+ var c = new Completer(); |
+ futures.add(c.future); |
+ sub.onDone(() { |
+ c.complete(null); |
+ }); |
+ }); |
+ await Future.wait(futures); |
+ Expect.listEquals([ |
+ "1:a", |
+ "2:ab", |
+ "4:abel", |
+ "3:abe", |
+ "4:bell", |
+ "4:able", |
+ "4:abba", |
+ "3:lea", |
+ "1:b" |
+ ], events); |
+ }); |
+ |
+ test("pause on group-stream", () async { |
+ // Pausing the stream returned by groupBy stops everything. |
+ var grouped = stringStream.groupBy<int>(len); |
+ var events = []; |
+ var futures = []; |
+ var done = new Completer(); |
+ var sub; |
+ sub = grouped.listen((sg) { |
+ var key = sg.key; |
+ futures.add(sg.values.forEach((value) { |
+ events.add("$key:$value"); |
+ if (value == "a") { |
+ // Pause everything until a later timer event. |
+ asyncStart(); |
+ var eventSnapshot = events.toList(); |
+ var delay = new Future.delayed(Duration.ZERO).then((_) { |
+ // No events added. |
+ Expect.listEquals(eventSnapshot, events); |
+ asyncEnd(); // Ensures this test has run. |
+ }); |
+ sub.pause(delay); |
+ } |
+ })); |
+ }); |
+ sub.onDone(() { |
+ done.complete(null); |
+ }); |
+ futures.add(done.future); |
+ await Future.wait(futures); |
+ Expect.listEquals([ |
+ "1:a", |
+ "2:ab", |
+ "1:b", |
+ "4:abel", |
+ "3:abe", |
+ "4:bell", |
+ "4:able", |
+ "4:abba", |
+ "3:lea", |
+ ], events); |
+ }); |
+ |
+ test("cancel on group", () async { |
+ // Cancelling the individial group's stream just makes that one stop. |
+ var grouped = stringStream.groupBy<int>(len); |
+ var events = []; |
+ var futures = []; |
+ await grouped.forEach((sg) { |
+ var key = sg.key; |
+ var sub; |
+ var c = new Completer(); |
+ sub = sg.values.listen((value) { |
+ events.add("$key:$value"); |
+ if (value == "bell") { |
+ // Pause until a later timer event, which is after stringStream |
+ // has delivered all events. |
+ sub.cancel(); |
+ c.complete(null); |
+ } |
+ }); |
+ futures.add(c.future); |
+ sub.onDone(() { |
+ c.complete(null); |
+ }); |
+ }); |
+ await Future.wait(futures); |
+ Expect.listEquals([ |
+ "1:a", |
+ "2:ab", |
+ "1:b", |
+ "4:abel", |
+ "3:abe", |
+ "4:bell", |
+ "3:lea", |
+ ], events); |
+ }); |
+ |
+ test("cancel on group-stream", () async { |
+ // Cancel the stream returned by groupBy ends everything. |
+ var grouped = stringStream.groupBy<int>(len); |
+ var events = []; |
+ var futures = []; |
+ var done = new Completer(); |
+ var sub; |
+ sub = grouped.listen((sg) { |
+ var key = sg.key; |
+ futures.add(sg.values.forEach((value) { |
+ events.add("$key:$value"); |
+ if (value == "bell") { |
+ // Pause everything until a later timer event. |
+ futures.add(sub.cancel()); |
+ done.complete(); |
+ } |
+ })); |
+ }); |
+ futures.add(done.future); |
+ await Future.wait(futures); |
+ Expect.listEquals([ |
+ "1:a", |
+ "2:ab", |
+ "1:b", |
+ "4:abel", |
+ "3:abe", |
+ "4:bell", |
+ ], events); |
+ }); |
+ |
+ asyncEnd(); |
+} |
+ |
+expectCompletes(future, result) { |
+ asyncStart(); |
+ future.then((v) { |
+ if (result is List) { |
+ Expect.listEquals(result, v); |
+ } else { |
+ Expect.equals(v, result); |
+ } |
+ asyncEnd(); |
+ }, onError: (e, s) { |
+ Expect.fail("$e\n$s"); |
+ }); |
+} |
+ |
+void test(name, func) { |
+ asyncStart(); |
+ func().then((_) { |
+ asyncEnd(); |
+ }, onError: (e, s) { |
+ Expect.fail("$name: $e\n$s"); |
+ }); |
+} |
+ |
+var strings = const [ |
+ "a", |
+ "ab", |
+ "b", |
+ "abel", |
+ "abe", |
+ "bell", |
+ "able", |
+ "abba", |
+ "lea" |
+]; |
+ |
+Stream<String> get stringStream async* { |
+ for (var string in strings) { |
+ yield string; |
+ } |
+} |
+ |
+Stream get emptyStream async* {} |
+ |
+Stream repeatStream(int count, value) async* { |
+ for (var i = 0; i < count; i++) { |
+ yield value; |
+ } |
+} |
+ |
+// Just some valid stack trace. |
+var stack = StackTrace.current; |
+ |
+Stream<String> stringErrorStream(int errorAfter) async* { |
+ for (int i = 0; i < strings.length; i++) { |
+ yield strings[i]; |
+ if (i == errorAfter) { |
+ // Emit error, but continue afterwards. |
+ yield* new Future.error("BAD", stack).asStream(); |
+ } |
+ } |
+} |
+ |
+Stream intStream(int count, [int start = 0]) async* { |
+ for (int i = 0; i < count; i++) { |
+ yield start++; |
+ } |
+} |
+ |
+Stream timerStream(int count, Duration interval) async* { |
+ for (int i = 0; i < count; i++) { |
+ await new Future.delayed(interval); |
+ yield i; |
+ } |
+} |