 Chromium Code Reviews
 Chromium Code Reviews Issue 1149563010:
  Add new features to package:async.  (Closed) 
  Base URL: https://github.com/dart-lang/async@master
    
  
    Issue 1149563010:
  Add new features to package:async.  (Closed) 
  Base URL: https://github.com/dart-lang/async@master| Index: lib/src/stream_completer.dart | 
| diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..3ef3c0321f1f2215b4044cb0d9d3c17615bcdf15 | 
| --- /dev/null | 
| +++ b/lib/src/stream_completer.dart | 
| @@ -0,0 +1,376 @@ | 
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 
| +// for details. All rights reserved. Use of this source code is governed by a | 
| +// BSD-style license that can be found in the LICENSE file. | 
| + | 
| +library async.stream_completer; | 
| + | 
| +import "dart:async"; | 
| +import "delegating_stream_subscription.dart"; | 
| + | 
| +/// A single-subscription [stream] where the contents are provided later. | 
| +/// | 
| +/// It is generally recommended that you never create a `Future<Stream>` | 
| +/// because you can just directly create a stream that doesn't do anything | 
| +/// until it's ready to do so. | 
| +/// This class can be used to create such a stream. | 
| +/// | 
| +/// The [stream] is a normal stream that you can listen to immediately, | 
| +/// but until either [setSourceStream] or [setEmpty] is called, | 
| +/// the stream won't produce any events. | 
| +/// | 
| +/// The same effect can be achieved by using a [StreamController] | 
| +/// and adding the stream using `addStream` when both | 
| +/// the controller's stream is listened to and the source stream is ready. | 
| +/// This class attempts to shortcut some of the overhead when possible. | 
| +/// For example, if the [stream] is only listened to | 
| +/// after the source stream has been set, | 
| +/// the listen is performed directly on the source stream. | 
| +class StreamCompleter<T> { | 
| + /// The stream of this completer. | 
| + /// | 
| + /// When a source stream is provided, its events will be forwarded to | 
| + /// listeners on this stream. | 
| + /// | 
| + /// The stream can be listened either before or after a source stream | 
| + /// is set. | 
| + final Stream<T> stream = new _CompleterStream<T>(); | 
| 
nweiz
2015/06/16 01:05:23
I should have mentioned this in the last pass (sor
 
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Ack. Done.
 | 
| + | 
| + /// Convert a `Future<Stream>` to a `Stream`. | 
| + /// | 
| + /// This creates a stream using a stream completer, | 
| + /// and sets the source stream to the result of the future when the | 
| + /// future completes. | 
| + /// | 
| + /// If the future completes with an error, the returned stream will | 
| + /// instead contain just that error. | 
| + static Stream fromFuture(Future<Stream> streamFuture) { | 
| + var completer = new StreamCompleter(); | 
| + streamFuture.then(completer.setSourceStream, | 
| + onError: (e, s) { | 
| 
nweiz
2015/06/16 01:05:23
"e, s" -> "_"
 
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Done.
 | 
| + completer.setSourceStream(streamFuture.asStream()); | 
| + }); | 
| + return completer.stream; | 
| + } | 
| + | 
| + /// Set a stream as the source of events for the [StreamCompleter]'s | 
| + /// [stream]. | 
| + /// | 
| + /// The completer's `stream` will act exactly as [sourceStream]. | 
| + /// | 
| + /// If the source stream is set before [stream] is listened to, | 
| + /// the listen call on [stream] is forwarded directly to [sourceStream]. | 
| + /// | 
| + /// If [stream] is listened to before setting the source stream, | 
| + /// an intermediate subscription is created. It looks like a completely | 
| + /// normal subscription, and can be paused or canceled, but it won't | 
| + /// produce any events until a source stream is provided. | 
| + /// | 
| + /// If the `stream` subscription is canceled before a source stream is set, | 
| + /// then nothing further happens. This means that the `sourceStream` may | 
| + /// never be listened to, even if the [stream] has been listened to. | 
| + /// | 
| + /// Otherwise, when the source stream is then set, | 
| + /// it is immediately listened to. | 
| + /// If the existing subscription was paused, the source stream subscription | 
| + /// is paused as many times, and then all events and callbacks are forwarded | 
| 
nweiz
2015/06/16 01:05:23
It's kind of strange that this forwards the same *
 
Lasse Reichstein Nielsen
2015/06/18 12:10:12
If you pause three times, you need to resume three
 | 
| + /// between the two subscriptions, so the listener works as if it was | 
| + /// listening to the source stream subscription directly. | 
| + /// | 
| + /// Either [setSourceStream] or [setEmpty] may be called at most once. | 
| + /// Trying to call either of them again will fail. | 
| + void setSourceStream(Stream<T> sourceStream) { | 
| + _CompleterStream completerStream = this.stream; | 
| + completerStream._linkStream(sourceStream); | 
| + } | 
| + | 
| + /// Equivalent to setting an empty stream using [setSourceStream]. | 
| + /// | 
| + /// Either [setSourceStream] or [setEmpty] may be called at most once. | 
| + /// Trying to call either of them again will fail. | 
| + void setEmpty() { | 
| + // TODO(lrn): Optimize this to not actually create the empty stream. | 
| + _CompleterStream completerStream = this.stream; | 
| + completerStream._linkStream(new Stream.fromIterable(const [])); | 
| + } | 
| +} | 
| + | 
| +/// Stream that acts as a source stream it is (eventually) linked with. | 
| +/// | 
| +/// The linked source stream can be set after a user has started listening on | 
| +/// this stream. No events occur before the source stream is provided. | 
| +/// | 
| +/// If a user listens before events are available, the state of the | 
| +/// subscription is maintained, and the subscription is then linked | 
| +/// to the source stream when that becomes available. | 
| +class _CompleterStream<T> extends Stream<T> { | 
| + // Bit flags used for the value of [_state]. | 
| + /// Flag marking that the source stream has been set. | 
| + static const int _streamFlag = 1; | 
| + /// Flag marking that the stream has been listened to. | 
| 
nweiz
2015/06/16 01:05:23
Nit: separate these with newlines.
 
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Gone.
 | 
| + static const int _listenerFlag = 2; | 
| + | 
| + // States used as values for _state. | 
| + /// Initial state with no listener and no source stream set. | 
| + static const int _initial = 0; | 
| + /// State where only the stream has been set. | 
| + static const int _streamOnly = _streamFlag; | 
| + /// State where there is only a listener. | 
| + static const int _listenerOnly = _listenerFlag; | 
| + /// State where source stream and listener have been linked. | 
| + static const int _linked = _streamFlag | _listenerFlag; | 
| + | 
| + /// The current state. | 
| + /// | 
| + /// One of [_initial], [_streamOnly], [_listenerOnly], | 
| + /// or [_linked]. | 
| + int _state = _initial; | 
| + | 
| + /// Shared variable used to store different values depending on the state. | 
| + /// | 
| + /// In the [_streamOnly] state it contains the source stream. | 
| + /// In the [_listenerOnly] state it contains a delegating subscription | 
| + /// controller. | 
| + /// In the [_linked] and [_initial] states it contains `null`. | 
| + /// | 
| + /// Do not access this field directly, | 
| + /// instead use [_stream] or [_controller] to read the value | 
| + /// appropriate for the current state. | 
| + var _stateData; | 
| + | 
| + /// Returns source stream that has been set. | 
| + /// | 
| + /// Must only be used when the source stream has been set, | 
| + /// but the stream has not been listened to yet (state is [_streamOnly]); | 
| 
nweiz
2015/06/16 01:05:23
";" -> "."
 | 
| + Stream get _stream { | 
| + assert(_state == _streamOnly); | 
| + return _stateData; | 
| + } | 
| + | 
| + /// Returns the mutable subscription controller with the subscription | 
| + /// on this stream | 
| + /// | 
| + /// Must only be used when the stream has been listened to, | 
| + /// but the source stream has not been set to yet (state is [_listenerOnly]); | 
| 
nweiz
2015/06/16 01:05:23
";" -> "."
 
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Acknowledged.
 | 
| + MutableDelegatingStreamSubscriptionController get _controller { | 
| + assert(_state == _listenerOnly); | 
| + return _stateData; | 
| + } | 
| + | 
| + // State transition functions. | 
| + | 
| + /// Sets the source stream, and enters the [_streamOnly] state. | 
| + /// | 
| + /// Must only be called from the initial state, before this stream has | 
| + /// been listened to. | 
| + void _setStream(Stream stream) { | 
| + assert(_state == _initial); | 
| + _stateData = stream; | 
| + _state = _streamOnly; | 
| + } | 
| + | 
| + /// Sets the listener subscription, and enters the [_listenerOnly] state. | 
| + /// | 
| + /// Must only be called from the initial state, before the source stream has | 
| + /// been set. | 
| + void _setListened(MutableDelegatingStreamSubscriptionController subscription) { | 
| 
nweiz
2015/06/16 01:05:23
Long line.
 
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Class really needs a better name.
I think I'll rem
 | 
| + assert(_state == _initial); | 
| + _stateData = subscription; | 
| + _state = _listenerOnly; | 
| + } | 
| + | 
| + /// Marks the listener and source stream as linked. | 
| + /// | 
| + /// Enters the [_isLinked] state. | 
| + /// This must be called only from either the [_streamOnly] | 
| + /// or the [_listenerOnly] state after setting up the link. | 
| + void _setLinked() { | 
| + assert(_state == _streamOnly || _state == _listenerOnly); | 
| + _state = _linked; | 
| + _stateData = null; | 
| + } | 
| + | 
| + // end state transition functions. | 
| + | 
| + /// Called by the controller when the user supplies a stream | 
| + void _linkStream(Stream stream) { | 
| + if (_state == _listenerOnly) { | 
| + var subscription = _controller.sourceSubscription; | 
| + if (subscription is _CompleterSubscriptionState) { | 
| + _controller.sourceSubscription = subscription._linkStream(stream); | 
| + } else { | 
| + assert(subscription is _CanceledSubscription); | 
| + } | 
| + _setLinked(); | 
| + } else if (_state == _initial) { | 
| + _setStream(stream); | 
| + } else { | 
| + throw new StateError("Stream already linked."); | 
| + } | 
| + } | 
| + | 
| + StreamSubscription listen(void onData(T event), | 
| + {Function onError, | 
| + void onDone(), | 
| + bool cancelOnError}) { | 
| + if (_state == _initial) { | 
| + if (cancelOnError == null) cancelOnError = false; | 
| + var controller = | 
| + new MutableDelegatingStreamSubscriptionController<T>(null); | 
| + controller.sourceSubscription = | 
| 
nweiz
2015/06/16 01:05:23
Why are you assigning this rather than passing it
 
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Because of the cyclic dependency, the state object
 | 
| + new _CompleterSubscriptionState(cancelOnError, controller); | 
| + var subscription = controller.subscription; | 
| + subscription.onData(onData); | 
| + subscription.onError(onError); | 
| + subscription.onDone(onDone); | 
| + _setListened(controller); | 
| + return subscription; | 
| + } | 
| + if (_state == _streamOnly) { | 
| 
nweiz
2015/06/16 01:05:23
Nit: "else if", for compactness and to match other
 
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Then I think I should also put the final throw in
 | 
| + var result = _stream.listen(onData, onError: onError, onDone: onDone, | 
| + cancelOnError: cancelOnError); | 
| + _setLinked(); | 
| + return result; | 
| + } | 
| + throw new StateError("Stream has already been listened to."); | 
| + } | 
| +} | 
| + | 
| +/// Holds subscription callbacks and state for a [_CompleterSubscription] | 
| +/// until the real subscription is available. | 
| +/// | 
| +/// Always used in a [MutableDelegatingStreamSubscriptionController]. | 
| +class _CompleterSubscriptionState implements StreamSubscription { | 
| + /// The mutable subscription wrapper. | 
| + /// | 
| + /// Used for handling [pause] with resume futures and [asFuture] which | 
| + /// both need to call a different function (`resume` and `cancel` | 
| + /// respectively) at a later point. We let those go through the | 
| + /// wrapper subscription to ensure that they are forwarded to the | 
| + /// controller source subscription that is current at that time. | 
| + final MutableDelegatingStreamSubscriptionController _controller; | 
| + | 
| + /// Whether the subscription cancels on error. | 
| + /// | 
| + /// This is forwarded to the real subscription when that is created. | 
| + final bool _cancelOnError; | 
| + | 
| + // Callbacks forwarded to the real subscription when it's created. | 
| + | 
| + ZoneUnaryCallback _onData; | 
| + Function _onError; | 
| + ZoneCallback _onDone; | 
| + | 
| + /// Future set when cancel is called. | 
| + /// This both marks the subscription as canceled and allows returning | 
| 
nweiz
2015/06/16 01:05:23
Nit: newline above this.
 
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Acknowledged.
 | 
| + /// the same future every time the cancel function is called. | 
| + Future cancelFuture; | 
| + | 
| + /// Count of active pauses. | 
| + /// | 
| + /// When the real subscription is created, it is paused this many times. | 
| + int pauseCount = 0; | 
| + | 
| + _CompleterSubscriptionState(this._cancelOnError, | 
| + this._controller); | 
| + | 
| + void onData(void handleData(data)) { | 
| + _onData = handleData; | 
| + } | 
| + | 
| + void onError(Function handleError) { | 
| + _onError = handleError; | 
| + } | 
| + | 
| + void onDone(void handleDone()) { | 
| + _onDone = handleDone; | 
| + } | 
| + | 
| + void pause([Future resumeFuture]) { | 
| + pauseCount++; | 
| + if (resumeFuture != null) { | 
| + // Go through wrapper subscription in case the real subscription | 
| + // is linked before the future completes. | 
| + resumeFuture.whenComplete(_controller.subscription.resume); | 
| + } | 
| + } | 
| + | 
| + void resume() { | 
| + if (pauseCount > 0) pauseCount--; | 
| + } | 
| + | 
| + Future cancel() { | 
| + var cancelFuture = new Future.value(); | 
| 
nweiz
2015/06/16 01:05:23
Why return a future here? cancel() is allowed to r
 
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Yes, sadly. That's a consequence of backwards comp
 | 
| + // Immediately replace the [_CompleterSubscription._delegate] so | 
| + // we won't be called again. | 
| + // This also releases any, now unused, callbacks we are holding on to. | 
| + _controller.sourceSubscription = new _CanceledSubscription(cancelFuture); | 
| + return cancelFuture; | 
| + } | 
| + | 
| + bool get isPaused { | 
| + return (cancelFuture != null && pauseCount > 0); | 
| + } | 
| + | 
| + Future asFuture([futureValue]) { | 
| + Completer completer = new Completer(); | 
| + _onDone = () { | 
| + completer.complete(futureValue); | 
| + }; | 
| + _onError = (error, StackTrace stackTrace) { | 
| + // Cancel the wrapper subscription in case the real subscription | 
| + // is linked before an error triggers this function. | 
| + _controller.subscription.cancel(); | 
| + completer.completeError(error, stackTrace); | 
| + }; | 
| + return completer.future; | 
| + } | 
| + | 
| + StreamSubscription _linkStream(Stream stream) { | 
| + if (cancelFuture != null) { | 
| + return new _CanceledSubscription(cancelFuture); | 
| + } | 
| + // If not canceled, create the real subscription and make | 
| + // sure it has the requested callbacks, cancelOnErrror flag and | 
| + // number of pauses. | 
| + var subscription = stream.listen(null, cancelOnError: _cancelOnError); | 
| + subscription.onData(_onData); | 
| + subscription.onError(_onError); | 
| + subscription.onDone(_onDone); | 
| + while (pauseCount > 0) { | 
| + subscription.pause(); | 
| + pauseCount--; | 
| + } | 
| + return subscription; | 
| + } | 
| +} | 
| + | 
| +/// A subscription that acts as if it has been canceled. | 
| +/// | 
| +/// No events are fired and pausing is ignored. | 
| +/// The [cancel] method always returns the same future. | 
| +class _CanceledSubscription implements StreamSubscription { | 
| + /// The future returned by [cancel]; | 
| + Future _doneFuture; | 
| + | 
| + _CanceledSubscription(this._doneFuture); | 
| + | 
| + void onData(void handleData(data)) {} | 
| + | 
| + void onError(Function handleError) {} | 
| + | 
| + void onDone(void handleDone()) {} | 
| + | 
| + void pause([Future resumeFuture]) {} | 
| + | 
| + void resume() {} | 
| + | 
| + Future cancel() => _doneFuture; | 
| + | 
| + /// Returns future that never completes. | 
| + /// | 
| + /// The `asFuture` result is completed by either an error event | 
| + /// or a done event. A canceled future never produces either. | 
| + Future asFuture([futureValue]) => new Completer().future; | 
| + | 
| + bool get isPaused => false; | 
| +} |