Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(547)

Side by Side Diff: lib/src/util/stream_queue.dart

Issue 1262623006: Temporarily bring in code from the async package. (Closed) Base URL: git@github.com:dart-lang/test@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/
6 // lands.
7 library test.util.forkable_stream_queue;
8
9 import 'dart:async';
10 import 'dart:collection';
11
12 import "package:async/async.dart" hide ForkableStream, StreamQueue;
13
14 import "forkable_stream.dart";
15
16 /// An asynchronous pull-based interface for accessing stream events.
17 ///
18 /// Wraps a stream and makes individual events available on request.
19 ///
20 /// You can request (and reserve) one or more events from the stream,
21 /// and after all previous requests have been fulfilled, stream events
22 /// go towards fulfilling your request.
23 ///
24 /// For example, if you ask for [next] two times, the returned futures
25 /// will be completed by the next two unrequested events from the stream.
26 ///
27 /// The stream subscription is paused when there are no active
28 /// requests.
29 ///
30 /// Some streams, including broadcast streams, will buffer
31 /// events while paused, so waiting too long between requests may
32 /// cause memory bloat somewhere else.
33 ///
34 /// This is similar to, but more convenient than, a [StreamIterator].
35 /// A `StreamIterator` requires you to manually check when a new event is
36 /// available and you can only access the value of that event until you
37 /// check for the next one. A `StreamQueue` allows you to request, for example,
38 /// three events at a time, either individually, as a group using [take]
39 /// or [skip], or in any combination.
40 ///
41 /// You can also ask to have the [rest] of the stream provided as
42 /// a new stream. This allows, for example, taking the first event
43 /// out of a stream and continuing to use the rest of the stream as a stream.
44 ///
45 /// Example:
46 ///
47 /// var events = new StreamQueue<String>(someStreamOfLines);
48 /// var first = await events.next;
49 /// while (first.startsWith('#')) {
50 /// // Skip comments.
51 /// first = await events.next;
52 /// }
53 ///
54 /// if (first.startsWith(MAGIC_MARKER)) {
55 /// var headerCount =
56 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
57 /// handleMessage(headers: await events.take(headerCount),
58 /// body: events.rest);
59 /// return;
60 /// }
61 /// // Error handling.
62 ///
63 /// When you need no further events the `StreamQueue` should be closed
64 /// using [cancel]. This releases the underlying stream subscription.
65 class StreamQueue<T> {
66 // This class maintains two queues: one of events and one of requests.
67 // The active request (the one in front of the queue) is called with
68 // the current event queue when it becomes active.
69 //
70 // If the request returns true, it's complete and will be removed from the
71 // request queue.
72 // If the request returns false, it needs more events, and will be called
73 // again when new events are available.
74 // The request can remove events that it uses, or keep them in the event
75 // queue until it has all that it needs.
76 //
77 // This model is very flexible and easily extensible.
78 // It allows requests that don't consume events (like [hasNext]) or
79 // potentially a request that takes either five or zero events, determined
80 // by the content of the fifth event.
81
82 /// Source of events.
83 final ForkableStream _sourceStream;
84
85 /// Subscription on [_sourceStream] while listening for events.
86 ///
87 /// Set to subscription when listening, and set to `null` when the
88 /// subscription is done (and [_isDone] is set to true).
89 StreamSubscription _subscription;
90
91 /// Whether we have listened on [_sourceStream] and the subscription is done.
92 bool _isDone = false;
93
94 /// Whether a closing operation has been performed on the stream queue.
95 ///
96 /// Closing operations are [cancel] and [rest].
97 bool _isClosed = false;
98
99 /// Queue of events not used by a request yet.
100 final Queue<Result> _eventQueue = new Queue();
101
102 /// Queue of pending requests.
103 ///
104 /// Access through methods below to ensure consistency.
105 final Queue<_EventRequest> _requestQueue = new Queue();
106
107 /// Create a `StreamQueue` of the events of [source].
108 StreamQueue(Stream source)
109 : _sourceStream = source is ForkableStream
110 ? source
111 : new ForkableStream(source);
112
113 /// Asks if the stream has any more events.
114 ///
115 /// Returns a future that completes with `true` if the stream has any
116 /// more events, whether data or error.
117 /// If the stream closes without producing any more events, the returned
118 /// future completes with `false`.
119 ///
120 /// Can be used before using [next] to avoid getting an error in the
121 /// future returned by `next` in the case where there are no more events.
122 Future<bool> get hasNext {
123 if (!_isClosed) {
124 var hasNextRequest = new _HasNextRequest();
125 _addRequest(hasNextRequest);
126 return hasNextRequest.future;
127 }
128 throw _failClosed();
129 }
130
131 /// Requests the next (yet unrequested) event from the stream.
132 ///
133 /// When the requested event arrives, the returned future is completed with
134 /// the event.
135 /// If the event is a data event, the returned future completes
136 /// with its value.
137 /// If the event is an error event, the returned future completes with
138 /// its error and stack trace.
139 /// If the stream closes before an event arrives, the returned future
140 /// completes with a [StateError].
141 ///
142 /// It's possible to have several pending [next] calls (or other requests),
143 /// and they will be completed in the order they were requested, by the
144 /// first events that were not consumed by previous requeusts.
145 Future<T> get next {
146 if (!_isClosed) {
147 var nextRequest = new _NextRequest<T>();
148 _addRequest(nextRequest);
149 return nextRequest.future;
150 }
151 throw _failClosed();
152 }
153
154 /// Returns a stream of all the remaning events of the source stream.
155 ///
156 /// All requested [next], [skip] or [take] operations are completed
157 /// first, and then any remaining events are provided as events of
158 /// the returned stream.
159 ///
160 /// Using `rest` closes this stream queue. After getting the
161 /// `rest` the caller may no longer request other events, like
162 /// after calling [cancel].
163 Stream<T> get rest {
164 if (_isClosed) {
165 throw _failClosed();
166 }
167 var request = new _RestRequest<T>(this);
168 _isClosed = true;
169 _addRequest(request);
170 return request.stream;
171 }
172
173 /// Skips the next [count] *data* events.
174 ///
175 /// The [count] must be non-negative.
176 ///
177 /// When successful, this is equivalent to using [take]
178 /// and ignoring the result.
179 ///
180 /// If an error occurs before `count` data events have been skipped,
181 /// the returned future completes with that error instead.
182 ///
183 /// If the stream closes before `count` data events,
184 /// the remaining unskipped event count is returned.
185 /// If the returned future completes with the integer `0`,
186 /// then all events were succssfully skipped. If the value
187 /// is greater than zero then the stream ended early.
188 Future<int> skip(int count) {
189 if (count < 0) throw new RangeError.range(count, 0, null, "count");
190 if (!_isClosed) {
191 var request = new _SkipRequest(count);
192 _addRequest(request);
193 return request.future;
194 }
195 throw _failClosed();
196 }
197
198 /// Requests the next [count] data events as a list.
199 ///
200 /// The [count] must be non-negative.
201 ///
202 /// Equivalent to calling [next] `count` times and
203 /// storing the data values in a list.
204 ///
205 /// If an error occurs before `count` data events has
206 /// been collected, the returned future completes with
207 /// that error instead.
208 ///
209 /// If the stream closes before `count` data events,
210 /// the returned future completes with the list
211 /// of data collected so far. That is, the returned
212 /// list may have fewer than [count] elements.
213 Future<List<T>> take(int count) {
214 if (count < 0) throw new RangeError.range(count, 0, null, "count");
215 if (!_isClosed) {
216 var request = new _TakeRequest<T>(count);
217 _addRequest(request);
218 return request.future;
219 }
220 throw _failClosed();
221 }
222
223 /// Creates a new stream queue in the same position as this one.
224 ///
225 /// The fork is subscribed to the same underlying stream as this queue, but
226 /// it's otherwise wholly independent. If requests are made on one, they don't
227 /// move the other forward; if one is closed, the other is still open.
228 ///
229 /// The underlying stream will only be paused when all forks have no
230 /// outstanding requests, and only canceled when all forks are canceled.
231 StreamQueue<T> fork() {
232 if (_isClosed) throw _failClosed();
233
234 var request = new _ForkRequest<T>(this);
235 _addRequest(request);
236 return request.queue;
237 }
238
239 /// Cancels the underlying stream subscription.
240 ///
241 /// If [immediate] is `false` (the default), the cancel operation waits until
242 /// all previously requested events have been processed, then it cancels the
243 /// subscription providing the events.
244 ///
245 /// If [immediate] is `true`, the subscription is instead canceled
246 /// immediately. Any pending events complete with a 'closed'-event, as though
247 /// the stream had closed by itself.
248 ///
249 /// The returned future completes with the result of calling
250 /// `cancel`.
251 ///
252 /// After calling `cancel`, no further events can be requested.
253 /// None of [next], [rest], [skip], [take] or [cancel] may be
254 /// called again.
255 Future cancel({bool immediate: false}) {
256 if (_isClosed) throw _failClosed();
257 _isClosed = true;
258
259 if (_isDone) return new Future.value();
260 if (_subscription == null) _subscription = _sourceStream.listen(null);
261
262 if (!immediate) {
263 var request = new _CancelRequest(this);
264 _addRequest(request);
265 return request.future;
266 }
267
268 var future = _subscription.cancel();
269 _onDone();
270 return future;
271 }
272
273 /// Returns an error for when a request is made after cancel.
274 ///
275 /// Returns a [StateError] with a message saying that either
276 /// [cancel] or [rest] have already been called.
277 Error _failClosed() {
278 return new StateError("Already cancelled");
279 }
280
281 // Callbacks receiving the events of the source stream.
282
283 void _onData(T data) {
284 _eventQueue.add(new Result.value(data));
285 _checkQueues();
286 }
287
288 void _onError(error, StackTrace stack) {
289 _eventQueue.add(new Result.error(error, stack));
290 _checkQueues();
291 }
292
293 void _onDone() {
294 _subscription = null;
295 _isDone = true;
296 _closeAllRequests();
297 }
298
299 // Request queue management.
300
301 /// Adds a new request to the queue.
302 void _addRequest(_EventRequest request) {
303 if (_isDone) {
304 assert(_requestQueue.isEmpty);
305 if (!request.addEvents(_eventQueue)) {
306 request.close(_eventQueue);
307 }
308 return;
309 }
310 if (_requestQueue.isEmpty) {
311 if (request.addEvents(_eventQueue)) return;
312 _ensureListening();
313 }
314 _requestQueue.add(request);
315 }
316
317 /// Ensures that we are listening on events from [_sourceStream].
318 ///
319 /// Resumes subscription on [_sourceStream], or creates it if necessary.
320 void _ensureListening() {
321 assert(!_isDone);
322 if (_subscription == null) {
323 _subscription =
324 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
325 } else {
326 _subscription.resume();
327 }
328 }
329
330 /// Removes all requests and closes them.
331 ///
332 /// Used when the source stream is done.
333 /// After this, no further requests will be added to the queue,
334 /// requests are immediately served entirely by events already in the event
335 /// queue, if any.
336 void _closeAllRequests() {
337 assert(_isDone);
338 while (_requestQueue.isNotEmpty) {
339 var request = _requestQueue.removeFirst();
340 if (!request.addEvents(_eventQueue)) {
341 request.close(_eventQueue);
342 }
343 }
344 }
345
346 /// Matches events with requests.
347 ///
348 /// Called after receiving an event.
349 void _checkQueues() {
350 while (_requestQueue.isNotEmpty) {
351 if (_requestQueue.first.addEvents(_eventQueue)) {
352 _requestQueue.removeFirst();
353 } else {
354 return;
355 }
356 }
357
358 if (!_isDone) {
359 _subscription.pause();
360 }
361 }
362
363 /// Extracts the subscription and makes this stream queue unusable.
364 ///
365 /// Can only be used by the very last request.
366 StreamSubscription _dispose() {
367 assert(_isClosed);
368 var subscription = _subscription;
369 _subscription = null;
370 _isDone = true;
371 return subscription;
372 }
373 }
374
375 /// Request object that receives events when they arrive, until fulfilled.
376 ///
377 /// Each request that cannot be fulfilled immediately is represented by
378 /// an `_EventRequest` object in the request queue.
379 ///
380 /// Events from the source stream are sent to the first request in the
381 /// queue until it reports itself as [isComplete].
382 ///
383 /// When the first request in the queue `isComplete`, either when becoming
384 /// the first request or after receiving an event, its [close] methods is
385 /// called.
386 ///
387 /// The [close] method is also called immediately when the source stream
388 /// is done.
389 abstract class _EventRequest {
390 /// Handle available events.
391 ///
392 /// The available events are provided as a queue. The `addEvents` function
393 /// should only remove events from the front of the event queue, e.g.,
394 /// using [removeFirst].
395 ///
396 /// Returns `true` if the request is completed, or `false` if it needs
397 /// more events.
398 /// The call may keep events in the queue until the requeust is complete,
399 /// or it may remove them immediately.
400 ///
401 /// If the method returns true, the request is considered fulfilled, and
402 /// will never be called again.
403 ///
404 /// This method is called when a request reaches the front of the request
405 /// queue, and if it returns `false`, it's called again every time a new event
406 /// becomes available, or when the stream closes.
407 bool addEvents(Queue<Result> events);
408
409 /// Complete the request.
410 ///
411 /// This is called when the source stream is done before the request
412 /// had a chance to receive all its events. That is, after a call
413 /// to [addEvents] has returned `false`.
414 /// If there are any unused events available, they are in the [events] queue.
415 /// No further events will become available.
416 ///
417 /// The queue should only remove events from the front of the event queue,
418 /// e.g., using [removeFirst].
419 ///
420 /// If the request kept events in the queue after an [addEvents] call,
421 /// this is the last chance to use them.
422 void close(Queue<Result> events);
423 }
424
425 /// Request for a [StreamQueue.next] call.
426 ///
427 /// Completes the returned future when receiving the first event,
428 /// and is then complete.
429 class _NextRequest<T> implements _EventRequest {
430 /// Completer for the future returned by [StreamQueue.next].
431 final Completer _completer;
432
433 _NextRequest() : _completer = new Completer<T>();
434
435 Future<T> get future => _completer.future;
436
437 bool addEvents(Queue<Result> events) {
438 if (events.isEmpty) return false;
439 events.removeFirst().complete(_completer);
440 return true;
441 }
442
443 void close(Queue<Result> events) {
444 var errorFuture =
445 new Future.sync(() => throw new StateError("No elements"));
446 _completer.complete(errorFuture);
447 }
448 }
449
450 /// Request for a [StreamQueue.skip] call.
451 class _SkipRequest implements _EventRequest {
452 /// Completer for the future returned by the skip call.
453 final Completer _completer = new Completer<int>();
454
455 /// Number of remaining events to skip.
456 ///
457 /// The request [isComplete] when the values reaches zero.
458 ///
459 /// Decremented when an event is seen.
460 /// Set to zero when an error is seen since errors abort the skip request.
461 int _eventsToSkip;
462
463 _SkipRequest(this._eventsToSkip);
464
465 /// The future completed when the correct number of events have been skipped.
466 Future get future => _completer.future;
467
468 bool addEvents(Queue<Result> events) {
469 while (_eventsToSkip > 0) {
470 if (events.isEmpty) return false;
471 _eventsToSkip--;
472 var event = events.removeFirst();
473 if (event.isError) {
474 event.complete(_completer);
475 return true;
476 }
477 }
478 _completer.complete(0);
479 return true;
480 }
481
482 void close(Queue<Result> events) {
483 _completer.complete(_eventsToSkip);
484 }
485 }
486
487 /// Request for a [StreamQueue.take] call.
488 class _TakeRequest<T> implements _EventRequest {
489 /// Completer for the future returned by the take call.
490 final Completer _completer;
491
492 /// List collecting events until enough have been seen.
493 final List _list = <T>[];
494
495 /// Number of events to capture.
496 ///
497 /// The request [isComplete] when the length of [_list] reaches
498 /// this value.
499 final int _eventsToTake;
500
501 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
502
503 /// The future completed when the correct number of events have been captured.
504 Future get future => _completer.future;
505
506 bool addEvents(Queue<Result> events) {
507 while (_list.length < _eventsToTake) {
508 if (events.isEmpty) return false;
509 var result = events.removeFirst();
510 if (result.isError) {
511 result.complete(_completer);
512 return true;
513 }
514 _list.add(result.asValue.value);
515 }
516 _completer.complete(_list);
517 return true;
518 }
519
520 void close(Queue<Result> events) {
521 _completer.complete(_list);
522 }
523 }
524
525 /// Request for a [StreamQueue.cancel] call.
526 ///
527 /// The request needs no events, it just waits in the request queue
528 /// until all previous events are fulfilled, then it cancels the stream queue
529 /// source subscription.
530 class _CancelRequest implements _EventRequest {
531 /// Completer for the future returned by the `cancel` call.
532 final Completer _completer = new Completer();
533
534 /// The [StreamQueue] object that has this request queued.
535 ///
536 /// When the event is completed, it needs to cancel the active subscription
537 /// of the `StreamQueue` object, if any.
538 final StreamQueue _streamQueue;
539
540 _CancelRequest(this._streamQueue);
541
542 /// The future completed when the cancel request is completed.
543 Future get future => _completer.future;
544
545 bool addEvents(Queue<Result> events) {
546 _shutdown();
547 return true;
548 }
549
550 void close(_) {
551 _shutdown();
552 }
553
554 void _shutdown() {
555 if (_streamQueue._isDone) {
556 _completer.complete();
557 } else {
558 _streamQueue._ensureListening();
559 _completer.complete(_streamQueue._dispose().cancel());
560 }
561 }
562 }
563
564 /// Request for a [StreamQueue.rest] call.
565 ///
566 /// The request is always complete, it just waits in the request queue
567 /// until all previous events are fulfilled, then it takes over the
568 /// stream events subscription and creates a stream from it.
569 class _RestRequest<T> implements _EventRequest {
570 /// Completer for the stream returned by the `rest` call.
571 final StreamCompleter _completer = new StreamCompleter<T>();
572
573 /// The [StreamQueue] object that has this request queued.
574 ///
575 /// When the event is completed, it needs to cancel the active subscription
576 /// of the `StreamQueue` object, if any.
577 final StreamQueue _streamQueue;
578
579 _RestRequest(this._streamQueue);
580
581 /// The stream which will contain the remaining events of [_streamQueue].
582 Stream<T> get stream => _completer.stream;
583
584 bool addEvents(Queue<Result> events) {
585 _completeStream(events);
586 return true;
587 }
588
589 void close(Queue<Result> events) {
590 _completeStream(events);
591 }
592
593 void _completeStream(Queue<Result> events) {
594 if (events.isEmpty) {
595 if (_streamQueue._isDone) {
596 _completer.setEmpty();
597 } else {
598 _completer.setSourceStream(_getRestStream());
599 }
600 } else {
601 // There are prefetched events which needs to be added before the
602 // remaining stream.
603 var controller = new StreamController<T>();
604 for (var event in events) {
605 event.addTo(controller);
606 }
607 controller.addStream(_getRestStream(), cancelOnError: false)
608 .whenComplete(controller.close);
609 _completer.setSourceStream(controller.stream);
610 }
611 }
612
613 /// Create a stream from the rest of [_streamQueue]'s subscription.
614 Stream _getRestStream() {
615 if (_streamQueue._isDone) {
616 var controller = new StreamController<T>()..close();
617 return controller.stream;
618 // TODO(lrn). Use the following when 1.11 is released.
619 // return new Stream<T>.empty();
620 }
621 if (_streamQueue._subscription == null) {
622 return _streamQueue._sourceStream;
623 }
624 var subscription = _streamQueue._dispose();
625 subscription.resume();
626 return new SubscriptionStream<T>(subscription);
627 }
628 }
629
630 /// Request for a [StreamQueue.hasNext] call.
631 ///
632 /// Completes the [future] with `true` if it sees any event,
633 /// but doesn't consume the event.
634 /// If the request is closed without seeing an event, then
635 /// the [future] is completed with `false`.
636 class _HasNextRequest<T> implements _EventRequest {
637 final Completer _completer = new Completer<bool>();
638
639 Future<bool> get future => _completer.future;
640
641 bool addEvents(Queue<Result> events) {
642 if (events.isNotEmpty) {
643 _completer.complete(true);
644 return true;
645 }
646 return false;
647 }
648
649 void close(_) {
650 _completer.complete(false);
651 }
652 }
653
654 /// Request for a [StreamQueue.fork] call.
655 class _ForkRequest<T> implements _EventRequest {
656 /// Completer for the stream used by the queue by the `fork` call.
657 StreamCompleter _completer;
658
659 StreamQueue<T> queue;
660
661 /// The [StreamQueue] object that has this request queued.
662 final StreamQueue _streamQueue;
663
664 _ForkRequest(this._streamQueue) {
665 _completer = new StreamCompleter<T>();
666 queue = new StreamQueue<T>(_completer.stream);
667 }
668
669 bool addEvents(Queue<Result> events) {
670 _completeStream(events);
671 return true;
672 }
673
674 void close(Queue<Result> events) {
675 _completeStream(events);
676 }
677
678 void _completeStream(Queue<Result> events) {
679 if (events.isEmpty) {
680 if (_streamQueue._isDone) {
681 _completer.setEmpty();
682 } else {
683 _completer.setSourceStream(_streamQueue._sourceStream.fork());
684 }
685 } else {
686 // There are prefetched events which need to be added before the
687 // remaining stream.
688 var controller = new StreamController<T>();
689 for (var event in events) {
690 event.addTo(controller);
691 }
692
693 var fork = _streamQueue._sourceStream.fork();
694 controller.addStream(fork, cancelOnError: false)
695 .whenComplete(controller.close);
696 _completer.setSourceStream(controller.stream);
697 }
698 }
699 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698