| Index: test/stream_completer_test.dart
|
| diff --git a/test/stream_completer_test.dart b/test/stream_completer_test.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..eb2333052fafa2a93bc14fa4b285088247b0b1ec
|
| --- /dev/null
|
| +++ b/test/stream_completer_test.dart
|
| @@ -0,0 +1,371 @@
|
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +import "dart:async";
|
| +
|
| +import "package:async/async.dart" show StreamCompleter;
|
| +import "package:test/test.dart";
|
| +
|
| +import "utils.dart";
|
| +
|
| +main() {
|
| + test("a stream is linked before listening", () async {
|
| + var completer = new StreamCompleter();
|
| + completer.setSourceStream(createStream());
|
| + expect(completer.stream.toList(), completion([1, 2, 3, 4]));
|
| + });
|
| +
|
| + test("listened to before a stream is linked", () async {
|
| + var completer = new StreamCompleter();
|
| + var done = completer.stream.toList();
|
| + await flushMicrotasks();
|
| + completer.setSourceStream(createStream());
|
| + expect(done, completion([1, 2, 3, 4]));
|
| + });
|
| +
|
| + test("cancel before linking a stream doesn't listen on stream", () async {
|
| + var completer = new StreamCompleter();
|
| + var subscription = completer.stream.listen(null);
|
| + subscription.pause(); // Should be ignored.
|
| + subscription.cancel();
|
| + completer.setSourceStream(new UnusableStream()); // Doesn't throw.
|
| + });
|
| +
|
| + test("listen and pause before linking stream", () async {
|
| + var controller = new StreamCompleter();
|
| + var events = [];
|
| + var subscription = controller.stream.listen(events.add);
|
| + var done = subscription.asFuture();
|
| + subscription.pause();
|
| + var sourceController = new StreamController();
|
| + sourceController..add(1)..add(2)..add(3)..add(4);
|
| + controller.setSourceStream(sourceController.stream);
|
| + await flushMicrotasks();
|
| + expect(sourceController.hasListener, isTrue);
|
| + expect(sourceController.isPaused, isTrue);
|
| + expect(events, []);
|
| + subscription.resume();
|
| + await flushMicrotasks();
|
| + expect(sourceController.hasListener, isTrue);
|
| + expect(sourceController.isPaused, isFalse);
|
| + expect(events, [1, 2, 3, 4]);
|
| + sourceController.close();
|
| + await done;
|
| + expect(events, [1, 2, 3, 4]);
|
| + });
|
| +
|
| + test("pause more than once", () async {
|
| + var completer = new StreamCompleter();
|
| + var events = [];
|
| + var subscription = completer.stream.listen(events.add);
|
| + var done = subscription.asFuture();
|
| + subscription.pause();
|
| + subscription.pause();
|
| + subscription.pause();
|
| + completer.setSourceStream(createStream());
|
| + for (int i = 0; i < 3; i++) {
|
| + await flushMicrotasks();
|
| + expect(events, []);
|
| + subscription.resume();
|
| + }
|
| + await done;
|
| + expect(events, [1, 2, 3, 4]);
|
| + });
|
| +
|
| + test("cancel new stream before source is done", () async {
|
| + var completer = new StreamCompleter();
|
| + var listened = false;
|
| + var lastEvent = -1;
|
| + var controller = new StreamController();
|
| + var subscription;
|
| + subscription = completer.stream.listen(
|
| + (value) {
|
| + expect(value, lessThan(3));
|
| + lastEvent = value;
|
| + if (value == 2) {
|
| + subscription.cancel();
|
| + }
|
| + },
|
| + onError: unreachable("error"),
|
| + onDone: unreachable("done"),
|
| + cancelOnError: true);
|
| + completer.setSourceStream(controller.stream);
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| + controller.add(1);
|
| +
|
| + await flushMicrotasks();
|
| + expect(lastEvent, 1);
|
| + expect(controller.hasListener, isTrue);
|
| + controller.add(2);
|
| +
|
| + await flushMicrotasks();
|
| + expect(lastEvent, 2);
|
| + expect(controller.hasListener, isFalse);
|
| + });
|
| +
|
| + test("complete with setEmpty before listening", () async {
|
| + var completer = new StreamCompleter();
|
| + completer.setEmpty();
|
| + var done = new Completer();
|
| + completer.stream.listen(
|
| + unreachable("data"),
|
| + onError: unreachable("error"),
|
| + onDone: done.complete);
|
| + await done.future;
|
| + });
|
| +
|
| + test("complete with setEmpty after listening", () async {
|
| + var completer = new StreamCompleter();
|
| + var done = new Completer();
|
| + completer.stream.listen(
|
| + unreachable("data"),
|
| + onError: unreachable("error"),
|
| + onDone: done.complete);
|
| + completer.setEmpty();
|
| + await done.future;
|
| + });
|
| +
|
| + test("source stream isn't listened to until completer stream is", () async {
|
| + var completer = new StreamCompleter();
|
| + var controller;
|
| + controller = new StreamController(onListen: () {
|
| + scheduleMicrotask(controller.close);
|
| + });
|
| +
|
| + completer.setSourceStream(controller.stream);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| + var subscription = completer.stream.listen(null);
|
| + expect(controller.hasListener, isTrue);
|
| + await subscription.asFuture();
|
| + });
|
| +
|
| + test("cancelOnError true when listening before linking stream", () async {
|
| + var completer = new StreamCompleter();
|
| + var listened = false;
|
| + var canceled = false;
|
| + var lastEvent = -1;
|
| + var controller = new StreamController();
|
| + var subscription = completer.stream.listen(
|
| + (value) {
|
| + expect(value, lessThan(3));
|
| + lastEvent = value;
|
| + },
|
| + onError: (value) {
|
| + expect(value, "3");
|
| + lastEvent = value;
|
| + },
|
| + onDone: unreachable("done"),
|
| + cancelOnError: true);
|
| + completer.setSourceStream(controller.stream);
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| + controller.add(1);
|
| +
|
| + await flushMicrotasks();
|
| + expect(lastEvent, 1);
|
| + expect(controller.hasListener, isTrue);
|
| + controller.add(2);
|
| +
|
| + await flushMicrotasks();
|
| + expect(lastEvent, 2);
|
| + expect(controller.hasListener, isTrue);
|
| + controller.addError("3");
|
| +
|
| + await flushMicrotasks();
|
| + expect(lastEvent, "3");
|
| + expect(controller.hasListener, isFalse);
|
| + });
|
| +
|
| + test("cancelOnError true when listening after linking stream", () async {
|
| + var completer = new StreamCompleter();
|
| + var lastEvent = -1;
|
| + var controller = new StreamController();
|
| + completer.setSourceStream(controller.stream);
|
| + controller.add(1);
|
| + expect(controller.hasListener, isFalse);
|
| +
|
| + var subscription = completer.stream.listen(
|
| + (value) {
|
| + expect(value, lessThan(3));
|
| + lastEvent = value;
|
| + },
|
| + onError: (value) {
|
| + expect(value, "3");
|
| + lastEvent = value;
|
| + },
|
| + onDone: unreachable("done"),
|
| + cancelOnError: true);
|
| +
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + await flushMicrotasks();
|
| + expect(lastEvent, 1);
|
| + expect(controller.hasListener, isTrue);
|
| + controller.add(2);
|
| +
|
| + await flushMicrotasks();
|
| + expect(lastEvent, 2);
|
| + expect(controller.hasListener, isTrue);
|
| + controller.addError("3");
|
| +
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| + });
|
| +
|
| + test("linking a stream after setSourceStream before listen", () async {
|
| + var completer = new StreamCompleter();
|
| + completer.setSourceStream(createStream());
|
| + expect(() => completer.setSourceStream(createStream()), throws);
|
| + expect(() => completer.setEmpty(createStream()), throws);
|
| + await completer.stream.toList();
|
| + // Still fails after source is done
|
| + expect(() => completer.setSourceStream(createStream()), throws);
|
| + expect(() => completer.setEmpty(createStream()), throws);
|
| + });
|
| +
|
| + test("linking a stream after setSourceStream after listen", () async {
|
| + var completer = new StreamCompleter();
|
| + var list = completer.stream.toList();
|
| + completer.setSourceStream(createStream());
|
| + expect(() => completer.setSoureStream(createStream()), throws);
|
| + expect(() => completer.stEmpty(createStream()), throws);
|
| + await list;
|
| + // Still fails after source is done.
|
| + expect(() => completer.setSoureStream(createStream()), throws);
|
| + expect(() => completer.stEmpty(createStream()), throws);
|
| + });
|
| +
|
| + test("linking a stream after setEmpty before listen", () async {
|
| + var completer = new StreamCompleter();
|
| + completer.setEmpty();
|
| + expect(() => completer.setSoureStream(createStream()), throws);
|
| + expect(() => completer.stEmpty(createStream()), throws);
|
| + await completer.stream.toList();
|
| + // Still fails after source is done
|
| + expect(() => completer.setSoureStream(createStream()), throws);
|
| + expect(() => completer.stEmpty(createStream()), throws);
|
| + });
|
| +
|
| + test("linking a stream after setEmpty() after listen", () async {
|
| + var completer = new StreamCompleter();
|
| + var list = completer.stream.toList();
|
| + completer.setEmpty();
|
| + expect(() => completer.setSoureStream(createStream()), throws);
|
| + expect(() => completer.stEmpty(createStream()), throws);
|
| + await list;
|
| + // Still fails after source is done.
|
| + expect(() => completer.setSoureStream(createStream()), throws);
|
| + expect(() => completer.stEmpty(createStream()), throws);
|
| + });
|
| +
|
| + test("listening more than once after setting stream", () async {
|
| + var completer = new StreamCompleter();
|
| + completer.setSourceStream(createStream());
|
| + var list = completer.stream.toList();
|
| + expect(() => completer.stream.oList(), throws);
|
| + await list;
|
| + expect(() => completer.stream.oList(), throws);
|
| + });
|
| +
|
| + test("listening more than once before setting stream", () async {
|
| + var completer = new StreamCompleter();
|
| + var list = completer.stream.toList();
|
| + expect(() => completer.stream.oList(), throws);
|
| + });
|
| +
|
| + test("setting onData etc. before and after setting stream", () async {
|
| + var completer = new StreamCompleter();
|
| + var controller = new StreamController();
|
| + var subscription = completer.stream.listen(null);
|
| + var lastEvent = 0;
|
| + subscription.onData((value) => lastEvent = value);
|
| + subscription.onError((value) => lastEvent = "$value");
|
| + subscription.onDone(() => lastEvent = -1);
|
| + completer.setSourceStream(controller.stream);
|
| + await flushMicrotasks();
|
| + controller.add(1);
|
| + await flushMicrotasks();
|
| + expect(lastEvent, 1);
|
| + controller.addError(2);
|
| + await flushMicrotasks();
|
| + expect(lastEvent, "2");
|
| + subscription.onData((value) => lastEvent = -value);
|
| + subscription.onError((value) => lastEvent = "${-value}");
|
| + controller.add(1);
|
| + await flushMicrotasks();
|
| + expect(lastEvent, -1);
|
| + controller.addError(2);
|
| + await flushMicrotasks();
|
| + expect(lastEvent, "-2");
|
| + controller.close();
|
| + await flushMicrotasks();
|
| + expect(lastEvent, -1);
|
| + });
|
| +
|
| + test("pause w/ resume future accross setting stream", () async {
|
| + var completer = new StreamCompleter();
|
| + var resume = new Completer();
|
| + var subscription = completer.stream.listen(unreachable("data"));
|
| + var lastEvent = 0;
|
| + subscription.pause(resume.future);
|
| + await flushMicrotasks();
|
| + completer.setSourceStream(createStream());
|
| + await flushMicrotasks();
|
| + resume.complete();
|
| + var events = [];
|
| + subscription.onData(events.add);
|
| + await subscription.asFuture();
|
| + expect(events, [1, 2, 3, 4]);
|
| + });
|
| +
|
| + test("asFuture with error accross setting stream", () async {
|
| + var completer = new StreamCompleter();
|
| + var controller = new StreamController();
|
| + var subscription = completer.stream.listen(unreachable("data"),
|
| + cancelOnError: false);
|
| + var done = subscription.asFuture();
|
| + expect(controller.hasListener, isFalse);
|
| + completer.setSourceStream(controller.stream);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| + controller.addError(42);
|
| + await done.then(unreachable("data"), onError: (error) {
|
| + expect(error, 42);
|
| + });
|
| + expect(controller.hasListener, isFalse);
|
| + });
|
| +}
|
| +
|
| +Stream<int> createStream() async* {
|
| + yield 1;
|
| + await flushMicrotasks();
|
| + yield 2;
|
| + await flushMicrotasks();
|
| + yield 3;
|
| + await flushMicrotasks();
|
| + yield 4;
|
| +}
|
| +
|
| +/// A zero-millisecond timer should wait until after all microtasks.
|
| +Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
|
| +
|
| +/// A generic unreachable callback function.
|
| +///
|
| +/// Returns a function that fails the test if it is ever called.
|
| +unreachable(String name) => ([a, b]) => fail("Unreachable: $name");
|
| +
|
| +/// A badly behaved stream which throws if it's ever listened to.
|
| +///
|
| +/// Can be used to test cases where a stream should not be used.
|
| +class UnusableStream extends Stream {
|
| + listen(onData, {onError, onDone, cancelOnError}) {
|
| + throw new UnimplementedError("Gotcha!");
|
| + }
|
| +}
|
|
|