Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(319)

Unified Diff: sdk/lib/io/io_sink.dart

Issue 14028017: Remove .writeStream, .consume and rewrite IOSink to correctly implement a (sane) well-defined behav… (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Review comments. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/io/http_parser.dart ('k') | sdk/lib/io/process.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/io_sink.dart
diff --git a/sdk/lib/io/io_sink.dart b/sdk/lib/io/io_sink.dart
index df3a7daac36faf6ca070c8c62cc851098e742dfa..802486551935911386fd5440008f7801ad0ae267 100644
--- a/sdk/lib/io/io_sink.dart
+++ b/sdk/lib/io/io_sink.dart
@@ -7,15 +7,14 @@ part of dart.io;
/**
* Helper class to wrap a [StreamConsumer<List<int>>] and provide
* utility functions for writing to the StreamConsumer directly. The
- * [IOSink] buffers the input given by [write], [writeAll], [writeln],
- * [writeCharCode] and [add] and will delay a [consume] or
- * [writeStream] until the buffer is flushed.
+ * [IOSink] buffers the input given by all [StringSink] methods and will delay
+ * a [addStream] until the buffer is flushed.
*
- * When the [IOSink] is bound to a stream (through either [consume]
- * or [writeStream]) any call to the [IOSink] will throw a
- * [StateError].
+ * When the [IOSink] is bound to a stream (through [addStream]) any call
+ * to the [IOSink] will throw a [StateError]. When the [addStream] compeltes,
+ * the [IOSink] will again be open for all calls.
*/
-abstract class IOSink<T>
+abstract class IOSink
implements StreamConsumer<List<int>>, StringSink, EventSink<List<int>> {
factory IOSink(StreamConsumer<List<int>> target,
{Encoding encoding: Encoding.UTF_8})
@@ -38,50 +37,37 @@ abstract class IOSink<T>
void addError(AsyncError error);
/**
- * Provide functionality for piping to the [IOSink].
- */
- Future<T> consume(Stream<List<int>> stream);
-
- /**
* Adds all elements of the given [stream] to `this`.
*/
- Future<T> addStream(Stream<List<int>> stream);
-
- /**
- * Like [consume], but will not close the target when done.
- *
- * *Deprecated*: use [addStream] instead.
- */
- Future<T> writeStream(Stream<List<int>> stream);
+ Future addStream(Stream<List<int>> stream);
/**
* Close the target.
*/
- // TODO(floitsch): Currently the future cannot be typed because it has
- // hardcoded type Future<HttpClientResponse> in subclass HttpClientRequest.
Future close();
/**
- * Get future that will complete when all data has been written to
- * the IOSink and it has been closed.
+ * Get a future that will complete when all synchronous have completed, or an
+ * error happened. This future is identical to the future returned from close.
*/
- Future<T> get done;
+ Future get done;
}
-class _IOSinkImpl<T> implements IOSink<T> {
+class _IOSinkImpl implements IOSink {
final StreamConsumer<List<int>> _target;
-
- Completer _writeStreamCompleter;
+ Completer _doneCompleter = new Completer();
+ Future _doneFuture;
StreamController<List<int>> _controllerInstance;
- Future<T> _pipeFuture;
- StreamSubscription<List<int>> _bindSubscription;
- bool _paused = true;
+ Completer _controllerCompleter;
+ Encoding _encoding;
+ bool _isClosed = false;
+ bool _isBound = false;
bool _encodingMutable = true;
- _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding);
-
- Encoding _encoding;
+ _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding) {
+ _doneFuture = _doneCompleter.future;
+ }
Encoding get encoding => _encoding;
@@ -135,56 +121,62 @@ class _IOSinkImpl<T> implements IOSink<T> {
}
void add(List<int> data) {
- if (_isBound) {
- throw new StateError("IOSink is already bound to a stream");
- }
_controller.add(data);
}
void addError(AsyncError error) {
- if (_isBound) {
- throw new StateError("IOSink is already bound to a stream");
- }
_controller.addError(error);
}
- Future<T> consume(Stream<List<int>> stream) {
+ Future addStream(Stream<List<int>> stream) {
if (_isBound) {
throw new StateError("IOSink is already bound to a stream");
}
- return _fillFromStream(stream);
+ _isBound = true;
+ // Wait for any sync operations to complete.
+ return _closeController().then((_) {
+ return _target.addStream(stream)
+ .whenComplete(() {
+ _isBound = false;
+ });
+ });
}
- Future<T> writeStream(Stream<List<int>> stream) {
- return addStream(stream);
- }
-
- Future<T> addStream(Stream<List<int>> stream) {
- if (_isBound) {
- throw new StateError("IOSink is already bound to a stream");
- }
- return _fillFromStream(stream, unbind: true);
+ Future _closeController() {
+ if (_controllerInstance == null) return new Future.immediate(null);
+ var future = _controllerCompleter.future;
+ _controllerInstance.close();
+ return future;
}
Future close() {
if (_isBound) {
- throw new StateError("IOSink is already bound to a stream");
+ throw new StateError("IOSink is bound to a stream");
+ }
+ if (!_isClosed) {
+ _isClosed = true;
+ if (_controllerInstance != null) {
+ _controllerInstance.close();
+ } else {
+ _closeTarget();
+ }
}
- _controller.close();
- return _pipeFuture;
+ return done;
}
- Future<T> get done {
- _controller;
- return _pipeFuture;
+ void _closeTarget() {
+ _target.close()
+ .then((_) => _completeDone(),
+ onError: (error) => _completeDone(error));
}
- void _completeWriteStreamCompleter([error]) {
- if (_writeStreamCompleter == null) return;
- var tmp = _writeStreamCompleter;
- _writeStreamCompleter = null;
+ Future get done => _doneFuture;
+
+ void _completeDone([error]) {
+ if (_doneCompleter == null) return;
+ var tmp = _doneCompleter;
+ _doneCompleter = null;
if (error == null) {
- _bindSubscription = null;
tmp.complete();
} else {
tmp.completeError(error);
@@ -192,82 +184,43 @@ class _IOSinkImpl<T> implements IOSink<T> {
}
StreamController<List<int>> get _controller {
- if (_controllerInstance == null) {
- _controllerInstance = new StreamController<List<int>>(
- onPauseStateChange: _onPauseStateChange,
- onSubscriptionStateChange: _onSubscriptionStateChange);
- var future = _controller.stream.pipe(_target);
- future.then((_) => _completeWriteStreamCompleter(),
- onError: (error) => _completeWriteStreamCompleter(error));
- _pipeFuture = future.then((value) => value);
- }
- return _controllerInstance;
- }
-
- bool get _isBound => _bindSubscription != null;
-
- void _onPauseStateChange() {
- _paused = _controller.isPaused;
- if (_controller.isPaused) {
- _pause();
- } else {
- _resume();
- }
- }
-
- void _pause() {
- if (_bindSubscription != null) {
- try {
- // The subscription can be canceled at this point.
- _bindSubscription.pause();
- } catch (e) {
- }
- }
- }
-
- void _resume() {
- if (_bindSubscription != null) {
- try {
- // The subscription can be canceled at this point.
- _bindSubscription.resume();
- } catch (e) {
- }
- }
- }
-
- void _onSubscriptionStateChange() {
- if (_controller.hasListener) {
- _paused = false;
- _resume();
- } else {
- if (_bindSubscription != null) {
- _bindSubscription.cancel();
- _bindSubscription = null;
- }
+ if (_isBound) {
+ throw new StateError("IOSink is bound to a stream");
}
- }
-
- Future<T> _fillFromStream(Stream<List<int>> stream, {unbind: false}) {
- _controller;
- assert(_writeStreamCompleter == null);
- if (unbind) {
- _writeStreamCompleter = new Completer<T>();
+ if (_isClosed) {
+ throw new StateError("IOSink is closed");
}
- _bindSubscription = stream.listen(
- _controller.add,
- onDone: () {
- if (unbind) {
- _completeWriteStreamCompleter();
- } else {
- _controller.close();
- }
- },
- onError: _controller.addError);
- if (_paused) _pause();
- if (unbind) {
- return _writeStreamCompleter.future;
- } else {
- return _pipeFuture;
+ if (_controllerInstance == null) {
+ _controllerInstance = new StreamController<List<int>>();
+ _controllerCompleter = new Completer();
+ _target.addStream(_controller.stream)
+ .then(
+ (_) {
+ if (_isBound) {
+ // A new stream takes over - forward values to that stream.
+ var completer = _controllerCompleter;
+ _controllerCompleter = null;
+ _controllerInstance = null;
+ completer.complete();
+ } else {
+ // No new stream, .close was called. Close _target.
+ _closeTarget();
+ }
+ },
+ onError: (error) {
+ if (_isBound) {
+ // A new stream takes over - forward errors to that stream.
+ var completer = _controllerCompleter;
+ _controllerCompleter = null;
+ _controllerInstance = null;
+ completer.completeError(error);
+ } else {
+ // No new stream. No need to close target, as it have already
+ // failed.
+ _completeDone(error);
+ }
+ });
}
+ return _controllerInstance;
}
}
« no previous file with comments | « sdk/lib/io/http_parser.dart ('k') | sdk/lib/io/process.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698