Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(302)

Side by Side Diff: lib/src/stream_completer.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Add all.dart to test. Apparently people like that. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_completer;
6
7 import "dart:async";
8 import "delegating_stream_subscription.dart";
9
10 /// A single-subscription [stream] where the contents are provided later.
11 ///
12 /// It is generally recommended that you never create a `Future<Stream>`
13 /// because you can just directly create a stream that doesn't do anything
14 /// until it's ready to do so.
15 /// This class can be used to create such a stream.
16 ///
17 /// The [stream] is a normal stream that you can listen to immediately,
18 /// but until either [setSourceStream] or [setEmpty] is called,
19 /// the stream won't produce any events.
20 ///
21 /// The same effect can be achieved by using a [StreamController]
22 /// and adding the stream using `addStream` when both
23 /// the controller's stream is listened to and the source stream is ready.
24 /// This class attempts to shortcut some of the overhead when possible.
25 /// For example, if the [stream] is only listened to
26 /// after the source stream has been set,
27 /// the listen is performed directly on the source stream.
28 class StreamCompleter<T> {
29 /// The stream doing the actual work, is returned by [stream].
30 final _CompleterStream _stream = new _CompleterStream<T>();
31
32 /// Convert a `Future<Stream>` to a `Stream`.
33 ///
34 /// This creates a stream using a stream completer,
35 /// and sets the source stream to the result of the future when the
36 /// future completes.
37 ///
38 /// If the future completes with an error, the returned stream will
39 /// instead contain just that error.
40 static Stream fromFuture(Future<Stream> streamFuture) {
41 var completer = new StreamCompleter();
42 streamFuture.then(completer.setSourceStream,
43 onError: (e, s) {
44 completer.setSourceStream(streamFuture.asStream());
45 });
46 return completer.stream;
47 }
48
49 /// The stream of this completer.
50 ///
51 /// When a source stream is provided, its events will be forwarded to
52 /// listeners on this stream.
53 ///
54 /// The stream can be listened either before or after a source stream
55 /// is set.
56 Stream<T> get stream => _stream;
57
58 /// Set a stream as the source of events for the [StreamCompleter]'s
59 /// [stream].
60 ///
61 /// The completer's `stream` will act exactly as [sourceStream].
62 ///
63 /// If the source stream is set before [stream] is listened to,
64 /// the listen call on [stream] is forwarded directly to [sourceStream].
65 ///
66 /// If [stream] is listened to before setting the source stream,
67 /// an intermediate subscription is created. It looks like a completely
68 /// normal subscription, and can be paused or canceled, but it won't
69 /// produce any events until a source stream is provided.
70 ///
71 /// If the `stream` subscription is canceled before a source stream is set,
72 /// the source stream will be listened to and immediately canceled again.
73 ///
74 /// Otherwise, when the source stream is then set,
75 /// it is immediately listened to, and its events are forwarded to the
76 /// existing subscription.
77 ///
78 /// Either [setSourceStream] or [setEmpty] may be called at most once.
79 /// Trying to call either of them again will fail.
80 void setSourceStream(Stream<T> sourceStream) {
81 if (_stream._isSourceStreamSet) {
82 throw new StateError("Source stream already set");
83 }
84 _stream._setSourceStream(sourceStream);
85 }
86
87 /// Equivalent to setting an empty stream using [setSourceStream].
88 ///
89 /// Either [setSourceStream] or [setEmpty] may be called at most once.
90 /// Trying to call either of them again will fail.
91 void setEmpty() {
92 if (_stream._isSourceStreamSet) {
93 throw new StateError("Source stream already set");
94 }
95 _stream._setEmpty();
96 }
97 }
98
99 /// Stream completed by [StreamCompleter].
100 class _CompleterStream<T> extends Stream<T> {
101 /// Controller for an intermediate stream.
102 ///
103 /// Created if the user listens on this stream before the source stream
104 /// is set, or if using [_setEmpty] so there is no source stream.
105 StreamController _controller;
106
107 /// Source stream for the events provided by this stream.
108 ///
109 /// Set when the completer sets the source stream using [_setSourceStream]
110 /// or [_setEmpty].
111 Stream _sourceStream;
112
113 StreamSubscription<T> listen(onData(T data),
114 {Function onError,
115 void onDone(),
116 bool cancelOnError}) {
117 if (_controller == null) {
118 if (_sourceStream != null) {
119 return _sourceStream.listen(onData, onError: onError, onDone: onDone,
120 cancelOnError: cancelOnError);
nweiz 2015/06/18 23:44:25 As written, if a completer is completed with a bro
Lasse Reichstein Nielsen 2015/06/30 10:34:12 Acknowledged.
121 }
122 _createController();
123 }
124 return _controller.stream.listen(onData, onError: onError, onDone: onDone,
125 cancelOnError: cancelOnError);
126 }
127
128 /// Whether a source stream has been set.
129 ///
130 /// Used to throw an error if trying to set a source stream twice.
131 bool get _isSourceStreamSet => _sourceStream != null;
132
133 /// Sets the source stream providing the events for this stream.
134 ///
135 /// If set before the user listens, listen calls will be directed directly
136 /// to the source stream. If the user listenes earlier, and intermediate
137 /// stream is created using a stream controller, and the source stream is
138 /// linked into that stream later.
139 void _setSourceStream(Stream<T> sourceStream) {
140 assert(_sourceStream == null);
141 _sourceStream = sourceStream;
142 if (_controller != null) {
143 // User has already listened, so provide the data through controller.
144 _controller.addStream(sourceStream, cancelOnError: false)
145 .whenComplete(_controller.close);
146 }
147 }
148
149 /// Sets an empty source stream.
150 ///
151 /// Uses [_controller] for the stream, then closes the controller
152 /// immediately.
153 void _setEmpty() {
154 assert(_sourceStream == null);
155 if (_controller == null) {
156 _createController();
157 }
158 _sourceStream = _controller.stream; // Mark stream as set.
159 _controller.close();
160 }
161
162 // Creates the [_controller].
163 void _createController() {
164 assert(_controller == null);
165 _controller = new StreamController<T>(sync: true);
166 }
167 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698