Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index 499386c47b89e415d8942fae26b510cd39ef69ee..c34c450e02988f11c95252495236188e7a3b8ae7 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -1326,6 +1326,167 @@ abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> { |
| /** |
| + * A `StreamSinkAdapter` is an adapter implementation that can be used on a |
|
Lasse Reichstein Nielsen
2014/03/18 15:34:45
Too long first line. How about:
An adapter that
Anders Johnsen
2014/03/19 11:48:55
Done.
|
| + * [StreamConsumer], to facilitate the functionality of a [StreamSink], |
| + * |
| + * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods |
| + * and will delay an [addStream] until the buffer is flushed. |
|
Lasse Reichstein Nielsen
2014/03/18 15:34:45
the buffer -> that buffer
This sounds like nothin
Anders Johnsen
2014/03/19 11:48:55
Done.
|
| + * |
| + * When the `StreamSinkAdapter` is bound to a stream (through [addStream]) any |
|
Lasse Reichstein Nielsen
2014/03/18 15:34:45
When -> While
Anders Johnsen
2014/03/19 11:48:55
Done.
|
| + * call to the `StreamSinkAdapter` will throw a [StateError]. |
|
Lasse Reichstein Nielsen
2014/03/18 15:34:45
Is this all calls? What about "done" (which is a g
Anders Johnsen
2014/03/19 11:48:55
Yes, only done... :)
|
| + * When the [addStream] completes, the `StreamSinkAdapter` will again be open |
| + * to all calls. |
| + * |
| + * If events are added to the `StreamSinkAdapter` after the adapter is closed, |
| + * the events will be ignored. |
| + * Use the [done] future to be notified when the `StreamSinkAdapter` is closed. |
| + */ |
| +class StreamSinkAdapter<S> implements StreamSink<S> { |
| + final StreamConsumer<S> _target; |
| + Completer _doneCompleter = new Completer(); |
| + Future _doneFuture; |
| + StreamController<S> _controllerInstance; |
| + Completer _controllerCompleter; |
| + bool _isClosed = false; |
| + bool _isBound = false; |
| + bool _hasError = false; |
| + |
| + /** |
| + * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer]. |
| + */ |
| + StreamSinkAdapter(StreamConsumer<S> target) : _target = target { |
| + _doneFuture = _doneCompleter.future; |
| + } |
| + |
| + void add(S data) { |
|
Lasse Reichstein Nielsen
2014/03/18 15:34:45
No _isBound check?
Anders Johnsen
2014/03/19 11:48:55
Is in _controller getter.
|
| + if (_isClosed) return; |
| + _controller.add(data); |
| + } |
| + |
| + void addError(error, [StackTrace stackTrace]) => |
| + _controller.addError(error, stackTrace); |
|
Lasse Reichstein Nielsen
2014/03/18 15:34:45
No _isBound check?
Anders Johnsen
2014/03/19 11:48:55
Is in _controller getter.
|
| + |
| + Future addStream(Stream<S> stream) { |
| + if (_isBound) { |
| + throw new StateError("StreamSink is already bound to a stream"); |
| + } |
| + _isBound = true; |
| + if (_hasError) return done; |
| + // Wait for any sync operations to complete. |
| + Future targetAddStream() { |
| + return _target.addStream(stream) |
| + .whenComplete(() { |
| + _isBound = false; |
| + }); |
| + } |
| + if (_controllerInstance == null) return targetAddStream(); |
| + var future = _controllerCompleter.future; |
| + _controllerInstance.close(); |
| + return future.then((_) => targetAddStream()); |
| + } |
| + |
| + /** |
| + * Returns a [Future] that completes once all buffered events is accepted by |
| + * the underlying [StreamConsumer]. |
| + * |
| + * It's an error to call this method, while an [addStream] is incomplete. |
| + * |
| + * NOTE: This is not necessarily the same as the data being flushed by the |
|
Lasse Reichstein Nielsen
2014/03/18 15:34:45
This note is confusion. It says what this call isn
Anders Johnsen
2014/03/19 11:48:55
Done.
|
| + * operating system. |
| + */ |
| + Future flush() { |
| + if (_isBound) { |
|
Lasse Reichstein Nielsen
2014/03/18 15:34:45
Consider having a helper function for this check,
Anders Johnsen
2014/03/19 11:48:55
Done.
|
| + throw new StateError("StreamSink is bound to a stream"); |
| + } |
| + if (_controllerInstance == null) return new Future.value(this); |
| + // Adding an empty stream-controller will return a future that will complete |
| + // when all data is done. |
| + _isBound = true; |
| + var future = _controllerCompleter.future; |
| + _controllerInstance.close(); |
| + return future.whenComplete(() { |
| + _isBound = false; |
| + }); |
| + } |
| + |
| + Future close() { |
| + if (_isBound) { |
| + throw new StateError("StreamSink is bound to a stream"); |
| + } |
| + if (!_isClosed) { |
| + _isClosed = true; |
| + if (_controllerInstance != null) { |
| + _controllerInstance.close(); |
| + } else { |
| + _closeTarget(); |
| + } |
| + } |
| + return done; |
| + } |
| + |
| + Future get done => _doneFuture; |
| + |
| + // Private helper methods. |
| + |
| + void _closeTarget() { |
| + _target.close() |
| + .then((value) => _completeDone(value: value), |
| + onError: (error) => _completeDone(error: error)); |
| + } |
| + |
| + void _completeDone({value, error}) { |
| + if (_doneCompleter == null) return; |
| + if (error == null) { |
| + _doneCompleter.complete(value); |
| + } else { |
| + _hasError = true; |
| + _doneCompleter.completeError(error); |
| + } |
| + _doneCompleter = null; |
| + } |
| + |
| + StreamController<S> get _controller { |
| + if (_isBound) { |
| + throw new StateError("StreamSink is bound to a stream"); |
| + } |
| + if (_isClosed) { |
| + throw new StateError("StreamSink is closed"); |
| + } |
| + if (_controllerInstance == null) { |
| + _controllerInstance = new StreamController<S>(sync: true); |
| + _controllerCompleter = new Completer(); |
| + _target.addStream(_controller.stream) |
| + .then( |
| + (_) { |
| + if (_isBound) { |
| + // A new stream takes over - forward values to that stream. |
| + _controllerCompleter.complete(this); |
| + _controllerCompleter = null; |
| + _controllerInstance = null; |
| + } else { |
| + // No new stream, .close was called. Close _target. |
| + _closeTarget(); |
| + } |
| + }, |
| + onError: (error) { |
| + if (_isBound) { |
| + // A new stream takes over - forward errors to that stream. |
| + _controllerCompleter.completeError(error); |
| + _controllerCompleter = null; |
| + _controllerInstance = null; |
| + } else { |
| + // No new stream. No need to close target, as it have already |
| + // failed. |
| + _completeDone(error: error); |
| + } |
| + }); |
| + } |
| + return _controllerInstance; |
| + } |
| +} |
| + |
| + |
| +/** |
| * The target of a [Stream.transform] call. |
| * |
| * The [Stream.transform] call will pass itself to this object and then return |