| Index: sdk/lib/async/stream.dart
|
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
|
| index 1c841c00b39a979aec30c3c0d557335c63a01f34..4b29de8c0a6f842b926557b54f5f9eb7b23e961a 100644
|
| --- a/sdk/lib/async/stream.dart
|
| +++ b/sdk/lib/async/stream.dart
|
| @@ -1326,169 +1326,6 @@ abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> {
|
|
|
|
|
| /**
|
| - * A [StreamSink] adapter for a [StreamConsumer].
|
| - *
|
| - * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods
|
| - * and will delay an [addStream] until all buffered data has been forwarded to
|
| - * the stream consumer.
|
| - *
|
| - * While the `StreamSinkAdapter` is bound to a stream (through [addStream]) any
|
| - * call to the `StreamSinkAdapter` will throw a [StateError], except [done].
|
| - * When the [addStream] completes, the `StreamSinkAdapter` will again be open
|
| - * to all calls.
|
| - *
|
| - * If events are added to the `StreamSinkAdapter` after the adapter is closed,
|
| - * the events will be ignored.
|
| - * Use the [done] future to be notified when the `StreamSinkAdapter` is closed.
|
| - */
|
| -class StreamSinkAdapter<S> implements StreamSink<S> {
|
| - final StreamConsumer<S> _target;
|
| - Completer _doneCompleter = new Completer();
|
| - Future _doneFuture;
|
| - StreamController<S> _controllerInstance;
|
| - Completer _controllerCompleter;
|
| - bool _isClosed = false;
|
| - bool _isBound = false;
|
| - bool _hasError = false;
|
| -
|
| - /**
|
| - * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer].
|
| - */
|
| - StreamSinkAdapter(StreamConsumer<S> target) : _target = target {
|
| - _doneFuture = _doneCompleter.future;
|
| - }
|
| -
|
| - void add(S data) {
|
| - if (_isClosed) return;
|
| - _controller.add(data);
|
| - }
|
| -
|
| - void addError(error, [StackTrace stackTrace]) =>
|
| - _controller.addError(error, stackTrace);
|
| -
|
| - Future addStream(Stream<S> stream) {
|
| - _checkNotBound();
|
| - _isBound = true;
|
| - if (_hasError) return _doneFuture;
|
| - // Wait for any sync operations to complete.
|
| - Future targetAddStream() {
|
| - return _target.addStream(stream)
|
| - .whenComplete(() {
|
| - _isBound = false;
|
| - });
|
| - }
|
| - if (_controllerInstance == null) return targetAddStream();
|
| - var future = _controllerCompleter.future;
|
| - _controllerInstance.close();
|
| - return future.then((_) => targetAddStream());
|
| - }
|
| -
|
| - /**
|
| - * Returns a [Future] that completes once all buffered events has been
|
| - * accepted by the underlying [StreamConsumer].
|
| - *
|
| - * The [Future] will complete with the value `this`.
|
| - *
|
| - * This method must not be called while an [addStream] is in progress.
|
| - *
|
| - * NOTE: This method does not guarantee anything except that the stream
|
| - * consumer has received all buffered data. It does not guarantee that the
|
| - * consumer has acted on the data in any way, or that the data has reached
|
| - * its final destination.
|
| - */
|
| - Future flush() {
|
| - _checkNotBound();
|
| - if (_controllerInstance == null) return new Future.value(this);
|
| - // Adding an empty stream-controller will return a future that will complete
|
| - // when all data is done.
|
| - _isBound = true;
|
| - var future = _controllerCompleter.future;
|
| - _controllerInstance.close();
|
| - return future.whenComplete(() {
|
| - _isBound = false;
|
| - });
|
| - }
|
| -
|
| - Future close() {
|
| - _checkNotBound();
|
| - if (!_isClosed) {
|
| - _isClosed = true;
|
| - if (_controllerInstance != null) {
|
| - _controllerInstance.close();
|
| - } else {
|
| - _closeTarget();
|
| - }
|
| - }
|
| - return _doneFuture;
|
| - }
|
| -
|
| - Future get done => _doneFuture;
|
| -
|
| - // Private helper methods.
|
| -
|
| - void _closeTarget() {
|
| - _target.close() .then(_completeDoneValue, onError: _completeDoneError);
|
| - }
|
| -
|
| - void _completeDoneValue(value) {
|
| - if (_doneCompleter == null) return;
|
| - _doneCompleter.complete(value);
|
| - _doneCompleter = null;
|
| - }
|
| -
|
| - void _completeDoneError(error, stackTrace) {
|
| - if (_doneCompleter == null) return;
|
| - _hasError = true;
|
| - _doneCompleter.completeError(error, stackTrace);
|
| - _doneCompleter = null;
|
| - }
|
| -
|
| - StreamController<S> get _controller {
|
| - _checkNotBound();
|
| - if (_isClosed) {
|
| - throw new StateError("StreamSink is closed");
|
| - }
|
| - if (_controllerInstance == null) {
|
| - _controllerInstance = new StreamController<S>(sync: true);
|
| - _controllerCompleter = new Completer();
|
| - _target.addStream(_controller.stream)
|
| - .then(
|
| - (_) {
|
| - if (_isBound) {
|
| - // A new stream takes over - forward values to that stream.
|
| - _controllerCompleter.complete(this);
|
| - _controllerCompleter = null;
|
| - _controllerInstance = null;
|
| - } else {
|
| - // No new stream, .close was called. Close _target.
|
| - _closeTarget();
|
| - }
|
| - },
|
| - onError: (error, stackTrace) {
|
| - if (_isBound) {
|
| - // A new stream takes over - forward errors to that stream.
|
| - _controllerCompleter.completeError(error, stackTrace);
|
| - _controllerCompleter = null;
|
| - _controllerInstance = null;
|
| - } else {
|
| - // No new stream. No need to close target, as it have already
|
| - // failed.
|
| - _completeDoneError(error, stackTrace);
|
| - }
|
| - });
|
| - }
|
| - return _controllerInstance;
|
| - }
|
| -
|
| - void _checkNotBound() {
|
| - if (_isBound) {
|
| - throw new StateError("StreamSink is processing an addStream call");
|
| - }
|
| - }
|
| -}
|
| -
|
| -
|
| -/**
|
| * The target of a [Stream.transform] call.
|
| *
|
| * The [Stream.transform] call will pass itself to this object and then return
|
|
|