Index: tests/lib_strong/async/stream_transformation_broadcast_test.dart |
diff --git a/tests/lib_strong/async/stream_transformation_broadcast_test.dart b/tests/lib_strong/async/stream_transformation_broadcast_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ce9c882e4fcb11ec3a0eec6904bdce9e6304098c |
--- /dev/null |
+++ b/tests/lib_strong/async/stream_transformation_broadcast_test.dart |
@@ -0,0 +1,362 @@ |
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+// Test that transformations like `map` and `where` preserve broadcast flag. |
+library stream_join_test; |
+ |
+import 'dart:async'; |
+import 'event_helper.dart'; |
+import 'package:unittest/unittest.dart'; |
+import "package:expect/expect.dart"; |
+ |
+main() { |
+ testStream("singlesub", () => new StreamController(), (c) => c.stream); |
+ testStream("broadcast", () => new StreamController.broadcast(), |
+ (c) => c.stream); |
+ testStream("asBroadcast", () => new StreamController(), |
+ (c) => c.stream.asBroadcastStream()); |
+ testStream("broadcast.asBroadcast", () => new StreamController.broadcast(), |
+ (c) => c.stream.asBroadcastStream()); |
+} |
+ |
+void testStream(String name, |
+ StreamController create(), |
+ Stream getStream(controller)) { |
+ test("$name-map", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.map((x) => x + 1); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(43, v); |
+ })); |
+ c.add(42); |
+ c.close(); |
+ }); |
+ test("$name-where", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.where((x) => x.isEven); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(42, v); |
+ })); |
+ c.add(37); |
+ c.add(42); |
+ c.add(87); |
+ c.close(); |
+ }); |
+ test("$name-handleError", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.handleError((x, s) {}); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(42, v); |
+ })); |
+ c.addError("BAD1"); |
+ c.add(42); |
+ c.addError("BAD2"); |
+ c.close(); |
+ }); |
+ test("$name-expand", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.expand((x) => x.isEven ? [x] : []); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(42, v); |
+ })); |
+ c.add(37); |
+ c.add(42); |
+ c.add(87); |
+ c.close(); |
+ }); |
+ test("$name-transform", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ // TODO: find name of default transformer |
+ var t = new StreamTransformer.fromHandlers( |
+ handleData: (value, EventSink sink) { sink.add(value); } |
+ ); |
+ Stream newStream = s.transform(t); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(42, v); |
+ })); |
+ c.add(42); |
+ c.close(); |
+ }); |
+ test("$name-take", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.take(1); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(42, v); |
+ })); |
+ c.add(42); |
+ c.add(37); |
+ c.close(); |
+ }); |
+ test("$name-takeWhile", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.takeWhile((x) => x.isEven); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(42, v); |
+ })); |
+ c.add(42); |
+ c.add(37); |
+ c.close(); |
+ }); |
+ test("$name-skip", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.skip(1); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(42, v); |
+ })); |
+ c.add(37); |
+ c.add(42); |
+ c.close(); |
+ }); |
+ test("$name-skipWhile", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.skipWhile((x) => x.isOdd); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(42, v); |
+ })); |
+ c.add(37); |
+ c.add(42); |
+ c.close(); |
+ }); |
+ test("$name-distinct", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.distinct(); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(42, v); |
+ })); |
+ c.add(42); |
+ c.add(42); |
+ c.close(); |
+ }); |
+ test("$name-timeout", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.timeout(const Duration(seconds: 1)); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(42, v); |
+ })); |
+ c.add(42); |
+ c.close(); |
+ }); |
+ test("$name-asyncMap", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.asyncMap((x) => new Future.value(x + 1)); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(43, v); |
+ })); |
+ c.add(42); |
+ c.close(); |
+ }); |
+ test("$name-asyncExpand", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.asyncExpand((x) => new Stream.fromIterable([x + 1])); |
+ Expect.equals(s.isBroadcast, newStream.isBroadcast); |
+ newStream.single.then(expectAsync((v) { |
+ Expect.equals(43, v); |
+ })); |
+ c.add(42); |
+ c.close(); |
+ }); |
+ |
+ // The following tests are only on broadcast streams, they require listening |
+ // more than once. |
+ if (name.startsWith("singlesub")) return; |
+ |
+ test("$name-skip-multilisten", () { |
+ if (name.startsWith("singlesub") || |
+ name.startsWith("asBroadcast")) return; |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.skip(5); |
+ // Listen immediately, to ensure that an asBroadcast stream is started. |
+ var sub = newStream.listen((_){}); |
+ int i = 0; |
+ var expect1 = 11; |
+ var expect2 = 21; |
+ var handler2 = expectAsync((v) { |
+ expect(v, expect2); |
+ expect2++; |
+ }, count: 5); |
+ var handler1 = expectAsync((v) { |
+ expect(v, expect1); |
+ expect1++; |
+ }, count: 15); |
+ var loop; |
+ loop = expectAsync(() { |
+ i++; |
+ c.add(i); |
+ if (i == 5) { |
+ scheduleMicrotask(() { |
+ newStream.listen(handler1); |
+ }); |
+ } |
+ if (i == 15) { |
+ scheduleMicrotask(() { |
+ newStream.listen(handler2); |
+ }); |
+ } |
+ if (i < 25) { |
+ scheduleMicrotask(loop); |
+ } else { |
+ sub.cancel(); |
+ c.close(); |
+ } |
+ }, count: 25); |
+ scheduleMicrotask(loop); |
+ }); |
+ |
+ test("$name-take-multilisten", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.take(10); |
+ // Listen immediately, to ensure that an asBroadcast stream is started. |
+ var sub = newStream.listen((_){}); |
+ int i = 0; |
+ var expect1 = 6; |
+ var expect2 = 11; |
+ var handler2 = expectAsync((v) { |
+ expect(v, expect2); |
+ expect(v <= 20, isTrue); |
+ expect2++; |
+ }, count: 10); |
+ var handler1 = expectAsync((v) { |
+ expect(v, expect1); |
+ expect(v <= 15, isTrue); |
+ expect1++; |
+ }, count: 10); |
+ var loop; |
+ loop = expectAsync(() { |
+ i++; |
+ c.add(i); |
+ if (i == 5) { |
+ scheduleMicrotask(() { |
+ newStream.listen(handler1); |
+ }); |
+ } |
+ if (i == 10) { |
+ scheduleMicrotask(() { |
+ newStream.listen(handler2); |
+ }); |
+ } |
+ if (i < 25) { |
+ scheduleMicrotask(loop); |
+ } else { |
+ sub.cancel(); |
+ c.close(); |
+ } |
+ }, count: 25); |
+ scheduleMicrotask(loop); |
+ }); |
+ |
+ test("$name-skipWhile-multilisten", () { |
+ if (name.startsWith("singlesub") || |
+ name.startsWith("asBroadcast")) return; |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.skipWhile((x) => (x % 10) != 1); |
+ // Listen immediately, to ensure that an asBroadcast stream is started. |
+ var sub = newStream.listen((_){}); |
+ int i = 0; |
+ var expect1 = 11; |
+ var expect2 = 21; |
+ var handler2 = expectAsync((v) { |
+ expect(v, expect2); |
+ expect2++; |
+ }, count: 5); |
+ var handler1 = expectAsync((v) { |
+ expect(v, expect1); |
+ expect1++; |
+ }, count: 15); |
+ var loop; |
+ loop = expectAsync(() { |
+ i++; |
+ c.add(i); |
+ if (i == 5) { |
+ scheduleMicrotask(() { |
+ newStream.listen(handler1); |
+ }); |
+ } |
+ if (i == 15) { |
+ scheduleMicrotask(() { |
+ newStream.listen(handler2); |
+ }); |
+ } |
+ if (i < 25) { |
+ scheduleMicrotask(loop); |
+ } else { |
+ sub.cancel(); |
+ c.close(); |
+ } |
+ }, count: 25); |
+ scheduleMicrotask(loop); |
+ }); |
+ |
+ test("$name-takeWhile-multilisten", () { |
+ var c = create(); |
+ var s = getStream(c); |
+ Stream newStream = s.takeWhile((x) => (x % 10) != 5); |
+ // Listen immediately, to ensure that an asBroadcast stream is started. |
+ var sub = newStream.listen((_){}); |
+ int i = 0; |
+ // Non-overlapping ranges means the test must not remember its first |
+ // failure. |
+ var expect1 = 6; |
+ var expect2 = 16; |
+ var handler2 = expectAsync((v) { |
+ expect(v, expect2); |
+ expect(v <= 25, isTrue); |
+ expect2++; |
+ }, count: 9); |
+ var handler1 = expectAsync((v) { |
+ expect(v, expect1); |
+ expect(v <= 15, isTrue); |
+ expect1++; |
+ }, count: 9); |
+ var loop; |
+ loop = expectAsync(() { |
+ i++; |
+ c.add(i); |
+ if (i == 5) { |
+ scheduleMicrotask(() { |
+ newStream.listen(handler1); |
+ }); |
+ } |
+ if (i == 15) { |
+ scheduleMicrotask(() { |
+ newStream.listen(handler2); |
+ }); |
+ } |
+ if (i < 25) { |
+ scheduleMicrotask(loop); |
+ } else { |
+ sub.cancel(); |
+ c.close(); |
+ } |
+ }, count: 25); |
+ scheduleMicrotask(loop); |
+ }); |
+} |