Chromium Code Reviews| Index: sdk/lib/io/io_sink.dart |
| diff --git a/sdk/lib/io/io_sink.dart b/sdk/lib/io/io_sink.dart |
| index df3a7daac36faf6ca070c8c62cc851098e742dfa..2a45451825638fc55e8315e7a3b8d1f7de54e906 100644 |
| --- a/sdk/lib/io/io_sink.dart |
| +++ b/sdk/lib/io/io_sink.dart |
| @@ -8,14 +8,13 @@ part of dart.io; |
| * Helper class to wrap a [StreamConsumer<List<int>>] and provide |
| * utility functions for writing to the StreamConsumer directly. The |
| * [IOSink] buffers the input given by [write], [writeAll], [writeln], |
|
Søren Gjesse
2013/04/15 06:56:30
[write], [writeAll], [writeln], [writeCharCode] =>
Anders Johnsen
2013/04/15 07:35:20
Done.
|
| - * [writeCharCode] and [add] and will delay a [consume] or |
| - * [writeStream] until the buffer is flushed. |
| + * [writeCharCode] and [add] and will delay a [addStream] until |
| + * the buffer is flushed. |
| * |
| - * When the [IOSink] is bound to a stream (through either [consume] |
| - * or [writeStream]) any call to the [IOSink] will throw a |
| - * [StateError]. |
| + * When the [IOSink] is bound to a stream (through [addStream]) any call |
| + * to the [IOSink] will throw a [StateError]. |
|
Søren Gjesse
2013/04/15 06:56:30
Maybe mention that when the addStream future compl
Anders Johnsen
2013/04/15 07:35:20
Done.
|
| */ |
| -abstract class IOSink<T> |
| +abstract class IOSink |
| implements StreamConsumer<List<int>>, StringSink, EventSink<List<int>> { |
| factory IOSink(StreamConsumer<List<int>> target, |
| {Encoding encoding: Encoding.UTF_8}) |
| @@ -38,50 +37,37 @@ abstract class IOSink<T> |
| void addError(AsyncError error); |
| /** |
| - * Provide functionality for piping to the [IOSink]. |
| - */ |
| - Future<T> consume(Stream<List<int>> stream); |
| - |
| - /** |
| * Adds all elements of the given [stream] to `this`. |
| */ |
| - Future<T> addStream(Stream<List<int>> stream); |
| - |
| - /** |
| - * Like [consume], but will not close the target when done. |
| - * |
| - * *Deprecated*: use [addStream] instead. |
| - */ |
| - Future<T> writeStream(Stream<List<int>> stream); |
| + Future addStream(Stream<List<int>> stream); |
| /** |
| * Close the target. |
| */ |
| - // TODO(floitsch): Currently the future cannot be typed because it has |
| - // hardcoded type Future<HttpClientResponse> in subclass HttpClientRequest. |
| Future close(); |
| /** |
| - * Get future that will complete when all data has been written to |
| - * the IOSink and it has been closed. |
| + * Get a future that will complete when all synchronous have completed, or an |
| + * error happened. This future is identical to the future returned from close. |
| */ |
| - Future<T> get done; |
| + Future get done; |
| } |
| -class _IOSinkImpl<T> implements IOSink<T> { |
| +class _IOSinkImpl implements IOSink { |
| final StreamConsumer<List<int>> _target; |
| - |
| - Completer _writeStreamCompleter; |
| + Completer _doneCompleter = new Completer(); |
| + Future _doneFuture; |
| StreamController<List<int>> _controllerInstance; |
| - Future<T> _pipeFuture; |
| - StreamSubscription<List<int>> _bindSubscription; |
| - bool _paused = true; |
| + Completer _controllerCompleter; |
| + Encoding _encoding; |
| + bool _isClosed = false; |
| + bool _isBound = false; |
| bool _encodingMutable = true; |
| - _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding); |
| - |
| - Encoding _encoding; |
| + _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding) { |
| + _doneFuture = _doneCompleter.future; |
| + } |
| Encoding get encoding => _encoding; |
| @@ -135,56 +121,61 @@ class _IOSinkImpl<T> implements IOSink<T> { |
| } |
| void add(List<int> data) { |
| - if (_isBound) { |
| - throw new StateError("IOSink is already bound to a stream"); |
| - } |
| _controller.add(data); |
| } |
| void addError(AsyncError error) { |
| - if (_isBound) { |
| - throw new StateError("IOSink is already bound to a stream"); |
| - } |
| _controller.addError(error); |
| } |
| - Future<T> consume(Stream<List<int>> stream) { |
| + Future addStream(Stream<List<int>> stream) { |
| if (_isBound) { |
| throw new StateError("IOSink is already bound to a stream"); |
| } |
| - return _fillFromStream(stream); |
| + _isBound = true; |
|
Søren Gjesse
2013/04/15 06:56:30
This looks good. Maybe add a comment like "wait fo
Anders Johnsen
2013/04/15 07:35:20
Done.
|
| + return _closeController().then((_) { |
| + return _target.addStream(stream) |
| + .whenComplete(() { |
| + _isBound = false; |
| + }); |
| + }); |
| } |
| - Future<T> writeStream(Stream<List<int>> stream) { |
| - return addStream(stream); |
| - } |
| - |
| - Future<T> addStream(Stream<List<int>> stream) { |
| - if (_isBound) { |
| - throw new StateError("IOSink is already bound to a stream"); |
| - } |
| - return _fillFromStream(stream, unbind: true); |
| + Future _closeController() { |
| + if (_controllerInstance == null) return new Future.immediate(null); |
| + var future = _controllerCompleter.future; |
| + _controllerInstance.close(); |
| + return future; |
| } |
| Future close() { |
| if (_isBound) { |
| throw new StateError("IOSink is already bound to a stream"); |
|
Søren Gjesse
2013/04/15 06:56:30
Maybe the "already" in this message is confusing.
Anders Johnsen
2013/04/15 07:35:20
Done.
|
| } |
| - _controller.close(); |
| - return _pipeFuture; |
| + if (!_isClosed) { |
| + _isClosed = true; |
| + if (_controllerInstance != null) { |
| + _controllerInstance.close(); |
| + } else { |
| + _closeTarget(); |
| + } |
| + } |
| + return done; |
| } |
| - Future<T> get done { |
| - _controller; |
| - return _pipeFuture; |
| + void _closeTarget() { |
| + _target.close() |
| + .then((_) => _completeDone(), |
| + onError: (error) => _completeDone(error)); |
| } |
| - void _completeWriteStreamCompleter([error]) { |
| - if (_writeStreamCompleter == null) return; |
| - var tmp = _writeStreamCompleter; |
| - _writeStreamCompleter = null; |
| + Future get done => _doneFuture; |
| + |
| + void _completeDone([error]) { |
| + if (_doneCompleter == null) return; |
| + var tmp = _doneCompleter; |
| + _doneCompleter = null; |
| if (error == null) { |
| - _bindSubscription = null; |
| tmp.complete(); |
| } else { |
| tmp.completeError(error); |
| @@ -192,82 +183,40 @@ class _IOSinkImpl<T> implements IOSink<T> { |
| } |
| StreamController<List<int>> get _controller { |
| - if (_controllerInstance == null) { |
| - _controllerInstance = new StreamController<List<int>>( |
| - onPauseStateChange: _onPauseStateChange, |
| - onSubscriptionStateChange: _onSubscriptionStateChange); |
| - var future = _controller.stream.pipe(_target); |
| - future.then((_) => _completeWriteStreamCompleter(), |
| - onError: (error) => _completeWriteStreamCompleter(error)); |
| - _pipeFuture = future.then((value) => value); |
| - } |
| - return _controllerInstance; |
| - } |
| - |
| - bool get _isBound => _bindSubscription != null; |
| - |
| - void _onPauseStateChange() { |
| - _paused = _controller.isPaused; |
| - if (_controller.isPaused) { |
| - _pause(); |
| - } else { |
| - _resume(); |
| - } |
| - } |
| - |
| - void _pause() { |
| - if (_bindSubscription != null) { |
| - try { |
| - // The subscription can be canceled at this point. |
| - _bindSubscription.pause(); |
| - } catch (e) { |
| - } |
| - } |
| - } |
| - |
| - void _resume() { |
| - if (_bindSubscription != null) { |
| - try { |
| - // The subscription can be canceled at this point. |
| - _bindSubscription.resume(); |
| - } catch (e) { |
| - } |
| - } |
| - } |
| - |
| - void _onSubscriptionStateChange() { |
| - if (_controller.hasListener) { |
| - _paused = false; |
| - _resume(); |
| - } else { |
| - if (_bindSubscription != null) { |
| - _bindSubscription.cancel(); |
| - _bindSubscription = null; |
| - } |
| + if (_isBound) { |
| + throw new StateError("IOSink is already bound to a stream"); |
|
Søren Gjesse
2013/04/15 06:56:30
Remove "already" here as well.
Anders Johnsen
2013/04/15 07:35:20
Done.
|
| } |
| - } |
| - |
| - Future<T> _fillFromStream(Stream<List<int>> stream, {unbind: false}) { |
| - _controller; |
| - assert(_writeStreamCompleter == null); |
| - if (unbind) { |
| - _writeStreamCompleter = new Completer<T>(); |
| + if (_isClosed) { |
| + throw new StateError("IOSink is already closed"); |
|
Søren Gjesse
2013/04/15 06:56:30
And here.
Anders Johnsen
2013/04/15 07:35:20
Done.
|
| } |
| - _bindSubscription = stream.listen( |
| - _controller.add, |
| - onDone: () { |
| - if (unbind) { |
| - _completeWriteStreamCompleter(); |
| - } else { |
| - _controller.close(); |
| - } |
| - }, |
| - onError: _controller.addError); |
| - if (_paused) _pause(); |
| - if (unbind) { |
| - return _writeStreamCompleter.future; |
| - } else { |
| - return _pipeFuture; |
| + if (_controllerInstance == null) { |
|
Søren Gjesse
2013/04/15 06:56:30
This also looks nice!
Anders Johnsen
2013/04/15 07:35:20
Thanks :)
|
| + _controllerInstance = new StreamController<List<int>>(); |
| + _controllerCompleter = new Completer(); |
| + _target.addStream(_controller.stream) |
| + .then( |
| + (_) { |
| + if (_isBound) { |
| + // A new stream takes over - forward errors to that stream. |
|
Søren Gjesse
2013/04/15 06:56:30
I think this comment belongs in the onError case.
Anders Johnsen
2013/04/15 07:35:20
Done.
|
| + var completer = _controllerCompleter; |
| + _controllerCompleter = null; |
| + _controllerInstance = null; |
| + completer.complete(); |
| + } else { |
| + // No new stream, .close was called. Close _target. |
| + _closeTarget(); |
| + } |
| + }, |
| + onError: (error) { |
| + if (_isBound) { |
| + var completer = _controllerCompleter; |
| + _controllerCompleter = null; |
| + _controllerInstance = null; |
| + completer.completeError(error); |
| + } else { |
| + _completeDone(error); |
| + } |
| + }); |
| } |
| + return _controllerInstance; |
| } |
| } |