Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2201)

Unified Diff: sdk/lib/io/io_sink.dart

Issue 196423021: Move _StreamSinkImpl from dart:io to dart:async as StreamSinkAdapter. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/io/io_sink.dart
diff --git a/sdk/lib/io/io_sink.dart b/sdk/lib/io/io_sink.dart
index 0c2752244994d403db9773487ce45177bf22ba35..9efbbc02e9d145f4ce5500b1c0bc2985c2c1d8d4 100644
--- a/sdk/lib/io/io_sink.dart
+++ b/sdk/lib/io/io_sink.dart
@@ -12,7 +12,7 @@ part of dart.io;
*
* When the [IOSink] is bound to a stream (through [addStream]) any call
* to the [IOSink] will throw a [StateError]. When the [addStream] completes,
- * the [IOSink] will again be open for all calls.
+ * the [IOSink] will again be open to all calls.
Lasse Reichstein Nielsen 2014/03/18 15:34:45 ... will again accept all method calls.
Anders Johnsen 2014/03/19 11:48:55 Done.
*
* If data is added to the [IOSink] after the sink is closed, the data will be
* ignored. Use the [done] future to be notified when the [IOSink] is closed.
@@ -124,138 +124,8 @@ abstract class IOSink implements StreamSink<List<int>>, StringSink {
Future get done;
}
-class _StreamSinkImpl<T> implements StreamSink<T> {
- final StreamConsumer<T> _target;
- Completer _doneCompleter = new Completer();
- Future _doneFuture;
- StreamController<T> _controllerInstance;
- Completer _controllerCompleter;
- bool _isClosed = false;
- bool _isBound = false;
- bool _hasError = false;
- _StreamSinkImpl(this._target) {
- _doneFuture = _doneCompleter.future;
- }
-
- void add(T data) {
- if (_isClosed) return;
- _controller.add(data);
- }
-
- void addError(error, [StackTrace stackTrace]) =>
- _controller.addError(error, stackTrace);
-
- Future addStream(Stream<T> stream) {
- if (_isBound) {
- throw new StateError("StreamSink is already bound to a stream");
- }
- _isBound = true;
- if (_hasError) return done;
- // Wait for any sync operations to complete.
- Future targetAddStream() {
- return _target.addStream(stream)
- .whenComplete(() {
- _isBound = false;
- });
- }
- if (_controllerInstance == null) return targetAddStream();
- var future = _controllerCompleter.future;
- _controllerInstance.close();
- return future.then((_) => targetAddStream());
- }
-
- Future flush() {
- if (_isBound) {
- throw new StateError("StreamSink is bound to a stream");
- }
- if (_controllerInstance == null) return new Future.value(this);
- // Adding an empty stream-controller will return a future that will complete
- // when all data is done.
- _isBound = true;
- var future = _controllerCompleter.future;
- _controllerInstance.close();
- return future.whenComplete(() {
- _isBound = false;
- });
- }
-
- Future close() {
- if (_isBound) {
- throw new StateError("StreamSink is bound to a stream");
- }
- if (!_isClosed) {
- _isClosed = true;
- if (_controllerInstance != null) {
- _controllerInstance.close();
- } else {
- _closeTarget();
- }
- }
- return done;
- }
-
- void _closeTarget() {
- _target.close()
- .then((value) => _completeDone(value: value),
- onError: (error) => _completeDone(error: error));
- }
-
- Future get done => _doneFuture;
-
- void _completeDone({value, error}) {
- if (_doneCompleter == null) return;
- if (error == null) {
- _doneCompleter.complete(value);
- } else {
- _hasError = true;
- _doneCompleter.completeError(error);
- }
- _doneCompleter = null;
- }
-
- StreamController<T> get _controller {
- if (_isBound) {
- throw new StateError("StreamSink is bound to a stream");
- }
- if (_isClosed) {
- throw new StateError("StreamSink is closed");
- }
- if (_controllerInstance == null) {
- _controllerInstance = new StreamController<T>(sync: true);
- _controllerCompleter = new Completer();
- _target.addStream(_controller.stream)
- .then(
- (_) {
- if (_isBound) {
- // A new stream takes over - forward values to that stream.
- _controllerCompleter.complete(this);
- _controllerCompleter = null;
- _controllerInstance = null;
- } else {
- // No new stream, .close was called. Close _target.
- _closeTarget();
- }
- },
- onError: (error) {
- if (_isBound) {
- // A new stream takes over - forward errors to that stream.
- _controllerCompleter.completeError(error);
- _controllerCompleter = null;
- _controllerInstance = null;
- } else {
- // No new stream. No need to close target, as it have already
- // failed.
- _completeDone(error: error);
- }
- });
- }
- return _controllerInstance;
- }
-}
-
-
-class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink {
+class _IOSinkImpl extends StreamSinkAdapter<List<int>> implements IOSink {
Encoding _encoding;
bool _encodingMutable = true;

Powered by Google App Engine
This is Rietveld 408576698