Chromium Code Reviews| Index: test/stream_completer_test.dart |
| diff --git a/test/stream_completer_test.dart b/test/stream_completer_test.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..27c4e56157a75d6d9d280944e6952f6a849677aa |
| --- /dev/null |
| +++ b/test/stream_completer_test.dart |
| @@ -0,0 +1,362 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +import "dart:async"; |
| + |
| +import "package:async/async.dart" show StreamCompleter; |
| +import "package:test/test.dart"; |
| + |
| +main() { |
| + test("a stream is linked before listening", () async { |
| + var completer = new StreamCompleter(); |
| + completer.setSourceStream(createStream()); |
| + expect(completer.stream.toList(), completion([1, 2, 3, 4])); |
| + }); |
| + |
| + test("listened to before a stream is linked", () async { |
| + var completer = new StreamCompleter(); |
| + Future done = completer.stream.toList(); |
|
nweiz
2015/06/18 23:44:27
Nit: "var"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
I have actually found full typing to catch bugs in
|
| + await flushMicrotasks(); |
| + completer.setSourceStream(createStream()); |
| + expect(done, completion([1, 2, 3, 4])); |
| + }); |
| + |
| + test("cancel before linking a stream doesn't listen on stream", () async { |
| + var completer = new StreamCompleter(); |
| + var subscription = completer.stream.listen(null); |
| + subscription.pause(); // Should be ignored. |
| + subscription.cancel(); |
| + completer.setSourceStream(new UnusableStream()); // Doesn't throw. |
| + }); |
| + |
| + test("listen and pause before linking stream", () async { |
|
nweiz
2015/06/18 23:44:27
Also test that this pause triggers "onPause" in th
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
|
| + var controller = new StreamCompleter(); |
| + var events = []; |
| + var subscription = controller.stream.listen(events.add); |
| + var done = subscription.asFuture(); |
| + subscription.pause(); |
| + controller.setSourceStream(createStream()); |
| + await flushMicrotasks(); |
| + expect(events, []); |
| + subscription.resume(); |
| + await done; |
| + expect(events, [1, 2, 3, 4]); |
| + }); |
| + |
| + test("pause more than once", () async { |
| + var completer = new StreamCompleter(); |
| + var events = []; |
| + var subscription = completer.stream.listen(events.add); |
| + Future done = subscription.asFuture(); |
| + subscription.pause(); |
| + subscription.pause(); |
| + subscription.pause(); |
| + completer.setSourceStream(createStream()); |
| + for (int i = 0; i < 3; i++) { |
| + await flushMicrotasks(); |
| + expect(events, []); |
| + subscription.resume(); |
| + } |
| + await done; |
| + expect(events, [1, 2, 3, 4]); |
| + }); |
| + |
| + test("cancel new stream before source is done.", () async { |
| + var completer = new StreamCompleter(); |
| + var listened = false; |
| + var canceled = false; |
| + var lastEvent = -1; |
| + var controller = new StreamController(onListen: () { listened = true; }, |
|
nweiz
2015/06/18 23:44:27
It's not actually in the style guide right now, bu
Lasse Reichstein Nielsen
2015/06/30 10:34:14
That's silly.
I'll rewrite it to
() => listened
|
| + onCancel: () { canceled = true; }); |
| + var subscription; |
| + subscription = completer.stream.listen( |
| + (v) { |
|
nweiz
2015/06/18 23:44:27
"v" -> "value"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
|
| + expect(v, lessThan(3)); |
| + lastEvent = v; |
| + if (v == 2) { |
| + subscription.cancel(); |
| + } |
| + }, |
| + onError: unreachable("error"), |
| + onDone: unreachable("done"), |
| + cancelOnError: true); |
| + completer.setSourceStream(controller.stream); |
| + expect(listened, isTrue); |
| + |
| + await flushMicrotasks(); |
| + expect(canceled, isFalse); |
| + controller.add(1); |
| + |
| + await flushMicrotasks(); |
| + expect(lastEvent, 1); |
| + expect(canceled, isFalse); |
| + controller.add(2); |
| + |
| + await flushMicrotasks(); |
| + expect(lastEvent, 2); |
| + expect(canceled, isTrue); |
| + }); |
| + |
| + test("complete with setEmpty before listening", () async { |
| + var completer = new StreamCompleter(); |
| + completer.setEmpty(); |
| + var done = new Completer(); |
| + completer.stream.listen( |
| + unreachable("data"), |
| + onError: unreachable("error"), |
| + onDone: done.complete); |
| + await done.future; |
| + }); |
| + |
| + test("complete with setEmpty after listening", () async { |
| + var completer = new StreamCompleter(); |
| + var done = new Completer(); |
| + completer.stream.listen( |
| + unreachable("data"), |
| + onError: unreachable("error"), |
| + onDone: done.complete); |
| + completer.setEmpty(); |
| + await done.future; |
| + }); |
| + |
| + test("source stream isn't listened to until completer stream is.", () async { |
|
nweiz
2015/06/18 23:44:27
Nit: no period
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
|
| + var completer = new StreamCompleter(); |
| + bool listened = false; |
|
nweiz
2015/06/18 23:44:27
Nit: "var"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
|
| + var controller; |
| + controller = new StreamController(onListen: () { |
| + listened = true; |
| + () async { controller.close(); } (); // In later microtask. |
|
nweiz
2015/06/18 23:44:27
I think "scheduleMicrotask" is a little more expli
Lasse Reichstein Nielsen
2015/06/30 10:34:14
And it has exactly the same length. Scary.
|
| + }); |
| + |
| + completer.setSourceStream(controller.stream); |
| + await flushMicrotasks(); |
| + expect(listened, isFalse); |
| + var subscription = completer.stream.listen(null); |
| + expect(listened, isTrue); |
| + await subscription.asFuture(); |
| + }); |
| + |
| + test("cancelOnError true when listening before linking stream", () async { |
| + var completer = new StreamCompleter(); |
| + var listened = false; |
| + var canceled = false; |
| + var lastEvent = -1; |
| + var controller = new StreamController(onListen: () { listened = true; }, |
| + onCancel: () { canceled = true; }); |
| + var subscription = completer.stream.listen( |
| + (v) { |
| + expect(v, lessThan(3)); |
| + lastEvent = v; |
| + }, |
| + onError: (v) { |
| + expect(v, "3"); |
| + lastEvent = v; |
| + }, |
| + onDone: unreachable("done"), |
| + cancelOnError: true); |
| + completer.setSourceStream(controller.stream); |
| + expect(listened, isTrue); |
| + |
| + await flushMicrotasks(); |
| + expect(canceled, isFalse); |
| + controller.add(1); |
| + |
| + await flushMicrotasks(); |
| + expect(lastEvent, 1); |
| + expect(canceled, isFalse); |
| + controller.add(2); |
| + |
| + await flushMicrotasks(); |
| + expect(lastEvent, 2); |
| + expect(canceled, isFalse); |
| + controller.addError("3"); |
| + |
| + await flushMicrotasks(); |
| + expect(lastEvent, "3"); |
| + expect(canceled, isTrue); |
| + }); |
| + |
| + test("cancelOnError true when listening after linking stream", () async { |
| + var completer = new StreamCompleter(); |
| + var listened = false; |
| + var canceled = false; |
| + var lastEvent = -1; |
| + var controller = new StreamController(onListen: () { listened = true; }, |
| + onCancel: () { canceled = true; }); |
| + completer.setSourceStream(controller.stream); |
| + controller.add(1); |
| + expect(listened, isFalse); |
| + |
| + var subscription = completer.stream.listen( |
| + (v) { |
| + expect(v, lessThan(3)); |
| + lastEvent = v; |
| + }, |
| + onError: (v) { |
| + expect(v, "3"); |
| + lastEvent = v; |
| + }, |
| + onDone: unreachable("done"), |
| + cancelOnError: true); |
| + |
| + expect(listened, isTrue); |
| + |
| + await flushMicrotasks(); |
| + expect(lastEvent, 1); |
| + expect(canceled, isFalse); |
| + controller.add(2); |
| + |
| + await flushMicrotasks(); |
| + expect(lastEvent, 2); |
| + expect(canceled, isFalse); |
| + controller.addError("3"); |
| + |
| + await flushMicrotasks(); |
| + expect(canceled, isTrue); |
| + }); |
| + |
| + test("linking a stream after setSourceStream before listen", () async { |
| + var completer = new StreamCompleter(); |
| + completer.setSourceStream(createStream()); |
| + expect(() { completer.setSourceStream(createStream()); }, throws); |
| + expect(() { completer.setEmpty(createStream()); }, throws); |
| + await completer.stream.toList(); |
| + // Still fails after source is done |
| + expect(() { completer.setSourceStream(createStream()); }, throws); |
| + expect(() { completer.setEmpty(createStream()); }, throws); |
| + }); |
| + |
| + test("linking a stream after setSourceStream after listen", () async { |
| + var completer = new StreamCompleter(); |
| + var list = completer.stream.toList(); |
| + completer.setSourceStream(createStream()); |
| + expect(() { completer.setSourceStream(createStream()); }, throws); |
| + expect(() { completer.setEmpty(createStream()); }, throws); |
| + await list; |
| + // Still fails after source is done. |
| + expect(() { completer.setSourceStream(createStream()); }, throws); |
| + expect(() { completer.setEmpty(createStream()); }, throws); |
| + }); |
| + |
| + test("linking a stream after setEmpty before listen", () async { |
| + var completer = new StreamCompleter(); |
| + completer.setEmpty(); |
| + expect(() { completer.setSourceStream(createStream()); }, throws); |
| + expect(() { completer.setEmpty(createStream()); }, throws); |
| + await completer.stream.toList(); |
| + // Still fails after source is done |
| + expect(() { completer.setSourceStream(createStream()); }, throws); |
| + expect(() { completer.setEmpty(createStream()); }, throws); |
| + }); |
| + |
| + test("linking a stream after setEmpty() after listen", () async { |
| + var completer = new StreamCompleter(); |
| + var list = completer.stream.toList(); |
| + completer.setEmpty(); |
| + expect(() { completer.setSourceStream(createStream()); }, throws); |
| + expect(() { completer.setEmpty(createStream()); }, throws); |
| + await list; |
| + // Still fails after source is done. |
| + expect(() { completer.setSourceStream(createStream()); }, throws); |
| + expect(() { completer.setEmpty(createStream()); }, throws); |
| + }); |
| + |
| + test("Listening more than once after setting stream", () async { |
|
nweiz
2015/06/18 23:44:27
Nit: "listening" (also below)
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
|
| + var completer = new StreamCompleter(); |
| + completer.setSourceStream(createStream()); |
| + var list = completer.stream.toList(); |
| + expect(() { completer.stream.toList(); }, throws); |
| + await list; |
| + expect(() { completer.stream.toList(); }, throws); |
| + }); |
| + |
| + test("Listening more than once before setting stream", () async { |
| + var completer = new StreamCompleter(); |
| + var list = completer.stream.toList(); |
| + expect(() { completer.stream.toList(); }, throws); |
| + }); |
| + |
| + test("setting onData etc. before and after setting stream", () async { |
| + var completer = new StreamCompleter(); |
| + var controller = new StreamController(); |
| + var subscription = completer.stream.listen(null); |
| + var lastEvent = 0; |
| + subscription.onData((v) { lastEvent = v; }); |
| + subscription.onError((v) { lastEvent = "$v"; }); |
| + subscription.onDone(() { lastEvent = -1; }); |
| + completer.setSourceStream(controller.stream); |
| + await flushMicrotasks(); |
| + controller.add(1); |
| + await flushMicrotasks(); |
| + expect(lastEvent, 1); |
| + controller.addError(2); |
| + await flushMicrotasks(); |
| + expect(lastEvent, "2"); |
| + subscription.onData((v) { lastEvent = -v; }); |
| + subscription.onError((v) { lastEvent = "${-v}"; }); |
| + controller.add(1); |
| + await flushMicrotasks(); |
| + expect(lastEvent, -1); |
| + controller.addError(2); |
| + await flushMicrotasks(); |
| + expect(lastEvent, "-2"); |
| + controller.close(); |
| + await flushMicrotasks(); |
| + expect(lastEvent, -1); |
| + }); |
| + |
| + test("pause w/ resume future accross setting stream", () async { |
| + var completer = new StreamCompleter(); |
| + var resume = new Completer(); |
| + var subscription = completer.stream.listen(unreachable("data")); |
| + var lastEvent = 0; |
| + subscription.pause(resume.future); |
| + await flushMicrotasks(); |
| + completer.setSourceStream(createStream()); |
| + await flushMicrotasks(); |
| + resume.complete(); |
| + var events = []; |
| + subscription.onData(events.add); |
| + await subscription.asFuture(); |
| + expect(events, [1, 2, 3, 4]); |
| + }); |
| + |
| + test("asFuture with error accross setting stream", () async { |
| + var completer = new StreamCompleter(); |
| + var controller = new StreamController(); |
| + var subscription = completer.stream.listen(unreachable("data"), |
| + cancelOnError: false); |
| + var done = subscription.asFuture(); |
| + expect(controller.hasListener, isFalse); |
| + completer.setSourceStream(controller.stream); |
| + await flushMicrotasks(); |
| + expect(controller.hasListener, isTrue); |
| + controller.addError(42); |
| + await done.then(unreachable("data"), onError: (error) { |
| + expect(error, 42); |
| + }); |
| + expect(controller.hasListener, isFalse); |
| + }); |
| +} |
| + |
| +/// A zero-millisecond timer should wait until after all microtasks. |
| +Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |
|
nweiz
2015/06/18 23:44:27
Consider moving this (and maybe [unreachable] and
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
|
| + |
| +Stream<int> createStream() async* { |
| + yield 1; |
| + await flushMicrotasks(); |
| + yield 2; |
| + await flushMicrotasks(); |
| + yield 3; |
| + await flushMicrotasks(); |
| + yield 4; |
| +} |
| + |
| +unreachable(String name) => ([a, b]) { fail("Unreachable: $name"); }; |
| + |
| +class UnusableStream extends Stream { |
| + listen(onData, {onError, onDone, cancelOnError}) { |
| + throw new UnimplementedError("Gotcha!"); |
| + } |
| +} |