Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(90)

Unified Diff: lib/src/stream_completer.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/stream_completer.dart
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart
new file mode 100644
index 0000000000000000000000000000000000000000..9d743a183043383c46bdcd144f7663b36c343ae1
--- /dev/null
+++ b/lib/src/stream_completer.dart
@@ -0,0 +1,275 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.streams.stream_completer;
+
+import 'dart:async';
+
+/// A [stream] where the contents aren't known at creation time.
+///
+/// It is generally recommended that you never create a `Future<Stream>`
+/// because you can just use directly create a stream that doesn't do anything
+/// until it's ready to do so.
+/// This class can be used to create such a stream.
+///
+/// The [stream] is a normal stream that you can listen to immediately,
+/// but until [setSourceStream] is called, the stream won't produce
+/// any events.
+///
+/// The same effect can be achieved by using a [StreamController]
+/// and adding the stream using `addStream` when both
+/// the controller's stream is listened to and the source stream is ready.
+/// This class attempts to shortcut some of the overhead when possible.
+/// For example, if the [stream] is only listened to
+/// after the source stream has been set,
+/// the listen is performed directly on the source stream.
+class StreamCompleter<T> {
+ final Stream<T> stream = new _PromiseStream<T>();
+
+ /// Set a stream as the source of events for the [StreamCompleter].
+ ///
+ /// There is no guarantee that the stream will ever be listened to.
+ void setSourceStream(Stream<T> stream) {
+ _PromiseStream promiseStream = this.stream;
+ promiseStream._linkStream(stream);
+ }
+
+ /// As setting an empty stream using [setSourceStream].
+ void setEmpty() {
+ // TODO(lrn): Optimize this to not actually create the empty stream.
+ _PromiseStream promiseStream = this.stream;
+ promiseStream._linkStream(new Stream.fromIterable(const []));
+ }
+}
+
+/// Stream that acts as a source stream it is (eventually) linked with.
+///
+/// The linked source stream can be set after a user has started listening on
+/// this stream. No events occur before the source stream is provided.
+///
+/// If a user listens before events are available, the state of the
+/// subscription is maintained, and the subscription is then linked
+/// to the source stream when that becomes available.
+class _PromiseStream<T> extends Stream<T> {
+ static const int _UNINITIALIZED = 0;
+ static const int _STREAM_SET = 1;
+ static const int _LISTENED = 2;
+
+ int _state = _UNINITIALIZED;
+ var _streamOrSubscription;
+
+ void _linkStream(Stream stream) {
+ if (_state == _UNINITIALIZED) {
+ _streamOrSubscription = stream;
+ _state |= _STREAM_SET;
+ } else if (_state == _LISTENED) {
+ _PromiseSubscription promiseSubscription = _streamOrSubscription;
+ promiseSubscription._linkStream(stream);
+ _state |= _STREAM_SET;
+ } else {
+ throw new StateError("Stream already linked.");
+ }
+ }
+
+ StreamSubscription listen(void onData(T event),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ int state = _state;
+ _state = state | _LISTENED;
+ if (state == _UNINITIALIZED) {
+ _PromiseSubscription subscription =
+ new _PromiseSubscription<T>(true == cancelOnError);
+ subscription.onData(onData);
+ subscription.onError(onError);
+ subscription.onDone(onDone);
+ _streamOrSubscription = subscription;
+ return subscription;
+ }
+ if (state == _STREAM_SET) {
+ Stream stream = _streamOrSubscription;
+ return stream.listen(onData, onError: onError, onDone: onDone,
+ cancelOnError: cancelOnError);
+ }
+ print(state);
+ throw new StateError("Stream has already been listened to.");
+ }
+}
+
+/// Subscription for a [_PromiseStream] that is listened to but has no data.
+///
+/// Maintains the state of a stream subscription that hasn't received any
+/// events until events are available, then it starts forwarding to another
+/// subscription.
+class _PromiseSubscription<T> implements StreamSubscription<T> {
+ // State values
+ // The subscription is in one of three distinct states:
+ // - Initial (remembers whether it's cancelOnError and paused).
+ // - Cancelled (before being linked).
+ // - Linked (before being cancelled).
+ static const int _INIT = 0;
+ static const int _INIT_CANCEL_ON_ERROR = 1;
+ static const int _CANCELLED = 2; // Exclusive.
+ static const int _LINKED = 3; // Exclusive
+ static const int _PAUSE = 4; // Used with _INIT or _INIT_CANCEL_ON_ERROR.
+
+ /// State represents the status of the subscription until the
+ /// real subscription becomes available.
+ ///
+ /// While `_state` is not `_LINKED` or `_CANCELED`, `_stateData` contains
+ /// a list of length three with the data, error and done handlers that
+ /// have been set.
+ ///
+ /// While `_state` is [_LINKED], [_stateData] contains the real
+ /// stream subscription.
+ ///
+ /// When `_state` is `_CANCELED`, `_stateData` is cleared since the
+ /// event handlers won't be needed anyway.
+ int _state;
+ var _stateData = new List(3);
+
+ _PromiseSubscription(bool cancelOnError)
+ : _state = (cancelOnError ? _INIT_CANCEL_ON_ERROR : _INIT);
+
+ bool get _isLinked => _state == _LINKED;
+ bool get _isCancelled => _state == _CANCELLED;
+ bool get _isInitial => (_state & (_PAUSE - 1)) <= _INIT_CANCEL_ON_ERROR;
+
+ void _linkStream(Stream stream) {
+ if (_isLinked) {
+ throw new StateError("Already linked to a stream.");
+ }
+ if (_isCancelled) {
+ return;
+ }
+ bool cancelOnError = (_state & _INIT_CANCEL_ON_ERROR) != 0;
+ StreamSubscription subscription =
+ stream.listen(null, cancelOnError: cancelOnError);
+ List handlers = _stateData;
+ subscription.onData(handlers[0]);
+ subscription.onError(handlers[1]);
+ subscription.onDone(handlers[2]);
+ int state = _state;
+ _subscription = subscription;
+ while (state >= _PAUSE) {
+ subscription.pause();
+ state -= _PAUSE;
+ }
+ }
+
+ List get _handlers {
+ assert(_isInitial);
+ return _stateData;
+ }
+
+ StreamSubscription get _subscription {
+ assert(_isLinked);
+ return _stateData;
+ }
+
+ // Sets state to linked.
+ void set _subscription(StreamSubscription subscription) {
+ assert(_isInitial);
+ _stateData = subscription;
+ _state = _LINKED;
+ }
+
+ void onData(void handleData(T data)) {
+ if (_isLinked) {
+ _subscription.onData(handleData);
+ } else {
+ assert(_isInitial);
+ _handlers[0] = handleData;
+ }
+ }
+
+ void onError(void handleError(error, StackTrace stackTrace)) {
+ if (_isLinked) {
+ _subscription.onError(handleError);
+ } else {
+ assert(_isInitial);
+ _handlers[1] = handleError;
+ }
+ }
+
+ void onDone(void handleDone()) {
+ if (_isLinked) {
+ _subscription.onDone(handleDone);
+ } else {
+ assert(_isInitial);
+ _handlers[2] = handleDone;
+ }
+ }
+
+ void pause([Future resumeFuture]) {
+ if (_isLinked) {
+ _subscription.pause(resumeFuture);
+ } else if (!_isCancelled) {
+ _state += _PAUSE;
+ if (resumeFuture != null) {
+ resumeFuture.whenComplete(this.resume);
+ }
+ }
+ }
+
+ void resume() {
+ if (_isLinked) {
+ _subscription.resume();
+ } else if (_state >= _PAUSE) {
+ _state -= _PAUSE;
+ }
+ }
+
+ Future cancel() {
+ if (_isLinked) {
+ return _subscription.cancel();
+ } else {
+ _stateData = null;
+ _state = _CANCELLED;
+ return new Future.value();
+ }
+ }
+
+ Future asFuture([futureValue]) {
+ if (_isLinked) {
+ return _subscription.asFuture(futureValue);
+ }
+ Completer completer = new Completer();
+ if (!_isCancelled) {
+ // Asking for a future of a cancelled subscription gives a future
+ // which never completes.
+ _handlers[1] = _cancelBeforeError(completer.completeError);
+ if (futureValue == null) {
+ _handlers[2] = completer.complete;
+ } else {
+ _handlers[2] = () { completer.complete(futureValue); };
+ }
+ }
+ return completer.future;
+ }
+
+ bool get isPaused {
+ if (_isLinked) {
+ return _subscription.isPaused;
+ }
+ return _state >= _PAUSE;
+ }
+
+ /// Helper function used by [asFuture].
+ ///
+ /// Returns an error handler which cancels the stream when it receives an
+ /// error.
+ Function _cancelBeforeError(Function handleError) {
+ return (e, s) {
+ cancel();
+ if (handleError is _BinaryCallback) {
+ handleError(e, s);
+ } else {
+ handleError(e);
+ }
+ };
+ }
+}
+
+typedef _BinaryCallback(a, b);
« no previous file with comments | « lib/src/delegates.dart ('k') | lib/src/stream_events.dart » ('j') | lib/src/stream_events.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698