Chromium Code Reviews| Index: lib/src/stream_completer.dart |
| diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..3ef3c0321f1f2215b4044cb0d9d3c17615bcdf15 |
| --- /dev/null |
| +++ b/lib/src/stream_completer.dart |
| @@ -0,0 +1,376 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.stream_completer; |
| + |
| +import "dart:async"; |
| +import "delegating_stream_subscription.dart"; |
| + |
| +/// A single-subscription [stream] where the contents are provided later. |
| +/// |
| +/// It is generally recommended that you never create a `Future<Stream>` |
| +/// because you can just directly create a stream that doesn't do anything |
| +/// until it's ready to do so. |
| +/// This class can be used to create such a stream. |
| +/// |
| +/// The [stream] is a normal stream that you can listen to immediately, |
| +/// but until either [setSourceStream] or [setEmpty] is called, |
| +/// the stream won't produce any events. |
| +/// |
| +/// The same effect can be achieved by using a [StreamController] |
| +/// and adding the stream using `addStream` when both |
| +/// the controller's stream is listened to and the source stream is ready. |
| +/// This class attempts to shortcut some of the overhead when possible. |
| +/// For example, if the [stream] is only listened to |
| +/// after the source stream has been set, |
| +/// the listen is performed directly on the source stream. |
| +class StreamCompleter<T> { |
| + /// The stream of this completer. |
| + /// |
| + /// When a source stream is provided, its events will be forwarded to |
| + /// listeners on this stream. |
| + /// |
| + /// The stream can be listened either before or after a source stream |
| + /// is set. |
| + final Stream<T> stream = new _CompleterStream<T>(); |
|
nweiz
2015/06/16 01:05:23
I should have mentioned this in the last pass (sor
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Ack. Done.
|
| + |
| + /// Convert a `Future<Stream>` to a `Stream`. |
| + /// |
| + /// This creates a stream using a stream completer, |
| + /// and sets the source stream to the result of the future when the |
| + /// future completes. |
| + /// |
| + /// If the future completes with an error, the returned stream will |
| + /// instead contain just that error. |
| + static Stream fromFuture(Future<Stream> streamFuture) { |
| + var completer = new StreamCompleter(); |
| + streamFuture.then(completer.setSourceStream, |
| + onError: (e, s) { |
|
nweiz
2015/06/16 01:05:23
"e, s" -> "_"
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Done.
|
| + completer.setSourceStream(streamFuture.asStream()); |
| + }); |
| + return completer.stream; |
| + } |
| + |
| + /// Set a stream as the source of events for the [StreamCompleter]'s |
| + /// [stream]. |
| + /// |
| + /// The completer's `stream` will act exactly as [sourceStream]. |
| + /// |
| + /// If the source stream is set before [stream] is listened to, |
| + /// the listen call on [stream] is forwarded directly to [sourceStream]. |
| + /// |
| + /// If [stream] is listened to before setting the source stream, |
| + /// an intermediate subscription is created. It looks like a completely |
| + /// normal subscription, and can be paused or canceled, but it won't |
| + /// produce any events until a source stream is provided. |
| + /// |
| + /// If the `stream` subscription is canceled before a source stream is set, |
| + /// then nothing further happens. This means that the `sourceStream` may |
| + /// never be listened to, even if the [stream] has been listened to. |
| + /// |
| + /// Otherwise, when the source stream is then set, |
| + /// it is immediately listened to. |
| + /// If the existing subscription was paused, the source stream subscription |
| + /// is paused as many times, and then all events and callbacks are forwarded |
|
nweiz
2015/06/16 01:05:23
It's kind of strange that this forwards the same *
Lasse Reichstein Nielsen
2015/06/18 12:10:12
If you pause three times, you need to resume three
|
| + /// between the two subscriptions, so the listener works as if it was |
| + /// listening to the source stream subscription directly. |
| + /// |
| + /// Either [setSourceStream] or [setEmpty] may be called at most once. |
| + /// Trying to call either of them again will fail. |
| + void setSourceStream(Stream<T> sourceStream) { |
| + _CompleterStream completerStream = this.stream; |
| + completerStream._linkStream(sourceStream); |
| + } |
| + |
| + /// Equivalent to setting an empty stream using [setSourceStream]. |
| + /// |
| + /// Either [setSourceStream] or [setEmpty] may be called at most once. |
| + /// Trying to call either of them again will fail. |
| + void setEmpty() { |
| + // TODO(lrn): Optimize this to not actually create the empty stream. |
| + _CompleterStream completerStream = this.stream; |
| + completerStream._linkStream(new Stream.fromIterable(const [])); |
| + } |
| +} |
| + |
| +/// Stream that acts as a source stream it is (eventually) linked with. |
| +/// |
| +/// The linked source stream can be set after a user has started listening on |
| +/// this stream. No events occur before the source stream is provided. |
| +/// |
| +/// If a user listens before events are available, the state of the |
| +/// subscription is maintained, and the subscription is then linked |
| +/// to the source stream when that becomes available. |
| +class _CompleterStream<T> extends Stream<T> { |
| + // Bit flags used for the value of [_state]. |
| + /// Flag marking that the source stream has been set. |
| + static const int _streamFlag = 1; |
| + /// Flag marking that the stream has been listened to. |
|
nweiz
2015/06/16 01:05:23
Nit: separate these with newlines.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Gone.
|
| + static const int _listenerFlag = 2; |
| + |
| + // States used as values for _state. |
| + /// Initial state with no listener and no source stream set. |
| + static const int _initial = 0; |
| + /// State where only the stream has been set. |
| + static const int _streamOnly = _streamFlag; |
| + /// State where there is only a listener. |
| + static const int _listenerOnly = _listenerFlag; |
| + /// State where source stream and listener have been linked. |
| + static const int _linked = _streamFlag | _listenerFlag; |
| + |
| + /// The current state. |
| + /// |
| + /// One of [_initial], [_streamOnly], [_listenerOnly], |
| + /// or [_linked]. |
| + int _state = _initial; |
| + |
| + /// Shared variable used to store different values depending on the state. |
| + /// |
| + /// In the [_streamOnly] state it contains the source stream. |
| + /// In the [_listenerOnly] state it contains a delegating subscription |
| + /// controller. |
| + /// In the [_linked] and [_initial] states it contains `null`. |
| + /// |
| + /// Do not access this field directly, |
| + /// instead use [_stream] or [_controller] to read the value |
| + /// appropriate for the current state. |
| + var _stateData; |
| + |
| + /// Returns source stream that has been set. |
| + /// |
| + /// Must only be used when the source stream has been set, |
| + /// but the stream has not been listened to yet (state is [_streamOnly]); |
|
nweiz
2015/06/16 01:05:23
";" -> "."
|
| + Stream get _stream { |
| + assert(_state == _streamOnly); |
| + return _stateData; |
| + } |
| + |
| + /// Returns the mutable subscription controller with the subscription |
| + /// on this stream |
| + /// |
| + /// Must only be used when the stream has been listened to, |
| + /// but the source stream has not been set to yet (state is [_listenerOnly]); |
|
nweiz
2015/06/16 01:05:23
";" -> "."
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Acknowledged.
|
| + MutableDelegatingStreamSubscriptionController get _controller { |
| + assert(_state == _listenerOnly); |
| + return _stateData; |
| + } |
| + |
| + // State transition functions. |
| + |
| + /// Sets the source stream, and enters the [_streamOnly] state. |
| + /// |
| + /// Must only be called from the initial state, before this stream has |
| + /// been listened to. |
| + void _setStream(Stream stream) { |
| + assert(_state == _initial); |
| + _stateData = stream; |
| + _state = _streamOnly; |
| + } |
| + |
| + /// Sets the listener subscription, and enters the [_listenerOnly] state. |
| + /// |
| + /// Must only be called from the initial state, before the source stream has |
| + /// been set. |
| + void _setListened(MutableDelegatingStreamSubscriptionController subscription) { |
|
nweiz
2015/06/16 01:05:23
Long line.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Class really needs a better name.
I think I'll rem
|
| + assert(_state == _initial); |
| + _stateData = subscription; |
| + _state = _listenerOnly; |
| + } |
| + |
| + /// Marks the listener and source stream as linked. |
| + /// |
| + /// Enters the [_isLinked] state. |
| + /// This must be called only from either the [_streamOnly] |
| + /// or the [_listenerOnly] state after setting up the link. |
| + void _setLinked() { |
| + assert(_state == _streamOnly || _state == _listenerOnly); |
| + _state = _linked; |
| + _stateData = null; |
| + } |
| + |
| + // end state transition functions. |
| + |
| + /// Called by the controller when the user supplies a stream |
| + void _linkStream(Stream stream) { |
| + if (_state == _listenerOnly) { |
| + var subscription = _controller.sourceSubscription; |
| + if (subscription is _CompleterSubscriptionState) { |
| + _controller.sourceSubscription = subscription._linkStream(stream); |
| + } else { |
| + assert(subscription is _CanceledSubscription); |
| + } |
| + _setLinked(); |
| + } else if (_state == _initial) { |
| + _setStream(stream); |
| + } else { |
| + throw new StateError("Stream already linked."); |
| + } |
| + } |
| + |
| + StreamSubscription listen(void onData(T event), |
| + {Function onError, |
| + void onDone(), |
| + bool cancelOnError}) { |
| + if (_state == _initial) { |
| + if (cancelOnError == null) cancelOnError = false; |
| + var controller = |
| + new MutableDelegatingStreamSubscriptionController<T>(null); |
| + controller.sourceSubscription = |
|
nweiz
2015/06/16 01:05:23
Why are you assigning this rather than passing it
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Because of the cyclic dependency, the state object
|
| + new _CompleterSubscriptionState(cancelOnError, controller); |
| + var subscription = controller.subscription; |
| + subscription.onData(onData); |
| + subscription.onError(onError); |
| + subscription.onDone(onDone); |
| + _setListened(controller); |
| + return subscription; |
| + } |
| + if (_state == _streamOnly) { |
|
nweiz
2015/06/16 01:05:23
Nit: "else if", for compactness and to match other
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Then I think I should also put the final throw in
|
| + var result = _stream.listen(onData, onError: onError, onDone: onDone, |
| + cancelOnError: cancelOnError); |
| + _setLinked(); |
| + return result; |
| + } |
| + throw new StateError("Stream has already been listened to."); |
| + } |
| +} |
| + |
| +/// Holds subscription callbacks and state for a [_CompleterSubscription] |
| +/// until the real subscription is available. |
| +/// |
| +/// Always used in a [MutableDelegatingStreamSubscriptionController]. |
| +class _CompleterSubscriptionState implements StreamSubscription { |
| + /// The mutable subscription wrapper. |
| + /// |
| + /// Used for handling [pause] with resume futures and [asFuture] which |
| + /// both need to call a different function (`resume` and `cancel` |
| + /// respectively) at a later point. We let those go through the |
| + /// wrapper subscription to ensure that they are forwarded to the |
| + /// controller source subscription that is current at that time. |
| + final MutableDelegatingStreamSubscriptionController _controller; |
| + |
| + /// Whether the subscription cancels on error. |
| + /// |
| + /// This is forwarded to the real subscription when that is created. |
| + final bool _cancelOnError; |
| + |
| + // Callbacks forwarded to the real subscription when it's created. |
| + |
| + ZoneUnaryCallback _onData; |
| + Function _onError; |
| + ZoneCallback _onDone; |
| + |
| + /// Future set when cancel is called. |
| + /// This both marks the subscription as canceled and allows returning |
|
nweiz
2015/06/16 01:05:23
Nit: newline above this.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Acknowledged.
|
| + /// the same future every time the cancel function is called. |
| + Future cancelFuture; |
| + |
| + /// Count of active pauses. |
| + /// |
| + /// When the real subscription is created, it is paused this many times. |
| + int pauseCount = 0; |
| + |
| + _CompleterSubscriptionState(this._cancelOnError, |
| + this._controller); |
| + |
| + void onData(void handleData(data)) { |
| + _onData = handleData; |
| + } |
| + |
| + void onError(Function handleError) { |
| + _onError = handleError; |
| + } |
| + |
| + void onDone(void handleDone()) { |
| + _onDone = handleDone; |
| + } |
| + |
| + void pause([Future resumeFuture]) { |
| + pauseCount++; |
| + if (resumeFuture != null) { |
| + // Go through wrapper subscription in case the real subscription |
| + // is linked before the future completes. |
| + resumeFuture.whenComplete(_controller.subscription.resume); |
| + } |
| + } |
| + |
| + void resume() { |
| + if (pauseCount > 0) pauseCount--; |
| + } |
| + |
| + Future cancel() { |
| + var cancelFuture = new Future.value(); |
|
nweiz
2015/06/16 01:05:23
Why return a future here? cancel() is allowed to r
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Yes, sadly. That's a consequence of backwards comp
|
| + // Immediately replace the [_CompleterSubscription._delegate] so |
| + // we won't be called again. |
| + // This also releases any, now unused, callbacks we are holding on to. |
| + _controller.sourceSubscription = new _CanceledSubscription(cancelFuture); |
| + return cancelFuture; |
| + } |
| + |
| + bool get isPaused { |
| + return (cancelFuture != null && pauseCount > 0); |
| + } |
| + |
| + Future asFuture([futureValue]) { |
| + Completer completer = new Completer(); |
| + _onDone = () { |
| + completer.complete(futureValue); |
| + }; |
| + _onError = (error, StackTrace stackTrace) { |
| + // Cancel the wrapper subscription in case the real subscription |
| + // is linked before an error triggers this function. |
| + _controller.subscription.cancel(); |
| + completer.completeError(error, stackTrace); |
| + }; |
| + return completer.future; |
| + } |
| + |
| + StreamSubscription _linkStream(Stream stream) { |
| + if (cancelFuture != null) { |
| + return new _CanceledSubscription(cancelFuture); |
| + } |
| + // If not canceled, create the real subscription and make |
| + // sure it has the requested callbacks, cancelOnErrror flag and |
| + // number of pauses. |
| + var subscription = stream.listen(null, cancelOnError: _cancelOnError); |
| + subscription.onData(_onData); |
| + subscription.onError(_onError); |
| + subscription.onDone(_onDone); |
| + while (pauseCount > 0) { |
| + subscription.pause(); |
| + pauseCount--; |
| + } |
| + return subscription; |
| + } |
| +} |
| + |
| +/// A subscription that acts as if it has been canceled. |
| +/// |
| +/// No events are fired and pausing is ignored. |
| +/// The [cancel] method always returns the same future. |
| +class _CanceledSubscription implements StreamSubscription { |
| + /// The future returned by [cancel]; |
| + Future _doneFuture; |
| + |
| + _CanceledSubscription(this._doneFuture); |
| + |
| + void onData(void handleData(data)) {} |
| + |
| + void onError(Function handleError) {} |
| + |
| + void onDone(void handleDone()) {} |
| + |
| + void pause([Future resumeFuture]) {} |
| + |
| + void resume() {} |
| + |
| + Future cancel() => _doneFuture; |
| + |
| + /// Returns future that never completes. |
| + /// |
| + /// The `asFuture` result is completed by either an error event |
| + /// or a done event. A canceled future never produces either. |
| + Future asFuture([futureValue]) => new Completer().future; |
| + |
| + bool get isPaused => false; |
| +} |