| Index: mojo/public/dart/third_party/async/lib/src/stream_completer.dart
|
| diff --git a/mojo/public/dart/third_party/async/lib/src/stream_completer.dart b/mojo/public/dart/third_party/async/lib/src/stream_completer.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..c343e6e7bd2330ccda59e9c793f96b0de3fbed50
|
| --- /dev/null
|
| +++ b/mojo/public/dart/third_party/async/lib/src/stream_completer.dart
|
| @@ -0,0 +1,180 @@
|
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +library async.stream_completer;
|
| +
|
| +import "dart:async";
|
| +
|
| +/// A single-subscription [stream] where the contents are provided later.
|
| +///
|
| +/// It is generally recommended that you never create a `Future<Stream>`
|
| +/// because you can just directly create a stream that doesn't do anything
|
| +/// until it's ready to do so.
|
| +/// This class can be used to create such a stream.
|
| +///
|
| +/// The [stream] is a normal stream that you can listen to immediately,
|
| +/// but until either [setSourceStream] or [setEmpty] is called,
|
| +/// the stream won't produce any events.
|
| +///
|
| +/// The same effect can be achieved by using a [StreamController]
|
| +/// and adding the stream using `addStream` when both
|
| +/// the controller's stream is listened to and the source stream is ready.
|
| +/// This class attempts to shortcut some of the overhead when possible.
|
| +/// For example, if the [stream] is only listened to
|
| +/// after the source stream has been set,
|
| +/// the listen is performed directly on the source stream.
|
| +class StreamCompleter<T> {
|
| + /// The stream doing the actual work, is returned by [stream].
|
| + final _CompleterStream _stream = new _CompleterStream<T>();
|
| +
|
| + /// Convert a `Future<Stream>` to a `Stream`.
|
| + ///
|
| + /// This creates a stream using a stream completer,
|
| + /// and sets the source stream to the result of the future when the
|
| + /// future completes.
|
| + ///
|
| + /// If the future completes with an error, the returned stream will
|
| + /// instead contain just that error.
|
| + static Stream fromFuture(Future<Stream> streamFuture) {
|
| + var completer = new StreamCompleter();
|
| + streamFuture.then(completer.setSourceStream,
|
| + onError: (e, s) {
|
| + completer.setSourceStream(streamFuture.asStream());
|
| + });
|
| + return completer.stream;
|
| + }
|
| +
|
| + /// The stream of this completer.
|
| + ///
|
| + /// This stream is always a single-subscription stream.
|
| + ///
|
| + /// When a source stream is provided, its events will be forwarded to
|
| + /// listeners on this stream.
|
| + ///
|
| + /// The stream can be listened either before or after a source stream
|
| + /// is set.
|
| + Stream<T> get stream => _stream;
|
| +
|
| + /// Set a stream as the source of events for the [StreamCompleter]'s
|
| + /// [stream].
|
| + ///
|
| + /// The completer's `stream` will act exactly as [sourceStream].
|
| + ///
|
| + /// If the source stream is set before [stream] is listened to,
|
| + /// the listen call on [stream] is forwarded directly to [sourceStream].
|
| + ///
|
| + /// If [stream] is listened to before setting the source stream,
|
| + /// an intermediate subscription is created. It looks like a completely
|
| + /// normal subscription, and can be paused or canceled, but it won't
|
| + /// produce any events until a source stream is provided.
|
| + ///
|
| + /// If the `stream` subscription is canceled before a source stream is set,
|
| + /// the source stream will be listened to and immediately canceled again.
|
| + ///
|
| + /// Otherwise, when the source stream is then set,
|
| + /// it is immediately listened to, and its events are forwarded to the
|
| + /// existing subscription.
|
| + ///
|
| + /// Either [setSourceStream] or [setEmpty] may be called at most once.
|
| + /// Trying to call either of them again will fail.
|
| + void setSourceStream(Stream<T> sourceStream) {
|
| + if (_stream._isSourceStreamSet) {
|
| + throw new StateError("Source stream already set");
|
| + }
|
| + _stream._setSourceStream(sourceStream);
|
| + }
|
| +
|
| + /// Equivalent to setting an empty stream using [setSourceStream].
|
| + ///
|
| + /// Either [setSourceStream] or [setEmpty] may be called at most once.
|
| + /// Trying to call either of them again will fail.
|
| + void setEmpty() {
|
| + if (_stream._isSourceStreamSet) {
|
| + throw new StateError("Source stream already set");
|
| + }
|
| + _stream._setEmpty();
|
| + }
|
| +}
|
| +
|
| +/// Stream completed by [StreamCompleter].
|
| +class _CompleterStream<T> extends Stream<T> {
|
| + /// Controller for an intermediate stream.
|
| + ///
|
| + /// Created if the user listens on this stream before the source stream
|
| + /// is set, or if using [_setEmpty] so there is no source stream.
|
| + StreamController _controller;
|
| +
|
| + /// Source stream for the events provided by this stream.
|
| + ///
|
| + /// Set when the completer sets the source stream using [_setSourceStream]
|
| + /// or [_setEmpty].
|
| + Stream _sourceStream;
|
| +
|
| + StreamSubscription<T> listen(onData(T data),
|
| + {Function onError,
|
| + void onDone(),
|
| + bool cancelOnError}) {
|
| + if (_controller == null) {
|
| + if (_sourceStream != null && !_sourceStream.isBroadcast) {
|
| + // If the source stream is itself single subscription,
|
| + // just listen to it directly instead of creating a controller.
|
| + return _sourceStream.listen(onData, onError: onError, onDone: onDone,
|
| + cancelOnError: cancelOnError);
|
| + }
|
| + _createController();
|
| + if (_sourceStream != null) {
|
| + _linkStreamToController();
|
| + }
|
| + }
|
| + return _controller.stream.listen(onData, onError: onError, onDone: onDone,
|
| + cancelOnError: cancelOnError);
|
| + }
|
| +
|
| + /// Whether a source stream has been set.
|
| + ///
|
| + /// Used to throw an error if trying to set a source stream twice.
|
| + bool get _isSourceStreamSet => _sourceStream != null;
|
| +
|
| + /// Sets the source stream providing the events for this stream.
|
| + ///
|
| + /// If set before the user listens, listen calls will be directed directly
|
| + /// to the source stream. If the user listenes earlier, and intermediate
|
| + /// stream is created using a stream controller, and the source stream is
|
| + /// linked into that stream later.
|
| + void _setSourceStream(Stream<T> sourceStream) {
|
| + assert(_sourceStream == null);
|
| + _sourceStream = sourceStream;
|
| + if (_controller != null) {
|
| + // User has already listened, so provide the data through controller.
|
| + _linkStreamToController();
|
| + }
|
| + }
|
| +
|
| + /// Links source stream to controller when both are available.
|
| + void _linkStreamToController() {
|
| + assert(_controller != null);
|
| + assert(_sourceStream != null);
|
| + _controller.addStream(_sourceStream, cancelOnError: false)
|
| + .whenComplete(_controller.close);
|
| + }
|
| +
|
| + /// Sets an empty source stream.
|
| + ///
|
| + /// Uses [_controller] for the stream, then closes the controller
|
| + /// immediately.
|
| + void _setEmpty() {
|
| + assert(_sourceStream == null);
|
| + if (_controller == null) {
|
| + _createController();
|
| + }
|
| + _sourceStream = _controller.stream; // Mark stream as set.
|
| + _controller.close();
|
| + }
|
| +
|
| + // Creates the [_controller].
|
| + void _createController() {
|
| + assert(_controller == null);
|
| + _controller = new StreamController<T>(sync: true);
|
| + }
|
| +}
|
|
|