Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(567)

Unified Diff: lib/src/stream_completer.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Address remaining comments. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/stream_completer.dart
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart
new file mode 100644
index 0000000000000000000000000000000000000000..3ef3c0321f1f2215b4044cb0d9d3c17615bcdf15
--- /dev/null
+++ b/lib/src/stream_completer.dart
@@ -0,0 +1,376 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_completer;
+
+import "dart:async";
+import "delegating_stream_subscription.dart";
+
+/// A single-subscription [stream] where the contents are provided later.
+///
+/// It is generally recommended that you never create a `Future<Stream>`
+/// because you can just directly create a stream that doesn't do anything
+/// until it's ready to do so.
+/// This class can be used to create such a stream.
+///
+/// The [stream] is a normal stream that you can listen to immediately,
+/// but until either [setSourceStream] or [setEmpty] is called,
+/// the stream won't produce any events.
+///
+/// The same effect can be achieved by using a [StreamController]
+/// and adding the stream using `addStream` when both
+/// the controller's stream is listened to and the source stream is ready.
+/// This class attempts to shortcut some of the overhead when possible.
+/// For example, if the [stream] is only listened to
+/// after the source stream has been set,
+/// the listen is performed directly on the source stream.
+class StreamCompleter<T> {
+ /// The stream of this completer.
+ ///
+ /// When a source stream is provided, its events will be forwarded to
+ /// listeners on this stream.
+ ///
+ /// The stream can be listened either before or after a source stream
+ /// is set.
+ final Stream<T> stream = new _CompleterStream<T>();
nweiz 2015/06/16 01:05:23 I should have mentioned this in the last pass (sor
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Ack. Done.
+
+ /// Convert a `Future<Stream>` to a `Stream`.
+ ///
+ /// This creates a stream using a stream completer,
+ /// and sets the source stream to the result of the future when the
+ /// future completes.
+ ///
+ /// If the future completes with an error, the returned stream will
+ /// instead contain just that error.
+ static Stream fromFuture(Future<Stream> streamFuture) {
+ var completer = new StreamCompleter();
+ streamFuture.then(completer.setSourceStream,
+ onError: (e, s) {
nweiz 2015/06/16 01:05:23 "e, s" -> "_"
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Done.
+ completer.setSourceStream(streamFuture.asStream());
+ });
+ return completer.stream;
+ }
+
+ /// Set a stream as the source of events for the [StreamCompleter]'s
+ /// [stream].
+ ///
+ /// The completer's `stream` will act exactly as [sourceStream].
+ ///
+ /// If the source stream is set before [stream] is listened to,
+ /// the listen call on [stream] is forwarded directly to [sourceStream].
+ ///
+ /// If [stream] is listened to before setting the source stream,
+ /// an intermediate subscription is created. It looks like a completely
+ /// normal subscription, and can be paused or canceled, but it won't
+ /// produce any events until a source stream is provided.
+ ///
+ /// If the `stream` subscription is canceled before a source stream is set,
+ /// then nothing further happens. This means that the `sourceStream` may
+ /// never be listened to, even if the [stream] has been listened to.
+ ///
+ /// Otherwise, when the source stream is then set,
+ /// it is immediately listened to.
+ /// If the existing subscription was paused, the source stream subscription
+ /// is paused as many times, and then all events and callbacks are forwarded
nweiz 2015/06/16 01:05:23 It's kind of strange that this forwards the same *
Lasse Reichstein Nielsen 2015/06/18 12:10:12 If you pause three times, you need to resume three
+ /// between the two subscriptions, so the listener works as if it was
+ /// listening to the source stream subscription directly.
+ ///
+ /// Either [setSourceStream] or [setEmpty] may be called at most once.
+ /// Trying to call either of them again will fail.
+ void setSourceStream(Stream<T> sourceStream) {
+ _CompleterStream completerStream = this.stream;
+ completerStream._linkStream(sourceStream);
+ }
+
+ /// Equivalent to setting an empty stream using [setSourceStream].
+ ///
+ /// Either [setSourceStream] or [setEmpty] may be called at most once.
+ /// Trying to call either of them again will fail.
+ void setEmpty() {
+ // TODO(lrn): Optimize this to not actually create the empty stream.
+ _CompleterStream completerStream = this.stream;
+ completerStream._linkStream(new Stream.fromIterable(const []));
+ }
+}
+
+/// Stream that acts as a source stream it is (eventually) linked with.
+///
+/// The linked source stream can be set after a user has started listening on
+/// this stream. No events occur before the source stream is provided.
+///
+/// If a user listens before events are available, the state of the
+/// subscription is maintained, and the subscription is then linked
+/// to the source stream when that becomes available.
+class _CompleterStream<T> extends Stream<T> {
+ // Bit flags used for the value of [_state].
+ /// Flag marking that the source stream has been set.
+ static const int _streamFlag = 1;
+ /// Flag marking that the stream has been listened to.
nweiz 2015/06/16 01:05:23 Nit: separate these with newlines.
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Gone.
+ static const int _listenerFlag = 2;
+
+ // States used as values for _state.
+ /// Initial state with no listener and no source stream set.
+ static const int _initial = 0;
+ /// State where only the stream has been set.
+ static const int _streamOnly = _streamFlag;
+ /// State where there is only a listener.
+ static const int _listenerOnly = _listenerFlag;
+ /// State where source stream and listener have been linked.
+ static const int _linked = _streamFlag | _listenerFlag;
+
+ /// The current state.
+ ///
+ /// One of [_initial], [_streamOnly], [_listenerOnly],
+ /// or [_linked].
+ int _state = _initial;
+
+ /// Shared variable used to store different values depending on the state.
+ ///
+ /// In the [_streamOnly] state it contains the source stream.
+ /// In the [_listenerOnly] state it contains a delegating subscription
+ /// controller.
+ /// In the [_linked] and [_initial] states it contains `null`.
+ ///
+ /// Do not access this field directly,
+ /// instead use [_stream] or [_controller] to read the value
+ /// appropriate for the current state.
+ var _stateData;
+
+ /// Returns source stream that has been set.
+ ///
+ /// Must only be used when the source stream has been set,
+ /// but the stream has not been listened to yet (state is [_streamOnly]);
nweiz 2015/06/16 01:05:23 ";" -> "."
+ Stream get _stream {
+ assert(_state == _streamOnly);
+ return _stateData;
+ }
+
+ /// Returns the mutable subscription controller with the subscription
+ /// on this stream
+ ///
+ /// Must only be used when the stream has been listened to,
+ /// but the source stream has not been set to yet (state is [_listenerOnly]);
nweiz 2015/06/16 01:05:23 ";" -> "."
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Acknowledged.
+ MutableDelegatingStreamSubscriptionController get _controller {
+ assert(_state == _listenerOnly);
+ return _stateData;
+ }
+
+ // State transition functions.
+
+ /// Sets the source stream, and enters the [_streamOnly] state.
+ ///
+ /// Must only be called from the initial state, before this stream has
+ /// been listened to.
+ void _setStream(Stream stream) {
+ assert(_state == _initial);
+ _stateData = stream;
+ _state = _streamOnly;
+ }
+
+ /// Sets the listener subscription, and enters the [_listenerOnly] state.
+ ///
+ /// Must only be called from the initial state, before the source stream has
+ /// been set.
+ void _setListened(MutableDelegatingStreamSubscriptionController subscription) {
nweiz 2015/06/16 01:05:23 Long line.
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Class really needs a better name. I think I'll rem
+ assert(_state == _initial);
+ _stateData = subscription;
+ _state = _listenerOnly;
+ }
+
+ /// Marks the listener and source stream as linked.
+ ///
+ /// Enters the [_isLinked] state.
+ /// This must be called only from either the [_streamOnly]
+ /// or the [_listenerOnly] state after setting up the link.
+ void _setLinked() {
+ assert(_state == _streamOnly || _state == _listenerOnly);
+ _state = _linked;
+ _stateData = null;
+ }
+
+ // end state transition functions.
+
+ /// Called by the controller when the user supplies a stream
+ void _linkStream(Stream stream) {
+ if (_state == _listenerOnly) {
+ var subscription = _controller.sourceSubscription;
+ if (subscription is _CompleterSubscriptionState) {
+ _controller.sourceSubscription = subscription._linkStream(stream);
+ } else {
+ assert(subscription is _CanceledSubscription);
+ }
+ _setLinked();
+ } else if (_state == _initial) {
+ _setStream(stream);
+ } else {
+ throw new StateError("Stream already linked.");
+ }
+ }
+
+ StreamSubscription listen(void onData(T event),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ if (_state == _initial) {
+ if (cancelOnError == null) cancelOnError = false;
+ var controller =
+ new MutableDelegatingStreamSubscriptionController<T>(null);
+ controller.sourceSubscription =
nweiz 2015/06/16 01:05:23 Why are you assigning this rather than passing it
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Because of the cyclic dependency, the state object
+ new _CompleterSubscriptionState(cancelOnError, controller);
+ var subscription = controller.subscription;
+ subscription.onData(onData);
+ subscription.onError(onError);
+ subscription.onDone(onDone);
+ _setListened(controller);
+ return subscription;
+ }
+ if (_state == _streamOnly) {
nweiz 2015/06/16 01:05:23 Nit: "else if", for compactness and to match other
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Then I think I should also put the final throw in
+ var result = _stream.listen(onData, onError: onError, onDone: onDone,
+ cancelOnError: cancelOnError);
+ _setLinked();
+ return result;
+ }
+ throw new StateError("Stream has already been listened to.");
+ }
+}
+
+/// Holds subscription callbacks and state for a [_CompleterSubscription]
+/// until the real subscription is available.
+///
+/// Always used in a [MutableDelegatingStreamSubscriptionController].
+class _CompleterSubscriptionState implements StreamSubscription {
+ /// The mutable subscription wrapper.
+ ///
+ /// Used for handling [pause] with resume futures and [asFuture] which
+ /// both need to call a different function (`resume` and `cancel`
+ /// respectively) at a later point. We let those go through the
+ /// wrapper subscription to ensure that they are forwarded to the
+ /// controller source subscription that is current at that time.
+ final MutableDelegatingStreamSubscriptionController _controller;
+
+ /// Whether the subscription cancels on error.
+ ///
+ /// This is forwarded to the real subscription when that is created.
+ final bool _cancelOnError;
+
+ // Callbacks forwarded to the real subscription when it's created.
+
+ ZoneUnaryCallback _onData;
+ Function _onError;
+ ZoneCallback _onDone;
+
+ /// Future set when cancel is called.
+ /// This both marks the subscription as canceled and allows returning
nweiz 2015/06/16 01:05:23 Nit: newline above this.
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Acknowledged.
+ /// the same future every time the cancel function is called.
+ Future cancelFuture;
+
+ /// Count of active pauses.
+ ///
+ /// When the real subscription is created, it is paused this many times.
+ int pauseCount = 0;
+
+ _CompleterSubscriptionState(this._cancelOnError,
+ this._controller);
+
+ void onData(void handleData(data)) {
+ _onData = handleData;
+ }
+
+ void onError(Function handleError) {
+ _onError = handleError;
+ }
+
+ void onDone(void handleDone()) {
+ _onDone = handleDone;
+ }
+
+ void pause([Future resumeFuture]) {
+ pauseCount++;
+ if (resumeFuture != null) {
+ // Go through wrapper subscription in case the real subscription
+ // is linked before the future completes.
+ resumeFuture.whenComplete(_controller.subscription.resume);
+ }
+ }
+
+ void resume() {
+ if (pauseCount > 0) pauseCount--;
+ }
+
+ Future cancel() {
+ var cancelFuture = new Future.value();
nweiz 2015/06/16 01:05:23 Why return a future here? cancel() is allowed to r
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Yes, sadly. That's a consequence of backwards comp
+ // Immediately replace the [_CompleterSubscription._delegate] so
+ // we won't be called again.
+ // This also releases any, now unused, callbacks we are holding on to.
+ _controller.sourceSubscription = new _CanceledSubscription(cancelFuture);
+ return cancelFuture;
+ }
+
+ bool get isPaused {
+ return (cancelFuture != null && pauseCount > 0);
+ }
+
+ Future asFuture([futureValue]) {
+ Completer completer = new Completer();
+ _onDone = () {
+ completer.complete(futureValue);
+ };
+ _onError = (error, StackTrace stackTrace) {
+ // Cancel the wrapper subscription in case the real subscription
+ // is linked before an error triggers this function.
+ _controller.subscription.cancel();
+ completer.completeError(error, stackTrace);
+ };
+ return completer.future;
+ }
+
+ StreamSubscription _linkStream(Stream stream) {
+ if (cancelFuture != null) {
+ return new _CanceledSubscription(cancelFuture);
+ }
+ // If not canceled, create the real subscription and make
+ // sure it has the requested callbacks, cancelOnErrror flag and
+ // number of pauses.
+ var subscription = stream.listen(null, cancelOnError: _cancelOnError);
+ subscription.onData(_onData);
+ subscription.onError(_onError);
+ subscription.onDone(_onDone);
+ while (pauseCount > 0) {
+ subscription.pause();
+ pauseCount--;
+ }
+ return subscription;
+ }
+}
+
+/// A subscription that acts as if it has been canceled.
+///
+/// No events are fired and pausing is ignored.
+/// The [cancel] method always returns the same future.
+class _CanceledSubscription implements StreamSubscription {
+ /// The future returned by [cancel];
+ Future _doneFuture;
+
+ _CanceledSubscription(this._doneFuture);
+
+ void onData(void handleData(data)) {}
+
+ void onError(Function handleError) {}
+
+ void onDone(void handleDone()) {}
+
+ void pause([Future resumeFuture]) {}
+
+ void resume() {}
+
+ Future cancel() => _doneFuture;
+
+ /// Returns future that never completes.
+ ///
+ /// The `asFuture` result is completed by either an error event
+ /// or a done event. A canceled future never produces either.
+ Future asFuture([futureValue]) => new Completer().future;
+
+ bool get isPaused => false;
+}

Powered by Google App Engine
This is Rietveld 408576698