| Index: test/stream_sink_completer_test.dart | 
| diff --git a/test/stream_sink_completer_test.dart b/test/stream_sink_completer_test.dart | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..3892d2679013702aa4fc136ef4982fa766b3e464 | 
| --- /dev/null | 
| +++ b/test/stream_sink_completer_test.dart | 
| @@ -0,0 +1,255 @@ | 
| +// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file | 
| +// for details. All rights reserved. Use of this source code is governed by a | 
| +// BSD-style license that can be found in the LICENSE file. | 
| + | 
| +import "dart:async"; | 
| + | 
| +import "package:async/async.dart"; | 
| +import "package:test/test.dart"; | 
| + | 
| +import "utils.dart"; | 
| + | 
| +main() { | 
| +  var completer; | 
| +  setUp(() { | 
| +    completer = new StreamSinkCompleter(); | 
| +  }); | 
| + | 
| +  group("when a stream is linked before events are added", () { | 
| +    test("data events are forwarded", () { | 
| +      var sink = new TestSink(); | 
| +      completer.setDestinationSink(sink); | 
| +      completer.sink..add(1)..add(2)..add(3)..add(4); | 
| + | 
| +      expect(sink.results[0].asValue.value, equals(1)); | 
| +      expect(sink.results[1].asValue.value, equals(2)); | 
| +      expect(sink.results[2].asValue.value, equals(3)); | 
| +      expect(sink.results[3].asValue.value, equals(4)); | 
| +    }); | 
| + | 
| +    test("error events are forwarded", () { | 
| +      var sink = new TestSink(); | 
| +      completer.setDestinationSink(sink); | 
| +      completer.sink..addError("oh no")..addError("that's bad"); | 
| + | 
| +      expect(sink.results[0].asError.error, equals("oh no")); | 
| +      expect(sink.results[1].asError.error, equals("that's bad")); | 
| +    }); | 
| + | 
| +    test("addStream is forwarded", () async { | 
| +      var sink = new TestSink(); | 
| +      completer.setDestinationSink(sink); | 
| + | 
| +      var controller = new StreamController(); | 
| +      completer.sink.addStream(controller.stream); | 
| + | 
| +      controller.add(1); | 
| +      controller.addError("oh no"); | 
| +      controller.add(2); | 
| +      controller.addError("that's bad"); | 
| +      await flushMicrotasks(); | 
| + | 
| +      expect(sink.results[0].asValue.value, equals(1)); | 
| +      expect(sink.results[1].asError.error, equals("oh no")); | 
| +      expect(sink.results[2].asValue.value, equals(2)); | 
| +      expect(sink.results[3].asError.error, equals("that's bad")); | 
| +      expect(sink.isClosed, isFalse); | 
| + | 
| +      controller.close(); | 
| +      await flushMicrotasks(); | 
| +      expect(sink.isClosed, isFalse); | 
| +    }); | 
| + | 
| +    test("close() is forwarded", () { | 
| +      var sink = new TestSink(); | 
| +      completer.setDestinationSink(sink); | 
| +      completer.sink.close(); | 
| +      expect(sink.isClosed, isTrue); | 
| +    }); | 
| + | 
| +    test("the future from the inner close() is returned", () async { | 
| +      var closeCompleter = new Completer(); | 
| +      var sink = new TestSink(onDone: () => closeCompleter.future); | 
| +      completer.setDestinationSink(sink); | 
| + | 
| +      var closeCompleted = false; | 
| +      completer.sink.close().then(expectAsync((_) { | 
| +        closeCompleted = true; | 
| +      })); | 
| + | 
| +      await flushMicrotasks(); | 
| +      expect(closeCompleted, isFalse); | 
| + | 
| +      closeCompleter.complete(); | 
| +      await flushMicrotasks(); | 
| +      expect(closeCompleted, isTrue); | 
| +    }); | 
| + | 
| +    test("errors are forwarded from the inner close()", () { | 
| +      var sink = new TestSink(onDone: () => throw "oh no"); | 
| +      completer.setDestinationSink(sink); | 
| +      expect(completer.sink.done, throwsA("oh no")); | 
| +      expect(completer.sink.close(), throwsA("oh no")); | 
| +    }); | 
| + | 
| +    test("errors aren't top-leveled if only close() is listened to", () async { | 
| +      var sink = new TestSink(onDone: () => throw "oh no"); | 
| +      completer.setDestinationSink(sink); | 
| +      expect(completer.sink.close(), throwsA("oh no")); | 
| + | 
| +      // Give the event loop a chance to top-level errors if it's going to. | 
| +      await flushMicrotasks(); | 
| +    }); | 
| + | 
| +    test("errors aren't top-leveled if only done is listened to", () async { | 
| +      var sink = new TestSink(onDone: () => throw "oh no"); | 
| +      completer.setDestinationSink(sink); | 
| +      completer.sink.close(); | 
| +      expect(completer.sink.done, throwsA("oh no")); | 
| + | 
| +      // Give the event loop a chance to top-level errors if it's going to. | 
| +      await flushMicrotasks(); | 
| +    }); | 
| +  }); | 
| + | 
| +  group("when a stream is linked after events are added", () { | 
| +    test("data events are forwarded", () async { | 
| +      completer.sink..add(1)..add(2)..add(3)..add(4); | 
| +      await flushMicrotasks(); | 
| + | 
| +      var sink = new TestSink(); | 
| +      completer.setDestinationSink(sink); | 
| +      await flushMicrotasks(); | 
| + | 
| +      expect(sink.results[0].asValue.value, equals(1)); | 
| +      expect(sink.results[1].asValue.value, equals(2)); | 
| +      expect(sink.results[2].asValue.value, equals(3)); | 
| +      expect(sink.results[3].asValue.value, equals(4)); | 
| +    }); | 
| + | 
| +    test("error events are forwarded", () async { | 
| +      completer.sink..addError("oh no")..addError("that's bad"); | 
| +      await flushMicrotasks(); | 
| + | 
| +      var sink = new TestSink(); | 
| +      completer.setDestinationSink(sink); | 
| +      await flushMicrotasks(); | 
| + | 
| +      expect(sink.results[0].asError.error, equals("oh no")); | 
| +      expect(sink.results[1].asError.error, equals("that's bad")); | 
| +    }); | 
| + | 
| +    test("addStream is forwarded", () async { | 
| +      var controller = new StreamController(); | 
| +      completer.sink.addStream(controller.stream); | 
| + | 
| +      controller.add(1); | 
| +      controller.addError("oh no"); | 
| +      controller.add(2); | 
| +      controller.addError("that's bad"); | 
| +      controller.close(); | 
| +      await flushMicrotasks(); | 
| + | 
| +      var sink = new TestSink(); | 
| +      completer.setDestinationSink(sink); | 
| +      await flushMicrotasks(); | 
| + | 
| +      expect(sink.results[0].asValue.value, equals(1)); | 
| +      expect(sink.results[1].asError.error, equals("oh no")); | 
| +      expect(sink.results[2].asValue.value, equals(2)); | 
| +      expect(sink.results[3].asError.error, equals("that's bad")); | 
| +      expect(sink.isClosed, isFalse); | 
| +    }); | 
| + | 
| +    test("close() is forwarded", () async { | 
| +      completer.sink.close(); | 
| +      await flushMicrotasks(); | 
| + | 
| +      var sink = new TestSink(); | 
| +      completer.setDestinationSink(sink); | 
| +      await flushMicrotasks(); | 
| + | 
| +      expect(sink.isClosed, isTrue); | 
| +    }); | 
| + | 
| +    test("the future from the inner close() is returned", () async { | 
| +      var closeCompleted = false; | 
| +      completer.sink.close().then(expectAsync((_) { | 
| +        closeCompleted = true; | 
| +      })); | 
| +      await flushMicrotasks(); | 
| + | 
| +      var closeCompleter = new Completer(); | 
| +      var sink = new TestSink(onDone: () => closeCompleter.future); | 
| +      completer.setDestinationSink(sink); | 
| +      await flushMicrotasks(); | 
| +      expect(closeCompleted, isFalse); | 
| + | 
| +      closeCompleter.complete(); | 
| +      await flushMicrotasks(); | 
| +      expect(closeCompleted, isTrue); | 
| +    }); | 
| + | 
| +    test("errors are forwarded from the inner close()", () async { | 
| +      expect(completer.sink.done, throwsA("oh no")); | 
| +      expect(completer.sink.close(), throwsA("oh no")); | 
| +      await flushMicrotasks(); | 
| + | 
| +      var sink = new TestSink(onDone: () => throw "oh no"); | 
| +      completer.setDestinationSink(sink); | 
| +    }); | 
| + | 
| +    test("errors aren't top-leveled if only close() is listened to", () async { | 
| +      expect(completer.sink.close(), throwsA("oh no")); | 
| +      await flushMicrotasks(); | 
| + | 
| +      var sink = new TestSink(onDone: () => throw "oh no"); | 
| +      completer.setDestinationSink(sink); | 
| + | 
| +      // Give the event loop a chance to top-level errors if it's going to. | 
| +      await flushMicrotasks(); | 
| +    }); | 
| + | 
| +    test("errors aren't top-leveled if only done is listened to", () async { | 
| +      completer.sink.close(); | 
| +      expect(completer.sink.done, throwsA("oh no")); | 
| +      await flushMicrotasks(); | 
| + | 
| +      var sink = new TestSink(onDone: () => throw "oh no"); | 
| +      completer.setDestinationSink(sink); | 
| + | 
| +      // Give the event loop a chance to top-level errors if it's going to. | 
| +      await flushMicrotasks(); | 
| +    }); | 
| +  }); | 
| + | 
| +  test("the sink is closed, the destination is set, then done is read", | 
| +      () async { | 
| +    expect(completer.sink.close(), completes); | 
| +    await flushMicrotasks(); | 
| + | 
| +    completer.setDestinationSink(new TestSink()); | 
| +    await flushMicrotasks(); | 
| + | 
| +    expect(completer.sink.done, completes); | 
| +  }); | 
| + | 
| +  test("done is read, the destination is set, then the sink is closed", | 
| +      () async { | 
| +    expect(completer.sink.done, completes); | 
| +    await flushMicrotasks(); | 
| + | 
| +    completer.setDestinationSink(new TestSink()); | 
| +    await flushMicrotasks(); | 
| + | 
| +    expect(completer.sink.close(), completes); | 
| +  }); | 
| + | 
| +  test("doesn't allow the destination sink to be set multiple times", () { | 
| +    completer.setDestinationSink(new TestSink()); | 
| +    expect(() => completer.setDestinationSink(new TestSink()), | 
| +        throwsStateError); | 
| +    expect(() => completer.setDestinationSink(new TestSink()), | 
| +        throwsStateError); | 
| +  }); | 
| +} | 
|  |