Index: sdk/lib/io/io_sink.dart |
diff --git a/sdk/lib/io/io_sink.dart b/sdk/lib/io/io_sink.dart |
index df3a7daac36faf6ca070c8c62cc851098e742dfa..802486551935911386fd5440008f7801ad0ae267 100644 |
--- a/sdk/lib/io/io_sink.dart |
+++ b/sdk/lib/io/io_sink.dart |
@@ -7,15 +7,14 @@ part of dart.io; |
/** |
* Helper class to wrap a [StreamConsumer<List<int>>] and provide |
* utility functions for writing to the StreamConsumer directly. The |
- * [IOSink] buffers the input given by [write], [writeAll], [writeln], |
- * [writeCharCode] and [add] and will delay a [consume] or |
- * [writeStream] until the buffer is flushed. |
+ * [IOSink] buffers the input given by all [StringSink] methods and will delay |
+ * a [addStream] until the buffer is flushed. |
* |
- * When the [IOSink] is bound to a stream (through either [consume] |
- * or [writeStream]) any call to the [IOSink] will throw a |
- * [StateError]. |
+ * When the [IOSink] is bound to a stream (through [addStream]) any call |
+ * to the [IOSink] will throw a [StateError]. When the [addStream] compeltes, |
+ * the [IOSink] will again be open for all calls. |
*/ |
-abstract class IOSink<T> |
+abstract class IOSink |
implements StreamConsumer<List<int>>, StringSink, EventSink<List<int>> { |
factory IOSink(StreamConsumer<List<int>> target, |
{Encoding encoding: Encoding.UTF_8}) |
@@ -38,50 +37,37 @@ abstract class IOSink<T> |
void addError(AsyncError error); |
/** |
- * Provide functionality for piping to the [IOSink]. |
- */ |
- Future<T> consume(Stream<List<int>> stream); |
- |
- /** |
* Adds all elements of the given [stream] to `this`. |
*/ |
- Future<T> addStream(Stream<List<int>> stream); |
- |
- /** |
- * Like [consume], but will not close the target when done. |
- * |
- * *Deprecated*: use [addStream] instead. |
- */ |
- Future<T> writeStream(Stream<List<int>> stream); |
+ Future addStream(Stream<List<int>> stream); |
/** |
* Close the target. |
*/ |
- // TODO(floitsch): Currently the future cannot be typed because it has |
- // hardcoded type Future<HttpClientResponse> in subclass HttpClientRequest. |
Future close(); |
/** |
- * Get future that will complete when all data has been written to |
- * the IOSink and it has been closed. |
+ * Get a future that will complete when all synchronous have completed, or an |
+ * error happened. This future is identical to the future returned from close. |
*/ |
- Future<T> get done; |
+ Future get done; |
} |
-class _IOSinkImpl<T> implements IOSink<T> { |
+class _IOSinkImpl implements IOSink { |
final StreamConsumer<List<int>> _target; |
- |
- Completer _writeStreamCompleter; |
+ Completer _doneCompleter = new Completer(); |
+ Future _doneFuture; |
StreamController<List<int>> _controllerInstance; |
- Future<T> _pipeFuture; |
- StreamSubscription<List<int>> _bindSubscription; |
- bool _paused = true; |
+ Completer _controllerCompleter; |
+ Encoding _encoding; |
+ bool _isClosed = false; |
+ bool _isBound = false; |
bool _encodingMutable = true; |
- _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding); |
- |
- Encoding _encoding; |
+ _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding) { |
+ _doneFuture = _doneCompleter.future; |
+ } |
Encoding get encoding => _encoding; |
@@ -135,56 +121,62 @@ class _IOSinkImpl<T> implements IOSink<T> { |
} |
void add(List<int> data) { |
- if (_isBound) { |
- throw new StateError("IOSink is already bound to a stream"); |
- } |
_controller.add(data); |
} |
void addError(AsyncError error) { |
- if (_isBound) { |
- throw new StateError("IOSink is already bound to a stream"); |
- } |
_controller.addError(error); |
} |
- Future<T> consume(Stream<List<int>> stream) { |
+ Future addStream(Stream<List<int>> stream) { |
if (_isBound) { |
throw new StateError("IOSink is already bound to a stream"); |
} |
- return _fillFromStream(stream); |
+ _isBound = true; |
+ // Wait for any sync operations to complete. |
+ return _closeController().then((_) { |
+ return _target.addStream(stream) |
+ .whenComplete(() { |
+ _isBound = false; |
+ }); |
+ }); |
} |
- Future<T> writeStream(Stream<List<int>> stream) { |
- return addStream(stream); |
- } |
- |
- Future<T> addStream(Stream<List<int>> stream) { |
- if (_isBound) { |
- throw new StateError("IOSink is already bound to a stream"); |
- } |
- return _fillFromStream(stream, unbind: true); |
+ Future _closeController() { |
+ if (_controllerInstance == null) return new Future.immediate(null); |
+ var future = _controllerCompleter.future; |
+ _controllerInstance.close(); |
+ return future; |
} |
Future close() { |
if (_isBound) { |
- throw new StateError("IOSink is already bound to a stream"); |
+ throw new StateError("IOSink is bound to a stream"); |
+ } |
+ if (!_isClosed) { |
+ _isClosed = true; |
+ if (_controllerInstance != null) { |
+ _controllerInstance.close(); |
+ } else { |
+ _closeTarget(); |
+ } |
} |
- _controller.close(); |
- return _pipeFuture; |
+ return done; |
} |
- Future<T> get done { |
- _controller; |
- return _pipeFuture; |
+ void _closeTarget() { |
+ _target.close() |
+ .then((_) => _completeDone(), |
+ onError: (error) => _completeDone(error)); |
} |
- void _completeWriteStreamCompleter([error]) { |
- if (_writeStreamCompleter == null) return; |
- var tmp = _writeStreamCompleter; |
- _writeStreamCompleter = null; |
+ Future get done => _doneFuture; |
+ |
+ void _completeDone([error]) { |
+ if (_doneCompleter == null) return; |
+ var tmp = _doneCompleter; |
+ _doneCompleter = null; |
if (error == null) { |
- _bindSubscription = null; |
tmp.complete(); |
} else { |
tmp.completeError(error); |
@@ -192,82 +184,43 @@ class _IOSinkImpl<T> implements IOSink<T> { |
} |
StreamController<List<int>> get _controller { |
- if (_controllerInstance == null) { |
- _controllerInstance = new StreamController<List<int>>( |
- onPauseStateChange: _onPauseStateChange, |
- onSubscriptionStateChange: _onSubscriptionStateChange); |
- var future = _controller.stream.pipe(_target); |
- future.then((_) => _completeWriteStreamCompleter(), |
- onError: (error) => _completeWriteStreamCompleter(error)); |
- _pipeFuture = future.then((value) => value); |
- } |
- return _controllerInstance; |
- } |
- |
- bool get _isBound => _bindSubscription != null; |
- |
- void _onPauseStateChange() { |
- _paused = _controller.isPaused; |
- if (_controller.isPaused) { |
- _pause(); |
- } else { |
- _resume(); |
- } |
- } |
- |
- void _pause() { |
- if (_bindSubscription != null) { |
- try { |
- // The subscription can be canceled at this point. |
- _bindSubscription.pause(); |
- } catch (e) { |
- } |
- } |
- } |
- |
- void _resume() { |
- if (_bindSubscription != null) { |
- try { |
- // The subscription can be canceled at this point. |
- _bindSubscription.resume(); |
- } catch (e) { |
- } |
- } |
- } |
- |
- void _onSubscriptionStateChange() { |
- if (_controller.hasListener) { |
- _paused = false; |
- _resume(); |
- } else { |
- if (_bindSubscription != null) { |
- _bindSubscription.cancel(); |
- _bindSubscription = null; |
- } |
+ if (_isBound) { |
+ throw new StateError("IOSink is bound to a stream"); |
} |
- } |
- |
- Future<T> _fillFromStream(Stream<List<int>> stream, {unbind: false}) { |
- _controller; |
- assert(_writeStreamCompleter == null); |
- if (unbind) { |
- _writeStreamCompleter = new Completer<T>(); |
+ if (_isClosed) { |
+ throw new StateError("IOSink is closed"); |
} |
- _bindSubscription = stream.listen( |
- _controller.add, |
- onDone: () { |
- if (unbind) { |
- _completeWriteStreamCompleter(); |
- } else { |
- _controller.close(); |
- } |
- }, |
- onError: _controller.addError); |
- if (_paused) _pause(); |
- if (unbind) { |
- return _writeStreamCompleter.future; |
- } else { |
- return _pipeFuture; |
+ if (_controllerInstance == null) { |
+ _controllerInstance = new StreamController<List<int>>(); |
+ _controllerCompleter = new Completer(); |
+ _target.addStream(_controller.stream) |
+ .then( |
+ (_) { |
+ if (_isBound) { |
+ // A new stream takes over - forward values to that stream. |
+ var completer = _controllerCompleter; |
+ _controllerCompleter = null; |
+ _controllerInstance = null; |
+ completer.complete(); |
+ } else { |
+ // No new stream, .close was called. Close _target. |
+ _closeTarget(); |
+ } |
+ }, |
+ onError: (error) { |
+ if (_isBound) { |
+ // A new stream takes over - forward errors to that stream. |
+ var completer = _controllerCompleter; |
+ _controllerCompleter = null; |
+ _controllerInstance = null; |
+ completer.completeError(error); |
+ } else { |
+ // No new stream. No need to close target, as it have already |
+ // failed. |
+ _completeDone(error); |
+ } |
+ }); |
} |
+ return _controllerInstance; |
} |
} |