OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library async.stream_completer; | |
6 | |
7 import "dart:async"; | |
8 import "delegating_stream_subscription.dart"; | |
9 | |
10 /// A single-subscription [stream] where the contents are provided later. | |
11 /// | |
12 /// It is generally recommended that you never create a `Future<Stream>` | |
13 /// because you can just directly create a stream that doesn't do anything | |
14 /// until it's ready to do so. | |
15 /// This class can be used to create such a stream. | |
16 /// | |
17 /// The [stream] is a normal stream that you can listen to immediately, | |
18 /// but until either [setSourceStream] or [setEmpty] is called, | |
19 /// the stream won't produce any events. | |
20 /// | |
21 /// The same effect can be achieved by using a [StreamController] | |
22 /// and adding the stream using `addStream` when both | |
23 /// the controller's stream is listened to and the source stream is ready. | |
24 /// This class attempts to shortcut some of the overhead when possible. | |
25 /// For example, if the [stream] is only listened to | |
26 /// after the source stream has been set, | |
27 /// the listen is performed directly on the source stream. | |
28 class StreamCompleter<T> { | |
29 /// The stream of this completer. | |
30 /// | |
31 /// When a source stream is provided, its events will be forwarded to | |
32 /// listeners on this stream. | |
33 /// | |
34 /// The stream can be listened either before or after a source stream | |
35 /// is set. | |
36 final Stream<T> stream = new _CompleterStream<T>(); | |
nweiz
2015/06/16 01:05:23
I should have mentioned this in the last pass (sor
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Ack. Done.
| |
37 | |
38 /// Convert a `Future<Stream>` to a `Stream`. | |
39 /// | |
40 /// This creates a stream using a stream completer, | |
41 /// and sets the source stream to the result of the future when the | |
42 /// future completes. | |
43 /// | |
44 /// If the future completes with an error, the returned stream will | |
45 /// instead contain just that error. | |
46 static Stream fromFuture(Future<Stream> streamFuture) { | |
47 var completer = new StreamCompleter(); | |
48 streamFuture.then(completer.setSourceStream, | |
49 onError: (e, s) { | |
nweiz
2015/06/16 01:05:23
"e, s" -> "_"
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Done.
| |
50 completer.setSourceStream(streamFuture.asStream()); | |
51 }); | |
52 return completer.stream; | |
53 } | |
54 | |
55 /// Set a stream as the source of events for the [StreamCompleter]'s | |
56 /// [stream]. | |
57 /// | |
58 /// The completer's `stream` will act exactly as [sourceStream]. | |
59 /// | |
60 /// If the source stream is set before [stream] is listened to, | |
61 /// the listen call on [stream] is forwarded directly to [sourceStream]. | |
62 /// | |
63 /// If [stream] is listened to before setting the source stream, | |
64 /// an intermediate subscription is created. It looks like a completely | |
65 /// normal subscription, and can be paused or canceled, but it won't | |
66 /// produce any events until a source stream is provided. | |
67 /// | |
68 /// If the `stream` subscription is canceled before a source stream is set, | |
69 /// then nothing further happens. This means that the `sourceStream` may | |
70 /// never be listened to, even if the [stream] has been listened to. | |
71 /// | |
72 /// Otherwise, when the source stream is then set, | |
73 /// it is immediately listened to. | |
74 /// If the existing subscription was paused, the source stream subscription | |
75 /// is paused as many times, and then all events and callbacks are forwarded | |
nweiz
2015/06/16 01:05:23
It's kind of strange that this forwards the same *
Lasse Reichstein Nielsen
2015/06/18 12:10:12
If you pause three times, you need to resume three
| |
76 /// between the two subscriptions, so the listener works as if it was | |
77 /// listening to the source stream subscription directly. | |
78 /// | |
79 /// Either [setSourceStream] or [setEmpty] may be called at most once. | |
80 /// Trying to call either of them again will fail. | |
81 void setSourceStream(Stream<T> sourceStream) { | |
82 _CompleterStream completerStream = this.stream; | |
83 completerStream._linkStream(sourceStream); | |
84 } | |
85 | |
86 /// Equivalent to setting an empty stream using [setSourceStream]. | |
87 /// | |
88 /// Either [setSourceStream] or [setEmpty] may be called at most once. | |
89 /// Trying to call either of them again will fail. | |
90 void setEmpty() { | |
91 // TODO(lrn): Optimize this to not actually create the empty stream. | |
92 _CompleterStream completerStream = this.stream; | |
93 completerStream._linkStream(new Stream.fromIterable(const [])); | |
94 } | |
95 } | |
96 | |
97 /// Stream that acts as a source stream it is (eventually) linked with. | |
98 /// | |
99 /// The linked source stream can be set after a user has started listening on | |
100 /// this stream. No events occur before the source stream is provided. | |
101 /// | |
102 /// If a user listens before events are available, the state of the | |
103 /// subscription is maintained, and the subscription is then linked | |
104 /// to the source stream when that becomes available. | |
105 class _CompleterStream<T> extends Stream<T> { | |
106 // Bit flags used for the value of [_state]. | |
107 /// Flag marking that the source stream has been set. | |
108 static const int _streamFlag = 1; | |
109 /// Flag marking that the stream has been listened to. | |
nweiz
2015/06/16 01:05:23
Nit: separate these with newlines.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Gone.
| |
110 static const int _listenerFlag = 2; | |
111 | |
112 // States used as values for _state. | |
113 /// Initial state with no listener and no source stream set. | |
114 static const int _initial = 0; | |
115 /// State where only the stream has been set. | |
116 static const int _streamOnly = _streamFlag; | |
117 /// State where there is only a listener. | |
118 static const int _listenerOnly = _listenerFlag; | |
119 /// State where source stream and listener have been linked. | |
120 static const int _linked = _streamFlag | _listenerFlag; | |
121 | |
122 /// The current state. | |
123 /// | |
124 /// One of [_initial], [_streamOnly], [_listenerOnly], | |
125 /// or [_linked]. | |
126 int _state = _initial; | |
127 | |
128 /// Shared variable used to store different values depending on the state. | |
129 /// | |
130 /// In the [_streamOnly] state it contains the source stream. | |
131 /// In the [_listenerOnly] state it contains a delegating subscription | |
132 /// controller. | |
133 /// In the [_linked] and [_initial] states it contains `null`. | |
134 /// | |
135 /// Do not access this field directly, | |
136 /// instead use [_stream] or [_controller] to read the value | |
137 /// appropriate for the current state. | |
138 var _stateData; | |
139 | |
140 /// Returns source stream that has been set. | |
141 /// | |
142 /// Must only be used when the source stream has been set, | |
143 /// but the stream has not been listened to yet (state is [_streamOnly]); | |
nweiz
2015/06/16 01:05:23
";" -> "."
| |
144 Stream get _stream { | |
145 assert(_state == _streamOnly); | |
146 return _stateData; | |
147 } | |
148 | |
149 /// Returns the mutable subscription controller with the subscription | |
150 /// on this stream | |
151 /// | |
152 /// Must only be used when the stream has been listened to, | |
153 /// but the source stream has not been set to yet (state is [_listenerOnly]); | |
nweiz
2015/06/16 01:05:23
";" -> "."
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Acknowledged.
| |
154 MutableDelegatingStreamSubscriptionController get _controller { | |
155 assert(_state == _listenerOnly); | |
156 return _stateData; | |
157 } | |
158 | |
159 // State transition functions. | |
160 | |
161 /// Sets the source stream, and enters the [_streamOnly] state. | |
162 /// | |
163 /// Must only be called from the initial state, before this stream has | |
164 /// been listened to. | |
165 void _setStream(Stream stream) { | |
166 assert(_state == _initial); | |
167 _stateData = stream; | |
168 _state = _streamOnly; | |
169 } | |
170 | |
171 /// Sets the listener subscription, and enters the [_listenerOnly] state. | |
172 /// | |
173 /// Must only be called from the initial state, before the source stream has | |
174 /// been set. | |
175 void _setListened(MutableDelegatingStreamSubscriptionController subscription) { | |
nweiz
2015/06/16 01:05:23
Long line.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Class really needs a better name.
I think I'll rem
| |
176 assert(_state == _initial); | |
177 _stateData = subscription; | |
178 _state = _listenerOnly; | |
179 } | |
180 | |
181 /// Marks the listener and source stream as linked. | |
182 /// | |
183 /// Enters the [_isLinked] state. | |
184 /// This must be called only from either the [_streamOnly] | |
185 /// or the [_listenerOnly] state after setting up the link. | |
186 void _setLinked() { | |
187 assert(_state == _streamOnly || _state == _listenerOnly); | |
188 _state = _linked; | |
189 _stateData = null; | |
190 } | |
191 | |
192 // end state transition functions. | |
193 | |
194 /// Called by the controller when the user supplies a stream | |
195 void _linkStream(Stream stream) { | |
196 if (_state == _listenerOnly) { | |
197 var subscription = _controller.sourceSubscription; | |
198 if (subscription is _CompleterSubscriptionState) { | |
199 _controller.sourceSubscription = subscription._linkStream(stream); | |
200 } else { | |
201 assert(subscription is _CanceledSubscription); | |
202 } | |
203 _setLinked(); | |
204 } else if (_state == _initial) { | |
205 _setStream(stream); | |
206 } else { | |
207 throw new StateError("Stream already linked."); | |
208 } | |
209 } | |
210 | |
211 StreamSubscription listen(void onData(T event), | |
212 {Function onError, | |
213 void onDone(), | |
214 bool cancelOnError}) { | |
215 if (_state == _initial) { | |
216 if (cancelOnError == null) cancelOnError = false; | |
217 var controller = | |
218 new MutableDelegatingStreamSubscriptionController<T>(null); | |
219 controller.sourceSubscription = | |
nweiz
2015/06/16 01:05:23
Why are you assigning this rather than passing it
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Because of the cyclic dependency, the state object
| |
220 new _CompleterSubscriptionState(cancelOnError, controller); | |
221 var subscription = controller.subscription; | |
222 subscription.onData(onData); | |
223 subscription.onError(onError); | |
224 subscription.onDone(onDone); | |
225 _setListened(controller); | |
226 return subscription; | |
227 } | |
228 if (_state == _streamOnly) { | |
nweiz
2015/06/16 01:05:23
Nit: "else if", for compactness and to match other
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Then I think I should also put the final throw in
| |
229 var result = _stream.listen(onData, onError: onError, onDone: onDone, | |
230 cancelOnError: cancelOnError); | |
231 _setLinked(); | |
232 return result; | |
233 } | |
234 throw new StateError("Stream has already been listened to."); | |
235 } | |
236 } | |
237 | |
238 /// Holds subscription callbacks and state for a [_CompleterSubscription] | |
239 /// until the real subscription is available. | |
240 /// | |
241 /// Always used in a [MutableDelegatingStreamSubscriptionController]. | |
242 class _CompleterSubscriptionState implements StreamSubscription { | |
243 /// The mutable subscription wrapper. | |
244 /// | |
245 /// Used for handling [pause] with resume futures and [asFuture] which | |
246 /// both need to call a different function (`resume` and `cancel` | |
247 /// respectively) at a later point. We let those go through the | |
248 /// wrapper subscription to ensure that they are forwarded to the | |
249 /// controller source subscription that is current at that time. | |
250 final MutableDelegatingStreamSubscriptionController _controller; | |
251 | |
252 /// Whether the subscription cancels on error. | |
253 /// | |
254 /// This is forwarded to the real subscription when that is created. | |
255 final bool _cancelOnError; | |
256 | |
257 // Callbacks forwarded to the real subscription when it's created. | |
258 | |
259 ZoneUnaryCallback _onData; | |
260 Function _onError; | |
261 ZoneCallback _onDone; | |
262 | |
263 /// Future set when cancel is called. | |
264 /// This both marks the subscription as canceled and allows returning | |
nweiz
2015/06/16 01:05:23
Nit: newline above this.
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Acknowledged.
| |
265 /// the same future every time the cancel function is called. | |
266 Future cancelFuture; | |
267 | |
268 /// Count of active pauses. | |
269 /// | |
270 /// When the real subscription is created, it is paused this many times. | |
271 int pauseCount = 0; | |
272 | |
273 _CompleterSubscriptionState(this._cancelOnError, | |
274 this._controller); | |
275 | |
276 void onData(void handleData(data)) { | |
277 _onData = handleData; | |
278 } | |
279 | |
280 void onError(Function handleError) { | |
281 _onError = handleError; | |
282 } | |
283 | |
284 void onDone(void handleDone()) { | |
285 _onDone = handleDone; | |
286 } | |
287 | |
288 void pause([Future resumeFuture]) { | |
289 pauseCount++; | |
290 if (resumeFuture != null) { | |
291 // Go through wrapper subscription in case the real subscription | |
292 // is linked before the future completes. | |
293 resumeFuture.whenComplete(_controller.subscription.resume); | |
294 } | |
295 } | |
296 | |
297 void resume() { | |
298 if (pauseCount > 0) pauseCount--; | |
299 } | |
300 | |
301 Future cancel() { | |
302 var cancelFuture = new Future.value(); | |
nweiz
2015/06/16 01:05:23
Why return a future here? cancel() is allowed to r
Lasse Reichstein Nielsen
2015/06/18 12:10:12
Yes, sadly. That's a consequence of backwards comp
| |
303 // Immediately replace the [_CompleterSubscription._delegate] so | |
304 // we won't be called again. | |
305 // This also releases any, now unused, callbacks we are holding on to. | |
306 _controller.sourceSubscription = new _CanceledSubscription(cancelFuture); | |
307 return cancelFuture; | |
308 } | |
309 | |
310 bool get isPaused { | |
311 return (cancelFuture != null && pauseCount > 0); | |
312 } | |
313 | |
314 Future asFuture([futureValue]) { | |
315 Completer completer = new Completer(); | |
316 _onDone = () { | |
317 completer.complete(futureValue); | |
318 }; | |
319 _onError = (error, StackTrace stackTrace) { | |
320 // Cancel the wrapper subscription in case the real subscription | |
321 // is linked before an error triggers this function. | |
322 _controller.subscription.cancel(); | |
323 completer.completeError(error, stackTrace); | |
324 }; | |
325 return completer.future; | |
326 } | |
327 | |
328 StreamSubscription _linkStream(Stream stream) { | |
329 if (cancelFuture != null) { | |
330 return new _CanceledSubscription(cancelFuture); | |
331 } | |
332 // If not canceled, create the real subscription and make | |
333 // sure it has the requested callbacks, cancelOnErrror flag and | |
334 // number of pauses. | |
335 var subscription = stream.listen(null, cancelOnError: _cancelOnError); | |
336 subscription.onData(_onData); | |
337 subscription.onError(_onError); | |
338 subscription.onDone(_onDone); | |
339 while (pauseCount > 0) { | |
340 subscription.pause(); | |
341 pauseCount--; | |
342 } | |
343 return subscription; | |
344 } | |
345 } | |
346 | |
347 /// A subscription that acts as if it has been canceled. | |
348 /// | |
349 /// No events are fired and pausing is ignored. | |
350 /// The [cancel] method always returns the same future. | |
351 class _CanceledSubscription implements StreamSubscription { | |
352 /// The future returned by [cancel]; | |
353 Future _doneFuture; | |
354 | |
355 _CanceledSubscription(this._doneFuture); | |
356 | |
357 void onData(void handleData(data)) {} | |
358 | |
359 void onError(Function handleError) {} | |
360 | |
361 void onDone(void handleDone()) {} | |
362 | |
363 void pause([Future resumeFuture]) {} | |
364 | |
365 void resume() {} | |
366 | |
367 Future cancel() => _doneFuture; | |
368 | |
369 /// Returns future that never completes. | |
370 /// | |
371 /// The `asFuture` result is completed by either an error event | |
372 /// or a done event. A canceled future never produces either. | |
373 Future asFuture([futureValue]) => new Completer().future; | |
374 | |
375 bool get isPaused => false; | |
376 } | |
OLD | NEW |