Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(449)

Unified Diff: sdk/lib/io/io_stream_consumer.dart

Issue 12389050: Remane io_stream_consumer.dart to io_sink and move out all implementation from IOSink to _IOSinkImp… (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Make _DetachedSocket pass dart-analyzer. Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/io/io_sink.dart ('k') | sdk/lib/io/iolib_sources.gypi » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/io_stream_consumer.dart
diff --git a/sdk/lib/io/io_stream_consumer.dart b/sdk/lib/io/io_stream_consumer.dart
deleted file mode 100644
index 9ae639e570af3428ec9c8c86920afda6339b9d3a..0000000000000000000000000000000000000000
--- a/sdk/lib/io/io_stream_consumer.dart
+++ /dev/null
@@ -1,173 +0,0 @@
-// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-part of dart.io;
-
-/**
- * Helper class to wrap a [StreamConsumer<List<int>, T>] and provide utility
- * functions for writing to the StreamConsumer directly. The [IOSink]
- * buffers the input given by [add] and [addString] and will delay a [consume]
- * or [addStream] until the buffer is flushed.
- *
- * When the [IOSink] is bound to a stream (through either [consume]
- * or [addStream]) any call to the [IOSink] will throw a
- * [StateError].
- */
-class IOSink<T> implements StreamConsumer<List<int>, T> {
- final StreamConsumer<List<int>, T> _target;
-
- StreamController<List<int>> _controllerInstance;
- Future<T> _pipeFuture;
- StreamSubscription<List<int>> _bindSubscription;
- bool _paused = true;
-
- IOSink(StreamConsumer<List<int>, T> target) : _target = target;
-
- /**
- * Provide functionality for piping to the [IOSink].
- */
- Future<T> consume(Stream<List<int>> stream) {
- if (_isBound) {
- throw new StateError("IOSink is already bound to a stream");
- }
- return _fillFromStream(stream);
- }
-
- /**
- * Like [consume], but will not close the target when done.
- */
- Future<T> addStream(Stream<List<int>> stream) {
- if (_isBound) {
- throw new StateError("IOSink is already bound to a stream");
- }
- return _fillFromStream(stream, unbind: true);
- }
-
- /**
- * Write a list of bytes to the target.
- */
- void add(List<int> data) {
- if (_isBound) {
- throw new StateError("IOSink is already bound to a stream");
- }
- _controller.add(data);
- }
-
- /**
- * Write a String to the target.
- */
- void addString(String string, [Encoding encoding = Encoding.UTF_8]) {
- add(_encodeString(string, encoding));
- }
-
- /**
- * Close the target.
- */
- void close() {
- if (_isBound) {
- throw new StateError("IOSink is already bound to a stream");
- }
- _controller.close();
- }
-
- /**
- * Get future that will complete when all data has been written to
- * the IOSink and it has been closed.
- */
- Future<T> get done {
- _controller;
- return _pipeFuture.then((_) => this);
- }
-
- StreamController<List<int>> get _controller {
- if (_controllerInstance == null) {
- _controllerInstance = new StreamController<List<int>>(
- onPauseStateChange: _onPauseStateChange,
- onSubscriptionStateChange: _onSubscriptionStateChange);
- _pipeFuture = _controller.stream.pipe(_target).then((_) => this);
- }
- return _controllerInstance;
- }
-
- bool get _isBound => _bindSubscription != null;
-
- void _onPauseStateChange() {
- _paused = _controller.isPaused;
- if (_controller.isPaused) {
- _pause();
- } else {
- _resume();
- }
- }
-
- void _pause() {
- if (_bindSubscription != null) {
- try {
- // The subscription can be canceled at this point.
- _bindSubscription.pause();
- } catch (e) {
- }
- }
- }
-
- void _resume() {
- if (_bindSubscription != null) {
- try {
- // The subscription can be canceled at this point.
- _bindSubscription.resume();
- } catch (e) {
- }
- }
- }
-
- void _onSubscriptionStateChange() {
- if (_controller.hasSubscribers) {
- _paused = false;
- _resume();
- } else {
- if (_bindSubscription != null) {
- _bindSubscription.cancel();
- _bindSubscription = null;
- }
- }
- }
-
- Future<T> _fillFromStream(Stream<List<int>> stream, {unbind: false}) {
- _controller;
- Completer<T> unbindCompleter;
- if (unbind) {
- unbindCompleter = new Completer<T>();
- }
- completeUnbind([error]) {
- if (unbindCompleter == null) return;
- var tmp = unbindCompleter;
- unbindCompleter = null;
- if (error == null) {
- _bindSubscription = null;
- tmp.complete();
- } else {
- tmp.completeError(error);
- }
- }
- _bindSubscription = stream.listen(
- _controller.add,
- onDone: () {
- if (unbind) {
- completeUnbind();
- } else {
- _controller.close();
- }
- },
- onError: _controller.signalError);
- if (_paused) _pause();
- if (unbind) {
- _pipeFuture
- .then((_) => completeUnbind(),
- onError: (error) => completeUnbind(error));
- return unbindCompleter.future;
- } else {
- return _pipeFuture.then((_) => this);
- }
- }
-}
« no previous file with comments | « sdk/lib/io/io_sink.dart ('k') | sdk/lib/io/iolib_sources.gypi » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698