Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(57)

Side by Side Diff: lib/src/stream_completer.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Address comments. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_completer;
6
7 import "dart:async";
8
9 /// A single-subscription [stream] where the contents are provided later.
10 ///
11 /// It is generally recommended that you never create a `Future<Stream>`
12 /// because you can just directly create a stream that doesn't do anything
13 /// until it's ready to do so.
14 /// This class can be used to create such a stream.
15 ///
16 /// The [stream] is a normal stream that you can listen to immediately,
17 /// but until either [setSourceStream] or [setEmpty] is called,
18 /// the stream won't produce any events.
19 ///
20 /// The same effect can be achieved by using a [StreamController]
21 /// and adding the stream using `addStream` when both
22 /// the controller's stream is listened to and the source stream is ready.
23 /// This class attempts to shortcut some of the overhead when possible.
24 /// For example, if the [stream] is only listened to
25 /// after the source stream has been set,
26 /// the listen is performed directly on the source stream.
27 class StreamCompleter<T> {
28 /// The stream doing the actual work, is returned by [stream].
29 final _CompleterStream _stream = new _CompleterStream<T>();
30
31 /// Convert a `Future<Stream>` to a `Stream`.
32 ///
33 /// This creates a stream using a stream completer,
34 /// and sets the source stream to the result of the future when the
35 /// future completes.
36 ///
37 /// If the future completes with an error, the returned stream will
38 /// instead contain just that error.
39 static Stream fromFuture(Future<Stream> streamFuture) {
40 var completer = new StreamCompleter();
41 streamFuture.then(completer.setSourceStream,
42 onError: (e, s) {
43 completer.setSourceStream(streamFuture.asStream());
44 });
45 return completer.stream;
46 }
47
48 /// The stream of this completer.
49 ///
50 /// When a source stream is provided, its events will be forwarded to
51 /// listeners on this stream.
52 ///
53 /// The stream can be listened either before or after a source stream
54 /// is set.
nweiz 2015/06/30 23:39:48 Document that this is always single-subscription,
Lasse Reichstein Nielsen 2015/07/01 08:24:57 Done.
55 Stream<T> get stream => _stream;
56
57 /// Set a stream as the source of events for the [StreamCompleter]'s
58 /// [stream].
59 ///
60 /// The completer's `stream` will act exactly as [sourceStream].
61 ///
62 /// If the source stream is set before [stream] is listened to,
63 /// the listen call on [stream] is forwarded directly to [sourceStream].
64 ///
65 /// If [stream] is listened to before setting the source stream,
66 /// an intermediate subscription is created. It looks like a completely
67 /// normal subscription, and can be paused or canceled, but it won't
68 /// produce any events until a source stream is provided.
69 ///
70 /// If the `stream` subscription is canceled before a source stream is set,
71 /// the source stream will be listened to and immediately canceled again.
72 ///
73 /// Otherwise, when the source stream is then set,
74 /// it is immediately listened to, and its events are forwarded to the
75 /// existing subscription.
76 ///
77 /// Either [setSourceStream] or [setEmpty] may be called at most once.
78 /// Trying to call either of them again will fail.
79 void setSourceStream(Stream<T> sourceStream) {
80 if (_stream._isSourceStreamSet) {
81 throw new StateError("Source stream already set");
82 }
83 _stream._setSourceStream(sourceStream);
84 }
85
86 /// Equivalent to setting an empty stream using [setSourceStream].
87 ///
88 /// Either [setSourceStream] or [setEmpty] may be called at most once.
89 /// Trying to call either of them again will fail.
90 void setEmpty() {
91 if (_stream._isSourceStreamSet) {
92 throw new StateError("Source stream already set");
93 }
94 _stream._setEmpty();
95 }
96 }
97
98 /// Stream completed by [StreamCompleter].
99 class _CompleterStream<T> extends Stream<T> {
100 /// Controller for an intermediate stream.
101 ///
102 /// Created if the user listens on this stream before the source stream
103 /// is set, or if using [_setEmpty] so there is no source stream.
104 StreamController _controller;
105
106 /// Source stream for the events provided by this stream.
107 ///
108 /// Set when the completer sets the source stream using [_setSourceStream]
109 /// or [_setEmpty].
110 Stream _sourceStream;
111
112 /// Completer for future returned by [setSourceStream].
113 ///
114 /// The future is completed when the stream has been added or canceled,
115 /// and if canceling causes an error, the future is completed with that error.
116 final Completer _doneCompleter = new Completer();
nweiz 2015/06/30 23:39:48 It looks like this is never completed.
Lasse Reichstein Nielsen 2015/07/01 08:24:57 True. It should be completed when the source strea
117
118 StreamSubscription<T> listen(onData(T data),
119 {Function onError,
120 void onDone(),
121 bool cancelOnError}) {
122 if (_controller == null) {
123 if (_sourceStream != null && !_sourceStream.isBroadcast) {
124 // If the source stream is itself single subscription,
125 // just listen to it directly instead of creating a controller.
126 return _sourceStream.listen(onData, onError: onError, onDone: onDone,
127 cancelOnError: cancelOnError);
128 }
129 _createController();
130 if (_sourceStream != null) {
131 _linkStreamToController();
132 }
133 }
134 return _controller.stream.listen(onData, onError: onError, onDone: onDone,
135 cancelOnError: cancelOnError);
136 }
137
138 /// Whether a source stream has been set.
139 ///
140 /// Used to throw an error if trying to set a source stream twice.
141 bool get _isSourceStreamSet => _sourceStream != null;
142
143 /// Sets the source stream providing the events for this stream.
144 ///
145 /// If set before the user listens, listen calls will be directed directly
146 /// to the source stream. If the user listenes earlier, and intermediate
147 /// stream is created using a stream controller, and the source stream is
148 /// linked into that stream later.
149 Future _setSourceStream(Stream<T> sourceStream) {
nweiz 2015/06/30 23:39:47 It looks like this return value is unused. Should
Lasse Reichstein Nielsen 2015/07/01 08:24:57 I removed the _doneCompleter entirely. If someone
150 assert(_sourceStream == null);
151 _sourceStream = sourceStream;
152 if (_controller != null) {
153 // User has already listened, so provide the data through controller.
154 _linkStreamToController();
155 }
156 return _doneCompleter.future;
157 }
158
159 /// Links source stream to controller when both are available.
160 void _linkStreamToController() {
161 assert(_controller != null);
162 assert(_sourceStream != null);
163 _controller.addStream(_sourceStream, cancelOnError: false)
164 .whenComplete(_controller.close);
165 }
166
167 /// Sets an empty source stream.
168 ///
169 /// Uses [_controller] for the stream, then closes the controller
170 /// immediately.
171 void _setEmpty() {
172 assert(_sourceStream == null);
173 if (_controller == null) {
174 _createController();
175 }
176 _sourceStream = _controller.stream; // Mark stream as set.
177 _controller.close();
178 }
179
180 // Creates the [_controller].
181 void _createController() {
182 assert(_controller == null);
183 _controller = new StreamController<T>(sync: true);
184 }
185 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698