Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 499386c47b89e415d8942fae26b510cd39ef69ee..34ad2541fb83c73e55a6cdde589625da9843c938 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -1326,6 +1326,169 @@ abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> { |
/** |
+ * A [StreamSink] adapter for a [StreamConsumer]. |
+ * |
+ * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods |
+ * and will delay an [addStream] until all buffered data has been forwarded to |
+ * the stream consumer. |
+ * |
+ * While the `StreamSinkAdapter` is bound to a stream (through [addStream]) any |
+ * call to the `StreamSinkAdapter` will throw a [StateError], except [done]. |
+ * When the [addStream] completes, the `StreamSinkAdapter` will again be open |
+ * to all calls. |
+ * |
+ * If events are added to the `StreamSinkAdapter` after the adapter is closed, |
+ * the events will be ignored. |
+ * Use the [done] future to be notified when the `StreamSinkAdapter` is closed. |
+ */ |
+class StreamSinkAdapter<S> implements StreamSink<S> { |
+ final StreamConsumer<S> _target; |
+ Completer _doneCompleter = new Completer(); |
+ Future _doneFuture; |
+ StreamController<S> _controllerInstance; |
+ Completer _controllerCompleter; |
+ bool _isClosed = false; |
+ bool _isBound = false; |
+ bool _hasError = false; |
+ |
+ /** |
+ * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer]. |
+ */ |
+ StreamSinkAdapter(StreamConsumer<S> target) : _target = target { |
+ _doneFuture = _doneCompleter.future; |
+ } |
+ |
+ void add(S data) { |
+ if (_isClosed) return; |
+ _controller.add(data); |
+ } |
+ |
+ void addError(error, [StackTrace stackTrace]) => |
+ _controller.addError(error, stackTrace); |
+ |
+ Future addStream(Stream<S> stream) { |
+ _checkIsBound(); |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Hmm, would probably sounds better as "_checkNotBou
Anders Johnsen
2014/03/19 13:56:39
Done.
|
+ _isBound = true; |
+ if (_hasError) return done; |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
return _doneFuture;
(I generally try to avoid hav
Anders Johnsen
2014/03/19 13:56:39
Done.
|
+ // Wait for any sync operations to complete. |
+ Future targetAddStream() { |
+ return _target.addStream(stream) |
+ .whenComplete(() { |
+ _isBound = false; |
+ }); |
+ } |
+ if (_controllerInstance == null) return targetAddStream(); |
+ var future = _controllerCompleter.future; |
+ _controllerInstance.close(); |
+ return future.then((_) => targetAddStream()); |
+ } |
+ |
+ /** |
+ * Returns a [Future] that completes once all buffered events is accepted by |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
is -> has been.
Anders Johnsen
2014/03/19 13:56:39
Done.
|
+ * the underlying [StreamConsumer]. |
+ * |
+ * It's an error to call this method, while an [addStream] is incomplete. |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
-> This method must not be called while an [addStr
Anders Johnsen
2014/03/19 13:56:39
Done.
|
+ * |
+ * NOTE: This is not necessarily the same as the data being flushed by the |
+ * operating system. |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Remote this NOTE, keep the next.
Anders Johnsen
2014/03/19 13:56:39
Done.
|
+ * NOTE: This method does not guarantee anything except that the stream |
+ * consumer has received all buffered data. It does not guarantee that the |
+ * consumer has acted on the data in any way, or that the data has reached |
+ * its final destination. |
+ */ |
+ Future flush() { |
+ _checkIsBound(); |
+ if (_controllerInstance == null) return new Future.value(this); |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Why is "this" the value of the future? Is this doc
Anders Johnsen
2014/03/19 13:56:39
Done.
|
+ // Adding an empty stream-controller will return a future that will complete |
+ // when all data is done. |
+ _isBound = true; |
+ var future = _controllerCompleter.future; |
+ _controllerInstance.close(); |
+ return future.whenComplete(() { |
+ _isBound = false; |
+ }); |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Indentation: Unindent the previous two lines by fo
Anders Johnsen
2014/03/19 13:56:39
Done.
|
+ } |
+ |
+ Future close() { |
+ _checkIsBound(); |
+ if (!_isClosed) { |
+ _isClosed = true; |
+ if (_controllerInstance != null) { |
+ _controllerInstance.close(); |
+ } else { |
+ _closeTarget(); |
+ } |
+ } |
+ return done; |
+ } |
+ |
+ Future get done => _doneFuture; |
+ |
+ // Private helper methods. |
+ |
+ void _closeTarget() { |
+ _target.close() |
+ .then((value) => _completeDone(value: value), |
+ onError: (error) => _completeDone(error: error)); |
+ } |
+ |
+ void _completeDone({value, error}) { |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
This function expects one of two arguments.
Make i
Anders Johnsen
2014/03/19 13:56:39
Done.
|
+ if (_doneCompleter == null) return; |
+ if (error == null) { |
+ _doneCompleter.complete(value); |
+ } else { |
+ _hasError = true; |
+ _doneCompleter.completeError(error); |
+ } |
+ _doneCompleter = null; |
+ } |
+ |
+ StreamController<S> get _controller { |
+ _checkIsBound(); |
+ if (_isClosed) { |
+ throw new StateError("StreamSink is closed"); |
+ } |
+ if (_controllerInstance == null) { |
+ _controllerInstance = new StreamController<S>(sync: true); |
+ _controllerCompleter = new Completer(); |
+ _target.addStream(_controller.stream) |
+ .then( |
+ (_) { |
+ if (_isBound) { |
+ // A new stream takes over - forward values to that stream. |
+ _controllerCompleter.complete(this); |
+ _controllerCompleter = null; |
+ _controllerInstance = null; |
+ } else { |
+ // No new stream, .close was called. Close _target. |
+ _closeTarget(); |
+ } |
+ }, |
+ onError: (error) { |
+ if (_isBound) { |
+ // A new stream takes over - forward errors to that stream. |
+ _controllerCompleter.completeError(error); |
+ _controllerCompleter = null; |
+ _controllerInstance = null; |
+ } else { |
+ // No new stream. No need to close target, as it have already |
+ // failed. |
+ _completeDone(error: error); |
+ } |
+ }); |
+ } |
+ return _controllerInstance; |
+ } |
+ |
+ void _checkIsBound() { |
+ if (_isBound) { |
+ throw new StateError("StreamSink is bound to a stream"); |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
... is bound to a stream -> ... is processing an a
Anders Johnsen
2014/03/19 13:56:39
Done.
|
+ } |
+ } |
+} |
+ |
+ |
+/** |
* The target of a [Stream.transform] call. |
* |
* The [Stream.transform] call will pass itself to this object and then return |