OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library async.stream_events; | |
6 | |
7 import 'dart:async'; | |
8 import 'dart:collection'; | |
9 | |
10 import "subscription_stream.dart"; | |
11 import "stream_completer.dart"; | |
12 import "../result.dart"; | |
13 | |
14 /// An asynchronous pull-based interface for accessing stream events. | |
15 /// | |
16 /// Wraps a stream and makes individual events available on request. | |
17 /// | |
18 /// You can request (and reserve) one or more events from the stream, | |
19 /// and after all previous requests have been fulfilled, stream events | |
20 /// go towards fulfilling your request. | |
21 /// | |
22 /// For example, if you ask for [next] two times, the returned futures | |
23 /// will be completed by the next two unrequested events from the stream. | |
24 /// | |
25 /// The stream subscription is paused when there are no active | |
26 /// requests. | |
27 /// | |
28 /// Some streams, including broadcast streams, will buffer | |
29 /// events while paused, so waiting too long between requests may | |
30 /// cause memory bloat somewhere else. | |
31 /// | |
32 /// This is similar to, but more convenient than, a [StreamIterator]. | |
33 /// A `StreamIterator` requires you to manually check when a new event is | |
34 /// available and you can only access the value of that event until you | |
35 /// check for the next one. A `StreamQueue` allows you to request, for example, | |
36 /// three events at a time, either individually, as a group using [take] | |
37 /// or [skip], or in any combination. | |
38 /// | |
39 /// You can also ask to have the [rest] of the stream provided as | |
40 /// a new stream. This allows, for example, taking the first event | |
41 /// out of a stream and continuing to use the rest of the stream as a stream. | |
42 /// | |
43 /// Example: | |
44 /// | |
45 /// var events = new StreamQueue<String>(someStreamOfLines); | |
46 /// var first = await events.next; | |
47 /// while (first.startsWith('#')) { | |
48 /// // Skip comments. | |
49 /// first = await events.next; | |
50 /// } | |
51 /// | |
52 /// if (first.startsWith(MAGIC_MARKER)) { | |
53 /// var headerCount = | |
54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); | |
55 /// handleMessage(headers: await events.take(headerCount), | |
56 /// body: events.rest); | |
57 /// return; | |
58 /// } | |
59 /// // Error handling. | |
60 /// | |
61 /// When you need no further events the `StreamQueue` should be closed | |
62 /// using [cancel]. This releases the underlying stream subscription. | |
63 class StreamQueue<T> { | |
64 // This class maintains two queues: one of events and one of requests. | |
65 // The active request (the one in front of the queue) is called with | |
66 // the current event queue when it becomes active. | |
67 // | |
68 // If the request returns true, it's complaete and will be removed from the | |
nweiz
2015/06/30 23:39:48
"complaete" -> "complete"
Lasse Reichstein Nielsen
2015/07/01 08:24:57
Done.
| |
69 // request queue. | |
70 // If the request returns false, it needs more events, and will be called | |
71 // again when new events are available. | |
72 // The request can remove events that it uses, or keep them in the event | |
73 // queue until it has all that it needs. | |
74 // | |
75 // This model is very flexible and easily extensible. | |
76 // It allows requests that don't consume events (like [hasNext]) or | |
77 // potentially a request that takes either five or zero events, determined | |
78 // by the content of the fifth event. | |
79 | |
80 /// Source of events. | |
81 final Stream _sourceStream; | |
82 | |
83 /// Subscription on [_sourceStream] while listening for events. | |
84 /// | |
85 /// Set to subscription when listening, and set to `null` when the | |
86 /// subscription is done (and [_isDone] is set to true). | |
87 StreamSubscription _subscription; | |
88 | |
89 /// Whether we have listened on [_sourceStream] and the subscription is done. | |
90 bool _isDone = false; | |
91 | |
92 /// Whether a closing operation has been performed on the stream queue. | |
93 /// | |
94 /// Closing operations are [cancel] and [rest]. | |
95 bool _isClosed = false; | |
96 | |
97 /// Queue of events not used by a request yet. | |
98 final Queue<Result> _eventQueue = new Queue(); | |
99 | |
100 /// Queue of pending requests. | |
101 /// | |
102 /// Access through methods below to ensure consistency. | |
103 final Queue<_EventRequest> _requestQueue = new Queue(); | |
104 | |
105 /// Create a `StreamQueue` of the events of [source]. | |
106 StreamQueue(Stream source) | |
107 : _sourceStream = source; | |
108 | |
109 /// Asks if the stream has any more events. | |
110 /// | |
111 /// Returns a future that completes with `true` if the stream has any | |
112 /// more events, whether data or error. | |
113 /// If the stream closes without producing any more events, the returned | |
114 /// future completes with `false`. | |
115 /// | |
116 /// Can be used before using [next] to avoid getting an error in the | |
117 /// future returned by `next` in the case where there are no more events. | |
118 Future<bool> get hasNext { | |
119 if (!_isClosed) { | |
120 var hasNextRequest = new _HasNextRequest(); | |
121 _addRequest(hasNextRequest); | |
122 return hasNextRequest.future; | |
123 } | |
124 throw _failClosed(); | |
125 } | |
126 | |
127 /// Requests the next (yet unrequested) event from the stream. | |
128 /// | |
129 /// When the requested event arrives, the returned future is completed with | |
130 /// the event. | |
131 /// If the event is a data event, the returned future completes | |
132 /// with its value. | |
133 /// If the event is an error event, the returned future completes with | |
134 /// its error and stack trace. | |
135 /// If the stream closes before an event arrives, the returned future | |
136 /// completes with a [StateError]. | |
137 /// | |
138 /// It's possible to have several pending [next] calls (or other requests), | |
139 /// and they will be completed in the order they were requested, by the | |
140 /// first events that were not consumed by previous requeusts. | |
141 Future<T> get next { | |
142 if (!_isClosed) { | |
143 var nextRequest = new _NextRequest<T>(); | |
144 _addRequest(nextRequest); | |
145 return nextRequest.future; | |
146 } | |
147 throw _failClosed(); | |
148 } | |
149 | |
150 /// Returns a stream of all the remaning events of the source stream. | |
151 /// | |
152 /// All requested [next], [skip] or [take] operations are completed | |
153 /// first, and then any remaining events are provided as events of | |
154 /// the returned stream. | |
155 /// | |
156 /// Using `rest` closes this stream queue. After getting the | |
157 /// `rest` the caller may no longer request other events, like | |
158 /// after calling [cancel]. | |
159 Stream<T> get rest { | |
160 if (_isClosed) { | |
161 throw _failClosed(); | |
162 } | |
163 var request = new _RestRequest<T>(this); | |
164 _isClosed = true; | |
165 _addRequest(request); | |
166 return request.stream; | |
167 } | |
168 | |
169 /// Skips the next [count] *data* events. | |
170 /// | |
171 /// The [count] must be non-negative. | |
172 /// | |
173 /// When successful, this is equivalent to using [take] | |
174 /// and ignoring the result. | |
175 /// | |
176 /// If an error occurs before `count` data events have been skipped, | |
177 /// the returned future completes with that error instead. | |
178 /// | |
179 /// If the stream closes before `count` data events, | |
180 /// the remaining unskipped event count is returned. | |
181 /// If the returned future completes with the integer `0`, | |
182 /// then all events were succssfully skipped. If the value | |
183 /// is greater than zero then the stream ended early. | |
184 Future<int> skip(int count) { | |
185 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
186 if (!_isClosed) { | |
187 var request = new _SkipRequest(count); | |
188 _addRequest(request); | |
189 return request.future; | |
190 } | |
191 throw _failClosed(); | |
192 } | |
193 | |
194 /// Requests the next [count] data events as a list. | |
195 /// | |
196 /// The [count] must be non-negative. | |
197 /// | |
198 /// Equivalent to calling [next] `count` times and | |
199 /// storing the data values in a list. | |
200 /// | |
201 /// If an error occurs before `count` data events has | |
202 /// been collected, the returned future completes with | |
203 /// that error instead. | |
204 /// | |
205 /// If the stream closes before `count` data events, | |
206 /// the returned future completes with the list | |
207 /// of data collected so far. That is, the returned | |
208 /// list may have fewer than [count] elements. | |
209 Future<List<T>> take(int count) { | |
210 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
211 if (!_isClosed) { | |
212 var request = new _TakeRequest<T>(count); | |
213 _addRequest(request); | |
214 return request.future; | |
215 } | |
216 throw _failClosed(); | |
217 } | |
218 | |
219 /// Cancels the underlying stream subscription. | |
220 /// | |
221 /// The cancel operation waits until all previously requested | |
222 /// events have been processed, then it cancels the subscription | |
223 /// providing the events. | |
224 /// | |
225 /// The returned future completes with the result of calling | |
226 /// `cancel`. | |
227 /// | |
228 /// After calling `cancel`, no further events can be requested. | |
229 /// None of [next], [rest], [skip], [take] or [cancel] may be | |
230 /// called again. | |
231 Future cancel() { | |
232 if (!_isClosed) { | |
233 _isClosed = true; | |
234 var request = new _CancelRequest(this); | |
235 _addRequest(request); | |
236 return request.future; | |
237 } | |
238 throw _failClosed(); | |
239 } | |
240 | |
241 /// Returns an error for when a request is made after cancel. | |
242 /// | |
243 /// Returns a [StateError] with a message saying that either | |
244 /// [cancel] or [rest] have already been called. | |
245 Error _failClosed() { | |
246 return new StateError("Already cancelled"); | |
247 } | |
248 | |
249 // Callbacks receiving the events of the source stream. | |
250 | |
251 void _onData(T data) { | |
252 _eventQueue.add(new Result.value(data)); | |
253 _checkQueues(); | |
254 } | |
255 | |
256 void _onError(error, StackTrace stack) { | |
257 _eventQueue.add(new Result.error(error, stack)); | |
258 _checkQueues(); | |
259 } | |
260 | |
261 void _onDone() { | |
262 _subscription = null; | |
263 _isDone = true; | |
264 _closeAllRequests(); | |
265 } | |
266 | |
267 // Request queue management. | |
268 | |
269 /// Adds a new request to the queue. | |
270 void _addRequest(_EventRequest request) { | |
271 if (_isDone) { | |
272 assert(_requestQueue.isEmpty); | |
273 if (!request.addEvents(_eventQueue)) { | |
274 request.close(_eventQueue); | |
275 } | |
276 return; | |
277 } | |
278 if (_requestQueue.isEmpty) { | |
279 if (request.addEvents(_eventQueue)) return; | |
280 _ensureListening(); | |
281 } | |
282 _requestQueue.add(request); | |
283 | |
284 } | |
285 | |
286 /// Ensures that we are listening on events from [_sourceStream]. | |
287 /// | |
288 /// Resumes subscription on [_sourceStream], or creates it if necessary. | |
289 StreamSubscription _ensureListening() { | |
290 assert(!_isDone); | |
291 if (_subscription == null) { | |
292 _subscription = | |
293 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | |
294 } else { | |
295 _subscription.resume(); | |
296 } | |
297 } | |
298 | |
299 /// Removes all requests and closes them. | |
300 /// | |
301 /// Used when the source stream is done. | |
302 /// After this, no further requests will be added to the queue, | |
303 /// requests are immediately served entirely by events already in the event | |
304 /// queue, if any. | |
305 void _closeAllRequests() { | |
306 assert(_isDone); | |
307 while (_requestQueue.isNotEmpty) { | |
308 var request = _requestQueue.removeFirst(); | |
309 if (!request.addEvents(_eventQueue)) { | |
310 request.close(_eventQueue); | |
311 } | |
312 } | |
313 } | |
314 | |
315 /// Matches events with requests. | |
316 /// | |
317 /// Called after receiving an event. | |
318 void _checkQueues() { | |
319 while (_requestQueue.isNotEmpty) { | |
320 if (_requestQueue.first.addEvents(_eventQueue)) { | |
321 _requestQueue.removeFirst(); | |
322 } else { | |
323 return; | |
324 } | |
325 } | |
326 if (!_isDone) { | |
327 _subscription.pause(); | |
328 } | |
329 } | |
330 | |
331 /// Extracts the subscription and makes this stream queue unusable. | |
332 /// | |
333 /// Can only be used by the very last request. | |
334 StreamSubscription _dispose() { | |
335 assert(_isClosed); | |
336 var subscription = _subscription; | |
337 _subscription = null; | |
338 _isDone = true; | |
339 return subscription; | |
340 } | |
341 } | |
342 | |
343 /// Request object that receives events when they arrive, until fulfilled. | |
344 /// | |
345 /// Each request that cannot be fulfilled immediately is represented by | |
346 /// an `_EventRequest` object in the request queue. | |
347 /// | |
348 /// Events from the source stream are sent to the first request in the | |
349 /// queue until it reports itself as [isComplete]. | |
350 /// | |
351 /// When the first request in the queue `isComplete`, either when becoming | |
352 /// the first request or after receiving an event, its [close] methods is | |
353 /// called. | |
354 /// | |
355 /// The [close] method is also called immediately when the source stream | |
356 /// is done. | |
357 abstract class _EventRequest implements EventSink { | |
358 /// Handle available events. | |
359 /// | |
360 /// The available events are provided as a queue. The `addEvents` function | |
361 /// should only remove events from the front of the event queue, e.g., | |
362 /// using [removeFirst]. | |
363 /// | |
364 /// Returns `true` if the request is completed, or `false` if it needs | |
365 /// more events. | |
366 /// The call may keep events in the queue until the requeust is complete, | |
367 /// or it may remove them immediately. | |
368 /// | |
369 /// If the method returns true, the request is considered fulfilled, and | |
370 /// will never be called again. | |
371 /// | |
372 /// This method is called when a request reaches the front of the request | |
373 /// queue, and if it returns `false`, it's called again every time a new event | |
374 /// becomes available, or when the stream closes. | |
375 bool addEvents(Queue<Result> events); | |
376 | |
377 /// Complete the request. | |
378 /// | |
379 /// This is called when the source stream is done before the request | |
380 /// had a chance to receive all its events. That is, after a call | |
381 /// to [addEvents] has returned `false`. | |
382 /// If there are any unused events available, they are in the [events] queue. | |
383 /// No further events will become available. | |
384 /// | |
385 /// The queue should only remove events from the front of the event queue, | |
386 /// e.g., using [removeFirst]. | |
387 /// | |
388 /// If the request kept events in the queue after an [addEvents] call, | |
389 /// this is the last chance to use them. | |
390 void close(Queue<Result> events); | |
391 } | |
392 | |
393 /// Request for a [StreamQueue.next] call. | |
394 /// | |
395 /// Completes the returned future when receiving the first event, | |
396 /// and is then complete. | |
397 class _NextRequest<T> implements _EventRequest { | |
398 /// Completer for the future returned by [StreamQueue.next]. | |
399 final Completer _completer; | |
400 | |
401 _NextRequest() : _completer = new Completer<T>(); | |
402 | |
403 Future<T> get future => _completer.future; | |
404 | |
405 bool addEvents(Queue<Result> events) { | |
406 if (events.isEmpty) return false; | |
407 events.removeFirst().complete(_completer); | |
408 return true; | |
409 } | |
410 | |
411 void close(Queue<Result> events) { | |
412 var errorFuture = | |
413 new Future.sync(() => throw new StateError("No elements")); | |
414 _completer.complete(errorFuture); | |
415 } | |
416 } | |
417 | |
418 /// Request for a [StreamQueue.skip] call. | |
419 class _SkipRequest implements _EventRequest { | |
420 /// Completer for the future returned by the skip call. | |
421 final Completer _completer = new Completer<int>(); | |
422 | |
423 /// Number of remaining events to skip. | |
424 /// | |
425 /// The request [isComplete] when the values reaches zero. | |
426 /// | |
427 /// Decremented when an event is seen. | |
428 /// Set to zero when an error is seen since errors abort the skip request. | |
429 int _eventsToSkip; | |
430 | |
431 _SkipRequest(this._eventsToSkip); | |
432 | |
433 /// The future completed when the correct number of events have been skipped. | |
434 Future get future => _completer.future; | |
435 | |
436 bool addEvents(Queue<Result> events) { | |
437 while (_eventsToSkip > 0) { | |
438 if (events.isEmpty) return false; | |
439 _eventsToSkip--; | |
440 var event = events.removeFirst(); | |
441 if (event.isError) { | |
442 event.complete(_completer); | |
443 return true; | |
444 } | |
445 } | |
446 _completer.complete(0); | |
447 return true; | |
448 } | |
449 | |
450 void close(Queue<Result> events) { | |
451 _completer.complete(_eventsToSkip); | |
452 } | |
453 } | |
454 | |
455 /// Request for a [StreamQueue.take] call. | |
456 class _TakeRequest<T> implements _EventRequest { | |
457 /// Completer for the future returned by the take call. | |
458 final Completer _completer; | |
459 | |
460 /// List collecting events until enough have been seen. | |
461 final List _list = <T>[]; | |
462 | |
463 /// Number of events to capture. | |
464 /// | |
465 /// The request [isComplete] when the length of [_list] reaches | |
466 /// this value. | |
467 final int _eventsToTake; | |
468 | |
469 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | |
470 | |
471 /// The future completed when the correct number of events have been captured. | |
472 Future get future => _completer.future; | |
473 | |
474 bool addEvents(Queue<Events> events) { | |
475 while (_list.length < _eventsToTake) { | |
476 if (events.isEmpty) return false; | |
477 var result = events.removeFirst(); | |
478 if (result.isError) { | |
479 result.complete(_completer); | |
480 return true; | |
481 } | |
482 _list.add(result.asValue.value); | |
483 } | |
484 _completer.complete(_list); | |
485 return true; | |
486 } | |
487 | |
488 void close(Queue<Events> events) { | |
489 _completer.complete(_list); | |
490 } | |
491 } | |
492 | |
493 /// Request for a [StreamQueue.cancel] call. | |
494 /// | |
495 /// The request needs no events, it just waits in the request queue | |
496 /// until all previous events are fulfilled, then it cancels the stream queue | |
497 /// source subscription. | |
498 class _CancelRequest implements _EventRequest { | |
499 /// Completer for the future returned by the `cancel` call. | |
500 final Completer _completer = new Completer(); | |
501 | |
502 /// The [StreamQueue] object that has this request queued. | |
503 /// | |
504 /// When the event is completed, it needs to cancel the active subscription | |
505 /// of the `StreamQueue` object, if any. | |
506 final StreamQueue _streamQueue; | |
507 | |
508 _CancelRequest(this._streamQueue); | |
509 | |
510 /// The future completed when the cancel request is completed. | |
511 Future get future => _completer.future; | |
512 | |
513 bool addEvents(Queue<Result> events) { | |
514 _shutdown(); | |
515 return true; | |
516 } | |
517 | |
518 void close(_) { | |
519 _shutdown(); | |
520 } | |
521 | |
522 void _shutdown() { | |
523 if (_streamQueue._subscription == null) { | |
524 _completer.complete(); | |
525 } else { | |
526 _completer.complete(_streamQueue._dispose().cancel()); | |
527 } | |
528 } | |
529 } | |
530 | |
531 /// Request for a [StreamQueue.rest] call. | |
532 /// | |
533 /// The request is always complete, it just waits in the request queue | |
534 /// until all previous events are fulfilled, then it takes over the | |
535 /// stream events subscription and creates a stream from it. | |
536 class _RestRequest<T> implements _EventRequest { | |
537 /// Completer for the stream returned by the `rest` call. | |
538 final StreamCompleter _completer = new StreamCompleter<T>(); | |
539 | |
540 /// The [StreamQueue] object that has this request queued. | |
541 /// | |
542 /// When the event is completed, it needs to cancel the active subscription | |
543 /// of the `StreamQueue` object, if any. | |
544 final StreamQueue _streamQueue; | |
545 | |
546 _RestRequest(this._streamQueue); | |
547 | |
548 /// The stream which will contain the remaining events of [_streamQueue]. | |
549 Stream<T> get stream => _completer.stream; | |
550 | |
551 bool addEvents(Queue<Result> events) { | |
552 _completeStream(events); | |
553 return true; | |
554 } | |
555 | |
556 void close(Queue<Result> events) { | |
557 _completeStream(events); | |
558 } | |
559 | |
560 void _completeStream(Queue<Result> events) { | |
561 Stream getRestStream() { | |
562 if (_streamQueue._isDone) { | |
563 return new StreamCompleter<T>()..close(); | |
564 // TODO(lrn). Use the following when 1.11 is released. | |
565 // return new Stream<T>.empty(); | |
566 } | |
567 if (_streamQueue._subscription == null) { | |
568 return _streamQueue._sourceStream; | |
569 } | |
570 var subscription = _streamQueue._dispose(); | |
571 subscription.resume(); | |
572 return new SubscriptionStream<T>(subscription); | |
573 } | |
574 if (events.isEmpty) { | |
575 if (_streamQueue._isDone) { | |
576 _completer.setEmpty(); | |
577 } else { | |
578 _completer.setSourceStream(getRestStream()); | |
579 } | |
580 } else { | |
581 // There are prefetched events which needs to be added before the | |
582 // remaining stream. | |
583 var controller = new StreamController<T>(); | |
584 for (var event in events) { | |
585 event.addTo(controller); | |
586 } | |
587 controller.addStream(getRestStream(), cancelOnError: false) | |
588 .whenComplete(controller.close); | |
589 _completer.setSourceStream(controller.stream); | |
590 } | |
591 } | |
592 } | |
593 | |
594 /// Request for a [StreamQueue.hasNext] call. | |
595 /// | |
596 /// Completes the [future] with `true` if it sees any event, | |
597 /// but doesn't consume the event. | |
598 /// If the request is closed without seeing an event, then | |
599 /// the [future] is completed with `false`. | |
600 class _HasNextRequest<T> implements _EventRequest { | |
601 final Completer _completer = new Completer<bool>(); | |
602 | |
603 Future<bool> get future => _completer.future; | |
604 | |
605 bool addEvents(Queue<Result> events) { | |
606 if (events.isNotEmpty) { | |
607 _completer.complete(true); | |
608 return true; | |
609 } | |
610 return false; | |
611 } | |
612 | |
613 void close(_) { | |
614 _completer.complete(false); | |
615 } | |
616 } | |
OLD | NEW |