Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(578)

Unified Diff: packages/async/lib/src/stream_completer.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « packages/async/lib/src/result_future.dart ('k') | packages/async/lib/src/stream_group.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: packages/async/lib/src/stream_completer.dart
diff --git a/packages/async/lib/src/stream_completer.dart b/packages/async/lib/src/stream_completer.dart
new file mode 100644
index 0000000000000000000000000000000000000000..c343e6e7bd2330ccda59e9c793f96b0de3fbed50
--- /dev/null
+++ b/packages/async/lib/src/stream_completer.dart
@@ -0,0 +1,180 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_completer;
+
+import "dart:async";
+
+/// A single-subscription [stream] where the contents are provided later.
+///
+/// It is generally recommended that you never create a `Future<Stream>`
+/// because you can just directly create a stream that doesn't do anything
+/// until it's ready to do so.
+/// This class can be used to create such a stream.
+///
+/// The [stream] is a normal stream that you can listen to immediately,
+/// but until either [setSourceStream] or [setEmpty] is called,
+/// the stream won't produce any events.
+///
+/// The same effect can be achieved by using a [StreamController]
+/// and adding the stream using `addStream` when both
+/// the controller's stream is listened to and the source stream is ready.
+/// This class attempts to shortcut some of the overhead when possible.
+/// For example, if the [stream] is only listened to
+/// after the source stream has been set,
+/// the listen is performed directly on the source stream.
+class StreamCompleter<T> {
+ /// The stream doing the actual work, is returned by [stream].
+ final _CompleterStream _stream = new _CompleterStream<T>();
+
+ /// Convert a `Future<Stream>` to a `Stream`.
+ ///
+ /// This creates a stream using a stream completer,
+ /// and sets the source stream to the result of the future when the
+ /// future completes.
+ ///
+ /// If the future completes with an error, the returned stream will
+ /// instead contain just that error.
+ static Stream fromFuture(Future<Stream> streamFuture) {
+ var completer = new StreamCompleter();
+ streamFuture.then(completer.setSourceStream,
+ onError: (e, s) {
+ completer.setSourceStream(streamFuture.asStream());
+ });
+ return completer.stream;
+ }
+
+ /// The stream of this completer.
+ ///
+ /// This stream is always a single-subscription stream.
+ ///
+ /// When a source stream is provided, its events will be forwarded to
+ /// listeners on this stream.
+ ///
+ /// The stream can be listened either before or after a source stream
+ /// is set.
+ Stream<T> get stream => _stream;
+
+ /// Set a stream as the source of events for the [StreamCompleter]'s
+ /// [stream].
+ ///
+ /// The completer's `stream` will act exactly as [sourceStream].
+ ///
+ /// If the source stream is set before [stream] is listened to,
+ /// the listen call on [stream] is forwarded directly to [sourceStream].
+ ///
+ /// If [stream] is listened to before setting the source stream,
+ /// an intermediate subscription is created. It looks like a completely
+ /// normal subscription, and can be paused or canceled, but it won't
+ /// produce any events until a source stream is provided.
+ ///
+ /// If the `stream` subscription is canceled before a source stream is set,
+ /// the source stream will be listened to and immediately canceled again.
+ ///
+ /// Otherwise, when the source stream is then set,
+ /// it is immediately listened to, and its events are forwarded to the
+ /// existing subscription.
+ ///
+ /// Either [setSourceStream] or [setEmpty] may be called at most once.
+ /// Trying to call either of them again will fail.
+ void setSourceStream(Stream<T> sourceStream) {
+ if (_stream._isSourceStreamSet) {
+ throw new StateError("Source stream already set");
+ }
+ _stream._setSourceStream(sourceStream);
+ }
+
+ /// Equivalent to setting an empty stream using [setSourceStream].
+ ///
+ /// Either [setSourceStream] or [setEmpty] may be called at most once.
+ /// Trying to call either of them again will fail.
+ void setEmpty() {
+ if (_stream._isSourceStreamSet) {
+ throw new StateError("Source stream already set");
+ }
+ _stream._setEmpty();
+ }
+}
+
+/// Stream completed by [StreamCompleter].
+class _CompleterStream<T> extends Stream<T> {
+ /// Controller for an intermediate stream.
+ ///
+ /// Created if the user listens on this stream before the source stream
+ /// is set, or if using [_setEmpty] so there is no source stream.
+ StreamController _controller;
+
+ /// Source stream for the events provided by this stream.
+ ///
+ /// Set when the completer sets the source stream using [_setSourceStream]
+ /// or [_setEmpty].
+ Stream _sourceStream;
+
+ StreamSubscription<T> listen(onData(T data),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ if (_controller == null) {
+ if (_sourceStream != null && !_sourceStream.isBroadcast) {
+ // If the source stream is itself single subscription,
+ // just listen to it directly instead of creating a controller.
+ return _sourceStream.listen(onData, onError: onError, onDone: onDone,
+ cancelOnError: cancelOnError);
+ }
+ _createController();
+ if (_sourceStream != null) {
+ _linkStreamToController();
+ }
+ }
+ return _controller.stream.listen(onData, onError: onError, onDone: onDone,
+ cancelOnError: cancelOnError);
+ }
+
+ /// Whether a source stream has been set.
+ ///
+ /// Used to throw an error if trying to set a source stream twice.
+ bool get _isSourceStreamSet => _sourceStream != null;
+
+ /// Sets the source stream providing the events for this stream.
+ ///
+ /// If set before the user listens, listen calls will be directed directly
+ /// to the source stream. If the user listenes earlier, and intermediate
+ /// stream is created using a stream controller, and the source stream is
+ /// linked into that stream later.
+ void _setSourceStream(Stream<T> sourceStream) {
+ assert(_sourceStream == null);
+ _sourceStream = sourceStream;
+ if (_controller != null) {
+ // User has already listened, so provide the data through controller.
+ _linkStreamToController();
+ }
+ }
+
+ /// Links source stream to controller when both are available.
+ void _linkStreamToController() {
+ assert(_controller != null);
+ assert(_sourceStream != null);
+ _controller.addStream(_sourceStream, cancelOnError: false)
+ .whenComplete(_controller.close);
+ }
+
+ /// Sets an empty source stream.
+ ///
+ /// Uses [_controller] for the stream, then closes the controller
+ /// immediately.
+ void _setEmpty() {
+ assert(_sourceStream == null);
+ if (_controller == null) {
+ _createController();
+ }
+ _sourceStream = _controller.stream; // Mark stream as set.
+ _controller.close();
+ }
+
+ // Creates the [_controller].
+ void _createController() {
+ assert(_controller == null);
+ _controller = new StreamController<T>(sync: true);
+ }
+}
« no previous file with comments | « packages/async/lib/src/result_future.dart ('k') | packages/async/lib/src/stream_group.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698