Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 1c841c00b39a979aec30c3c0d557335c63a01f34..4b29de8c0a6f842b926557b54f5f9eb7b23e961a 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -1326,169 +1326,6 @@ abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> { |
/** |
- * A [StreamSink] adapter for a [StreamConsumer]. |
- * |
- * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods |
- * and will delay an [addStream] until all buffered data has been forwarded to |
- * the stream consumer. |
- * |
- * While the `StreamSinkAdapter` is bound to a stream (through [addStream]) any |
- * call to the `StreamSinkAdapter` will throw a [StateError], except [done]. |
- * When the [addStream] completes, the `StreamSinkAdapter` will again be open |
- * to all calls. |
- * |
- * If events are added to the `StreamSinkAdapter` after the adapter is closed, |
- * the events will be ignored. |
- * Use the [done] future to be notified when the `StreamSinkAdapter` is closed. |
- */ |
-class StreamSinkAdapter<S> implements StreamSink<S> { |
- final StreamConsumer<S> _target; |
- Completer _doneCompleter = new Completer(); |
- Future _doneFuture; |
- StreamController<S> _controllerInstance; |
- Completer _controllerCompleter; |
- bool _isClosed = false; |
- bool _isBound = false; |
- bool _hasError = false; |
- |
- /** |
- * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer]. |
- */ |
- StreamSinkAdapter(StreamConsumer<S> target) : _target = target { |
- _doneFuture = _doneCompleter.future; |
- } |
- |
- void add(S data) { |
- if (_isClosed) return; |
- _controller.add(data); |
- } |
- |
- void addError(error, [StackTrace stackTrace]) => |
- _controller.addError(error, stackTrace); |
- |
- Future addStream(Stream<S> stream) { |
- _checkNotBound(); |
- _isBound = true; |
- if (_hasError) return _doneFuture; |
- // Wait for any sync operations to complete. |
- Future targetAddStream() { |
- return _target.addStream(stream) |
- .whenComplete(() { |
- _isBound = false; |
- }); |
- } |
- if (_controllerInstance == null) return targetAddStream(); |
- var future = _controllerCompleter.future; |
- _controllerInstance.close(); |
- return future.then((_) => targetAddStream()); |
- } |
- |
- /** |
- * Returns a [Future] that completes once all buffered events has been |
- * accepted by the underlying [StreamConsumer]. |
- * |
- * The [Future] will complete with the value `this`. |
- * |
- * This method must not be called while an [addStream] is in progress. |
- * |
- * NOTE: This method does not guarantee anything except that the stream |
- * consumer has received all buffered data. It does not guarantee that the |
- * consumer has acted on the data in any way, or that the data has reached |
- * its final destination. |
- */ |
- Future flush() { |
- _checkNotBound(); |
- if (_controllerInstance == null) return new Future.value(this); |
- // Adding an empty stream-controller will return a future that will complete |
- // when all data is done. |
- _isBound = true; |
- var future = _controllerCompleter.future; |
- _controllerInstance.close(); |
- return future.whenComplete(() { |
- _isBound = false; |
- }); |
- } |
- |
- Future close() { |
- _checkNotBound(); |
- if (!_isClosed) { |
- _isClosed = true; |
- if (_controllerInstance != null) { |
- _controllerInstance.close(); |
- } else { |
- _closeTarget(); |
- } |
- } |
- return _doneFuture; |
- } |
- |
- Future get done => _doneFuture; |
- |
- // Private helper methods. |
- |
- void _closeTarget() { |
- _target.close() .then(_completeDoneValue, onError: _completeDoneError); |
- } |
- |
- void _completeDoneValue(value) { |
- if (_doneCompleter == null) return; |
- _doneCompleter.complete(value); |
- _doneCompleter = null; |
- } |
- |
- void _completeDoneError(error, stackTrace) { |
- if (_doneCompleter == null) return; |
- _hasError = true; |
- _doneCompleter.completeError(error, stackTrace); |
- _doneCompleter = null; |
- } |
- |
- StreamController<S> get _controller { |
- _checkNotBound(); |
- if (_isClosed) { |
- throw new StateError("StreamSink is closed"); |
- } |
- if (_controllerInstance == null) { |
- _controllerInstance = new StreamController<S>(sync: true); |
- _controllerCompleter = new Completer(); |
- _target.addStream(_controller.stream) |
- .then( |
- (_) { |
- if (_isBound) { |
- // A new stream takes over - forward values to that stream. |
- _controllerCompleter.complete(this); |
- _controllerCompleter = null; |
- _controllerInstance = null; |
- } else { |
- // No new stream, .close was called. Close _target. |
- _closeTarget(); |
- } |
- }, |
- onError: (error, stackTrace) { |
- if (_isBound) { |
- // A new stream takes over - forward errors to that stream. |
- _controllerCompleter.completeError(error, stackTrace); |
- _controllerCompleter = null; |
- _controllerInstance = null; |
- } else { |
- // No new stream. No need to close target, as it have already |
- // failed. |
- _completeDoneError(error, stackTrace); |
- } |
- }); |
- } |
- return _controllerInstance; |
- } |
- |
- void _checkNotBound() { |
- if (_isBound) { |
- throw new StateError("StreamSink is processing an addStream call"); |
- } |
- } |
-} |
- |
- |
-/** |
* The target of a [Stream.transform] call. |
* |
* The [Stream.transform] call will pass itself to this object and then return |