Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 499386c47b89e415d8942fae26b510cd39ef69ee..c34c450e02988f11c95252495236188e7a3b8ae7 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -1326,6 +1326,167 @@ abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> { |
/** |
+ * A `StreamSinkAdapter` is an adapter implementation that can be used on a |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
Too long first line. How about:
An adapter that
Anders Johnsen
2014/03/19 11:48:55
Done.
|
+ * [StreamConsumer], to facilitate the functionality of a [StreamSink], |
+ * |
+ * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods |
+ * and will delay an [addStream] until the buffer is flushed. |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
the buffer -> that buffer
This sounds like nothin
Anders Johnsen
2014/03/19 11:48:55
Done.
|
+ * |
+ * When the `StreamSinkAdapter` is bound to a stream (through [addStream]) any |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
When -> While
Anders Johnsen
2014/03/19 11:48:55
Done.
|
+ * call to the `StreamSinkAdapter` will throw a [StateError]. |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
Is this all calls? What about "done" (which is a g
Anders Johnsen
2014/03/19 11:48:55
Yes, only done... :)
|
+ * When the [addStream] completes, the `StreamSinkAdapter` will again be open |
+ * to all calls. |
+ * |
+ * If events are added to the `StreamSinkAdapter` after the adapter is closed, |
+ * the events will be ignored. |
+ * Use the [done] future to be notified when the `StreamSinkAdapter` is closed. |
+ */ |
+class StreamSinkAdapter<S> implements StreamSink<S> { |
+ final StreamConsumer<S> _target; |
+ Completer _doneCompleter = new Completer(); |
+ Future _doneFuture; |
+ StreamController<S> _controllerInstance; |
+ Completer _controllerCompleter; |
+ bool _isClosed = false; |
+ bool _isBound = false; |
+ bool _hasError = false; |
+ |
+ /** |
+ * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer]. |
+ */ |
+ StreamSinkAdapter(StreamConsumer<S> target) : _target = target { |
+ _doneFuture = _doneCompleter.future; |
+ } |
+ |
+ void add(S data) { |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
No _isBound check?
Anders Johnsen
2014/03/19 11:48:55
Is in _controller getter.
|
+ if (_isClosed) return; |
+ _controller.add(data); |
+ } |
+ |
+ void addError(error, [StackTrace stackTrace]) => |
+ _controller.addError(error, stackTrace); |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
No _isBound check?
Anders Johnsen
2014/03/19 11:48:55
Is in _controller getter.
|
+ |
+ Future addStream(Stream<S> stream) { |
+ if (_isBound) { |
+ throw new StateError("StreamSink is already bound to a stream"); |
+ } |
+ _isBound = true; |
+ if (_hasError) return done; |
+ // Wait for any sync operations to complete. |
+ Future targetAddStream() { |
+ return _target.addStream(stream) |
+ .whenComplete(() { |
+ _isBound = false; |
+ }); |
+ } |
+ if (_controllerInstance == null) return targetAddStream(); |
+ var future = _controllerCompleter.future; |
+ _controllerInstance.close(); |
+ return future.then((_) => targetAddStream()); |
+ } |
+ |
+ /** |
+ * Returns a [Future] that completes once all buffered events is accepted by |
+ * the underlying [StreamConsumer]. |
+ * |
+ * It's an error to call this method, while an [addStream] is incomplete. |
+ * |
+ * NOTE: This is not necessarily the same as the data being flushed by the |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
This note is confusion. It says what this call isn
Anders Johnsen
2014/03/19 11:48:55
Done.
|
+ * operating system. |
+ */ |
+ Future flush() { |
+ if (_isBound) { |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
Consider having a helper function for this check,
Anders Johnsen
2014/03/19 11:48:55
Done.
|
+ throw new StateError("StreamSink is bound to a stream"); |
+ } |
+ if (_controllerInstance == null) return new Future.value(this); |
+ // Adding an empty stream-controller will return a future that will complete |
+ // when all data is done. |
+ _isBound = true; |
+ var future = _controllerCompleter.future; |
+ _controllerInstance.close(); |
+ return future.whenComplete(() { |
+ _isBound = false; |
+ }); |
+ } |
+ |
+ Future close() { |
+ if (_isBound) { |
+ throw new StateError("StreamSink is bound to a stream"); |
+ } |
+ if (!_isClosed) { |
+ _isClosed = true; |
+ if (_controllerInstance != null) { |
+ _controllerInstance.close(); |
+ } else { |
+ _closeTarget(); |
+ } |
+ } |
+ return done; |
+ } |
+ |
+ Future get done => _doneFuture; |
+ |
+ // Private helper methods. |
+ |
+ void _closeTarget() { |
+ _target.close() |
+ .then((value) => _completeDone(value: value), |
+ onError: (error) => _completeDone(error: error)); |
+ } |
+ |
+ void _completeDone({value, error}) { |
+ if (_doneCompleter == null) return; |
+ if (error == null) { |
+ _doneCompleter.complete(value); |
+ } else { |
+ _hasError = true; |
+ _doneCompleter.completeError(error); |
+ } |
+ _doneCompleter = null; |
+ } |
+ |
+ StreamController<S> get _controller { |
+ if (_isBound) { |
+ throw new StateError("StreamSink is bound to a stream"); |
+ } |
+ if (_isClosed) { |
+ throw new StateError("StreamSink is closed"); |
+ } |
+ if (_controllerInstance == null) { |
+ _controllerInstance = new StreamController<S>(sync: true); |
+ _controllerCompleter = new Completer(); |
+ _target.addStream(_controller.stream) |
+ .then( |
+ (_) { |
+ if (_isBound) { |
+ // A new stream takes over - forward values to that stream. |
+ _controllerCompleter.complete(this); |
+ _controllerCompleter = null; |
+ _controllerInstance = null; |
+ } else { |
+ // No new stream, .close was called. Close _target. |
+ _closeTarget(); |
+ } |
+ }, |
+ onError: (error) { |
+ if (_isBound) { |
+ // A new stream takes over - forward errors to that stream. |
+ _controllerCompleter.completeError(error); |
+ _controllerCompleter = null; |
+ _controllerInstance = null; |
+ } else { |
+ // No new stream. No need to close target, as it have already |
+ // failed. |
+ _completeDone(error: error); |
+ } |
+ }); |
+ } |
+ return _controllerInstance; |
+ } |
+} |
+ |
+ |
+/** |
* The target of a [Stream.transform] call. |
* |
* The [Stream.transform] call will pass itself to this object and then return |