Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(249)

Unified Diff: tests/lib/async/stream_group_by_test.dart

Issue 2850393003: Add groupBy to Stream. (Closed)
Patch Set: Fix typos and warnings Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tests/lib/async/stream_group_by_test.dart
diff --git a/tests/lib/async/stream_group_by_test.dart b/tests/lib/async/stream_group_by_test.dart
new file mode 100644
index 0000000000000000000000000000000000000000..edba9e089a9e49947a8cfb58ee7bedbaca6c731e
--- /dev/null
+++ b/tests/lib/async/stream_group_by_test.dart
@@ -0,0 +1,319 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library stream_group_by_test;
+
+import "dart:async";
+
+import "package:expect/expect.dart";
+import "package:async_helper/async_helper.dart";
+
+int len(x) => x.length;
+String wrap(x) => "[$x]";
+
+void main() {
+ asyncStart();
+ // groupBy.
+ test("splits", () async {
+ var grouped = stringStream.groupBy<int>(len);
+ var byLength = <int, Future<List<String>>>{};
+ await for (StreamGroup<int, String> group in grouped) {
+ byLength[group.key] = group.values.toList();
+ }
+ Expect.listEquals([1, 2, 4, 3], byLength.keys.toList());
+ expectCompletes(byLength[1], ["a", "b"]);
+ expectCompletes(byLength[2], ["ab"]);
+ expectCompletes(byLength[3], ["abe", "lea"]);
+ expectCompletes(byLength[4], ["abel", "bell", "able", "abba"]);
+ });
+
+ test("empty", () async {
+ var grouped = emptyStream.groupBy<int>(len);
+ var byLength = <int, Future<List<String>>>{};
+ await for (StreamGroup<int, String> group in grouped) {
+ byLength[group.key] = group.values.toList();
+ }
+ Expect.isTrue(byLength.isEmpty);
+ });
+
+ test("single group", () async {
+ var grouped = repeatStream(5, "x").groupBy<int>(len);
+ var byLength = <int, Future<List<String>>>{};
+ await for (StreamGroup<int, String> group in grouped) {
+ byLength[group.key] = group.values.toList();
+ }
+ Expect.listEquals([1], byLength.keys.toList());
+ expectCompletes(byLength[1], ["x", "x", "x", "x", "x"]);
+ });
+
+ test("with error", () async {
+ var grouped = stringErrorStream(3).groupBy<int>(len);
+ var byLength = <int, Future<List<String>>>{};
+ bool caught = false;
+ try {
+ await for (StreamGroup<int, String> group in grouped) {
+ byLength[group.key] = group.values.toList();
+ }
+ } catch (e) {
+ Expect.equals("BAD", e);
+ caught = true;
+ }
+ Expect.isTrue(caught);
+ Expect.listEquals([1, 2, 4], byLength.keys.toList());
+ expectCompletes(byLength[1], ["a", "b"]);
+ expectCompletes(byLength[2], ["ab"]);
+ expectCompletes(byLength[4], ["abel"]);
+ });
+
+ // For comparison with later tests.
+ test("no pause or cancel", () async {
+ var grouped = stringStream.groupBy<int>(len);
+ var events = [];
+ var futures = [];
+ await grouped.forEach((sg) {
+ var key = sg.key;
+ var sub;
+ sub = sg.values.listen((value) {
+ events.add("$key:$value");
+ });
+ var c = new Completer();
+ futures.add(c.future);
+ sub.onDone(() {
+ c.complete(null);
+ });
+ });
+ await Future.wait(futures);
+ Expect.listEquals([
+ "1:a",
+ "2:ab",
+ "1:b",
+ "4:abel",
+ "3:abe",
+ "4:bell",
+ "4:able",
+ "4:abba",
+ "3:lea",
+ ], events);
+ });
+
+ test("pause on group", () async {
+ // Pausing the individial group's stream just makes it buffer.
+ var grouped = stringStream.groupBy<int>(len);
+ var events = [];
+ var futures = [];
+ await grouped.forEach((sg) {
+ var key = sg.key;
+ var sub;
+ sub = sg.values.listen((value) {
+ events.add("$key:$value");
+ if (value == "a") {
+ // Pause until a later timer event, which is after stringStream
+ // has delivered all events.
+ sub.pause(new Future.delayed(Duration.ZERO, () {}));
+ }
+ });
+ var c = new Completer();
+ futures.add(c.future);
+ sub.onDone(() {
+ c.complete(null);
+ });
+ });
+ await Future.wait(futures);
+ Expect.listEquals([
+ "1:a",
+ "2:ab",
+ "4:abel",
+ "3:abe",
+ "4:bell",
+ "4:able",
+ "4:abba",
+ "3:lea",
+ "1:b"
+ ], events);
+ });
+
+ test("pause on group-stream", () async {
+ // Pausing the stream returned by groupBy stops everything.
+ var grouped = stringStream.groupBy<int>(len);
+ var events = [];
+ var futures = [];
+ var done = new Completer();
+ var sub;
+ sub = grouped.listen((sg) {
+ var key = sg.key;
+ futures.add(sg.values.forEach((value) {
+ events.add("$key:$value");
+ if (value == "a") {
+ // Pause everything until a later timer event.
+ asyncStart();
+ var eventSnapshot = events.toList();
+ var delay = new Future.delayed(Duration.ZERO).then((_) {
+ // No events added.
+ Expect.listEquals(eventSnapshot, events);
+ asyncEnd(); // Ensures this test has run.
+ });
+ sub.pause(delay);
+ }
+ }));
+ });
+ sub.onDone(() {
+ done.complete(null);
+ });
+ futures.add(done.future);
+ await Future.wait(futures);
+ Expect.listEquals([
+ "1:a",
+ "2:ab",
+ "1:b",
+ "4:abel",
+ "3:abe",
+ "4:bell",
+ "4:able",
+ "4:abba",
+ "3:lea",
+ ], events);
+ });
+
+ test("cancel on group", () async {
+ // Cancelling the individial group's stream just makes that one stop.
+ var grouped = stringStream.groupBy<int>(len);
+ var events = [];
+ var futures = [];
+ await grouped.forEach((sg) {
+ var key = sg.key;
+ var sub;
+ var c = new Completer();
+ sub = sg.values.listen((value) {
+ events.add("$key:$value");
+ if (value == "bell") {
+ // Pause until a later timer event, which is after stringStream
+ // has delivered all events.
+ sub.cancel();
+ c.complete(null);
+ }
+ });
+ futures.add(c.future);
+ sub.onDone(() {
+ c.complete(null);
+ });
+ });
+ await Future.wait(futures);
+ Expect.listEquals([
+ "1:a",
+ "2:ab",
+ "1:b",
+ "4:abel",
+ "3:abe",
+ "4:bell",
+ "3:lea",
+ ], events);
+ });
+
+ test("cancel on group-stream", () async {
+ // Cancel the stream returned by groupBy ends everything.
+ var grouped = stringStream.groupBy<int>(len);
+ var events = [];
+ var futures = [];
+ var done = new Completer();
+ var sub;
+ sub = grouped.listen((sg) {
+ var key = sg.key;
+ futures.add(sg.values.forEach((value) {
+ events.add("$key:$value");
+ if (value == "bell") {
+ // Pause everything until a later timer event.
+ futures.add(sub.cancel());
+ done.complete();
+ }
+ }));
+ });
+ futures.add(done.future);
+ await Future.wait(futures);
+ Expect.listEquals([
+ "1:a",
+ "2:ab",
+ "1:b",
+ "4:abel",
+ "3:abe",
+ "4:bell",
+ ], events);
+ });
+
+ asyncEnd();
+}
+
+expectCompletes(future, result) {
+ asyncStart();
+ future.then((v) {
+ if (result is List) {
+ Expect.listEquals(result, v);
+ } else {
+ Expect.equals(v, result);
+ }
+ asyncEnd();
+ }, onError: (e, s) {
+ Expect.fail("$e\n$s");
+ });
+}
+
+void test(name, func) {
+ asyncStart();
+ func().then((_) {
+ asyncEnd();
+ }, onError: (e, s) {
+ Expect.fail("$name: $e\n$s");
+ });
+}
+
+var strings = const [
+ "a",
+ "ab",
+ "b",
+ "abel",
+ "abe",
+ "bell",
+ "able",
+ "abba",
+ "lea"
+];
+
+Stream<String> get stringStream async* {
+ for (var string in strings) {
+ yield string;
+ }
+}
+
+Stream get emptyStream async* {}
+
+Stream repeatStream(int count, value) async* {
+ for (var i = 0; i < count; i++) {
+ yield value;
+ }
+}
+
+// Just some valid stack trace.
+var stack = StackTrace.current;
+
+Stream<String> stringErrorStream(int errorAfter) async* {
+ for (int i = 0; i < strings.length; i++) {
+ yield strings[i];
+ if (i == errorAfter) {
+ // Emit error, but continue afterwards.
+ yield* new Future.error("BAD", stack).asStream();
+ }
+ }
+}
+
+Stream intStream(int count, [int start = 0]) async* {
+ for (int i = 0; i < count; i++) {
+ yield start++;
+ }
+}
+
+Stream timerStream(int count, Duration interval) async* {
+ for (int i = 0; i < count; i++) {
+ await new Future.delayed(interval);
+ yield i;
+ }
+}
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698