Index: lib/src/stream_completer.dart |
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..3ef3c0321f1f2215b4044cb0d9d3c17615bcdf15 |
--- /dev/null |
+++ b/lib/src/stream_completer.dart |
@@ -0,0 +1,376 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library async.stream_completer; |
+ |
+import "dart:async"; |
+import "delegating_stream_subscription.dart"; |
+ |
+/// A single-subscription [stream] where the contents are provided later. |
+/// |
+/// It is generally recommended that you never create a `Future<Stream>` |
+/// because you can just directly create a stream that doesn't do anything |
+/// until it's ready to do so. |
+/// This class can be used to create such a stream. |
+/// |
+/// The [stream] is a normal stream that you can listen to immediately, |
+/// but until either [setSourceStream] or [setEmpty] is called, |
+/// the stream won't produce any events. |
+/// |
+/// The same effect can be achieved by using a [StreamController] |
+/// and adding the stream using `addStream` when both |
+/// the controller's stream is listened to and the source stream is ready. |
+/// This class attempts to shortcut some of the overhead when possible. |
+/// For example, if the [stream] is only listened to |
+/// after the source stream has been set, |
+/// the listen is performed directly on the source stream. |
+class StreamCompleter<T> { |
+ /// The stream of this completer. |
+ /// |
+ /// When a source stream is provided, its events will be forwarded to |
+ /// listeners on this stream. |
+ /// |
+ /// The stream can be listened either before or after a source stream |
+ /// is set. |
+ final Stream<T> stream = new _CompleterStream<T>(); |
nweiz
2015/06/16 01:05:23
I should have mentioned this in the last pass (sor
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Ack. Done.
|
+ |
+ /// Convert a `Future<Stream>` to a `Stream`. |
+ /// |
+ /// This creates a stream using a stream completer, |
+ /// and sets the source stream to the result of the future when the |
+ /// future completes. |
+ /// |
+ /// If the future completes with an error, the returned stream will |
+ /// instead contain just that error. |
+ static Stream fromFuture(Future<Stream> streamFuture) { |
+ var completer = new StreamCompleter(); |
+ streamFuture.then(completer.setSourceStream, |
+ onError: (e, s) { |
nweiz
2015/06/16 01:05:23
"e, s" -> "_"
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Done.
|
+ completer.setSourceStream(streamFuture.asStream()); |
+ }); |
+ return completer.stream; |
+ } |
+ |
+ /// Set a stream as the source of events for the [StreamCompleter]'s |
+ /// [stream]. |
+ /// |
+ /// The completer's `stream` will act exactly as [sourceStream]. |
+ /// |
+ /// If the source stream is set before [stream] is listened to, |
+ /// the listen call on [stream] is forwarded directly to [sourceStream]. |
+ /// |
+ /// If [stream] is listened to before setting the source stream, |
+ /// an intermediate subscription is created. It looks like a completely |
+ /// normal subscription, and can be paused or canceled, but it won't |
+ /// produce any events until a source stream is provided. |
+ /// |
+ /// If the `stream` subscription is canceled before a source stream is set, |
+ /// then nothing further happens. This means that the `sourceStream` may |
+ /// never be listened to, even if the [stream] has been listened to. |
+ /// |
+ /// Otherwise, when the source stream is then set, |
+ /// it is immediately listened to. |
+ /// If the existing subscription was paused, the source stream subscription |
+ /// is paused as many times, and then all events and callbacks are forwarded |
nweiz
2015/06/16 01:05:23
It's kind of strange that this forwards the same *
Lasse Reichstein Nielsen
2015/06/18 12:10:12
If you pause three times, you need to resume three
|
+ /// between the two subscriptions, so the listener works as if it was |
+ /// listening to the source stream subscription directly. |
+ /// |
+ /// Either [setSourceStream] or [setEmpty] may be called at most once. |
+ /// Trying to call either of them again will fail. |
+ void setSourceStream(Stream<T> sourceStream) { |
+ _CompleterStream completerStream = this.stream; |
+ completerStream._linkStream(sourceStream); |
+ } |
+ |
+ /// Equivalent to setting an empty stream using [setSourceStream]. |
+ /// |
+ /// Either [setSourceStream] or [setEmpty] may be called at most once. |
+ /// Trying to call either of them again will fail. |
+ void setEmpty() { |
+ // TODO(lrn): Optimize this to not actually create the empty stream. |
+ _CompleterStream completerStream = this.stream; |
+ completerStream._linkStream(new Stream.fromIterable(const [])); |
+ } |
+} |
+ |
+/// Stream that acts as a source stream it is (eventually) linked with. |
+/// |
+/// The linked source stream can be set after a user has started listening on |
+/// this stream. No events occur before the source stream is provided. |
+/// |
+/// If a user listens before events are available, the state of the |
+/// subscription is maintained, and the subscription is then linked |
+/// to the source stream when that becomes available. |
+class _CompleterStream<T> extends Stream<T> { |
+ // Bit flags used for the value of [_state]. |
+ /// Flag marking that the source stream has been set. |
+ static const int _streamFlag = 1; |
+ /// Flag marking that the stream has been listened to. |
nweiz
2015/06/16 01:05:23
Nit: separate these with newlines.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Gone.
|
+ static const int _listenerFlag = 2; |
+ |
+ // States used as values for _state. |
+ /// Initial state with no listener and no source stream set. |
+ static const int _initial = 0; |
+ /// State where only the stream has been set. |
+ static const int _streamOnly = _streamFlag; |
+ /// State where there is only a listener. |
+ static const int _listenerOnly = _listenerFlag; |
+ /// State where source stream and listener have been linked. |
+ static const int _linked = _streamFlag | _listenerFlag; |
+ |
+ /// The current state. |
+ /// |
+ /// One of [_initial], [_streamOnly], [_listenerOnly], |
+ /// or [_linked]. |
+ int _state = _initial; |
+ |
+ /// Shared variable used to store different values depending on the state. |
+ /// |
+ /// In the [_streamOnly] state it contains the source stream. |
+ /// In the [_listenerOnly] state it contains a delegating subscription |
+ /// controller. |
+ /// In the [_linked] and [_initial] states it contains `null`. |
+ /// |
+ /// Do not access this field directly, |
+ /// instead use [_stream] or [_controller] to read the value |
+ /// appropriate for the current state. |
+ var _stateData; |
+ |
+ /// Returns source stream that has been set. |
+ /// |
+ /// Must only be used when the source stream has been set, |
+ /// but the stream has not been listened to yet (state is [_streamOnly]); |
nweiz
2015/06/16 01:05:23
";" -> "."
|
+ Stream get _stream { |
+ assert(_state == _streamOnly); |
+ return _stateData; |
+ } |
+ |
+ /// Returns the mutable subscription controller with the subscription |
+ /// on this stream |
+ /// |
+ /// Must only be used when the stream has been listened to, |
+ /// but the source stream has not been set to yet (state is [_listenerOnly]); |
nweiz
2015/06/16 01:05:23
";" -> "."
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Acknowledged.
|
+ MutableDelegatingStreamSubscriptionController get _controller { |
+ assert(_state == _listenerOnly); |
+ return _stateData; |
+ } |
+ |
+ // State transition functions. |
+ |
+ /// Sets the source stream, and enters the [_streamOnly] state. |
+ /// |
+ /// Must only be called from the initial state, before this stream has |
+ /// been listened to. |
+ void _setStream(Stream stream) { |
+ assert(_state == _initial); |
+ _stateData = stream; |
+ _state = _streamOnly; |
+ } |
+ |
+ /// Sets the listener subscription, and enters the [_listenerOnly] state. |
+ /// |
+ /// Must only be called from the initial state, before the source stream has |
+ /// been set. |
+ void _setListened(MutableDelegatingStreamSubscriptionController subscription) { |
nweiz
2015/06/16 01:05:23
Long line.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Class really needs a better name.
I think I'll rem
|
+ assert(_state == _initial); |
+ _stateData = subscription; |
+ _state = _listenerOnly; |
+ } |
+ |
+ /// Marks the listener and source stream as linked. |
+ /// |
+ /// Enters the [_isLinked] state. |
+ /// This must be called only from either the [_streamOnly] |
+ /// or the [_listenerOnly] state after setting up the link. |
+ void _setLinked() { |
+ assert(_state == _streamOnly || _state == _listenerOnly); |
+ _state = _linked; |
+ _stateData = null; |
+ } |
+ |
+ // end state transition functions. |
+ |
+ /// Called by the controller when the user supplies a stream |
+ void _linkStream(Stream stream) { |
+ if (_state == _listenerOnly) { |
+ var subscription = _controller.sourceSubscription; |
+ if (subscription is _CompleterSubscriptionState) { |
+ _controller.sourceSubscription = subscription._linkStream(stream); |
+ } else { |
+ assert(subscription is _CanceledSubscription); |
+ } |
+ _setLinked(); |
+ } else if (_state == _initial) { |
+ _setStream(stream); |
+ } else { |
+ throw new StateError("Stream already linked."); |
+ } |
+ } |
+ |
+ StreamSubscription listen(void onData(T event), |
+ {Function onError, |
+ void onDone(), |
+ bool cancelOnError}) { |
+ if (_state == _initial) { |
+ if (cancelOnError == null) cancelOnError = false; |
+ var controller = |
+ new MutableDelegatingStreamSubscriptionController<T>(null); |
+ controller.sourceSubscription = |
nweiz
2015/06/16 01:05:23
Why are you assigning this rather than passing it
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Because of the cyclic dependency, the state object
|
+ new _CompleterSubscriptionState(cancelOnError, controller); |
+ var subscription = controller.subscription; |
+ subscription.onData(onData); |
+ subscription.onError(onError); |
+ subscription.onDone(onDone); |
+ _setListened(controller); |
+ return subscription; |
+ } |
+ if (_state == _streamOnly) { |
nweiz
2015/06/16 01:05:23
Nit: "else if", for compactness and to match other
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Then I think I should also put the final throw in
|
+ var result = _stream.listen(onData, onError: onError, onDone: onDone, |
+ cancelOnError: cancelOnError); |
+ _setLinked(); |
+ return result; |
+ } |
+ throw new StateError("Stream has already been listened to."); |
+ } |
+} |
+ |
+/// Holds subscription callbacks and state for a [_CompleterSubscription] |
+/// until the real subscription is available. |
+/// |
+/// Always used in a [MutableDelegatingStreamSubscriptionController]. |
+class _CompleterSubscriptionState implements StreamSubscription { |
+ /// The mutable subscription wrapper. |
+ /// |
+ /// Used for handling [pause] with resume futures and [asFuture] which |
+ /// both need to call a different function (`resume` and `cancel` |
+ /// respectively) at a later point. We let those go through the |
+ /// wrapper subscription to ensure that they are forwarded to the |
+ /// controller source subscription that is current at that time. |
+ final MutableDelegatingStreamSubscriptionController _controller; |
+ |
+ /// Whether the subscription cancels on error. |
+ /// |
+ /// This is forwarded to the real subscription when that is created. |
+ final bool _cancelOnError; |
+ |
+ // Callbacks forwarded to the real subscription when it's created. |
+ |
+ ZoneUnaryCallback _onData; |
+ Function _onError; |
+ ZoneCallback _onDone; |
+ |
+ /// Future set when cancel is called. |
+ /// This both marks the subscription as canceled and allows returning |
nweiz
2015/06/16 01:05:23
Nit: newline above this.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Acknowledged.
|
+ /// the same future every time the cancel function is called. |
+ Future cancelFuture; |
+ |
+ /// Count of active pauses. |
+ /// |
+ /// When the real subscription is created, it is paused this many times. |
+ int pauseCount = 0; |
+ |
+ _CompleterSubscriptionState(this._cancelOnError, |
+ this._controller); |
+ |
+ void onData(void handleData(data)) { |
+ _onData = handleData; |
+ } |
+ |
+ void onError(Function handleError) { |
+ _onError = handleError; |
+ } |
+ |
+ void onDone(void handleDone()) { |
+ _onDone = handleDone; |
+ } |
+ |
+ void pause([Future resumeFuture]) { |
+ pauseCount++; |
+ if (resumeFuture != null) { |
+ // Go through wrapper subscription in case the real subscription |
+ // is linked before the future completes. |
+ resumeFuture.whenComplete(_controller.subscription.resume); |
+ } |
+ } |
+ |
+ void resume() { |
+ if (pauseCount > 0) pauseCount--; |
+ } |
+ |
+ Future cancel() { |
+ var cancelFuture = new Future.value(); |
nweiz
2015/06/16 01:05:23
Why return a future here? cancel() is allowed to r
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Yes, sadly. That's a consequence of backwards comp
|
+ // Immediately replace the [_CompleterSubscription._delegate] so |
+ // we won't be called again. |
+ // This also releases any, now unused, callbacks we are holding on to. |
+ _controller.sourceSubscription = new _CanceledSubscription(cancelFuture); |
+ return cancelFuture; |
+ } |
+ |
+ bool get isPaused { |
+ return (cancelFuture != null && pauseCount > 0); |
+ } |
+ |
+ Future asFuture([futureValue]) { |
+ Completer completer = new Completer(); |
+ _onDone = () { |
+ completer.complete(futureValue); |
+ }; |
+ _onError = (error, StackTrace stackTrace) { |
+ // Cancel the wrapper subscription in case the real subscription |
+ // is linked before an error triggers this function. |
+ _controller.subscription.cancel(); |
+ completer.completeError(error, stackTrace); |
+ }; |
+ return completer.future; |
+ } |
+ |
+ StreamSubscription _linkStream(Stream stream) { |
+ if (cancelFuture != null) { |
+ return new _CanceledSubscription(cancelFuture); |
+ } |
+ // If not canceled, create the real subscription and make |
+ // sure it has the requested callbacks, cancelOnErrror flag and |
+ // number of pauses. |
+ var subscription = stream.listen(null, cancelOnError: _cancelOnError); |
+ subscription.onData(_onData); |
+ subscription.onError(_onError); |
+ subscription.onDone(_onDone); |
+ while (pauseCount > 0) { |
+ subscription.pause(); |
+ pauseCount--; |
+ } |
+ return subscription; |
+ } |
+} |
+ |
+/// A subscription that acts as if it has been canceled. |
+/// |
+/// No events are fired and pausing is ignored. |
+/// The [cancel] method always returns the same future. |
+class _CanceledSubscription implements StreamSubscription { |
+ /// The future returned by [cancel]; |
+ Future _doneFuture; |
+ |
+ _CanceledSubscription(this._doneFuture); |
+ |
+ void onData(void handleData(data)) {} |
+ |
+ void onError(Function handleError) {} |
+ |
+ void onDone(void handleDone()) {} |
+ |
+ void pause([Future resumeFuture]) {} |
+ |
+ void resume() {} |
+ |
+ Future cancel() => _doneFuture; |
+ |
+ /// Returns future that never completes. |
+ /// |
+ /// The `asFuture` result is completed by either an error event |
+ /// or a done event. A canceled future never produces either. |
+ Future asFuture([futureValue]) => new Completer().future; |
+ |
+ bool get isPaused => false; |
+} |