Index: lib/src/stream_completer.dart |
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..9d743a183043383c46bdcd144f7663b36c343ae1 |
--- /dev/null |
+++ b/lib/src/stream_completer.dart |
@@ -0,0 +1,275 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library async.streams.stream_completer; |
+ |
+import 'dart:async'; |
+ |
+/// A [stream] where the contents aren't known at creation time. |
+/// |
+/// It is generally recommended that you never create a `Future<Stream>` |
+/// because you can just use directly create a stream that doesn't do anything |
+/// until it's ready to do so. |
+/// This class can be used to create such a stream. |
+/// |
+/// The [stream] is a normal stream that you can listen to immediately, |
+/// but until [setSourceStream] is called, the stream won't produce |
+/// any events. |
+/// |
+/// The same effect can be achieved by using a [StreamController] |
+/// and adding the stream using `addStream` when both |
+/// the controller's stream is listened to and the source stream is ready. |
+/// This class attempts to shortcut some of the overhead when possible. |
+/// For example, if the [stream] is only listened to |
+/// after the source stream has been set, |
+/// the listen is performed directly on the source stream. |
+class StreamCompleter<T> { |
+ final Stream<T> stream = new _PromiseStream<T>(); |
+ |
+ /// Set a stream as the source of events for the [StreamCompleter]. |
+ /// |
+ /// There is no guarantee that the stream will ever be listened to. |
+ void setSourceStream(Stream<T> stream) { |
+ _PromiseStream promiseStream = this.stream; |
+ promiseStream._linkStream(stream); |
+ } |
+ |
+ /// As setting an empty stream using [setSourceStream]. |
+ void setEmpty() { |
+ // TODO(lrn): Optimize this to not actually create the empty stream. |
+ _PromiseStream promiseStream = this.stream; |
+ promiseStream._linkStream(new Stream.fromIterable(const [])); |
+ } |
+} |
+ |
+/// Stream that acts as a source stream it is (eventually) linked with. |
+/// |
+/// The linked source stream can be set after a user has started listening on |
+/// this stream. No events occur before the source stream is provided. |
+/// |
+/// If a user listens before events are available, the state of the |
+/// subscription is maintained, and the subscription is then linked |
+/// to the source stream when that becomes available. |
+class _PromiseStream<T> extends Stream<T> { |
+ static const int _UNINITIALIZED = 0; |
+ static const int _STREAM_SET = 1; |
+ static const int _LISTENED = 2; |
+ |
+ int _state = _UNINITIALIZED; |
+ var _streamOrSubscription; |
+ |
+ void _linkStream(Stream stream) { |
+ if (_state == _UNINITIALIZED) { |
+ _streamOrSubscription = stream; |
+ _state |= _STREAM_SET; |
+ } else if (_state == _LISTENED) { |
+ _PromiseSubscription promiseSubscription = _streamOrSubscription; |
+ promiseSubscription._linkStream(stream); |
+ _state |= _STREAM_SET; |
+ } else { |
+ throw new StateError("Stream already linked."); |
+ } |
+ } |
+ |
+ StreamSubscription listen(void onData(T event), |
+ {Function onError, |
+ void onDone(), |
+ bool cancelOnError}) { |
+ int state = _state; |
+ _state = state | _LISTENED; |
+ if (state == _UNINITIALIZED) { |
+ _PromiseSubscription subscription = |
+ new _PromiseSubscription<T>(true == cancelOnError); |
+ subscription.onData(onData); |
+ subscription.onError(onError); |
+ subscription.onDone(onDone); |
+ _streamOrSubscription = subscription; |
+ return subscription; |
+ } |
+ if (state == _STREAM_SET) { |
+ Stream stream = _streamOrSubscription; |
+ return stream.listen(onData, onError: onError, onDone: onDone, |
+ cancelOnError: cancelOnError); |
+ } |
+ print(state); |
+ throw new StateError("Stream has already been listened to."); |
+ } |
+} |
+ |
+/// Subscription for a [_PromiseStream] that is listened to but has no data. |
+/// |
+/// Maintains the state of a stream subscription that hasn't received any |
+/// events until events are available, then it starts forwarding to another |
+/// subscription. |
+class _PromiseSubscription<T> implements StreamSubscription<T> { |
+ // State values |
+ // The subscription is in one of three distinct states: |
+ // - Initial (remembers whether it's cancelOnError and paused). |
+ // - Cancelled (before being linked). |
+ // - Linked (before being cancelled). |
+ static const int _INIT = 0; |
+ static const int _INIT_CANCEL_ON_ERROR = 1; |
+ static const int _CANCELLED = 2; // Exclusive. |
+ static const int _LINKED = 3; // Exclusive |
+ static const int _PAUSE = 4; // Used with _INIT or _INIT_CANCEL_ON_ERROR. |
+ |
+ /// State represents the status of the subscription until the |
+ /// real subscription becomes available. |
+ /// |
+ /// While `_state` is not `_LINKED` or `_CANCELED`, `_stateData` contains |
+ /// a list of length three with the data, error and done handlers that |
+ /// have been set. |
+ /// |
+ /// While `_state` is [_LINKED], [_stateData] contains the real |
+ /// stream subscription. |
+ /// |
+ /// When `_state` is `_CANCELED`, `_stateData` is cleared since the |
+ /// event handlers won't be needed anyway. |
+ int _state; |
+ var _stateData = new List(3); |
+ |
+ _PromiseSubscription(bool cancelOnError) |
+ : _state = (cancelOnError ? _INIT_CANCEL_ON_ERROR : _INIT); |
+ |
+ bool get _isLinked => _state == _LINKED; |
+ bool get _isCancelled => _state == _CANCELLED; |
+ bool get _isInitial => (_state & (_PAUSE - 1)) <= _INIT_CANCEL_ON_ERROR; |
+ |
+ void _linkStream(Stream stream) { |
+ if (_isLinked) { |
+ throw new StateError("Already linked to a stream."); |
+ } |
+ if (_isCancelled) { |
+ return; |
+ } |
+ bool cancelOnError = (_state & _INIT_CANCEL_ON_ERROR) != 0; |
+ StreamSubscription subscription = |
+ stream.listen(null, cancelOnError: cancelOnError); |
+ List handlers = _stateData; |
+ subscription.onData(handlers[0]); |
+ subscription.onError(handlers[1]); |
+ subscription.onDone(handlers[2]); |
+ int state = _state; |
+ _subscription = subscription; |
+ while (state >= _PAUSE) { |
+ subscription.pause(); |
+ state -= _PAUSE; |
+ } |
+ } |
+ |
+ List get _handlers { |
+ assert(_isInitial); |
+ return _stateData; |
+ } |
+ |
+ StreamSubscription get _subscription { |
+ assert(_isLinked); |
+ return _stateData; |
+ } |
+ |
+ // Sets state to linked. |
+ void set _subscription(StreamSubscription subscription) { |
+ assert(_isInitial); |
+ _stateData = subscription; |
+ _state = _LINKED; |
+ } |
+ |
+ void onData(void handleData(T data)) { |
+ if (_isLinked) { |
+ _subscription.onData(handleData); |
+ } else { |
+ assert(_isInitial); |
+ _handlers[0] = handleData; |
+ } |
+ } |
+ |
+ void onError(void handleError(error, StackTrace stackTrace)) { |
+ if (_isLinked) { |
+ _subscription.onError(handleError); |
+ } else { |
+ assert(_isInitial); |
+ _handlers[1] = handleError; |
+ } |
+ } |
+ |
+ void onDone(void handleDone()) { |
+ if (_isLinked) { |
+ _subscription.onDone(handleDone); |
+ } else { |
+ assert(_isInitial); |
+ _handlers[2] = handleDone; |
+ } |
+ } |
+ |
+ void pause([Future resumeFuture]) { |
+ if (_isLinked) { |
+ _subscription.pause(resumeFuture); |
+ } else if (!_isCancelled) { |
+ _state += _PAUSE; |
+ if (resumeFuture != null) { |
+ resumeFuture.whenComplete(this.resume); |
+ } |
+ } |
+ } |
+ |
+ void resume() { |
+ if (_isLinked) { |
+ _subscription.resume(); |
+ } else if (_state >= _PAUSE) { |
+ _state -= _PAUSE; |
+ } |
+ } |
+ |
+ Future cancel() { |
+ if (_isLinked) { |
+ return _subscription.cancel(); |
+ } else { |
+ _stateData = null; |
+ _state = _CANCELLED; |
+ return new Future.value(); |
+ } |
+ } |
+ |
+ Future asFuture([futureValue]) { |
+ if (_isLinked) { |
+ return _subscription.asFuture(futureValue); |
+ } |
+ Completer completer = new Completer(); |
+ if (!_isCancelled) { |
+ // Asking for a future of a cancelled subscription gives a future |
+ // which never completes. |
+ _handlers[1] = _cancelBeforeError(completer.completeError); |
+ if (futureValue == null) { |
+ _handlers[2] = completer.complete; |
+ } else { |
+ _handlers[2] = () { completer.complete(futureValue); }; |
+ } |
+ } |
+ return completer.future; |
+ } |
+ |
+ bool get isPaused { |
+ if (_isLinked) { |
+ return _subscription.isPaused; |
+ } |
+ return _state >= _PAUSE; |
+ } |
+ |
+ /// Helper function used by [asFuture]. |
+ /// |
+ /// Returns an error handler which cancels the stream when it receives an |
+ /// error. |
+ Function _cancelBeforeError(Function handleError) { |
+ return (e, s) { |
+ cancel(); |
+ if (handleError is _BinaryCallback) { |
+ handleError(e, s); |
+ } else { |
+ handleError(e); |
+ } |
+ }; |
+ } |
+} |
+ |
+typedef _BinaryCallback(a, b); |