Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Side by Side Diff: lib/src/stream_completer.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Address remaining comments. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_completer;
6
7 import "dart:async";
8 import "delegating_stream_subscription.dart";
9
10 /// A single-subscription [stream] where the contents are provided later.
11 ///
12 /// It is generally recommended that you never create a `Future<Stream>`
13 /// because you can just directly create a stream that doesn't do anything
14 /// until it's ready to do so.
15 /// This class can be used to create such a stream.
16 ///
17 /// The [stream] is a normal stream that you can listen to immediately,
18 /// but until either [setSourceStream] or [setEmpty] is called,
19 /// the stream won't produce any events.
20 ///
21 /// The same effect can be achieved by using a [StreamController]
22 /// and adding the stream using `addStream` when both
23 /// the controller's stream is listened to and the source stream is ready.
24 /// This class attempts to shortcut some of the overhead when possible.
25 /// For example, if the [stream] is only listened to
26 /// after the source stream has been set,
27 /// the listen is performed directly on the source stream.
28 class StreamCompleter<T> {
29 /// The stream of this completer.
30 ///
31 /// When a source stream is provided, its events will be forwarded to
32 /// listeners on this stream.
33 ///
34 /// The stream can be listened either before or after a source stream
35 /// is set.
36 final Stream<T> stream = new _CompleterStream<T>();
nweiz 2015/06/16 01:05:23 I should have mentioned this in the last pass (sor
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Ack. Done.
37
38 /// Convert a `Future<Stream>` to a `Stream`.
39 ///
40 /// This creates a stream using a stream completer,
41 /// and sets the source stream to the result of the future when the
42 /// future completes.
43 ///
44 /// If the future completes with an error, the returned stream will
45 /// instead contain just that error.
46 static Stream fromFuture(Future<Stream> streamFuture) {
47 var completer = new StreamCompleter();
48 streamFuture.then(completer.setSourceStream,
49 onError: (e, s) {
nweiz 2015/06/16 01:05:23 "e, s" -> "_"
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Done.
50 completer.setSourceStream(streamFuture.asStream());
51 });
52 return completer.stream;
53 }
54
55 /// Set a stream as the source of events for the [StreamCompleter]'s
56 /// [stream].
57 ///
58 /// The completer's `stream` will act exactly as [sourceStream].
59 ///
60 /// If the source stream is set before [stream] is listened to,
61 /// the listen call on [stream] is forwarded directly to [sourceStream].
62 ///
63 /// If [stream] is listened to before setting the source stream,
64 /// an intermediate subscription is created. It looks like a completely
65 /// normal subscription, and can be paused or canceled, but it won't
66 /// produce any events until a source stream is provided.
67 ///
68 /// If the `stream` subscription is canceled before a source stream is set,
69 /// then nothing further happens. This means that the `sourceStream` may
70 /// never be listened to, even if the [stream] has been listened to.
71 ///
72 /// Otherwise, when the source stream is then set,
73 /// it is immediately listened to.
74 /// If the existing subscription was paused, the source stream subscription
75 /// is paused as many times, and then all events and callbacks are forwarded
nweiz 2015/06/16 01:05:23 It's kind of strange that this forwards the same *
Lasse Reichstein Nielsen 2015/06/18 12:10:12 If you pause three times, you need to resume three
76 /// between the two subscriptions, so the listener works as if it was
77 /// listening to the source stream subscription directly.
78 ///
79 /// Either [setSourceStream] or [setEmpty] may be called at most once.
80 /// Trying to call either of them again will fail.
81 void setSourceStream(Stream<T> sourceStream) {
82 _CompleterStream completerStream = this.stream;
83 completerStream._linkStream(sourceStream);
84 }
85
86 /// Equivalent to setting an empty stream using [setSourceStream].
87 ///
88 /// Either [setSourceStream] or [setEmpty] may be called at most once.
89 /// Trying to call either of them again will fail.
90 void setEmpty() {
91 // TODO(lrn): Optimize this to not actually create the empty stream.
92 _CompleterStream completerStream = this.stream;
93 completerStream._linkStream(new Stream.fromIterable(const []));
94 }
95 }
96
97 /// Stream that acts as a source stream it is (eventually) linked with.
98 ///
99 /// The linked source stream can be set after a user has started listening on
100 /// this stream. No events occur before the source stream is provided.
101 ///
102 /// If a user listens before events are available, the state of the
103 /// subscription is maintained, and the subscription is then linked
104 /// to the source stream when that becomes available.
105 class _CompleterStream<T> extends Stream<T> {
106 // Bit flags used for the value of [_state].
107 /// Flag marking that the source stream has been set.
108 static const int _streamFlag = 1;
109 /// Flag marking that the stream has been listened to.
nweiz 2015/06/16 01:05:23 Nit: separate these with newlines.
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Gone.
110 static const int _listenerFlag = 2;
111
112 // States used as values for _state.
113 /// Initial state with no listener and no source stream set.
114 static const int _initial = 0;
115 /// State where only the stream has been set.
116 static const int _streamOnly = _streamFlag;
117 /// State where there is only a listener.
118 static const int _listenerOnly = _listenerFlag;
119 /// State where source stream and listener have been linked.
120 static const int _linked = _streamFlag | _listenerFlag;
121
122 /// The current state.
123 ///
124 /// One of [_initial], [_streamOnly], [_listenerOnly],
125 /// or [_linked].
126 int _state = _initial;
127
128 /// Shared variable used to store different values depending on the state.
129 ///
130 /// In the [_streamOnly] state it contains the source stream.
131 /// In the [_listenerOnly] state it contains a delegating subscription
132 /// controller.
133 /// In the [_linked] and [_initial] states it contains `null`.
134 ///
135 /// Do not access this field directly,
136 /// instead use [_stream] or [_controller] to read the value
137 /// appropriate for the current state.
138 var _stateData;
139
140 /// Returns source stream that has been set.
141 ///
142 /// Must only be used when the source stream has been set,
143 /// but the stream has not been listened to yet (state is [_streamOnly]);
nweiz 2015/06/16 01:05:23 ";" -> "."
144 Stream get _stream {
145 assert(_state == _streamOnly);
146 return _stateData;
147 }
148
149 /// Returns the mutable subscription controller with the subscription
150 /// on this stream
151 ///
152 /// Must only be used when the stream has been listened to,
153 /// but the source stream has not been set to yet (state is [_listenerOnly]);
nweiz 2015/06/16 01:05:23 ";" -> "."
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Acknowledged.
154 MutableDelegatingStreamSubscriptionController get _controller {
155 assert(_state == _listenerOnly);
156 return _stateData;
157 }
158
159 // State transition functions.
160
161 /// Sets the source stream, and enters the [_streamOnly] state.
162 ///
163 /// Must only be called from the initial state, before this stream has
164 /// been listened to.
165 void _setStream(Stream stream) {
166 assert(_state == _initial);
167 _stateData = stream;
168 _state = _streamOnly;
169 }
170
171 /// Sets the listener subscription, and enters the [_listenerOnly] state.
172 ///
173 /// Must only be called from the initial state, before the source stream has
174 /// been set.
175 void _setListened(MutableDelegatingStreamSubscriptionController subscription) {
nweiz 2015/06/16 01:05:23 Long line.
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Class really needs a better name. I think I'll rem
176 assert(_state == _initial);
177 _stateData = subscription;
178 _state = _listenerOnly;
179 }
180
181 /// Marks the listener and source stream as linked.
182 ///
183 /// Enters the [_isLinked] state.
184 /// This must be called only from either the [_streamOnly]
185 /// or the [_listenerOnly] state after setting up the link.
186 void _setLinked() {
187 assert(_state == _streamOnly || _state == _listenerOnly);
188 _state = _linked;
189 _stateData = null;
190 }
191
192 // end state transition functions.
193
194 /// Called by the controller when the user supplies a stream
195 void _linkStream(Stream stream) {
196 if (_state == _listenerOnly) {
197 var subscription = _controller.sourceSubscription;
198 if (subscription is _CompleterSubscriptionState) {
199 _controller.sourceSubscription = subscription._linkStream(stream);
200 } else {
201 assert(subscription is _CanceledSubscription);
202 }
203 _setLinked();
204 } else if (_state == _initial) {
205 _setStream(stream);
206 } else {
207 throw new StateError("Stream already linked.");
208 }
209 }
210
211 StreamSubscription listen(void onData(T event),
212 {Function onError,
213 void onDone(),
214 bool cancelOnError}) {
215 if (_state == _initial) {
216 if (cancelOnError == null) cancelOnError = false;
217 var controller =
218 new MutableDelegatingStreamSubscriptionController<T>(null);
219 controller.sourceSubscription =
nweiz 2015/06/16 01:05:23 Why are you assigning this rather than passing it
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Because of the cyclic dependency, the state object
220 new _CompleterSubscriptionState(cancelOnError, controller);
221 var subscription = controller.subscription;
222 subscription.onData(onData);
223 subscription.onError(onError);
224 subscription.onDone(onDone);
225 _setListened(controller);
226 return subscription;
227 }
228 if (_state == _streamOnly) {
nweiz 2015/06/16 01:05:23 Nit: "else if", for compactness and to match other
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Then I think I should also put the final throw in
229 var result = _stream.listen(onData, onError: onError, onDone: onDone,
230 cancelOnError: cancelOnError);
231 _setLinked();
232 return result;
233 }
234 throw new StateError("Stream has already been listened to.");
235 }
236 }
237
238 /// Holds subscription callbacks and state for a [_CompleterSubscription]
239 /// until the real subscription is available.
240 ///
241 /// Always used in a [MutableDelegatingStreamSubscriptionController].
242 class _CompleterSubscriptionState implements StreamSubscription {
243 /// The mutable subscription wrapper.
244 ///
245 /// Used for handling [pause] with resume futures and [asFuture] which
246 /// both need to call a different function (`resume` and `cancel`
247 /// respectively) at a later point. We let those go through the
248 /// wrapper subscription to ensure that they are forwarded to the
249 /// controller source subscription that is current at that time.
250 final MutableDelegatingStreamSubscriptionController _controller;
251
252 /// Whether the subscription cancels on error.
253 ///
254 /// This is forwarded to the real subscription when that is created.
255 final bool _cancelOnError;
256
257 // Callbacks forwarded to the real subscription when it's created.
258
259 ZoneUnaryCallback _onData;
260 Function _onError;
261 ZoneCallback _onDone;
262
263 /// Future set when cancel is called.
264 /// This both marks the subscription as canceled and allows returning
nweiz 2015/06/16 01:05:23 Nit: newline above this.
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Acknowledged.
265 /// the same future every time the cancel function is called.
266 Future cancelFuture;
267
268 /// Count of active pauses.
269 ///
270 /// When the real subscription is created, it is paused this many times.
271 int pauseCount = 0;
272
273 _CompleterSubscriptionState(this._cancelOnError,
274 this._controller);
275
276 void onData(void handleData(data)) {
277 _onData = handleData;
278 }
279
280 void onError(Function handleError) {
281 _onError = handleError;
282 }
283
284 void onDone(void handleDone()) {
285 _onDone = handleDone;
286 }
287
288 void pause([Future resumeFuture]) {
289 pauseCount++;
290 if (resumeFuture != null) {
291 // Go through wrapper subscription in case the real subscription
292 // is linked before the future completes.
293 resumeFuture.whenComplete(_controller.subscription.resume);
294 }
295 }
296
297 void resume() {
298 if (pauseCount > 0) pauseCount--;
299 }
300
301 Future cancel() {
302 var cancelFuture = new Future.value();
nweiz 2015/06/16 01:05:23 Why return a future here? cancel() is allowed to r
Lasse Reichstein Nielsen 2015/06/18 12:10:12 Yes, sadly. That's a consequence of backwards comp
303 // Immediately replace the [_CompleterSubscription._delegate] so
304 // we won't be called again.
305 // This also releases any, now unused, callbacks we are holding on to.
306 _controller.sourceSubscription = new _CanceledSubscription(cancelFuture);
307 return cancelFuture;
308 }
309
310 bool get isPaused {
311 return (cancelFuture != null && pauseCount > 0);
312 }
313
314 Future asFuture([futureValue]) {
315 Completer completer = new Completer();
316 _onDone = () {
317 completer.complete(futureValue);
318 };
319 _onError = (error, StackTrace stackTrace) {
320 // Cancel the wrapper subscription in case the real subscription
321 // is linked before an error triggers this function.
322 _controller.subscription.cancel();
323 completer.completeError(error, stackTrace);
324 };
325 return completer.future;
326 }
327
328 StreamSubscription _linkStream(Stream stream) {
329 if (cancelFuture != null) {
330 return new _CanceledSubscription(cancelFuture);
331 }
332 // If not canceled, create the real subscription and make
333 // sure it has the requested callbacks, cancelOnErrror flag and
334 // number of pauses.
335 var subscription = stream.listen(null, cancelOnError: _cancelOnError);
336 subscription.onData(_onData);
337 subscription.onError(_onError);
338 subscription.onDone(_onDone);
339 while (pauseCount > 0) {
340 subscription.pause();
341 pauseCount--;
342 }
343 return subscription;
344 }
345 }
346
347 /// A subscription that acts as if it has been canceled.
348 ///
349 /// No events are fired and pausing is ignored.
350 /// The [cancel] method always returns the same future.
351 class _CanceledSubscription implements StreamSubscription {
352 /// The future returned by [cancel];
353 Future _doneFuture;
354
355 _CanceledSubscription(this._doneFuture);
356
357 void onData(void handleData(data)) {}
358
359 void onError(Function handleError) {}
360
361 void onDone(void handleDone()) {}
362
363 void pause([Future resumeFuture]) {}
364
365 void resume() {}
366
367 Future cancel() => _doneFuture;
368
369 /// Returns future that never completes.
370 ///
371 /// The `asFuture` result is completed by either an error event
372 /// or a done event. A canceled future never produces either.
373 Future asFuture([futureValue]) => new Completer().future;
374
375 bool get isPaused => false;
376 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698