Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1642)

Side by Side Diff: lib/src/stream_queue.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Address comments. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_events;
6
7 import 'dart:async';
8 import 'dart:collection';
9
10 import "subscription_stream.dart";
11 import "stream_completer.dart";
12 import "../result.dart";
13
14 /// An asynchronous pull-based interface for accessing stream events.
15 ///
16 /// Wraps a stream and makes individual events available on request.
17 ///
18 /// You can request (and reserve) one or more events from the stream,
19 /// and after all previous requests have been fulfilled, stream events
20 /// go towards fulfilling your request.
21 ///
22 /// For example, if you ask for [next] two times, the returned futures
23 /// will be completed by the next two unrequested events from the stream.
24 ///
25 /// The stream subscription is paused when there are no active
26 /// requests.
27 ///
28 /// Some streams, including broadcast streams, will buffer
29 /// events while paused, so waiting too long between requests may
30 /// cause memory bloat somewhere else.
31 ///
32 /// This is similar to, but more convenient than, a [StreamIterator].
33 /// A `StreamIterator` requires you to manually check when a new event is
34 /// available and you can only access the value of that event until you
35 /// check for the next one. A `StreamQueue` allows you to request, for example,
36 /// three events at a time, either individually, as a group using [take]
37 /// or [skip], or in any combination.
38 ///
39 /// You can also ask to have the [rest] of the stream provided as
40 /// a new stream. This allows, for example, taking the first event
41 /// out of a stream and continuing to use the rest of the stream as a stream.
42 ///
43 /// Example:
44 ///
45 /// var events = new StreamQueue<String>(someStreamOfLines);
46 /// var first = await events.next;
47 /// while (first.startsWith('#')) {
48 /// // Skip comments.
49 /// first = await events.next;
50 /// }
51 ///
52 /// if (first.startsWith(MAGIC_MARKER)) {
53 /// var headerCount =
54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
55 /// handleMessage(headers: await events.take(headerCount),
56 /// body: events.rest);
57 /// return;
58 /// }
59 /// // Error handling.
60 ///
61 /// When you need no further events the `StreamQueue` should be closed
62 /// using [cancel]. This releases the underlying stream subscription.
63 class StreamQueue<T> {
64 // This class maintains two queues: one of events and one of requests.
65 // The active request (the one in front of the queue) is called with
66 // the current event queue when it becomes active.
67 //
68 // If the request returns true, it's complaete and will be removed from the
nweiz 2015/06/30 23:39:48 "complaete" -> "complete"
Lasse Reichstein Nielsen 2015/07/01 08:24:57 Done.
69 // request queue.
70 // If the request returns false, it needs more events, and will be called
71 // again when new events are available.
72 // The request can remove events that it uses, or keep them in the event
73 // queue until it has all that it needs.
74 //
75 // This model is very flexible and easily extensible.
76 // It allows requests that don't consume events (like [hasNext]) or
77 // potentially a request that takes either five or zero events, determined
78 // by the content of the fifth event.
79
80 /// Source of events.
81 final Stream _sourceStream;
82
83 /// Subscription on [_sourceStream] while listening for events.
84 ///
85 /// Set to subscription when listening, and set to `null` when the
86 /// subscription is done (and [_isDone] is set to true).
87 StreamSubscription _subscription;
88
89 /// Whether we have listened on [_sourceStream] and the subscription is done.
90 bool _isDone = false;
91
92 /// Whether a closing operation has been performed on the stream queue.
93 ///
94 /// Closing operations are [cancel] and [rest].
95 bool _isClosed = false;
96
97 /// Queue of events not used by a request yet.
98 final Queue<Result> _eventQueue = new Queue();
99
100 /// Queue of pending requests.
101 ///
102 /// Access through methods below to ensure consistency.
103 final Queue<_EventRequest> _requestQueue = new Queue();
104
105 /// Create a `StreamQueue` of the events of [source].
106 StreamQueue(Stream source)
107 : _sourceStream = source;
108
109 /// Asks if the stream has any more events.
110 ///
111 /// Returns a future that completes with `true` if the stream has any
112 /// more events, whether data or error.
113 /// If the stream closes without producing any more events, the returned
114 /// future completes with `false`.
115 ///
116 /// Can be used before using [next] to avoid getting an error in the
117 /// future returned by `next` in the case where there are no more events.
118 Future<bool> get hasNext {
119 if (!_isClosed) {
120 var hasNextRequest = new _HasNextRequest();
121 _addRequest(hasNextRequest);
122 return hasNextRequest.future;
123 }
124 throw _failClosed();
125 }
126
127 /// Requests the next (yet unrequested) event from the stream.
128 ///
129 /// When the requested event arrives, the returned future is completed with
130 /// the event.
131 /// If the event is a data event, the returned future completes
132 /// with its value.
133 /// If the event is an error event, the returned future completes with
134 /// its error and stack trace.
135 /// If the stream closes before an event arrives, the returned future
136 /// completes with a [StateError].
137 ///
138 /// It's possible to have several pending [next] calls (or other requests),
139 /// and they will be completed in the order they were requested, by the
140 /// first events that were not consumed by previous requeusts.
141 Future<T> get next {
142 if (!_isClosed) {
143 var nextRequest = new _NextRequest<T>();
144 _addRequest(nextRequest);
145 return nextRequest.future;
146 }
147 throw _failClosed();
148 }
149
150 /// Returns a stream of all the remaning events of the source stream.
151 ///
152 /// All requested [next], [skip] or [take] operations are completed
153 /// first, and then any remaining events are provided as events of
154 /// the returned stream.
155 ///
156 /// Using `rest` closes this stream queue. After getting the
157 /// `rest` the caller may no longer request other events, like
158 /// after calling [cancel].
159 Stream<T> get rest {
160 if (_isClosed) {
161 throw _failClosed();
162 }
163 var request = new _RestRequest<T>(this);
164 _isClosed = true;
165 _addRequest(request);
166 return request.stream;
167 }
168
169 /// Skips the next [count] *data* events.
170 ///
171 /// The [count] must be non-negative.
172 ///
173 /// When successful, this is equivalent to using [take]
174 /// and ignoring the result.
175 ///
176 /// If an error occurs before `count` data events have been skipped,
177 /// the returned future completes with that error instead.
178 ///
179 /// If the stream closes before `count` data events,
180 /// the remaining unskipped event count is returned.
181 /// If the returned future completes with the integer `0`,
182 /// then all events were succssfully skipped. If the value
183 /// is greater than zero then the stream ended early.
184 Future<int> skip(int count) {
185 if (count < 0) throw new RangeError.range(count, 0, null, "count");
186 if (!_isClosed) {
187 var request = new _SkipRequest(count);
188 _addRequest(request);
189 return request.future;
190 }
191 throw _failClosed();
192 }
193
194 /// Requests the next [count] data events as a list.
195 ///
196 /// The [count] must be non-negative.
197 ///
198 /// Equivalent to calling [next] `count` times and
199 /// storing the data values in a list.
200 ///
201 /// If an error occurs before `count` data events has
202 /// been collected, the returned future completes with
203 /// that error instead.
204 ///
205 /// If the stream closes before `count` data events,
206 /// the returned future completes with the list
207 /// of data collected so far. That is, the returned
208 /// list may have fewer than [count] elements.
209 Future<List<T>> take(int count) {
210 if (count < 0) throw new RangeError.range(count, 0, null, "count");
211 if (!_isClosed) {
212 var request = new _TakeRequest<T>(count);
213 _addRequest(request);
214 return request.future;
215 }
216 throw _failClosed();
217 }
218
219 /// Cancels the underlying stream subscription.
220 ///
221 /// The cancel operation waits until all previously requested
222 /// events have been processed, then it cancels the subscription
223 /// providing the events.
224 ///
225 /// The returned future completes with the result of calling
226 /// `cancel`.
227 ///
228 /// After calling `cancel`, no further events can be requested.
229 /// None of [next], [rest], [skip], [take] or [cancel] may be
230 /// called again.
231 Future cancel() {
232 if (!_isClosed) {
233 _isClosed = true;
234 var request = new _CancelRequest(this);
235 _addRequest(request);
236 return request.future;
237 }
238 throw _failClosed();
239 }
240
241 /// Returns an error for when a request is made after cancel.
242 ///
243 /// Returns a [StateError] with a message saying that either
244 /// [cancel] or [rest] have already been called.
245 Error _failClosed() {
246 return new StateError("Already cancelled");
247 }
248
249 // Callbacks receiving the events of the source stream.
250
251 void _onData(T data) {
252 _eventQueue.add(new Result.value(data));
253 _checkQueues();
254 }
255
256 void _onError(error, StackTrace stack) {
257 _eventQueue.add(new Result.error(error, stack));
258 _checkQueues();
259 }
260
261 void _onDone() {
262 _subscription = null;
263 _isDone = true;
264 _closeAllRequests();
265 }
266
267 // Request queue management.
268
269 /// Adds a new request to the queue.
270 void _addRequest(_EventRequest request) {
271 if (_isDone) {
272 assert(_requestQueue.isEmpty);
273 if (!request.addEvents(_eventQueue)) {
274 request.close(_eventQueue);
275 }
276 return;
277 }
278 if (_requestQueue.isEmpty) {
279 if (request.addEvents(_eventQueue)) return;
280 _ensureListening();
281 }
282 _requestQueue.add(request);
283
284 }
285
286 /// Ensures that we are listening on events from [_sourceStream].
287 ///
288 /// Resumes subscription on [_sourceStream], or creates it if necessary.
289 StreamSubscription _ensureListening() {
290 assert(!_isDone);
291 if (_subscription == null) {
292 _subscription =
293 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
294 } else {
295 _subscription.resume();
296 }
297 }
298
299 /// Removes all requests and closes them.
300 ///
301 /// Used when the source stream is done.
302 /// After this, no further requests will be added to the queue,
303 /// requests are immediately served entirely by events already in the event
304 /// queue, if any.
305 void _closeAllRequests() {
306 assert(_isDone);
307 while (_requestQueue.isNotEmpty) {
308 var request = _requestQueue.removeFirst();
309 if (!request.addEvents(_eventQueue)) {
310 request.close(_eventQueue);
311 }
312 }
313 }
314
315 /// Matches events with requests.
316 ///
317 /// Called after receiving an event.
318 void _checkQueues() {
319 while (_requestQueue.isNotEmpty) {
320 if (_requestQueue.first.addEvents(_eventQueue)) {
321 _requestQueue.removeFirst();
322 } else {
323 return;
324 }
325 }
326 if (!_isDone) {
327 _subscription.pause();
328 }
329 }
330
331 /// Extracts the subscription and makes this stream queue unusable.
332 ///
333 /// Can only be used by the very last request.
334 StreamSubscription _dispose() {
335 assert(_isClosed);
336 var subscription = _subscription;
337 _subscription = null;
338 _isDone = true;
339 return subscription;
340 }
341 }
342
343 /// Request object that receives events when they arrive, until fulfilled.
344 ///
345 /// Each request that cannot be fulfilled immediately is represented by
346 /// an `_EventRequest` object in the request queue.
347 ///
348 /// Events from the source stream are sent to the first request in the
349 /// queue until it reports itself as [isComplete].
350 ///
351 /// When the first request in the queue `isComplete`, either when becoming
352 /// the first request or after receiving an event, its [close] methods is
353 /// called.
354 ///
355 /// The [close] method is also called immediately when the source stream
356 /// is done.
357 abstract class _EventRequest implements EventSink {
358 /// Handle available events.
359 ///
360 /// The available events are provided as a queue. The `addEvents` function
361 /// should only remove events from the front of the event queue, e.g.,
362 /// using [removeFirst].
363 ///
364 /// Returns `true` if the request is completed, or `false` if it needs
365 /// more events.
366 /// The call may keep events in the queue until the requeust is complete,
367 /// or it may remove them immediately.
368 ///
369 /// If the method returns true, the request is considered fulfilled, and
370 /// will never be called again.
371 ///
372 /// This method is called when a request reaches the front of the request
373 /// queue, and if it returns `false`, it's called again every time a new event
374 /// becomes available, or when the stream closes.
375 bool addEvents(Queue<Result> events);
376
377 /// Complete the request.
378 ///
379 /// This is called when the source stream is done before the request
380 /// had a chance to receive all its events. That is, after a call
381 /// to [addEvents] has returned `false`.
382 /// If there are any unused events available, they are in the [events] queue.
383 /// No further events will become available.
384 ///
385 /// The queue should only remove events from the front of the event queue,
386 /// e.g., using [removeFirst].
387 ///
388 /// If the request kept events in the queue after an [addEvents] call,
389 /// this is the last chance to use them.
390 void close(Queue<Result> events);
391 }
392
393 /// Request for a [StreamQueue.next] call.
394 ///
395 /// Completes the returned future when receiving the first event,
396 /// and is then complete.
397 class _NextRequest<T> implements _EventRequest {
398 /// Completer for the future returned by [StreamQueue.next].
399 final Completer _completer;
400
401 _NextRequest() : _completer = new Completer<T>();
402
403 Future<T> get future => _completer.future;
404
405 bool addEvents(Queue<Result> events) {
406 if (events.isEmpty) return false;
407 events.removeFirst().complete(_completer);
408 return true;
409 }
410
411 void close(Queue<Result> events) {
412 var errorFuture =
413 new Future.sync(() => throw new StateError("No elements"));
414 _completer.complete(errorFuture);
415 }
416 }
417
418 /// Request for a [StreamQueue.skip] call.
419 class _SkipRequest implements _EventRequest {
420 /// Completer for the future returned by the skip call.
421 final Completer _completer = new Completer<int>();
422
423 /// Number of remaining events to skip.
424 ///
425 /// The request [isComplete] when the values reaches zero.
426 ///
427 /// Decremented when an event is seen.
428 /// Set to zero when an error is seen since errors abort the skip request.
429 int _eventsToSkip;
430
431 _SkipRequest(this._eventsToSkip);
432
433 /// The future completed when the correct number of events have been skipped.
434 Future get future => _completer.future;
435
436 bool addEvents(Queue<Result> events) {
437 while (_eventsToSkip > 0) {
438 if (events.isEmpty) return false;
439 _eventsToSkip--;
440 var event = events.removeFirst();
441 if (event.isError) {
442 event.complete(_completer);
443 return true;
444 }
445 }
446 _completer.complete(0);
447 return true;
448 }
449
450 void close(Queue<Result> events) {
451 _completer.complete(_eventsToSkip);
452 }
453 }
454
455 /// Request for a [StreamQueue.take] call.
456 class _TakeRequest<T> implements _EventRequest {
457 /// Completer for the future returned by the take call.
458 final Completer _completer;
459
460 /// List collecting events until enough have been seen.
461 final List _list = <T>[];
462
463 /// Number of events to capture.
464 ///
465 /// The request [isComplete] when the length of [_list] reaches
466 /// this value.
467 final int _eventsToTake;
468
469 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
470
471 /// The future completed when the correct number of events have been captured.
472 Future get future => _completer.future;
473
474 bool addEvents(Queue<Events> events) {
475 while (_list.length < _eventsToTake) {
476 if (events.isEmpty) return false;
477 var result = events.removeFirst();
478 if (result.isError) {
479 result.complete(_completer);
480 return true;
481 }
482 _list.add(result.asValue.value);
483 }
484 _completer.complete(_list);
485 return true;
486 }
487
488 void close(Queue<Events> events) {
489 _completer.complete(_list);
490 }
491 }
492
493 /// Request for a [StreamQueue.cancel] call.
494 ///
495 /// The request needs no events, it just waits in the request queue
496 /// until all previous events are fulfilled, then it cancels the stream queue
497 /// source subscription.
498 class _CancelRequest implements _EventRequest {
499 /// Completer for the future returned by the `cancel` call.
500 final Completer _completer = new Completer();
501
502 /// The [StreamQueue] object that has this request queued.
503 ///
504 /// When the event is completed, it needs to cancel the active subscription
505 /// of the `StreamQueue` object, if any.
506 final StreamQueue _streamQueue;
507
508 _CancelRequest(this._streamQueue);
509
510 /// The future completed when the cancel request is completed.
511 Future get future => _completer.future;
512
513 bool addEvents(Queue<Result> events) {
514 _shutdown();
515 return true;
516 }
517
518 void close(_) {
519 _shutdown();
520 }
521
522 void _shutdown() {
523 if (_streamQueue._subscription == null) {
524 _completer.complete();
525 } else {
526 _completer.complete(_streamQueue._dispose().cancel());
527 }
528 }
529 }
530
531 /// Request for a [StreamQueue.rest] call.
532 ///
533 /// The request is always complete, it just waits in the request queue
534 /// until all previous events are fulfilled, then it takes over the
535 /// stream events subscription and creates a stream from it.
536 class _RestRequest<T> implements _EventRequest {
537 /// Completer for the stream returned by the `rest` call.
538 final StreamCompleter _completer = new StreamCompleter<T>();
539
540 /// The [StreamQueue] object that has this request queued.
541 ///
542 /// When the event is completed, it needs to cancel the active subscription
543 /// of the `StreamQueue` object, if any.
544 final StreamQueue _streamQueue;
545
546 _RestRequest(this._streamQueue);
547
548 /// The stream which will contain the remaining events of [_streamQueue].
549 Stream<T> get stream => _completer.stream;
550
551 bool addEvents(Queue<Result> events) {
552 _completeStream(events);
553 return true;
554 }
555
556 void close(Queue<Result> events) {
557 _completeStream(events);
558 }
559
560 void _completeStream(Queue<Result> events) {
561 Stream getRestStream() {
562 if (_streamQueue._isDone) {
563 return new StreamCompleter<T>()..close();
564 // TODO(lrn). Use the following when 1.11 is released.
565 // return new Stream<T>.empty();
566 }
567 if (_streamQueue._subscription == null) {
568 return _streamQueue._sourceStream;
569 }
570 var subscription = _streamQueue._dispose();
571 subscription.resume();
572 return new SubscriptionStream<T>(subscription);
573 }
574 if (events.isEmpty) {
575 if (_streamQueue._isDone) {
576 _completer.setEmpty();
577 } else {
578 _completer.setSourceStream(getRestStream());
579 }
580 } else {
581 // There are prefetched events which needs to be added before the
582 // remaining stream.
583 var controller = new StreamController<T>();
584 for (var event in events) {
585 event.addTo(controller);
586 }
587 controller.addStream(getRestStream(), cancelOnError: false)
588 .whenComplete(controller.close);
589 _completer.setSourceStream(controller.stream);
590 }
591 }
592 }
593
594 /// Request for a [StreamQueue.hasNext] call.
595 ///
596 /// Completes the [future] with `true` if it sees any event,
597 /// but doesn't consume the event.
598 /// If the request is closed without seeing an event, then
599 /// the [future] is completed with `false`.
600 class _HasNextRequest<T> implements _EventRequest {
601 final Completer _completer = new Completer<bool>();
602
603 Future<bool> get future => _completer.future;
604
605 bool addEvents(Queue<Result> events) {
606 if (events.isNotEmpty) {
607 _completer.complete(true);
608 return true;
609 }
610 return false;
611 }
612
613 void close(_) {
614 _completer.complete(false);
615 }
616 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698