Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index 499386c47b89e415d8942fae26b510cd39ef69ee..34ad2541fb83c73e55a6cdde589625da9843c938 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -1326,6 +1326,169 @@ abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> { |
| /** |
| + * A [StreamSink] adapter for a [StreamConsumer]. |
| + * |
| + * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods |
| + * and will delay an [addStream] until all buffered data has been forwarded to |
| + * the stream consumer. |
| + * |
| + * While the `StreamSinkAdapter` is bound to a stream (through [addStream]) any |
| + * call to the `StreamSinkAdapter` will throw a [StateError], except [done]. |
| + * When the [addStream] completes, the `StreamSinkAdapter` will again be open |
| + * to all calls. |
| + * |
| + * If events are added to the `StreamSinkAdapter` after the adapter is closed, |
| + * the events will be ignored. |
| + * Use the [done] future to be notified when the `StreamSinkAdapter` is closed. |
| + */ |
| +class StreamSinkAdapter<S> implements StreamSink<S> { |
| + final StreamConsumer<S> _target; |
| + Completer _doneCompleter = new Completer(); |
| + Future _doneFuture; |
| + StreamController<S> _controllerInstance; |
| + Completer _controllerCompleter; |
| + bool _isClosed = false; |
| + bool _isBound = false; |
| + bool _hasError = false; |
| + |
| + /** |
| + * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer]. |
| + */ |
| + StreamSinkAdapter(StreamConsumer<S> target) : _target = target { |
| + _doneFuture = _doneCompleter.future; |
| + } |
| + |
| + void add(S data) { |
| + if (_isClosed) return; |
| + _controller.add(data); |
| + } |
| + |
| + void addError(error, [StackTrace stackTrace]) => |
| + _controller.addError(error, stackTrace); |
| + |
| + Future addStream(Stream<S> stream) { |
| + _checkIsBound(); |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Hmm, would probably sounds better as "_checkNotBou
Anders Johnsen
2014/03/19 13:56:39
Done.
|
| + _isBound = true; |
| + if (_hasError) return done; |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
return _doneFuture;
(I generally try to avoid hav
Anders Johnsen
2014/03/19 13:56:39
Done.
|
| + // Wait for any sync operations to complete. |
| + Future targetAddStream() { |
| + return _target.addStream(stream) |
| + .whenComplete(() { |
| + _isBound = false; |
| + }); |
| + } |
| + if (_controllerInstance == null) return targetAddStream(); |
| + var future = _controllerCompleter.future; |
| + _controllerInstance.close(); |
| + return future.then((_) => targetAddStream()); |
| + } |
| + |
| + /** |
| + * Returns a [Future] that completes once all buffered events is accepted by |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
is -> has been.
Anders Johnsen
2014/03/19 13:56:39
Done.
|
| + * the underlying [StreamConsumer]. |
| + * |
| + * It's an error to call this method, while an [addStream] is incomplete. |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
-> This method must not be called while an [addStr
Anders Johnsen
2014/03/19 13:56:39
Done.
|
| + * |
| + * NOTE: This is not necessarily the same as the data being flushed by the |
| + * operating system. |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Remote this NOTE, keep the next.
Anders Johnsen
2014/03/19 13:56:39
Done.
|
| + * NOTE: This method does not guarantee anything except that the stream |
| + * consumer has received all buffered data. It does not guarantee that the |
| + * consumer has acted on the data in any way, or that the data has reached |
| + * its final destination. |
| + */ |
| + Future flush() { |
| + _checkIsBound(); |
| + if (_controllerInstance == null) return new Future.value(this); |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Why is "this" the value of the future? Is this doc
Anders Johnsen
2014/03/19 13:56:39
Done.
|
| + // Adding an empty stream-controller will return a future that will complete |
| + // when all data is done. |
| + _isBound = true; |
| + var future = _controllerCompleter.future; |
| + _controllerInstance.close(); |
| + return future.whenComplete(() { |
| + _isBound = false; |
| + }); |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Indentation: Unindent the previous two lines by fo
Anders Johnsen
2014/03/19 13:56:39
Done.
|
| + } |
| + |
| + Future close() { |
| + _checkIsBound(); |
| + if (!_isClosed) { |
| + _isClosed = true; |
| + if (_controllerInstance != null) { |
| + _controllerInstance.close(); |
| + } else { |
| + _closeTarget(); |
| + } |
| + } |
| + return done; |
| + } |
| + |
| + Future get done => _doneFuture; |
| + |
| + // Private helper methods. |
| + |
| + void _closeTarget() { |
| + _target.close() |
| + .then((value) => _completeDone(value: value), |
| + onError: (error) => _completeDone(error: error)); |
| + } |
| + |
| + void _completeDone({value, error}) { |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
This function expects one of two arguments.
Make i
Anders Johnsen
2014/03/19 13:56:39
Done.
|
| + if (_doneCompleter == null) return; |
| + if (error == null) { |
| + _doneCompleter.complete(value); |
| + } else { |
| + _hasError = true; |
| + _doneCompleter.completeError(error); |
| + } |
| + _doneCompleter = null; |
| + } |
| + |
| + StreamController<S> get _controller { |
| + _checkIsBound(); |
| + if (_isClosed) { |
| + throw new StateError("StreamSink is closed"); |
| + } |
| + if (_controllerInstance == null) { |
| + _controllerInstance = new StreamController<S>(sync: true); |
| + _controllerCompleter = new Completer(); |
| + _target.addStream(_controller.stream) |
| + .then( |
| + (_) { |
| + if (_isBound) { |
| + // A new stream takes over - forward values to that stream. |
| + _controllerCompleter.complete(this); |
| + _controllerCompleter = null; |
| + _controllerInstance = null; |
| + } else { |
| + // No new stream, .close was called. Close _target. |
| + _closeTarget(); |
| + } |
| + }, |
| + onError: (error) { |
| + if (_isBound) { |
| + // A new stream takes over - forward errors to that stream. |
| + _controllerCompleter.completeError(error); |
| + _controllerCompleter = null; |
| + _controllerInstance = null; |
| + } else { |
| + // No new stream. No need to close target, as it have already |
| + // failed. |
| + _completeDone(error: error); |
| + } |
| + }); |
| + } |
| + return _controllerInstance; |
| + } |
| + |
| + void _checkIsBound() { |
| + if (_isBound) { |
| + throw new StateError("StreamSink is bound to a stream"); |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
... is bound to a stream -> ... is processing an a
Anders Johnsen
2014/03/19 13:56:39
Done.
|
| + } |
| + } |
| +} |
| + |
| + |
| +/** |
| * The target of a [Stream.transform] call. |
| * |
| * The [Stream.transform] call will pass itself to this object and then return |