Chromium Code Reviews| Index: lib/src/stream_completer.dart |
| diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..2a1f5ab89fc851fdfaaaee98e7d92749284ee29b |
| --- /dev/null |
| +++ b/lib/src/stream_completer.dart |
| @@ -0,0 +1,185 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.stream_completer; |
| + |
| +import "dart:async"; |
| + |
| +/// A single-subscription [stream] where the contents are provided later. |
| +/// |
| +/// It is generally recommended that you never create a `Future<Stream>` |
| +/// because you can just directly create a stream that doesn't do anything |
| +/// until it's ready to do so. |
| +/// This class can be used to create such a stream. |
| +/// |
| +/// The [stream] is a normal stream that you can listen to immediately, |
| +/// but until either [setSourceStream] or [setEmpty] is called, |
| +/// the stream won't produce any events. |
| +/// |
| +/// The same effect can be achieved by using a [StreamController] |
| +/// and adding the stream using `addStream` when both |
| +/// the controller's stream is listened to and the source stream is ready. |
| +/// This class attempts to shortcut some of the overhead when possible. |
| +/// For example, if the [stream] is only listened to |
| +/// after the source stream has been set, |
| +/// the listen is performed directly on the source stream. |
| +class StreamCompleter<T> { |
| + /// The stream doing the actual work, is returned by [stream]. |
| + final _CompleterStream _stream = new _CompleterStream<T>(); |
| + |
| + /// Convert a `Future<Stream>` to a `Stream`. |
| + /// |
| + /// This creates a stream using a stream completer, |
| + /// and sets the source stream to the result of the future when the |
| + /// future completes. |
| + /// |
| + /// If the future completes with an error, the returned stream will |
| + /// instead contain just that error. |
| + static Stream fromFuture(Future<Stream> streamFuture) { |
| + var completer = new StreamCompleter(); |
| + streamFuture.then(completer.setSourceStream, |
| + onError: (e, s) { |
| + completer.setSourceStream(streamFuture.asStream()); |
| + }); |
| + return completer.stream; |
| + } |
| + |
| + /// The stream of this completer. |
| + /// |
| + /// When a source stream is provided, its events will be forwarded to |
| + /// listeners on this stream. |
| + /// |
| + /// The stream can be listened either before or after a source stream |
| + /// is set. |
|
nweiz
2015/06/30 23:39:48
Document that this is always single-subscription,
Lasse Reichstein Nielsen
2015/07/01 08:24:57
Done.
|
| + Stream<T> get stream => _stream; |
| + |
| + /// Set a stream as the source of events for the [StreamCompleter]'s |
| + /// [stream]. |
| + /// |
| + /// The completer's `stream` will act exactly as [sourceStream]. |
| + /// |
| + /// If the source stream is set before [stream] is listened to, |
| + /// the listen call on [stream] is forwarded directly to [sourceStream]. |
| + /// |
| + /// If [stream] is listened to before setting the source stream, |
| + /// an intermediate subscription is created. It looks like a completely |
| + /// normal subscription, and can be paused or canceled, but it won't |
| + /// produce any events until a source stream is provided. |
| + /// |
| + /// If the `stream` subscription is canceled before a source stream is set, |
| + /// the source stream will be listened to and immediately canceled again. |
| + /// |
| + /// Otherwise, when the source stream is then set, |
| + /// it is immediately listened to, and its events are forwarded to the |
| + /// existing subscription. |
| + /// |
| + /// Either [setSourceStream] or [setEmpty] may be called at most once. |
| + /// Trying to call either of them again will fail. |
| + void setSourceStream(Stream<T> sourceStream) { |
| + if (_stream._isSourceStreamSet) { |
| + throw new StateError("Source stream already set"); |
| + } |
| + _stream._setSourceStream(sourceStream); |
| + } |
| + |
| + /// Equivalent to setting an empty stream using [setSourceStream]. |
| + /// |
| + /// Either [setSourceStream] or [setEmpty] may be called at most once. |
| + /// Trying to call either of them again will fail. |
| + void setEmpty() { |
| + if (_stream._isSourceStreamSet) { |
| + throw new StateError("Source stream already set"); |
| + } |
| + _stream._setEmpty(); |
| + } |
| +} |
| + |
| +/// Stream completed by [StreamCompleter]. |
| +class _CompleterStream<T> extends Stream<T> { |
| + /// Controller for an intermediate stream. |
| + /// |
| + /// Created if the user listens on this stream before the source stream |
| + /// is set, or if using [_setEmpty] so there is no source stream. |
| + StreamController _controller; |
| + |
| + /// Source stream for the events provided by this stream. |
| + /// |
| + /// Set when the completer sets the source stream using [_setSourceStream] |
| + /// or [_setEmpty]. |
| + Stream _sourceStream; |
| + |
| + /// Completer for future returned by [setSourceStream]. |
| + /// |
| + /// The future is completed when the stream has been added or canceled, |
| + /// and if canceling causes an error, the future is completed with that error. |
| + final Completer _doneCompleter = new Completer(); |
|
nweiz
2015/06/30 23:39:48
It looks like this is never completed.
Lasse Reichstein Nielsen
2015/07/01 08:24:57
True. It should be completed when the source strea
|
| + |
| + StreamSubscription<T> listen(onData(T data), |
| + {Function onError, |
| + void onDone(), |
| + bool cancelOnError}) { |
| + if (_controller == null) { |
| + if (_sourceStream != null && !_sourceStream.isBroadcast) { |
| + // If the source stream is itself single subscription, |
| + // just listen to it directly instead of creating a controller. |
| + return _sourceStream.listen(onData, onError: onError, onDone: onDone, |
| + cancelOnError: cancelOnError); |
| + } |
| + _createController(); |
| + if (_sourceStream != null) { |
| + _linkStreamToController(); |
| + } |
| + } |
| + return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
| + cancelOnError: cancelOnError); |
| + } |
| + |
| + /// Whether a source stream has been set. |
| + /// |
| + /// Used to throw an error if trying to set a source stream twice. |
| + bool get _isSourceStreamSet => _sourceStream != null; |
| + |
| + /// Sets the source stream providing the events for this stream. |
| + /// |
| + /// If set before the user listens, listen calls will be directed directly |
| + /// to the source stream. If the user listenes earlier, and intermediate |
| + /// stream is created using a stream controller, and the source stream is |
| + /// linked into that stream later. |
| + Future _setSourceStream(Stream<T> sourceStream) { |
|
nweiz
2015/06/30 23:39:47
It looks like this return value is unused. Should
Lasse Reichstein Nielsen
2015/07/01 08:24:57
I removed the _doneCompleter entirely. If someone
|
| + assert(_sourceStream == null); |
| + _sourceStream = sourceStream; |
| + if (_controller != null) { |
| + // User has already listened, so provide the data through controller. |
| + _linkStreamToController(); |
| + } |
| + return _doneCompleter.future; |
| + } |
| + |
| + /// Links source stream to controller when both are available. |
| + void _linkStreamToController() { |
| + assert(_controller != null); |
| + assert(_sourceStream != null); |
| + _controller.addStream(_sourceStream, cancelOnError: false) |
| + .whenComplete(_controller.close); |
| + } |
| + |
| + /// Sets an empty source stream. |
| + /// |
| + /// Uses [_controller] for the stream, then closes the controller |
| + /// immediately. |
| + void _setEmpty() { |
| + assert(_sourceStream == null); |
| + if (_controller == null) { |
| + _createController(); |
| + } |
| + _sourceStream = _controller.stream; // Mark stream as set. |
| + _controller.close(); |
| + } |
| + |
| + // Creates the [_controller]. |
| + void _createController() { |
| + assert(_controller == null); |
| + _controller = new StreamController<T>(sync: true); |
| + } |
| +} |