Index: test/stream_sink_completer_test.dart |
diff --git a/test/stream_sink_completer_test.dart b/test/stream_sink_completer_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..3892d2679013702aa4fc136ef4982fa766b3e464 |
--- /dev/null |
+++ b/test/stream_sink_completer_test.dart |
@@ -0,0 +1,255 @@ |
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import "dart:async"; |
+ |
+import "package:async/async.dart"; |
+import "package:test/test.dart"; |
+ |
+import "utils.dart"; |
+ |
+main() { |
+ var completer; |
+ setUp(() { |
+ completer = new StreamSinkCompleter(); |
+ }); |
+ |
+ group("when a stream is linked before events are added", () { |
+ test("data events are forwarded", () { |
+ var sink = new TestSink(); |
+ completer.setDestinationSink(sink); |
+ completer.sink..add(1)..add(2)..add(3)..add(4); |
+ |
+ expect(sink.results[0].asValue.value, equals(1)); |
+ expect(sink.results[1].asValue.value, equals(2)); |
+ expect(sink.results[2].asValue.value, equals(3)); |
+ expect(sink.results[3].asValue.value, equals(4)); |
+ }); |
+ |
+ test("error events are forwarded", () { |
+ var sink = new TestSink(); |
+ completer.setDestinationSink(sink); |
+ completer.sink..addError("oh no")..addError("that's bad"); |
+ |
+ expect(sink.results[0].asError.error, equals("oh no")); |
+ expect(sink.results[1].asError.error, equals("that's bad")); |
+ }); |
+ |
+ test("addStream is forwarded", () async { |
+ var sink = new TestSink(); |
+ completer.setDestinationSink(sink); |
+ |
+ var controller = new StreamController(); |
+ completer.sink.addStream(controller.stream); |
+ |
+ controller.add(1); |
+ controller.addError("oh no"); |
+ controller.add(2); |
+ controller.addError("that's bad"); |
+ await flushMicrotasks(); |
+ |
+ expect(sink.results[0].asValue.value, equals(1)); |
+ expect(sink.results[1].asError.error, equals("oh no")); |
+ expect(sink.results[2].asValue.value, equals(2)); |
+ expect(sink.results[3].asError.error, equals("that's bad")); |
+ expect(sink.isClosed, isFalse); |
+ |
+ controller.close(); |
+ await flushMicrotasks(); |
+ expect(sink.isClosed, isFalse); |
+ }); |
+ |
+ test("close() is forwarded", () { |
+ var sink = new TestSink(); |
+ completer.setDestinationSink(sink); |
+ completer.sink.close(); |
+ expect(sink.isClosed, isTrue); |
+ }); |
+ |
+ test("the future from the inner close() is returned", () async { |
+ var closeCompleter = new Completer(); |
+ var sink = new TestSink(onDone: () => closeCompleter.future); |
+ completer.setDestinationSink(sink); |
+ |
+ var closeCompleted = false; |
+ completer.sink.close().then(expectAsync((_) { |
+ closeCompleted = true; |
+ })); |
+ |
+ await flushMicrotasks(); |
+ expect(closeCompleted, isFalse); |
+ |
+ closeCompleter.complete(); |
+ await flushMicrotasks(); |
+ expect(closeCompleted, isTrue); |
+ }); |
+ |
+ test("errors are forwarded from the inner close()", () { |
+ var sink = new TestSink(onDone: () => throw "oh no"); |
+ completer.setDestinationSink(sink); |
+ expect(completer.sink.done, throwsA("oh no")); |
+ expect(completer.sink.close(), throwsA("oh no")); |
+ }); |
+ |
+ test("errors aren't top-leveled if only close() is listened to", () async { |
+ var sink = new TestSink(onDone: () => throw "oh no"); |
+ completer.setDestinationSink(sink); |
+ expect(completer.sink.close(), throwsA("oh no")); |
+ |
+ // Give the event loop a chance to top-level errors if it's going to. |
+ await flushMicrotasks(); |
+ }); |
+ |
+ test("errors aren't top-leveled if only done is listened to", () async { |
+ var sink = new TestSink(onDone: () => throw "oh no"); |
+ completer.setDestinationSink(sink); |
+ completer.sink.close(); |
+ expect(completer.sink.done, throwsA("oh no")); |
+ |
+ // Give the event loop a chance to top-level errors if it's going to. |
+ await flushMicrotasks(); |
+ }); |
+ }); |
+ |
+ group("when a stream is linked after events are added", () { |
+ test("data events are forwarded", () async { |
+ completer.sink..add(1)..add(2)..add(3)..add(4); |
+ await flushMicrotasks(); |
+ |
+ var sink = new TestSink(); |
+ completer.setDestinationSink(sink); |
+ await flushMicrotasks(); |
+ |
+ expect(sink.results[0].asValue.value, equals(1)); |
+ expect(sink.results[1].asValue.value, equals(2)); |
+ expect(sink.results[2].asValue.value, equals(3)); |
+ expect(sink.results[3].asValue.value, equals(4)); |
+ }); |
+ |
+ test("error events are forwarded", () async { |
+ completer.sink..addError("oh no")..addError("that's bad"); |
+ await flushMicrotasks(); |
+ |
+ var sink = new TestSink(); |
+ completer.setDestinationSink(sink); |
+ await flushMicrotasks(); |
+ |
+ expect(sink.results[0].asError.error, equals("oh no")); |
+ expect(sink.results[1].asError.error, equals("that's bad")); |
+ }); |
+ |
+ test("addStream is forwarded", () async { |
+ var controller = new StreamController(); |
+ completer.sink.addStream(controller.stream); |
+ |
+ controller.add(1); |
+ controller.addError("oh no"); |
+ controller.add(2); |
+ controller.addError("that's bad"); |
+ controller.close(); |
+ await flushMicrotasks(); |
+ |
+ var sink = new TestSink(); |
+ completer.setDestinationSink(sink); |
+ await flushMicrotasks(); |
+ |
+ expect(sink.results[0].asValue.value, equals(1)); |
+ expect(sink.results[1].asError.error, equals("oh no")); |
+ expect(sink.results[2].asValue.value, equals(2)); |
+ expect(sink.results[3].asError.error, equals("that's bad")); |
+ expect(sink.isClosed, isFalse); |
+ }); |
+ |
+ test("close() is forwarded", () async { |
+ completer.sink.close(); |
+ await flushMicrotasks(); |
+ |
+ var sink = new TestSink(); |
+ completer.setDestinationSink(sink); |
+ await flushMicrotasks(); |
+ |
+ expect(sink.isClosed, isTrue); |
+ }); |
+ |
+ test("the future from the inner close() is returned", () async { |
+ var closeCompleted = false; |
+ completer.sink.close().then(expectAsync((_) { |
+ closeCompleted = true; |
+ })); |
+ await flushMicrotasks(); |
+ |
+ var closeCompleter = new Completer(); |
+ var sink = new TestSink(onDone: () => closeCompleter.future); |
+ completer.setDestinationSink(sink); |
+ await flushMicrotasks(); |
+ expect(closeCompleted, isFalse); |
+ |
+ closeCompleter.complete(); |
+ await flushMicrotasks(); |
+ expect(closeCompleted, isTrue); |
+ }); |
+ |
+ test("errors are forwarded from the inner close()", () async { |
+ expect(completer.sink.done, throwsA("oh no")); |
+ expect(completer.sink.close(), throwsA("oh no")); |
+ await flushMicrotasks(); |
+ |
+ var sink = new TestSink(onDone: () => throw "oh no"); |
+ completer.setDestinationSink(sink); |
+ }); |
+ |
+ test("errors aren't top-leveled if only close() is listened to", () async { |
+ expect(completer.sink.close(), throwsA("oh no")); |
+ await flushMicrotasks(); |
+ |
+ var sink = new TestSink(onDone: () => throw "oh no"); |
+ completer.setDestinationSink(sink); |
+ |
+ // Give the event loop a chance to top-level errors if it's going to. |
+ await flushMicrotasks(); |
+ }); |
+ |
+ test("errors aren't top-leveled if only done is listened to", () async { |
+ completer.sink.close(); |
+ expect(completer.sink.done, throwsA("oh no")); |
+ await flushMicrotasks(); |
+ |
+ var sink = new TestSink(onDone: () => throw "oh no"); |
+ completer.setDestinationSink(sink); |
+ |
+ // Give the event loop a chance to top-level errors if it's going to. |
+ await flushMicrotasks(); |
+ }); |
+ }); |
+ |
+ test("the sink is closed, the destination is set, then done is read", |
+ () async { |
+ expect(completer.sink.close(), completes); |
+ await flushMicrotasks(); |
+ |
+ completer.setDestinationSink(new TestSink()); |
+ await flushMicrotasks(); |
+ |
+ expect(completer.sink.done, completes); |
+ }); |
+ |
+ test("done is read, the destination is set, then the sink is closed", |
+ () async { |
+ expect(completer.sink.done, completes); |
+ await flushMicrotasks(); |
+ |
+ completer.setDestinationSink(new TestSink()); |
+ await flushMicrotasks(); |
+ |
+ expect(completer.sink.close(), completes); |
+ }); |
+ |
+ test("doesn't allow the destination sink to be set multiple times", () { |
+ completer.setDestinationSink(new TestSink()); |
+ expect(() => completer.setDestinationSink(new TestSink()), |
+ throwsStateError); |
+ expect(() => completer.setDestinationSink(new TestSink()), |
+ throwsStateError); |
+ }); |
+} |