| Index: tests/lib_strong/async/stream_transformation_broadcast_test.dart
|
| diff --git a/tests/lib_strong/async/stream_transformation_broadcast_test.dart b/tests/lib_strong/async/stream_transformation_broadcast_test.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..ce9c882e4fcb11ec3a0eec6904bdce9e6304098c
|
| --- /dev/null
|
| +++ b/tests/lib_strong/async/stream_transformation_broadcast_test.dart
|
| @@ -0,0 +1,362 @@
|
| +// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +// Test that transformations like `map` and `where` preserve broadcast flag.
|
| +library stream_join_test;
|
| +
|
| +import 'dart:async';
|
| +import 'event_helper.dart';
|
| +import 'package:unittest/unittest.dart';
|
| +import "package:expect/expect.dart";
|
| +
|
| +main() {
|
| + testStream("singlesub", () => new StreamController(), (c) => c.stream);
|
| + testStream("broadcast", () => new StreamController.broadcast(),
|
| + (c) => c.stream);
|
| + testStream("asBroadcast", () => new StreamController(),
|
| + (c) => c.stream.asBroadcastStream());
|
| + testStream("broadcast.asBroadcast", () => new StreamController.broadcast(),
|
| + (c) => c.stream.asBroadcastStream());
|
| +}
|
| +
|
| +void testStream(String name,
|
| + StreamController create(),
|
| + Stream getStream(controller)) {
|
| + test("$name-map", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.map((x) => x + 1);
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(43, v);
|
| + }));
|
| + c.add(42);
|
| + c.close();
|
| + });
|
| + test("$name-where", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.where((x) => x.isEven);
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(42, v);
|
| + }));
|
| + c.add(37);
|
| + c.add(42);
|
| + c.add(87);
|
| + c.close();
|
| + });
|
| + test("$name-handleError", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.handleError((x, s) {});
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(42, v);
|
| + }));
|
| + c.addError("BAD1");
|
| + c.add(42);
|
| + c.addError("BAD2");
|
| + c.close();
|
| + });
|
| + test("$name-expand", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.expand((x) => x.isEven ? [x] : []);
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(42, v);
|
| + }));
|
| + c.add(37);
|
| + c.add(42);
|
| + c.add(87);
|
| + c.close();
|
| + });
|
| + test("$name-transform", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + // TODO: find name of default transformer
|
| + var t = new StreamTransformer.fromHandlers(
|
| + handleData: (value, EventSink sink) { sink.add(value); }
|
| + );
|
| + Stream newStream = s.transform(t);
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(42, v);
|
| + }));
|
| + c.add(42);
|
| + c.close();
|
| + });
|
| + test("$name-take", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.take(1);
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(42, v);
|
| + }));
|
| + c.add(42);
|
| + c.add(37);
|
| + c.close();
|
| + });
|
| + test("$name-takeWhile", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.takeWhile((x) => x.isEven);
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(42, v);
|
| + }));
|
| + c.add(42);
|
| + c.add(37);
|
| + c.close();
|
| + });
|
| + test("$name-skip", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.skip(1);
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(42, v);
|
| + }));
|
| + c.add(37);
|
| + c.add(42);
|
| + c.close();
|
| + });
|
| + test("$name-skipWhile", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.skipWhile((x) => x.isOdd);
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(42, v);
|
| + }));
|
| + c.add(37);
|
| + c.add(42);
|
| + c.close();
|
| + });
|
| + test("$name-distinct", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.distinct();
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(42, v);
|
| + }));
|
| + c.add(42);
|
| + c.add(42);
|
| + c.close();
|
| + });
|
| + test("$name-timeout", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.timeout(const Duration(seconds: 1));
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(42, v);
|
| + }));
|
| + c.add(42);
|
| + c.close();
|
| + });
|
| + test("$name-asyncMap", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.asyncMap((x) => new Future.value(x + 1));
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(43, v);
|
| + }));
|
| + c.add(42);
|
| + c.close();
|
| + });
|
| + test("$name-asyncExpand", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.asyncExpand((x) => new Stream.fromIterable([x + 1]));
|
| + Expect.equals(s.isBroadcast, newStream.isBroadcast);
|
| + newStream.single.then(expectAsync((v) {
|
| + Expect.equals(43, v);
|
| + }));
|
| + c.add(42);
|
| + c.close();
|
| + });
|
| +
|
| + // The following tests are only on broadcast streams, they require listening
|
| + // more than once.
|
| + if (name.startsWith("singlesub")) return;
|
| +
|
| + test("$name-skip-multilisten", () {
|
| + if (name.startsWith("singlesub") ||
|
| + name.startsWith("asBroadcast")) return;
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.skip(5);
|
| + // Listen immediately, to ensure that an asBroadcast stream is started.
|
| + var sub = newStream.listen((_){});
|
| + int i = 0;
|
| + var expect1 = 11;
|
| + var expect2 = 21;
|
| + var handler2 = expectAsync((v) {
|
| + expect(v, expect2);
|
| + expect2++;
|
| + }, count: 5);
|
| + var handler1 = expectAsync((v) {
|
| + expect(v, expect1);
|
| + expect1++;
|
| + }, count: 15);
|
| + var loop;
|
| + loop = expectAsync(() {
|
| + i++;
|
| + c.add(i);
|
| + if (i == 5) {
|
| + scheduleMicrotask(() {
|
| + newStream.listen(handler1);
|
| + });
|
| + }
|
| + if (i == 15) {
|
| + scheduleMicrotask(() {
|
| + newStream.listen(handler2);
|
| + });
|
| + }
|
| + if (i < 25) {
|
| + scheduleMicrotask(loop);
|
| + } else {
|
| + sub.cancel();
|
| + c.close();
|
| + }
|
| + }, count: 25);
|
| + scheduleMicrotask(loop);
|
| + });
|
| +
|
| + test("$name-take-multilisten", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.take(10);
|
| + // Listen immediately, to ensure that an asBroadcast stream is started.
|
| + var sub = newStream.listen((_){});
|
| + int i = 0;
|
| + var expect1 = 6;
|
| + var expect2 = 11;
|
| + var handler2 = expectAsync((v) {
|
| + expect(v, expect2);
|
| + expect(v <= 20, isTrue);
|
| + expect2++;
|
| + }, count: 10);
|
| + var handler1 = expectAsync((v) {
|
| + expect(v, expect1);
|
| + expect(v <= 15, isTrue);
|
| + expect1++;
|
| + }, count: 10);
|
| + var loop;
|
| + loop = expectAsync(() {
|
| + i++;
|
| + c.add(i);
|
| + if (i == 5) {
|
| + scheduleMicrotask(() {
|
| + newStream.listen(handler1);
|
| + });
|
| + }
|
| + if (i == 10) {
|
| + scheduleMicrotask(() {
|
| + newStream.listen(handler2);
|
| + });
|
| + }
|
| + if (i < 25) {
|
| + scheduleMicrotask(loop);
|
| + } else {
|
| + sub.cancel();
|
| + c.close();
|
| + }
|
| + }, count: 25);
|
| + scheduleMicrotask(loop);
|
| + });
|
| +
|
| + test("$name-skipWhile-multilisten", () {
|
| + if (name.startsWith("singlesub") ||
|
| + name.startsWith("asBroadcast")) return;
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.skipWhile((x) => (x % 10) != 1);
|
| + // Listen immediately, to ensure that an asBroadcast stream is started.
|
| + var sub = newStream.listen((_){});
|
| + int i = 0;
|
| + var expect1 = 11;
|
| + var expect2 = 21;
|
| + var handler2 = expectAsync((v) {
|
| + expect(v, expect2);
|
| + expect2++;
|
| + }, count: 5);
|
| + var handler1 = expectAsync((v) {
|
| + expect(v, expect1);
|
| + expect1++;
|
| + }, count: 15);
|
| + var loop;
|
| + loop = expectAsync(() {
|
| + i++;
|
| + c.add(i);
|
| + if (i == 5) {
|
| + scheduleMicrotask(() {
|
| + newStream.listen(handler1);
|
| + });
|
| + }
|
| + if (i == 15) {
|
| + scheduleMicrotask(() {
|
| + newStream.listen(handler2);
|
| + });
|
| + }
|
| + if (i < 25) {
|
| + scheduleMicrotask(loop);
|
| + } else {
|
| + sub.cancel();
|
| + c.close();
|
| + }
|
| + }, count: 25);
|
| + scheduleMicrotask(loop);
|
| + });
|
| +
|
| + test("$name-takeWhile-multilisten", () {
|
| + var c = create();
|
| + var s = getStream(c);
|
| + Stream newStream = s.takeWhile((x) => (x % 10) != 5);
|
| + // Listen immediately, to ensure that an asBroadcast stream is started.
|
| + var sub = newStream.listen((_){});
|
| + int i = 0;
|
| + // Non-overlapping ranges means the test must not remember its first
|
| + // failure.
|
| + var expect1 = 6;
|
| + var expect2 = 16;
|
| + var handler2 = expectAsync((v) {
|
| + expect(v, expect2);
|
| + expect(v <= 25, isTrue);
|
| + expect2++;
|
| + }, count: 9);
|
| + var handler1 = expectAsync((v) {
|
| + expect(v, expect1);
|
| + expect(v <= 15, isTrue);
|
| + expect1++;
|
| + }, count: 9);
|
| + var loop;
|
| + loop = expectAsync(() {
|
| + i++;
|
| + c.add(i);
|
| + if (i == 5) {
|
| + scheduleMicrotask(() {
|
| + newStream.listen(handler1);
|
| + });
|
| + }
|
| + if (i == 15) {
|
| + scheduleMicrotask(() {
|
| + newStream.listen(handler2);
|
| + });
|
| + }
|
| + if (i < 25) {
|
| + scheduleMicrotask(loop);
|
| + } else {
|
| + sub.cancel();
|
| + c.close();
|
| + }
|
| + }, count: 25);
|
| + scheduleMicrotask(loop);
|
| + });
|
| +}
|
|
|