OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
nweiz
2015/06/16 01:05:23
As part of moving a bunch of existing async utilit
Lasse Reichstein Nielsen
2015/06/16 13:05:45
True. And if I was just using a controller this co
nweiz
2015/06/16 22:34:11
Here's a (rough untested) sketch of what it would
Lasse Reichstein Nielsen
2015/06/17 11:08:29
And, to make it even more obvious that this is a g
nweiz
2015/06/17 22:58:45
Oh dang, I thought I published that CL yesterday,
| |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library async.streams.stream_completer; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 /// A [stream] where the contents aren't known at creation time. | |
10 /// | |
11 /// It is generally recommended that you never create a `Future<Stream>` | |
12 /// because you can just use directly create a stream that doesn't do anything | |
nweiz
2015/06/12 01:24:23
"use directly" -> "directly"
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
| |
13 /// until it's ready to do so. | |
14 /// This class can be used to create such a stream. | |
15 /// | |
16 /// The [stream] is a normal stream that you can listen to immediately, | |
17 /// but until [setSourceStream] is called, the stream won't produce | |
18 /// any events. | |
19 /// | |
20 /// The same effect can be achieved by using a [StreamController] | |
21 /// and adding the stream using `addStream` when both | |
22 /// the controller's stream is listened to and the source stream is ready. | |
23 /// This class attempts to shortcut some of the overhead when possible. | |
24 /// For example, if the [stream] is only listened to | |
25 /// after the source stream has been set, | |
26 /// the listen is performed directly on the source stream. | |
27 class StreamCompleter<T> { | |
Søren Gjesse
2015/06/11 07:57:06
Maybe add a constructor with an optional onListen
nweiz
2015/06/12 01:24:23
Consider adding a constructor so that the user can
Lasse Reichstein Nielsen
2015/06/12 13:04:19
I think I'd use a completely different class for a
Lasse Reichstein Nielsen
2015/06/12 13:04:20
You are thinking the case where you don't even wan
| |
28 final Stream<T> stream = new _PromiseStream<T>(); | |
nweiz
2015/06/12 01:24:22
Nit: document this
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
| |
29 | |
nweiz
2015/06/12 01:24:23
Suggestion: add "static Stream fromFuture(Future<S
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Good idea. Done.
| |
30 /// Set a stream as the source of events for the [StreamCompleter]. | |
31 /// | |
32 /// There is no guarantee that the stream will ever be listened to. | |
nweiz
2015/06/12 01:24:23
It would be nice to go further and fully explain h
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
| |
33 void setSourceStream(Stream<T> stream) { | |
34 _PromiseStream promiseStream = this.stream; | |
35 promiseStream._linkStream(stream); | |
36 } | |
37 | |
38 /// As setting an empty stream using [setSourceStream]. | |
39 void setEmpty() { | |
nweiz
2015/06/12 01:24:23
Document that this is mutually exclusive with [set
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
| |
40 // TODO(lrn): Optimize this to not actually create the empty stream. | |
41 _PromiseStream promiseStream = this.stream; | |
42 promiseStream._linkStream(new Stream.fromIterable(const [])); | |
43 } | |
44 } | |
45 | |
46 /// Stream that acts as a source stream it is (eventually) linked with. | |
47 /// | |
48 /// The linked source stream can be set after a user has started listening on | |
49 /// this stream. No events occur before the source stream is provided. | |
50 /// | |
51 /// If a user listens before events are available, the state of the | |
52 /// subscription is maintained, and the subscription is then linked | |
53 /// to the source stream when that becomes available. | |
54 class _PromiseStream<T> extends Stream<T> { | |
nweiz
2015/06/12 01:24:23
Even though it's not public, it would be nice to h
Lasse Reichstein Nielsen
2015/06/12 13:04:19
True. I renamed the public class at some point, so
| |
55 static const int _UNINITIALIZED = 0; | |
56 static const int _STREAM_SET = 1; | |
57 static const int _LISTENED = 2; | |
nweiz
2015/06/12 01:24:22
Document the semantics of each of these variables.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done
| |
58 | |
59 int _state = _UNINITIALIZED; | |
nweiz
2015/06/12 01:24:23
Document that this is a bitfield. At first glance,
Lasse Reichstein Nielsen
2015/06/12 13:04:20
This is no longer used as a bit field. I have cons
| |
60 var _streamOrSubscription; | |
nweiz
2015/06/12 01:24:22
This really seems like it should be two separate f
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I think the way I have (now) encapsulated it shoul
nweiz
2015/06/16 01:05:23
It's definitely better, but it's still a lot of ex
Lasse Reichstein Nielsen
2015/06/16 13:05:44
True. I think I'll try rewriting it without the st
| |
61 | |
62 void _linkStream(Stream stream) { | |
nweiz
2015/06/12 01:24:23
It would be really nice to have all these private
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
| |
63 if (_state == _UNINITIALIZED) { | |
64 _streamOrSubscription = stream; | |
65 _state |= _STREAM_SET; | |
66 } else if (_state == _LISTENED) { | |
67 _PromiseSubscription promiseSubscription = _streamOrSubscription; | |
68 promiseSubscription._linkStream(stream); | |
69 _state |= _STREAM_SET; | |
70 } else { | |
71 throw new StateError("Stream already linked."); | |
72 } | |
73 } | |
74 | |
75 StreamSubscription listen(void onData(T event), | |
76 {Function onError, | |
77 void onDone(), | |
78 bool cancelOnError}) { | |
79 int state = _state; | |
nweiz
2015/06/12 01:24:22
Nit: don't type-annotate local variables. There ar
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Yeah, about that.
I try to use more "var", but I
nweiz
2015/06/16 01:05:22
As Bob likes to say, code is for people to read it
Lasse Reichstein Nielsen
2015/06/16 13:05:45
True.
It's mainly for myself. I like to see the t
| |
80 _state = state | _LISTENED; | |
81 if (state == _UNINITIALIZED) { | |
Søren Gjesse
2015/06/11 07:57:05
Why not just check (_state & _STREAM_SET) == 0? Th
Lasse Reichstein Nielsen
2015/06/12 13:04:19
I still need to read the state before setting it,
| |
82 _PromiseSubscription subscription = | |
83 new _PromiseSubscription<T>(true == cancelOnError); | |
nweiz
2015/06/12 01:24:23
"true == cancelOnError" is really hard to read. It
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Now forwards the parameter all the way to the fina
| |
84 subscription.onData(onData); | |
85 subscription.onError(onError); | |
86 subscription.onDone(onDone); | |
87 _streamOrSubscription = subscription; | |
88 return subscription; | |
89 } | |
90 if (state == _STREAM_SET) { | |
91 Stream stream = _streamOrSubscription; | |
92 return stream.listen(onData, onError: onError, onDone: onDone, | |
93 cancelOnError: cancelOnError); | |
94 } | |
95 print(state); | |
Søren Gjesse
2015/06/11 07:57:06
Debug print.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Acknowledged.
| |
96 throw new StateError("Stream has already been listened to."); | |
97 } | |
98 } | |
99 | |
100 /// Subscription for a [_PromiseStream] that is listened to but has no data. | |
nweiz
2015/06/12 01:24:23
"has no data" is a little misleading, since this w
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I have reworded it. It's a little constrained by w
| |
101 /// | |
102 /// Maintains the state of a stream subscription that hasn't received any | |
103 /// events until events are available, then it starts forwarding to another | |
nweiz
2015/06/12 01:24:22
"events are available" has the same issue as above
Lasse Reichstein Nielsen
2015/06/12 13:04:20
reworded.
| |
104 /// subscription. | |
105 class _PromiseSubscription<T> implements StreamSubscription<T> { | |
106 // State values | |
107 // The subscription is in one of three distinct states: | |
108 // - Initial (remembers whether it's cancelOnError and paused). | |
109 // - Cancelled (before being linked). | |
110 // - Linked (before being cancelled). | |
111 static const int _INIT = 0; | |
112 static const int _INIT_CANCEL_ON_ERROR = 1; | |
113 static const int _CANCELLED = 2; // Exclusive. | |
114 static const int _LINKED = 3; // Exclusive | |
115 static const int _PAUSE = 4; // Used with _INIT or _INIT_CANCEL_ON_ERROR. | |
nweiz
2015/06/12 01:24:23
Similarly to the above, it would be a lot clearer
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I have moved the entire state into a separate obje
| |
116 | |
117 /// State represents the status of the subscription until the | |
118 /// real subscription becomes available. | |
119 /// | |
120 /// While `_state` is not `_LINKED` or `_CANCELED`, `_stateData` contains | |
121 /// a list of length three with the data, error and done handlers that | |
122 /// have been set. | |
123 /// | |
124 /// While `_state` is [_LINKED], [_stateData] contains the real | |
125 /// stream subscription. | |
126 /// | |
127 /// When `_state` is `_CANCELED`, `_stateData` is cleared since the | |
128 /// event handlers won't be needed anyway. | |
129 int _state; | |
130 var _stateData = new List(3); | |
Søren Gjesse
2015/06/11 07:57:05
Why use a list with three members instead of three
nweiz
2015/06/12 01:24:23
+1
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Space! Saving of!
I started writing it as a real o
| |
131 | |
132 _PromiseSubscription(bool cancelOnError) | |
133 : _state = (cancelOnError ? _INIT_CANCEL_ON_ERROR : _INIT); | |
134 | |
135 bool get _isLinked => _state == _LINKED; | |
Søren Gjesse
2015/06/11 07:57:06
If you relay want to use a bit-field why not ave g
Lasse Reichstein Nielsen
2015/06/12 13:04:21
It's not always a bit field.
It's really a combo:
| |
136 bool get _isCancelled => _state == _CANCELLED; | |
137 bool get _isInitial => (_state & (_PAUSE - 1)) <= _INIT_CANCEL_ON_ERROR; | |
138 | |
139 void _linkStream(Stream stream) { | |
140 if (_isLinked) { | |
141 throw new StateError("Already linked to a stream."); | |
142 } | |
143 if (_isCancelled) { | |
Søren Gjesse
2015/06/11 07:57:06
Shouldn't we listen on and cancel the stream in th
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Probably not.
For one thing, it's unnecessary ove
nweiz
2015/06/16 01:05:23
I agree with Søren that the default should be to s
Lasse Reichstein Nielsen
2015/06/16 13:05:45
Ok. The only problem with that is that listen+canc
| |
144 return; | |
145 } | |
146 bool cancelOnError = (_state & _INIT_CANCEL_ON_ERROR) != 0; | |
147 StreamSubscription subscription = | |
148 stream.listen(null, cancelOnError: cancelOnError); | |
149 List handlers = _stateData; | |
Søren Gjesse
2015/06/11 07:57:05
Why this 'cast' - documentation?
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Because I like it. It's not obvious what the type
| |
150 subscription.onData(handlers[0]); | |
151 subscription.onError(handlers[1]); | |
152 subscription.onDone(handlers[2]); | |
153 int state = _state; | |
154 _subscription = subscription; | |
155 while (state >= _PAUSE) { | |
156 subscription.pause(); | |
157 state -= _PAUSE; | |
nweiz
2015/06/12 01:24:22
I have no idea what's going on here :-/. If you're
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Rewritten completely (twice), and there is no fanc
| |
158 } | |
159 } | |
160 | |
161 List get _handlers { | |
162 assert(_isInitial); | |
163 return _stateData; | |
164 } | |
165 | |
166 StreamSubscription get _subscription { | |
167 assert(_isLinked); | |
168 return _stateData; | |
169 } | |
170 | |
171 // Sets state to linked. | |
172 void set _subscription(StreamSubscription subscription) { | |
nweiz
2015/06/12 01:24:22
Since this is only called once, I think it would b
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Now a function named setListened that does the sta
| |
173 assert(_isInitial); | |
174 _stateData = subscription; | |
175 _state = _LINKED; | |
176 } | |
177 | |
178 void onData(void handleData(T data)) { | |
179 if (_isLinked) { | |
180 _subscription.onData(handleData); | |
181 } else { | |
182 assert(_isInitial); | |
183 _handlers[0] = handleData; | |
184 } | |
185 } | |
186 | |
187 void onError(void handleError(error, StackTrace stackTrace)) { | |
188 if (_isLinked) { | |
189 _subscription.onError(handleError); | |
190 } else { | |
191 assert(_isInitial); | |
192 _handlers[1] = handleError; | |
193 } | |
194 } | |
195 | |
196 void onDone(void handleDone()) { | |
197 if (_isLinked) { | |
198 _subscription.onDone(handleDone); | |
199 } else { | |
200 assert(_isInitial); | |
201 _handlers[2] = handleDone; | |
202 } | |
203 } | |
204 | |
205 void pause([Future resumeFuture]) { | |
206 if (_isLinked) { | |
207 _subscription.pause(resumeFuture); | |
208 } else if (!_isCancelled) { | |
Søren Gjesse
2015/06/11 07:57:05
assert that it is not already paused (or use |).
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Pauses are cumulative, it can be paused more than
| |
209 _state += _PAUSE; | |
210 if (resumeFuture != null) { | |
211 resumeFuture.whenComplete(this.resume); | |
212 } | |
213 } | |
214 } | |
215 | |
216 void resume() { | |
217 if (_isLinked) { | |
218 _subscription.resume(); | |
219 } else if (_state >= _PAUSE) { | |
Søren Gjesse
2015/06/11 07:57:05
Just check the bit instead of using >=.
Lasse Reichstein Nielsen
2015/06/12 13:04:20
It can be paused more than once, so the bit will o
| |
220 _state -= _PAUSE; | |
221 } | |
222 } | |
223 | |
224 Future cancel() { | |
Søren Gjesse
2015/06/11 07:57:06
What is the semantics of calling cancel several ti
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I'm not sure it's specified, but that's what we us
| |
225 if (_isLinked) { | |
226 return _subscription.cancel(); | |
227 } else { | |
228 _stateData = null; | |
229 _state = _CANCELLED; | |
230 return new Future.value(); | |
231 } | |
232 } | |
233 | |
234 Future asFuture([futureValue]) { | |
235 if (_isLinked) { | |
236 return _subscription.asFuture(futureValue); | |
237 } | |
238 Completer completer = new Completer(); | |
239 if (!_isCancelled) { | |
nweiz
2015/06/12 01:24:23
Nit: Short-circuit if it is cancelled to save some
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Rewritten.
| |
240 // Asking for a future of a cancelled subscription gives a future | |
241 // which never completes. | |
242 _handlers[1] = _cancelBeforeError(completer.completeError); | |
243 if (futureValue == null) { | |
244 _handlers[2] = completer.complete; | |
245 } else { | |
246 _handlers[2] = () { completer.complete(futureValue); }; | |
nweiz
2015/06/12 01:24:23
Nit: Either use "=>" or make this multiple lines.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
I skipped the special casing of null completely (a
| |
247 } | |
248 } | |
249 return completer.future; | |
250 } | |
251 | |
252 bool get isPaused { | |
253 if (_isLinked) { | |
254 return _subscription.isPaused; | |
255 } | |
256 return _state >= _PAUSE; | |
257 } | |
258 | |
259 /// Helper function used by [asFuture]. | |
260 /// | |
261 /// Returns an error handler which cancels the stream when it receives an | |
262 /// error. | |
263 Function _cancelBeforeError(Function handleError) { | |
264 return (e, s) { | |
nweiz
2015/06/12 01:24:23
Nit: use full words for variables.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
| |
265 cancel(); | |
266 if (handleError is _BinaryCallback) { | |
267 handleError(e, s); | |
268 } else { | |
269 handleError(e); | |
270 } | |
271 }; | |
272 } | |
273 } | |
274 | |
275 typedef _BinaryCallback(a, b); | |
Søren Gjesse
2015/06/11 07:57:06
We don't have this typedef publid in and dart: lib
Lasse Reichstein Nielsen
2015/06/12 13:04:19
We do: ZoneBinaryCallback in dart:async.
Also not
| |
OLD | NEW |