OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library async.streams.stream_events; | |
6 | |
7 import 'dart:async'; | |
8 import 'dart:collection'; | |
9 | |
10 import "subscription_stream.dart"; | |
11 import "stream_completer.dart"; | |
12 | |
13 /// An asynchronous pull-based interface for accessing stream events. | |
14 /// | |
15 /// Wraps a stream and makes individual events available on request. | |
16 /// | |
17 /// The individual requests are served in the order they are requested. | |
nweiz
2015/06/12 01:24:25
Explain how this is different from a [StreamIterat
Lasse Reichstein Nielsen
2015/06/17 11:08:29
Done.
| |
18 /// | |
19 /// Example: | |
20 /// | |
21 /// var events = new StreamEvents<String>(someStreamOfLines); | |
22 /// var first = await events.next; | |
23 /// while (first.startsWith('#')) { | |
Søren Gjesse
2015/06/11 07:57:06
Maybe add an isDone check in this sample.
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Hmm. Not good for the flow, but I guess I can try
| |
24 /// // Skip comments. | |
25 /// first = await events.next; | |
26 /// } | |
27 /// | |
28 /// if (first.startsWith(MAGIC_MARKER)) { | |
29 /// var headerCount = | |
30 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); | |
31 /// handleMessage(headers: await events.take(headerCount), | |
32 /// body: events.rest); | |
33 /// return; | |
34 /// } | |
35 /// // Error handling. | |
36 /// | |
37 class StreamEvents<T> { | |
nweiz
2015/06/12 01:24:26
Plural class names always seem weird to me. What d
Lasse Reichstein Nielsen
2015/06/12 13:04:22
It's not a stream, so "PullStream" won't work.
The
nweiz
2015/06/16 01:05:23
Bob suggested "StreamPump", using the water analog
Lasse Reichstein Nielsen
2015/06/16 13:05:45
I'm not a great fan of analogies in nameing - it o
nweiz
2015/06/16 22:34:11
All names in programming are analogies at some lev
Lasse Reichstein Nielsen
2015/06/17 11:08:29
I'm trying out StreamQueue now. It is slightly irk
| |
38 /// In the initial state, the stream has not been listened to yet. | |
nweiz
2015/06/12 01:24:25
Nit: The first paragraph of a doc comment should b
| |
39 /// It will be listened to when the first event is requested. | |
40 /// The `stateData` field holds the stream and the request queue is empty. | |
41 static const int _INITIAL = 0; | |
nweiz
2015/06/12 01:24:25
Nit: since these fields aren't right up next to on
Lasse Reichstein Nielsen
2015/06/12 13:04:22
In my editor it still works very well to show the
| |
42 | |
43 /// Listening on the stream. | |
44 /// If the request queue is empty and the subscription isn't done, | |
45 /// the subscription is paused. | |
46 /// The `stateData` field holds the subscription. | |
47 static const int _LISTENING = 1; | |
48 | |
49 /// The stream has completed. | |
50 /// The `stateData` field is `null` and the request queue is empty. | |
51 static const int _DONE = 2; | |
52 | |
Søren Gjesse
2015/06/11 07:57:06
Maybe put an additional comment that 4 is reserved
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Will do.
| |
53 /// Flag set when [close] is called. | |
54 /// The `StreamEvents` is closed and no further events can be requested. | |
55 /// The last request in the queue, if any, | |
56 /// is completed and is witing to clean up. | |
nweiz
2015/06/12 01:24:24
"witing" -> "waiting"
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
| |
57 static const int _CLOSED = 8; | |
58 | |
59 /// Flag set when [rest] is called. | |
60 /// Only used for error reporting, otherwise equivalent to [_CLOSED]. | |
61 static const int _CLOSED_REST = 12; | |
62 | |
63 /// Current state. | |
64 /// | |
65 /// Use getters below to check if the state is [_isListening] or [_isDone], | |
66 /// and whether the stream events object [_isClosed]. | |
67 int _state = _INITIAL; | |
68 | |
69 /// Value depending on state. Use getters below to get the value and assert | |
70 /// the expected state. | |
71 var _stateData; | |
72 | |
73 /// Queue of pending requests while state is [_LISTENING]. | |
74 /// Access through methods below to ensure consistency. | |
75 Queue<_EventRequest> _requestQueue = new Queue(); | |
nweiz
2015/06/12 01:24:25
Make this final.
Personally, I prefer to omit the
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Made final.
I prefer type annotations on all field
| |
76 | |
77 StreamEvents(Stream source) : _stateData = source; | |
78 | |
79 bool get _isListening => (_state & _LISTENING) != 0; | |
80 bool get _isClosed => (_state & _CLOSED) != 0; | |
81 bool get _isDone => (_state & _DONE) != 0; | |
82 | |
83 /// Whether the underlying stream is spent. | |
nweiz
2015/06/12 01:24:24
"spent" -> "done"
Just to use consistent terminol
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
| |
84 /// | |
85 /// This may return true before all events have been delivered. | |
86 /// Requesting a new event when [isDone] returns true, | |
87 /// for example using [next], will always fail. | |
88 bool get isDone => _isDone; | |
nweiz
2015/06/12 01:24:25
What's the use case for exposing this to the user?
Lasse Reichstein Nielsen
2015/06/12 13:04:23
It might be a little speculative because it doesn'
nweiz
2015/06/16 01:05:23
The version of this I wrote for ScheduledTest has
Lasse Reichstein Nielsen
2015/06/16 13:05:45
It doesn't solve the problem of what to do with th
nweiz
2015/06/16 22:34:11
It's unlikely that someone will call [hasNext] fol
| |
89 | |
90 /// Return the stream subscription while state is listening. | |
nweiz
2015/06/12 01:24:25
Nit: use present tense (e.g. "Returns" rather than
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
| |
91 StreamSubscription get _subscription { | |
92 assert(_isListening); | |
93 return _stateData; | |
94 } | |
95 | |
96 /// Return the source stream while state is initial. | |
97 Stream get _sourceStream { | |
98 assert(!_isListening); | |
99 assert(!_isDone); | |
100 return _stateData; | |
101 } | |
102 | |
103 // Set the subscription and transition to listening state. | |
104 void set _subscription(StreamSubscription subscription) { | |
105 assert(!_isListening); | |
106 assert(!_isDone); | |
107 _stateData = subscription; | |
108 _state |= _LISTENING; | |
109 } | |
110 | |
111 void _setDone() { | |
112 assert(!_isDone); | |
113 _state = (_state & _CLOSED_REST) | _DONE; | |
114 _stateData = null; | |
115 } | |
116 | |
117 /// Request the next (yet unrequested) event from the stream. | |
nweiz
2015/06/12 01:24:26
Clarify in this documentation that it's valid to h
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
| |
118 /// | |
119 /// When the requested event arrives, the returned future is completed with | |
120 /// the event. This is independent of whether the event is a data event or | |
121 /// an error event. | |
nweiz
2015/06/12 01:24:26
I'd write this as: "If the event is a value event,
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
| |
122 /// | |
123 /// If the stream closed before an event arrives, the future is completed | |
nweiz
2015/06/12 01:24:24
"closed" -> "closes", "is completed" -> "completes
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
| |
124 /// with a [StateError]. | |
125 Future<T> get next { | |
126 if (!_isClosed) { | |
127 Completer completer = new Completer<T>(); | |
128 _addRequest(new _NextRequest(completer)); | |
129 return completer.future; | |
130 } | |
131 throw _failClosed(); | |
132 } | |
133 | |
134 /// Request a stream of all the remaning events of the source stream. | |
nweiz
2015/06/12 01:24:25
"Request" -> "Returns"
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
| |
135 /// | |
136 /// All requested [next], [skip] or [take] operations are completed | |
137 /// first, and then any remaining events are provided as events of | |
138 /// the returned stream. | |
139 /// | |
140 /// Using `rest` closes the stream events object. After getting the | |
141 /// `rest` it is no longer allowed to request other events, like | |
nweiz
2015/06/12 01:24:25
"it is no longer allowed to" -> "the caller may no
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
| |
142 /// after calling [close]. | |
143 Stream<T> get rest { | |
144 if (!_isClosed) { | |
nweiz
2015/06/12 01:24:25
Nit: Short-circuit if it's closed to save on inden
Lasse Reichstein Nielsen
2015/06/12 13:04:22
ACK.
This is not going to be in any inner loops, s
| |
145 _state |= _CLOSED_REST; | |
146 if (_isListening) { | |
147 // We have an active subscription that we want to take over. | |
148 var delayStream = new StreamCompleter<T>(); | |
149 _addRequest(new _RestRequest<T>(delayStream, this)); | |
150 return delayStream.stream; | |
151 } | |
152 assert(_requestQueue.isEmpty); | |
153 if (isDone) { | |
154 // TODO(lrn): Add Stream.empty() constructor. | |
155 return new Stream<T>.fromIterable(const []); | |
156 } | |
157 // We have never listened the source stream, so just return that directly. | |
nweiz
2015/06/12 01:24:26
"listened to"
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
| |
158 Stream result = _sourceStream; | |
159 _setDone(); | |
160 return result; | |
161 } | |
162 throw _failClosed(); | |
163 } | |
164 | |
165 /// Requests to skip the next [count] *data* events. | |
nweiz
2015/06/12 01:24:24
"Requests to skip" -> "Skips"
| |
166 /// | |
167 /// The [count] value must be non-negative. | |
168 /// | |
169 /// When successful, this is equivalent to using [take] | |
170 /// and ignoring the result. | |
171 /// | |
172 /// If an error occurs before `count` data events has | |
nweiz
2015/06/12 01:24:25
"has" -> "have"
| |
173 /// been skipped, the returned future completes with | |
174 /// that error instead. | |
175 /// | |
176 /// If the stream closes before `count` data events, | |
177 /// the remaining unskipped event count is returned. | |
178 /// If the returned future completes with the integer `0`, | |
179 /// then all events were succssfully skipped. If the value | |
180 /// is greater than zero then the stream ended early. | |
181 Future<int> skip(int count) { | |
182 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
183 if (!_isClosed) { | |
184 Completer completer = new Completer<int>(); | |
185 _addRequest(new _SkipRequest(completer, count)); | |
186 return completer.future; | |
187 } | |
188 throw _failClosed(); | |
189 } | |
190 | |
191 /// Requests the next [count] data events as a list. | |
192 /// | |
193 /// The [count] value must be non-negative. | |
nweiz
2015/06/12 01:24:25
"The [count] value" -> "[count]"
Lasse Reichstein Nielsen
2015/06/12 13:04:21
-> "The [count]".
Otherwise it would start the sen
| |
194 /// | |
195 /// Equivalent to calling [next] `count` times and | |
196 /// storing the data values in a list. | |
197 /// | |
198 /// If an error occurs before `count` data events has | |
199 /// been collected, the returned future completes with | |
200 /// that error instead. | |
201 /// | |
202 /// If the stream closes before `count` data events, | |
203 /// the returned future completes with the list | |
204 /// of data collected so far. That is, the returned | |
205 /// list may have fewer than [count] elements. | |
206 Future<List<T>> take(int count) { | |
207 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
208 if (!_isClosed) { | |
209 Completer completer = new Completer<List<T>>(); | |
210 _addRequest(new _TakeRequest(completer, count)); | |
211 return completer.future; | |
212 } | |
213 throw _failClosed(); | |
214 } | |
215 | |
216 /// Release the underlying stream subscription. | |
nweiz
2015/06/12 01:24:26
"Release" -> "Cancels"
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
| |
217 /// | |
218 /// The close operation waits until all previously requested | |
219 /// events have been processed, then it cancels the subscription | |
220 /// providing the events. | |
221 /// | |
222 /// The returned future completes with the result of calling | |
223 /// `cancel`. | |
224 /// | |
225 /// After calling `close`, no further events can be requested. | |
226 /// None of [next], [rest], [skip], [take] or [close] may be | |
227 /// called again. | |
228 Future close() { | |
nweiz
2015/06/12 01:24:24
Consider calling this [cancel], to match [StreamSu
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Good idea!
| |
229 if (!_isClosed) { | |
230 _state |= _CLOSED; | |
231 if (!_isListening) { | |
232 assert(_requestQueue.isEmpty); | |
nweiz
2015/06/12 01:24:24
Add a comment explaining why the request queue wil
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
| |
233 if (!_isDone) _setDone(); | |
234 return new Future.value(); | |
235 } | |
236 Completer completer = new Completer(); | |
237 _addRequest(new _CloseRequest(completer, this)); | |
238 return completer.future; | |
239 } | |
240 throw _failClosed(); | |
nweiz
2015/06/12 01:24:26
Everything in the core libraries that's closable a
Lasse Reichstein Nielsen
2015/06/12 13:04:22
True.
One reason why I didn't allow that was that
| |
241 } | |
242 | |
243 /// Reused error message. | |
nweiz
2015/06/12 01:24:24
Expand on this.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
| |
244 Error _failClosed() { | |
245 String cause = | |
246 ((_state & _CLOSED_REST) == _CLOSED_REST) ? "rest" : "close"; | |
247 return new StateError("Already closed by a call to $cause"); | |
248 } | |
249 | |
250 /// Called when requesting an event when the requeust queue is empty. | |
251 /// The underlying subscription is paused in that case, except the very | |
252 /// first time when the subscription needs to be created. | |
253 void _listen() { | |
nweiz
2015/06/12 01:24:24
Methods like this and [_pause] that are only calle
Lasse Reichstein Nielsen
2015/06/12 13:04:23
Done.
| |
254 if (_isListening) { | |
255 _subscription.resume(); | |
256 } else if (!_isDone) { | |
257 _subscription = | |
258 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | |
259 } | |
260 } | |
261 | |
262 /// Pauses the underlying subscription. | |
263 /// Called when the request queue is empty and the subscription isn't closed. | |
264 void _pause() { | |
265 assert(_isListening); | |
266 _subscription.pause(); | |
267 } | |
268 | |
269 // Callbacks receiving the events of the source stream. | |
nweiz
2015/06/12 01:24:25
Nit: If this is referring to a group of methods, a
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
| |
270 void _onData(T data) { | |
271 assert(_requestQueue.isNotEmpty); | |
272 _EventRequest action = _nextAction; | |
273 action.add(data); | |
274 _checkCompleted(); | |
275 } | |
276 | |
277 void _onError(error, StackTrace stack) { | |
278 assert(_requestQueue.isNotEmpty); | |
279 _EventRequest action = _nextAction; | |
280 action.addError(error, stack); | |
281 _checkCompleted(); | |
282 } | |
283 | |
284 void _onDone() { | |
285 _setDone(); | |
286 _closeAllRequests(); | |
287 } | |
288 | |
289 // Request queue management. | |
290 | |
291 /// Get the next action in the queue, but don't remove it. | |
292 _EventRequest get _nextAction { | |
293 assert(_requestQueue.isNotEmpty); | |
nweiz
2015/06/12 01:24:24
This assertion seems redundant, since the line bel
Lasse Reichstein Nielsen
2015/06/12 13:04:22
True.
| |
294 return _requestQueue.first; | |
295 } | |
296 | |
297 /// Add a new request to the queue. | |
298 void _addRequest(_EventRequest action) { | |
299 if (_isDone) { | |
300 action.close(); | |
301 return; | |
302 } | |
303 if (_requestQueue.isEmpty) { | |
304 if (action.isComplete) { | |
nweiz
2015/06/12 01:24:24
Explain under what circumstances an action that wa
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
| |
305 // Skip listening and complete this immediately. | |
306 action.close(); | |
307 return; | |
308 } | |
309 _listen(); | |
310 } | |
311 _requestQueue.add(action); | |
312 } | |
313 | |
314 /// Remove all requests and call their `done` event. | |
315 /// | |
316 /// Used when the source stream dries up. | |
317 void _closeAllRequests() { | |
318 assert(_isDone); | |
319 while (_requestQueue.isNotEmpty) { | |
320 _requestQueue.removeFirst().close(); | |
321 } | |
322 } | |
323 | |
324 /// Check whether the next actions in the queue are complete. | |
325 /// | |
326 /// If so, remove them and call their `complete` method. | |
327 void _checkCompleted() { | |
328 // Close-actions are executed immediately when they become the | |
329 // next (and last) event in the queue. | |
330 // When _isClosed and the queue is not empty, the last element | |
331 // of the queue is the close action. | |
332 while (_requestQueue.isNotEmpty) { | |
333 if (!_requestQueue.first.isComplete) { | |
334 return; | |
335 } | |
336 _requestQueue.removeFirst().close(); | |
337 } | |
338 assert(_requestQueue.isEmpty); | |
339 if (!_isDone) _pause(); | |
340 } | |
341 | |
342 /// Extracts the subscription and makes the events object unusable. | |
343 /// | |
344 /// Can only be used by the very last request. | |
345 StreamSubscription _dispose() { | |
346 assert(_isClosed); | |
347 assert(_isListening); | |
348 assert(_requestQueue.isEmpty); | |
349 StreamSubscription subscription = _subscription; | |
350 _setDone(); | |
351 return subscription; | |
352 } | |
353 } | |
354 | |
nweiz
2015/06/12 01:24:25
Nit: extra newline.
| |
355 | |
356 /// Action to take when a requested event arrives. | |
357 abstract class _EventRequest implements EventSink { | |
nweiz
2015/06/12 01:24:25
It's kind of weird that the documentation for thes
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Agree, I switched to "request" halfway through, an
| |
358 bool get isComplete; | |
Søren Gjesse
2015/06/11 07:57:06
Add close method here as well
I think the 'close'
nweiz
2015/06/12 01:24:25
This should also have [add] and [addError] abstrac
Lasse Reichstein Nielsen
2015/06/12 13:04:22
It's inherited from EventSink.
I picked EventSink
| |
359 } | |
360 | |
361 /// Action completing a [StreamEvents.next] request. | |
362 class _NextRequest implements _EventRequest { | |
363 Completer completer; | |
nweiz
2015/06/12 01:24:25
Even though it doesn't do anything at the language
| |
364 _NextRequest(this.completer); | |
nweiz
2015/06/12 01:24:24
Rather than passing in a completer, consider makin
Lasse Reichstein Nielsen
2015/06/12 13:04:23
Good idea. Will do.
The type parameter needs to go
| |
365 | |
366 void add(data) { | |
367 completer.complete(data); | |
368 completer = null; | |
nweiz
2015/06/12 01:24:24
Does nulling this out buy you anything? It seems l
Lasse Reichstein Nielsen
2015/06/12 13:04:23
We have an isCompleted getter? Whoa!
I'm too used
| |
369 } | |
370 | |
371 void addError(error, [StackTrace stack]) { | |
372 completer.completeError(error, stack); | |
373 completer = null; | |
374 } | |
375 | |
376 void close() { | |
377 if (!isComplete) { | |
378 completer.completeError(new StateError("no elements")); | |
nweiz
2015/06/12 01:24:25
If you do continue to check against null, null out
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Technically not necessary since "close" is always
| |
379 } | |
380 } | |
381 | |
382 bool get isComplete => completer == null; | |
383 } | |
384 | |
385 /// Action completing a [StreamEvents.skip] request. | |
386 class _SkipRequest implements _EventRequest { | |
387 final Completer completer; | |
388 int count; | |
nweiz
2015/06/12 01:24:24
Document this. Also consider calling it something
Lasse Reichstein Nielsen
2015/06/15 15:46:23
Done.
| |
389 _SkipRequest(this.completer, this.count); | |
390 | |
391 void add(data) { | |
392 count--; | |
393 } | |
394 | |
395 void addError(error, [StackTrace stack]) { | |
396 completer.completeError(error, stack); | |
397 count = 0; | |
398 } | |
399 | |
400 void close() { | |
401 if (!completer.isCompleted) { | |
402 completer.complete(count); | |
403 count = 0; | |
404 } | |
405 } | |
406 | |
407 bool get isComplete { | |
408 return count == 0; | |
409 } | |
410 } | |
411 | |
412 /// Action completing a [StreamEvents.take] request. | |
413 class _TakeRequest<T> implements _EventRequest { | |
414 final Completer completer; | |
415 final List list = <T>[]; | |
416 int count; | |
417 _TakeRequest(this.completer, this.count); | |
418 | |
419 void add(data) { | |
420 list.add(data); | |
421 count--; | |
422 } | |
423 | |
424 void addError(error, [StackTrace stack]) { | |
425 completer.completeError(error, stack); | |
426 count = 0; | |
427 } | |
428 | |
429 void close() { | |
430 if (!completer.isCompleted) { | |
431 completer.complete(list); | |
432 } | |
433 } | |
434 | |
435 bool get isComplete => count == 0; | |
436 } | |
437 | |
438 /// Action completing a [StreamEvents.close] request. | |
439 class _CloseRequest implements _EventRequest { | |
440 final Completer completer; | |
441 StreamEvents events; | |
442 | |
443 _CloseRequest(this.completer, this.events); | |
444 | |
445 void add(data) { | |
446 throw new UnsupportedError("event"); | |
447 } | |
448 | |
449 void addError(error, [StackTrace stack]) { | |
450 throw new UnsupportedError("event"); | |
nweiz
2015/06/12 01:24:24
Shouldn't this be "error"?
Lasse Reichstein Nielsen
2015/06/12 13:04:22
No, we don't support receiving events, that's the
| |
451 } | |
452 | |
453 void close() { | |
454 if (events._isListening) { | |
455 completer.complete(events._dispose().cancel()); | |
456 } else { | |
457 completer.complete(); | |
458 } | |
459 } | |
460 | |
461 bool get isComplete => true; | |
462 } | |
463 | |
464 /// Action completing a [StreamEvents.rest] request. | |
465 class _RestRequest<T> implements _EventRequest { | |
466 final StreamCompleter delayStream; | |
467 final StreamEvents events; | |
468 _RestRequest(this.delayStream, this.events); | |
469 | |
470 void add(data) { | |
471 throw new UnsupportedError("event"); | |
472 } | |
473 | |
474 void addError(error, [StackTrace stack]) { | |
475 throw new UnsupportedError("event"); | |
nweiz
2015/06/12 01:24:26
Ditto.
Lasse Reichstein Nielsen
2015/06/15 15:46:23
Done.
| |
476 } | |
477 | |
478 void close() { | |
479 if (events._isListening) { | |
480 StreamSubscription subscription = events._dispose(); | |
481 delayStream.setSourceStream(new SubscriptionStream<T>(subscription)); | |
482 if (subscription.isPaused) subscription.resume(); | |
483 } else { | |
484 assert(events._isDone); | |
485 delayStream.setEmpty(); | |
486 } | |
487 } | |
488 | |
489 bool get isComplete => true; | |
490 } | |
OLD | NEW |