Index: packages/async/test/stream_completer_test.dart |
diff --git a/packages/async/test/stream_completer_test.dart b/packages/async/test/stream_completer_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..cd3ceb9fa49a78fce51ac6f299db7a31a8c02dd9 |
--- /dev/null |
+++ b/packages/async/test/stream_completer_test.dart |
@@ -0,0 +1,350 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import "dart:async"; |
+ |
+import "package:async/async.dart" show StreamCompleter; |
+import "package:test/test.dart"; |
+ |
+import "utils.dart"; |
+ |
+main() { |
+ test("a stream is linked before listening", () async { |
+ var completer = new StreamCompleter(); |
+ completer.setSourceStream(createStream()); |
+ expect(completer.stream.toList(), completion([1, 2, 3, 4])); |
+ }); |
+ |
+ test("listened to before a stream is linked", () async { |
+ var completer = new StreamCompleter(); |
+ var done = completer.stream.toList(); |
+ await flushMicrotasks(); |
+ completer.setSourceStream(createStream()); |
+ expect(done, completion([1, 2, 3, 4])); |
+ }); |
+ |
+ test("cancel before linking a stream doesn't listen on stream", () async { |
+ var completer = new StreamCompleter(); |
+ var subscription = completer.stream.listen(null); |
+ subscription.pause(); // Should be ignored. |
+ subscription.cancel(); |
+ completer.setSourceStream(new UnusableStream()); // Doesn't throw. |
+ }); |
+ |
+ test("listen and pause before linking stream", () async { |
+ var controller = new StreamCompleter(); |
+ var events = []; |
+ var subscription = controller.stream.listen(events.add); |
+ var done = subscription.asFuture(); |
+ subscription.pause(); |
+ var sourceController = new StreamController(); |
+ sourceController..add(1)..add(2)..add(3)..add(4); |
+ controller.setSourceStream(sourceController.stream); |
+ await flushMicrotasks(); |
+ expect(sourceController.hasListener, isTrue); |
+ expect(sourceController.isPaused, isTrue); |
+ expect(events, []); |
+ subscription.resume(); |
+ await flushMicrotasks(); |
+ expect(sourceController.hasListener, isTrue); |
+ expect(sourceController.isPaused, isFalse); |
+ expect(events, [1, 2, 3, 4]); |
+ sourceController.close(); |
+ await done; |
+ expect(events, [1, 2, 3, 4]); |
+ }); |
+ |
+ test("pause more than once", () async { |
+ var completer = new StreamCompleter(); |
+ var events = []; |
+ var subscription = completer.stream.listen(events.add); |
+ var done = subscription.asFuture(); |
+ subscription.pause(); |
+ subscription.pause(); |
+ subscription.pause(); |
+ completer.setSourceStream(createStream()); |
+ for (int i = 0; i < 3; i++) { |
+ await flushMicrotasks(); |
+ expect(events, []); |
+ subscription.resume(); |
+ } |
+ await done; |
+ expect(events, [1, 2, 3, 4]); |
+ }); |
+ |
+ test("cancel new stream before source is done", () async { |
+ var completer = new StreamCompleter(); |
+ var lastEvent = -1; |
+ var controller = new StreamController(); |
+ var subscription; |
+ subscription = completer.stream.listen( |
+ (value) { |
+ expect(value, lessThan(3)); |
+ lastEvent = value; |
+ if (value == 2) { |
+ subscription.cancel(); |
+ } |
+ }, |
+ onError: unreachable("error"), |
+ onDone: unreachable("done"), |
+ cancelOnError: true); |
+ completer.setSourceStream(controller.stream); |
+ expect(controller.hasListener, isTrue); |
+ |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ controller.add(1); |
+ |
+ await flushMicrotasks(); |
+ expect(lastEvent, 1); |
+ expect(controller.hasListener, isTrue); |
+ controller.add(2); |
+ |
+ await flushMicrotasks(); |
+ expect(lastEvent, 2); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+ |
+ test("complete with setEmpty before listening", () async { |
+ var completer = new StreamCompleter(); |
+ completer.setEmpty(); |
+ var done = new Completer(); |
+ completer.stream.listen( |
+ unreachable("data"), |
+ onError: unreachable("error"), |
+ onDone: done.complete); |
+ await done.future; |
+ }); |
+ |
+ test("complete with setEmpty after listening", () async { |
+ var completer = new StreamCompleter(); |
+ var done = new Completer(); |
+ completer.stream.listen( |
+ unreachable("data"), |
+ onError: unreachable("error"), |
+ onDone: done.complete); |
+ completer.setEmpty(); |
+ await done.future; |
+ }); |
+ |
+ test("source stream isn't listened to until completer stream is", () async { |
+ var completer = new StreamCompleter(); |
+ var controller; |
+ controller = new StreamController(onListen: () { |
+ scheduleMicrotask(controller.close); |
+ }); |
+ |
+ completer.setSourceStream(controller.stream); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ var subscription = completer.stream.listen(null); |
+ expect(controller.hasListener, isTrue); |
+ await subscription.asFuture(); |
+ }); |
+ |
+ test("cancelOnError true when listening before linking stream", () async { |
+ var completer = new StreamCompleter(); |
+ var lastEvent = -1; |
+ var controller = new StreamController(); |
+ completer.stream.listen( |
+ (value) { |
+ expect(value, lessThan(3)); |
+ lastEvent = value; |
+ }, |
+ onError: (value) { |
+ expect(value, "3"); |
+ lastEvent = value; |
+ }, |
+ onDone: unreachable("done"), |
+ cancelOnError: true); |
+ completer.setSourceStream(controller.stream); |
+ expect(controller.hasListener, isTrue); |
+ |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ controller.add(1); |
+ |
+ await flushMicrotasks(); |
+ expect(lastEvent, 1); |
+ expect(controller.hasListener, isTrue); |
+ controller.add(2); |
+ |
+ await flushMicrotasks(); |
+ expect(lastEvent, 2); |
+ expect(controller.hasListener, isTrue); |
+ controller.addError("3"); |
+ |
+ await flushMicrotasks(); |
+ expect(lastEvent, "3"); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+ |
+ test("cancelOnError true when listening after linking stream", () async { |
+ var completer = new StreamCompleter(); |
+ var lastEvent = -1; |
+ var controller = new StreamController(); |
+ completer.setSourceStream(controller.stream); |
+ controller.add(1); |
+ expect(controller.hasListener, isFalse); |
+ |
+ completer.stream.listen( |
+ (value) { |
+ expect(value, lessThan(3)); |
+ lastEvent = value; |
+ }, |
+ onError: (value) { |
+ expect(value, "3"); |
+ lastEvent = value; |
+ }, |
+ onDone: unreachable("done"), |
+ cancelOnError: true); |
+ |
+ expect(controller.hasListener, isTrue); |
+ |
+ await flushMicrotasks(); |
+ expect(lastEvent, 1); |
+ expect(controller.hasListener, isTrue); |
+ controller.add(2); |
+ |
+ await flushMicrotasks(); |
+ expect(lastEvent, 2); |
+ expect(controller.hasListener, isTrue); |
+ controller.addError("3"); |
+ |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+ |
+ test("linking a stream after setSourceStream before listen", () async { |
+ var completer = new StreamCompleter(); |
+ completer.setSourceStream(createStream()); |
+ expect(() => completer.setSourceStream(createStream()), throwsStateError); |
+ expect(() => completer.setEmpty(), throwsStateError); |
+ await completer.stream.toList(); |
+ // Still fails after source is done |
+ expect(() => completer.setSourceStream(createStream()), throwsStateError); |
+ expect(() => completer.setEmpty(), throwsStateError); |
+ }); |
+ |
+ test("linking a stream after setSourceStream after listen", () async { |
+ var completer = new StreamCompleter(); |
+ var list = completer.stream.toList(); |
+ completer.setSourceStream(createStream()); |
+ expect(() => completer.setSourceStream(createStream()), throwsStateError); |
+ expect(() => completer.setEmpty(), throwsStateError); |
+ await list; |
+ // Still fails after source is done. |
+ expect(() => completer.setSourceStream(createStream()), throwsStateError); |
+ expect(() => completer.setEmpty(), throwsStateError); |
+ }); |
+ |
+ test("linking a stream after setEmpty before listen", () async { |
+ var completer = new StreamCompleter(); |
+ completer.setEmpty(); |
+ expect(() => completer.setSourceStream(createStream()), throwsStateError); |
+ expect(() => completer.setEmpty(), throwsStateError); |
+ await completer.stream.toList(); |
+ // Still fails after source is done |
+ expect(() => completer.setSourceStream(createStream()), throwsStateError); |
+ expect(() => completer.setEmpty(), throwsStateError); |
+ }); |
+ |
+ test("linking a stream after setEmpty() after listen", () async { |
+ var completer = new StreamCompleter(); |
+ var list = completer.stream.toList(); |
+ completer.setEmpty(); |
+ expect(() => completer.setSourceStream(createStream()), throwsStateError); |
+ expect(() => completer.setEmpty(), throwsStateError); |
+ await list; |
+ // Still fails after source is done. |
+ expect(() => completer.setSourceStream(createStream()), throwsStateError); |
+ expect(() => completer.setEmpty(), throwsStateError); |
+ }); |
+ |
+ test("listening more than once after setting stream", () async { |
+ var completer = new StreamCompleter(); |
+ completer.setSourceStream(createStream()); |
+ var list = completer.stream.toList(); |
+ expect(() => completer.stream.toList(), throwsStateError); |
+ await list; |
+ expect(() => completer.stream.toList(), throwsStateError); |
+ }); |
+ |
+ test("listening more than once before setting stream", () async { |
+ var completer = new StreamCompleter(); |
+ completer.stream.toList(); |
+ expect(() => completer.stream.toList(), throwsStateError); |
+ }); |
+ |
+ test("setting onData etc. before and after setting stream", () async { |
+ var completer = new StreamCompleter(); |
+ var controller = new StreamController(); |
+ var subscription = completer.stream.listen(null); |
+ var lastEvent = 0; |
+ subscription.onData((value) => lastEvent = value); |
+ subscription.onError((value) => lastEvent = "$value"); |
+ subscription.onDone(() => lastEvent = -1); |
+ completer.setSourceStream(controller.stream); |
+ await flushMicrotasks(); |
+ controller.add(1); |
+ await flushMicrotasks(); |
+ expect(lastEvent, 1); |
+ controller.addError(2); |
+ await flushMicrotasks(); |
+ expect(lastEvent, "2"); |
+ subscription.onData((value) => lastEvent = -value); |
+ subscription.onError((value) => lastEvent = "${-value}"); |
+ controller.add(1); |
+ await flushMicrotasks(); |
+ expect(lastEvent, -1); |
+ controller.addError(2); |
+ await flushMicrotasks(); |
+ expect(lastEvent, "-2"); |
+ controller.close(); |
+ await flushMicrotasks(); |
+ expect(lastEvent, -1); |
+ }); |
+ |
+ test("pause w/ resume future accross setting stream", () async { |
+ var completer = new StreamCompleter(); |
+ var resume = new Completer(); |
+ var subscription = completer.stream.listen(unreachable("data")); |
+ subscription.pause(resume.future); |
+ await flushMicrotasks(); |
+ completer.setSourceStream(createStream()); |
+ await flushMicrotasks(); |
+ resume.complete(); |
+ var events = []; |
+ subscription.onData(events.add); |
+ await subscription.asFuture(); |
+ expect(events, [1, 2, 3, 4]); |
+ }); |
+ |
+ test("asFuture with error accross setting stream", () async { |
+ var completer = new StreamCompleter(); |
+ var controller = new StreamController(); |
+ var subscription = completer.stream.listen(unreachable("data"), |
+ cancelOnError: false); |
+ var done = subscription.asFuture(); |
+ expect(controller.hasListener, isFalse); |
+ completer.setSourceStream(controller.stream); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ controller.addError(42); |
+ await done.then(unreachable("data"), onError: (error) { |
+ expect(error, 42); |
+ }); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+} |
+ |
+Stream<int> createStream() async* { |
+ yield 1; |
+ await flushMicrotasks(); |
+ yield 2; |
+ await flushMicrotasks(); |
+ yield 3; |
+ await flushMicrotasks(); |
+ yield 4; |
+} |