| Index: sdk/lib/io/io_sink.dart
|
| diff --git a/sdk/lib/io/io_sink.dart b/sdk/lib/io/io_sink.dart
|
| index df3a7daac36faf6ca070c8c62cc851098e742dfa..802486551935911386fd5440008f7801ad0ae267 100644
|
| --- a/sdk/lib/io/io_sink.dart
|
| +++ b/sdk/lib/io/io_sink.dart
|
| @@ -7,15 +7,14 @@ part of dart.io;
|
| /**
|
| * Helper class to wrap a [StreamConsumer<List<int>>] and provide
|
| * utility functions for writing to the StreamConsumer directly. The
|
| - * [IOSink] buffers the input given by [write], [writeAll], [writeln],
|
| - * [writeCharCode] and [add] and will delay a [consume] or
|
| - * [writeStream] until the buffer is flushed.
|
| + * [IOSink] buffers the input given by all [StringSink] methods and will delay
|
| + * a [addStream] until the buffer is flushed.
|
| *
|
| - * When the [IOSink] is bound to a stream (through either [consume]
|
| - * or [writeStream]) any call to the [IOSink] will throw a
|
| - * [StateError].
|
| + * When the [IOSink] is bound to a stream (through [addStream]) any call
|
| + * to the [IOSink] will throw a [StateError]. When the [addStream] compeltes,
|
| + * the [IOSink] will again be open for all calls.
|
| */
|
| -abstract class IOSink<T>
|
| +abstract class IOSink
|
| implements StreamConsumer<List<int>>, StringSink, EventSink<List<int>> {
|
| factory IOSink(StreamConsumer<List<int>> target,
|
| {Encoding encoding: Encoding.UTF_8})
|
| @@ -38,50 +37,37 @@ abstract class IOSink<T>
|
| void addError(AsyncError error);
|
|
|
| /**
|
| - * Provide functionality for piping to the [IOSink].
|
| - */
|
| - Future<T> consume(Stream<List<int>> stream);
|
| -
|
| - /**
|
| * Adds all elements of the given [stream] to `this`.
|
| */
|
| - Future<T> addStream(Stream<List<int>> stream);
|
| -
|
| - /**
|
| - * Like [consume], but will not close the target when done.
|
| - *
|
| - * *Deprecated*: use [addStream] instead.
|
| - */
|
| - Future<T> writeStream(Stream<List<int>> stream);
|
| + Future addStream(Stream<List<int>> stream);
|
|
|
| /**
|
| * Close the target.
|
| */
|
| - // TODO(floitsch): Currently the future cannot be typed because it has
|
| - // hardcoded type Future<HttpClientResponse> in subclass HttpClientRequest.
|
| Future close();
|
|
|
| /**
|
| - * Get future that will complete when all data has been written to
|
| - * the IOSink and it has been closed.
|
| + * Get a future that will complete when all synchronous have completed, or an
|
| + * error happened. This future is identical to the future returned from close.
|
| */
|
| - Future<T> get done;
|
| + Future get done;
|
| }
|
|
|
|
|
| -class _IOSinkImpl<T> implements IOSink<T> {
|
| +class _IOSinkImpl implements IOSink {
|
| final StreamConsumer<List<int>> _target;
|
| -
|
| - Completer _writeStreamCompleter;
|
| + Completer _doneCompleter = new Completer();
|
| + Future _doneFuture;
|
| StreamController<List<int>> _controllerInstance;
|
| - Future<T> _pipeFuture;
|
| - StreamSubscription<List<int>> _bindSubscription;
|
| - bool _paused = true;
|
| + Completer _controllerCompleter;
|
| + Encoding _encoding;
|
| + bool _isClosed = false;
|
| + bool _isBound = false;
|
| bool _encodingMutable = true;
|
|
|
| - _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding);
|
| -
|
| - Encoding _encoding;
|
| + _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding) {
|
| + _doneFuture = _doneCompleter.future;
|
| + }
|
|
|
| Encoding get encoding => _encoding;
|
|
|
| @@ -135,56 +121,62 @@ class _IOSinkImpl<T> implements IOSink<T> {
|
| }
|
|
|
| void add(List<int> data) {
|
| - if (_isBound) {
|
| - throw new StateError("IOSink is already bound to a stream");
|
| - }
|
| _controller.add(data);
|
| }
|
|
|
| void addError(AsyncError error) {
|
| - if (_isBound) {
|
| - throw new StateError("IOSink is already bound to a stream");
|
| - }
|
| _controller.addError(error);
|
| }
|
|
|
| - Future<T> consume(Stream<List<int>> stream) {
|
| + Future addStream(Stream<List<int>> stream) {
|
| if (_isBound) {
|
| throw new StateError("IOSink is already bound to a stream");
|
| }
|
| - return _fillFromStream(stream);
|
| + _isBound = true;
|
| + // Wait for any sync operations to complete.
|
| + return _closeController().then((_) {
|
| + return _target.addStream(stream)
|
| + .whenComplete(() {
|
| + _isBound = false;
|
| + });
|
| + });
|
| }
|
|
|
| - Future<T> writeStream(Stream<List<int>> stream) {
|
| - return addStream(stream);
|
| - }
|
| -
|
| - Future<T> addStream(Stream<List<int>> stream) {
|
| - if (_isBound) {
|
| - throw new StateError("IOSink is already bound to a stream");
|
| - }
|
| - return _fillFromStream(stream, unbind: true);
|
| + Future _closeController() {
|
| + if (_controllerInstance == null) return new Future.immediate(null);
|
| + var future = _controllerCompleter.future;
|
| + _controllerInstance.close();
|
| + return future;
|
| }
|
|
|
| Future close() {
|
| if (_isBound) {
|
| - throw new StateError("IOSink is already bound to a stream");
|
| + throw new StateError("IOSink is bound to a stream");
|
| + }
|
| + if (!_isClosed) {
|
| + _isClosed = true;
|
| + if (_controllerInstance != null) {
|
| + _controllerInstance.close();
|
| + } else {
|
| + _closeTarget();
|
| + }
|
| }
|
| - _controller.close();
|
| - return _pipeFuture;
|
| + return done;
|
| }
|
|
|
| - Future<T> get done {
|
| - _controller;
|
| - return _pipeFuture;
|
| + void _closeTarget() {
|
| + _target.close()
|
| + .then((_) => _completeDone(),
|
| + onError: (error) => _completeDone(error));
|
| }
|
|
|
| - void _completeWriteStreamCompleter([error]) {
|
| - if (_writeStreamCompleter == null) return;
|
| - var tmp = _writeStreamCompleter;
|
| - _writeStreamCompleter = null;
|
| + Future get done => _doneFuture;
|
| +
|
| + void _completeDone([error]) {
|
| + if (_doneCompleter == null) return;
|
| + var tmp = _doneCompleter;
|
| + _doneCompleter = null;
|
| if (error == null) {
|
| - _bindSubscription = null;
|
| tmp.complete();
|
| } else {
|
| tmp.completeError(error);
|
| @@ -192,82 +184,43 @@ class _IOSinkImpl<T> implements IOSink<T> {
|
| }
|
|
|
| StreamController<List<int>> get _controller {
|
| - if (_controllerInstance == null) {
|
| - _controllerInstance = new StreamController<List<int>>(
|
| - onPauseStateChange: _onPauseStateChange,
|
| - onSubscriptionStateChange: _onSubscriptionStateChange);
|
| - var future = _controller.stream.pipe(_target);
|
| - future.then((_) => _completeWriteStreamCompleter(),
|
| - onError: (error) => _completeWriteStreamCompleter(error));
|
| - _pipeFuture = future.then((value) => value);
|
| - }
|
| - return _controllerInstance;
|
| - }
|
| -
|
| - bool get _isBound => _bindSubscription != null;
|
| -
|
| - void _onPauseStateChange() {
|
| - _paused = _controller.isPaused;
|
| - if (_controller.isPaused) {
|
| - _pause();
|
| - } else {
|
| - _resume();
|
| - }
|
| - }
|
| -
|
| - void _pause() {
|
| - if (_bindSubscription != null) {
|
| - try {
|
| - // The subscription can be canceled at this point.
|
| - _bindSubscription.pause();
|
| - } catch (e) {
|
| - }
|
| - }
|
| - }
|
| -
|
| - void _resume() {
|
| - if (_bindSubscription != null) {
|
| - try {
|
| - // The subscription can be canceled at this point.
|
| - _bindSubscription.resume();
|
| - } catch (e) {
|
| - }
|
| - }
|
| - }
|
| -
|
| - void _onSubscriptionStateChange() {
|
| - if (_controller.hasListener) {
|
| - _paused = false;
|
| - _resume();
|
| - } else {
|
| - if (_bindSubscription != null) {
|
| - _bindSubscription.cancel();
|
| - _bindSubscription = null;
|
| - }
|
| + if (_isBound) {
|
| + throw new StateError("IOSink is bound to a stream");
|
| }
|
| - }
|
| -
|
| - Future<T> _fillFromStream(Stream<List<int>> stream, {unbind: false}) {
|
| - _controller;
|
| - assert(_writeStreamCompleter == null);
|
| - if (unbind) {
|
| - _writeStreamCompleter = new Completer<T>();
|
| + if (_isClosed) {
|
| + throw new StateError("IOSink is closed");
|
| }
|
| - _bindSubscription = stream.listen(
|
| - _controller.add,
|
| - onDone: () {
|
| - if (unbind) {
|
| - _completeWriteStreamCompleter();
|
| - } else {
|
| - _controller.close();
|
| - }
|
| - },
|
| - onError: _controller.addError);
|
| - if (_paused) _pause();
|
| - if (unbind) {
|
| - return _writeStreamCompleter.future;
|
| - } else {
|
| - return _pipeFuture;
|
| + if (_controllerInstance == null) {
|
| + _controllerInstance = new StreamController<List<int>>();
|
| + _controllerCompleter = new Completer();
|
| + _target.addStream(_controller.stream)
|
| + .then(
|
| + (_) {
|
| + if (_isBound) {
|
| + // A new stream takes over - forward values to that stream.
|
| + var completer = _controllerCompleter;
|
| + _controllerCompleter = null;
|
| + _controllerInstance = null;
|
| + completer.complete();
|
| + } else {
|
| + // No new stream, .close was called. Close _target.
|
| + _closeTarget();
|
| + }
|
| + },
|
| + onError: (error) {
|
| + if (_isBound) {
|
| + // A new stream takes over - forward errors to that stream.
|
| + var completer = _controllerCompleter;
|
| + _controllerCompleter = null;
|
| + _controllerInstance = null;
|
| + completer.completeError(error);
|
| + } else {
|
| + // No new stream. No need to close target, as it have already
|
| + // failed.
|
| + _completeDone(error);
|
| + }
|
| + });
|
| }
|
| + return _controllerInstance;
|
| }
|
| }
|
|
|