| Index: lib/src/stream_completer.dart
|
| diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..9d743a183043383c46bdcd144f7663b36c343ae1
|
| --- /dev/null
|
| +++ b/lib/src/stream_completer.dart
|
| @@ -0,0 +1,275 @@
|
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +library async.streams.stream_completer;
|
| +
|
| +import 'dart:async';
|
| +
|
| +/// A [stream] where the contents aren't known at creation time.
|
| +///
|
| +/// It is generally recommended that you never create a `Future<Stream>`
|
| +/// because you can just use directly create a stream that doesn't do anything
|
| +/// until it's ready to do so.
|
| +/// This class can be used to create such a stream.
|
| +///
|
| +/// The [stream] is a normal stream that you can listen to immediately,
|
| +/// but until [setSourceStream] is called, the stream won't produce
|
| +/// any events.
|
| +///
|
| +/// The same effect can be achieved by using a [StreamController]
|
| +/// and adding the stream using `addStream` when both
|
| +/// the controller's stream is listened to and the source stream is ready.
|
| +/// This class attempts to shortcut some of the overhead when possible.
|
| +/// For example, if the [stream] is only listened to
|
| +/// after the source stream has been set,
|
| +/// the listen is performed directly on the source stream.
|
| +class StreamCompleter<T> {
|
| + final Stream<T> stream = new _PromiseStream<T>();
|
| +
|
| + /// Set a stream as the source of events for the [StreamCompleter].
|
| + ///
|
| + /// There is no guarantee that the stream will ever be listened to.
|
| + void setSourceStream(Stream<T> stream) {
|
| + _PromiseStream promiseStream = this.stream;
|
| + promiseStream._linkStream(stream);
|
| + }
|
| +
|
| + /// As setting an empty stream using [setSourceStream].
|
| + void setEmpty() {
|
| + // TODO(lrn): Optimize this to not actually create the empty stream.
|
| + _PromiseStream promiseStream = this.stream;
|
| + promiseStream._linkStream(new Stream.fromIterable(const []));
|
| + }
|
| +}
|
| +
|
| +/// Stream that acts as a source stream it is (eventually) linked with.
|
| +///
|
| +/// The linked source stream can be set after a user has started listening on
|
| +/// this stream. No events occur before the source stream is provided.
|
| +///
|
| +/// If a user listens before events are available, the state of the
|
| +/// subscription is maintained, and the subscription is then linked
|
| +/// to the source stream when that becomes available.
|
| +class _PromiseStream<T> extends Stream<T> {
|
| + static const int _UNINITIALIZED = 0;
|
| + static const int _STREAM_SET = 1;
|
| + static const int _LISTENED = 2;
|
| +
|
| + int _state = _UNINITIALIZED;
|
| + var _streamOrSubscription;
|
| +
|
| + void _linkStream(Stream stream) {
|
| + if (_state == _UNINITIALIZED) {
|
| + _streamOrSubscription = stream;
|
| + _state |= _STREAM_SET;
|
| + } else if (_state == _LISTENED) {
|
| + _PromiseSubscription promiseSubscription = _streamOrSubscription;
|
| + promiseSubscription._linkStream(stream);
|
| + _state |= _STREAM_SET;
|
| + } else {
|
| + throw new StateError("Stream already linked.");
|
| + }
|
| + }
|
| +
|
| + StreamSubscription listen(void onData(T event),
|
| + {Function onError,
|
| + void onDone(),
|
| + bool cancelOnError}) {
|
| + int state = _state;
|
| + _state = state | _LISTENED;
|
| + if (state == _UNINITIALIZED) {
|
| + _PromiseSubscription subscription =
|
| + new _PromiseSubscription<T>(true == cancelOnError);
|
| + subscription.onData(onData);
|
| + subscription.onError(onError);
|
| + subscription.onDone(onDone);
|
| + _streamOrSubscription = subscription;
|
| + return subscription;
|
| + }
|
| + if (state == _STREAM_SET) {
|
| + Stream stream = _streamOrSubscription;
|
| + return stream.listen(onData, onError: onError, onDone: onDone,
|
| + cancelOnError: cancelOnError);
|
| + }
|
| + print(state);
|
| + throw new StateError("Stream has already been listened to.");
|
| + }
|
| +}
|
| +
|
| +/// Subscription for a [_PromiseStream] that is listened to but has no data.
|
| +///
|
| +/// Maintains the state of a stream subscription that hasn't received any
|
| +/// events until events are available, then it starts forwarding to another
|
| +/// subscription.
|
| +class _PromiseSubscription<T> implements StreamSubscription<T> {
|
| + // State values
|
| + // The subscription is in one of three distinct states:
|
| + // - Initial (remembers whether it's cancelOnError and paused).
|
| + // - Cancelled (before being linked).
|
| + // - Linked (before being cancelled).
|
| + static const int _INIT = 0;
|
| + static const int _INIT_CANCEL_ON_ERROR = 1;
|
| + static const int _CANCELLED = 2; // Exclusive.
|
| + static const int _LINKED = 3; // Exclusive
|
| + static const int _PAUSE = 4; // Used with _INIT or _INIT_CANCEL_ON_ERROR.
|
| +
|
| + /// State represents the status of the subscription until the
|
| + /// real subscription becomes available.
|
| + ///
|
| + /// While `_state` is not `_LINKED` or `_CANCELED`, `_stateData` contains
|
| + /// a list of length three with the data, error and done handlers that
|
| + /// have been set.
|
| + ///
|
| + /// While `_state` is [_LINKED], [_stateData] contains the real
|
| + /// stream subscription.
|
| + ///
|
| + /// When `_state` is `_CANCELED`, `_stateData` is cleared since the
|
| + /// event handlers won't be needed anyway.
|
| + int _state;
|
| + var _stateData = new List(3);
|
| +
|
| + _PromiseSubscription(bool cancelOnError)
|
| + : _state = (cancelOnError ? _INIT_CANCEL_ON_ERROR : _INIT);
|
| +
|
| + bool get _isLinked => _state == _LINKED;
|
| + bool get _isCancelled => _state == _CANCELLED;
|
| + bool get _isInitial => (_state & (_PAUSE - 1)) <= _INIT_CANCEL_ON_ERROR;
|
| +
|
| + void _linkStream(Stream stream) {
|
| + if (_isLinked) {
|
| + throw new StateError("Already linked to a stream.");
|
| + }
|
| + if (_isCancelled) {
|
| + return;
|
| + }
|
| + bool cancelOnError = (_state & _INIT_CANCEL_ON_ERROR) != 0;
|
| + StreamSubscription subscription =
|
| + stream.listen(null, cancelOnError: cancelOnError);
|
| + List handlers = _stateData;
|
| + subscription.onData(handlers[0]);
|
| + subscription.onError(handlers[1]);
|
| + subscription.onDone(handlers[2]);
|
| + int state = _state;
|
| + _subscription = subscription;
|
| + while (state >= _PAUSE) {
|
| + subscription.pause();
|
| + state -= _PAUSE;
|
| + }
|
| + }
|
| +
|
| + List get _handlers {
|
| + assert(_isInitial);
|
| + return _stateData;
|
| + }
|
| +
|
| + StreamSubscription get _subscription {
|
| + assert(_isLinked);
|
| + return _stateData;
|
| + }
|
| +
|
| + // Sets state to linked.
|
| + void set _subscription(StreamSubscription subscription) {
|
| + assert(_isInitial);
|
| + _stateData = subscription;
|
| + _state = _LINKED;
|
| + }
|
| +
|
| + void onData(void handleData(T data)) {
|
| + if (_isLinked) {
|
| + _subscription.onData(handleData);
|
| + } else {
|
| + assert(_isInitial);
|
| + _handlers[0] = handleData;
|
| + }
|
| + }
|
| +
|
| + void onError(void handleError(error, StackTrace stackTrace)) {
|
| + if (_isLinked) {
|
| + _subscription.onError(handleError);
|
| + } else {
|
| + assert(_isInitial);
|
| + _handlers[1] = handleError;
|
| + }
|
| + }
|
| +
|
| + void onDone(void handleDone()) {
|
| + if (_isLinked) {
|
| + _subscription.onDone(handleDone);
|
| + } else {
|
| + assert(_isInitial);
|
| + _handlers[2] = handleDone;
|
| + }
|
| + }
|
| +
|
| + void pause([Future resumeFuture]) {
|
| + if (_isLinked) {
|
| + _subscription.pause(resumeFuture);
|
| + } else if (!_isCancelled) {
|
| + _state += _PAUSE;
|
| + if (resumeFuture != null) {
|
| + resumeFuture.whenComplete(this.resume);
|
| + }
|
| + }
|
| + }
|
| +
|
| + void resume() {
|
| + if (_isLinked) {
|
| + _subscription.resume();
|
| + } else if (_state >= _PAUSE) {
|
| + _state -= _PAUSE;
|
| + }
|
| + }
|
| +
|
| + Future cancel() {
|
| + if (_isLinked) {
|
| + return _subscription.cancel();
|
| + } else {
|
| + _stateData = null;
|
| + _state = _CANCELLED;
|
| + return new Future.value();
|
| + }
|
| + }
|
| +
|
| + Future asFuture([futureValue]) {
|
| + if (_isLinked) {
|
| + return _subscription.asFuture(futureValue);
|
| + }
|
| + Completer completer = new Completer();
|
| + if (!_isCancelled) {
|
| + // Asking for a future of a cancelled subscription gives a future
|
| + // which never completes.
|
| + _handlers[1] = _cancelBeforeError(completer.completeError);
|
| + if (futureValue == null) {
|
| + _handlers[2] = completer.complete;
|
| + } else {
|
| + _handlers[2] = () { completer.complete(futureValue); };
|
| + }
|
| + }
|
| + return completer.future;
|
| + }
|
| +
|
| + bool get isPaused {
|
| + if (_isLinked) {
|
| + return _subscription.isPaused;
|
| + }
|
| + return _state >= _PAUSE;
|
| + }
|
| +
|
| + /// Helper function used by [asFuture].
|
| + ///
|
| + /// Returns an error handler which cancels the stream when it receives an
|
| + /// error.
|
| + Function _cancelBeforeError(Function handleError) {
|
| + return (e, s) {
|
| + cancel();
|
| + if (handleError is _BinaryCallback) {
|
| + handleError(e, s);
|
| + } else {
|
| + handleError(e);
|
| + }
|
| + };
|
| + }
|
| +}
|
| +
|
| +typedef _BinaryCallback(a, b);
|
|
|