Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(458)

Side by Side Diff: lib/src/stream_queue.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Add all.dart to test. Apparently people like that. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_events;
6
7 import 'dart:async';
8 import 'dart:collection';
9
10 import "subscription_stream.dart";
11 import "stream_completer.dart";
12 import "../result.dart";
13
14 /// An asynchronous pull-based interface for accessing stream events.
15 ///
16 /// Wraps a stream and makes individual events available on request.
17 ///
18 /// You can request (and reserve) one or more events from the stream,
19 /// and after all previous requestes have been fulfilled, stream events
nweiz 2015/06/18 23:44:26 "requestes" -> "requests"
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
20 /// go towards fulfilling your request.
21 ///
22 /// For example, if you ask for [next] two times, the returned futures
23 /// will be completed by the next two unreserved events from the stream.
nweiz 2015/06/18 23:44:25 "unreserved" -> "unrequested"? Just to keep the te
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
24 ///
25 /// The stream subscription is paused when there are no active
26 /// requests.
27 ///
28 /// Some streams, including broadcast streams, will buffer
29 /// events while paused, so waiting too long between requests may
30 /// cause memory bloat somewhere else.
31 ///
32 /// The individual requests are served in the order they are requested,
33 /// and the stream subscription is paused when there are no active requests.
nweiz 2015/06/18 23:44:27 Both of these are already mentioned above, so this
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
34 ///
35 /// This is similar to, but more convenient than, a [StreamIterator].
36 /// A `StreamIterator` requires you to manually check when a new event is
37 /// available and you can only access the value of that event until you
38 /// check for the next one. A `StreamQueue` allows you to request, for example,
39 /// three events at a time, either individually, as a group using [take]
40 /// or [skip], or in any combination.
41 ///
42 /// You can also ask to have the [rest] of the stream provided as
43 /// a new stream. This allows, for example, taking the first event
44 /// out of a stream and continue using the rest of the stream as a stream.
nweiz 2015/06/18 23:44:27 "continue using" -> "continuing to use"
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
45 ///
46 /// Example:
47 ///
48 /// var events = new StreamQueue<String>(someStreamOfLines);
49 /// var first = await events.next;
50 /// while (first.startsWith('#')) {
51 /// // Skip comments.
52 /// first = await events.next;
53 /// }
54 ///
55 /// if (first.startsWith(MAGIC_MARKER)) {
56 /// var headerCount =
57 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
58 /// handleMessage(headers: await events.take(headerCount),
59 /// body: events.rest);
60 /// return;
61 /// }
62 /// // Error handling.
63 ///
64 /// When you need no further events the `StreamQueue` should be closed
65 /// using [cancel]. This releases the underlying stream subscription.
66 ///
67 /// The underlying stream subscription is paused when there
68 /// are no requeusts. Some subscriptions, including those of broadcast streams,
69 /// will still buffer events while paused. Creating a `StreamQueue` from
70 /// such a stream and stopping to request events, will cause memory to fill up
nweiz 2015/06/18 23:44:27 "stopping" -> "ceasing", get rid of the comma
71 /// unnecessarily.
nweiz 2015/06/18 23:44:26 This paragraph also seems redundant with the sixth
Lasse Reichstein Nielsen 2015/06/30 10:34:12 Done.
72 class StreamQueue<T> {
73 // This class maintains two queues: one of events and one of requests.
74 // The active request (the one in front of the queue) is called with
75 // the current event queue when it becomes active.
nweiz 2015/06/18 23:44:27 "it" -> "the request" (to clarify whether you're r
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
76 // If it returns true, it's done and will be removed from the request queue.
nweiz 2015/06/18 23:44:26 "it" -> "the request" again. The next "it" is fine
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
77 // If it returns false, it needs more events, and will be called again when
78 // new events are available.
79 // The request can remove events that it uses, or keep them in the event
80 // queue until it has all that it needs.
81 //
82 // This model is very flexible and easily extensible.
83 // It allows requests that don't consume events (like [hasNext]) or
84 // potentially a request that takes either five or zero events, determined
85 // by the content of the fifth event.
86
87 /// Source of events.
88 final Stream _sourceStream;
89
90 /// Number of events that may be prefetched when there is no request.
91 final int _prefetchCount;
92
93 /// Subscription on [_sourceStream] while listening for events.
94 ///
95 /// Set to subscription when listening, and set to `null` when the
96 /// subscription is done (and [_isDone] is set to true).
97 StreamSubscription _subscription;
98
99 /// Whether we have listened on [_sourceStream] and the subscription is done.
100 bool _isDone = false;
101
102 /// Whether a closing operation has been performed on the stream queue.
103 ///
104 /// Closing operations are [cancel] and [rest].
105 bool _isClosed = false;
106
107 /// Queue of events not used by a request yet.
108 final Queue<Result> _eventQueue = new Queue();
109
110 /// Queue of pending requests.
111 /// Access through methods below to ensure consistency.
nweiz 2015/06/18 23:44:26 Nit: add a newline above
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
112 final Queue<_EventRequest> _requestQueue = new Queue();
113
114 /// Create a `StreamQueue` of the events of source.
nweiz 2015/06/18 23:44:26 "source" -> "[source]"
Lasse Reichstein Nielsen 2015/06/30 10:34:12 Done.
115 ///
116 /// Allow prefetching [prefetch] events before pausing the source
117 /// stream even if there are no current requests for them.
118 /// The default is to pause immediately when there is no pending request.
119 /// Even if `prefetch` is greater than zero, the stream won't listened on
120 /// before the first request.
nweiz 2015/06/18 23:44:25 Consider allowing "prefetch: -1" to mean "never pa
Lasse Reichstein Nielsen 2015/06/30 10:34:14 I don't like -1 being magical, the alternative wou
121 StreamQueue(Stream source, {int prefetch: 0})
nweiz 2015/06/18 23:44:26 Consider making [prefetch] default to null and ass
Lasse Reichstein Nielsen 2015/06/30 10:34:12 True. I'll just remove it for now (effectively fix
122 : _sourceStream = source, _prefetchCount = prefetch;
123
124 /// Asks if the stream has any more events.
125 ///
126 /// Returns a future that completes with `true` if the stream has any
127 /// more events, whether data or error.
128 /// If the stream closes without producing any more events, the returned
129 /// future completes with `false`.
130 ///
131 /// Can be used before using [next] to avoid getting an error in the
132 /// future returned by `next` in the case where there are no more events.
133 Future<bool> get hasNext {
134 if (!_isClosed) {
135 _HasNextRequest hasNextRequest = new _HasNextRequest();
nweiz 2015/06/18 23:44:26 Nit: "var" (also below)
Lasse Reichstein Nielsen 2015/06/30 10:34:12 Done.
136 _addRequest(hasNextRequest);
137 return hasNextRequest.future;
138 }
139 throw _failClosed();
140 }
141
142 /// Requests the next (yet unrequested) event from the stream.
143 ///
144 /// When the requested event arrives, the returned future is completed with
145 /// the event.
146 /// If the event is a data event, the returned future completes
147 /// with its value.
148 /// If the event is an error event, the returned future completes with
149 /// its error and stack trace.
150 /// If the stream closes before an event arrives, the returned future
151 /// completes with a [StateError].
152 ///
153 /// It's possible to have several pending [next] calls (or other requests),
154 /// and they will be completed in the order they were requested, by the
155 /// first events that were not used by previous requeusts.
nweiz 2015/06/18 23:44:26 "used" -> "consumed" (slightly more correct for re
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
156 Future<T> get next {
157 if (!_isClosed) {
158 _NextRequest nextRequest = new _NextRequest<T>();
159 _addRequest(nextRequest);
160 return nextRequest.future;
161 }
162 throw _failClosed();
163 }
164
165 /// Returns a stream of all the remaning events of the source stream.
166 ///
167 /// All requested [next], [skip] or [take] operations are completed
168 /// first, and then any remaining events are provided as events of
169 /// the returned stream.
170 ///
171 /// Using `rest` closes the stream events object. After getting the
nweiz 2015/06/18 23:44:26 "the stream events object" -> "this stream queue"
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
172 /// `rest` the caller may no longer request other events, like
173 /// after calling [cancel].
174 Stream<T> get rest {
175 if (_isClosed) {
176 throw _failClosed();
177 }
178 var request = new _RestRequest<T>(this);
179 _isClosed = true;
180 _addRequest(request);
181 return request.stream;
182 }
183
184 /// Skips the next [count] *data* events.
185 ///
186 /// The [count] must be non-negative.
187 ///
188 /// When successful, this is equivalent to using [take]
189 /// and ignoring the result.
190 ///
191 /// If an error occurs before `count` data events have been skipped,
192 /// the returned future completes with that error instead.
193 ///
194 /// If the stream closes before `count` data events,
195 /// the remaining unskipped event count is returned.
196 /// If the returned future completes with the integer `0`,
197 /// then all events were succssfully skipped. If the value
198 /// is greater than zero then the stream ended early.
199 Future<int> skip(int count) {
200 if (count < 0) throw new RangeError.range(count, 0, null, "count");
201 if (!_isClosed) {
202 var request = new _SkipRequest(count);
203 _addRequest(request);
204 return request.future;
205 }
206 throw _failClosed();
207 }
208
209 /// Requests the next [count] data events as a list.
210 ///
211 /// The [count] must be non-negative.
212 ///
213 /// Equivalent to calling [next] `count` times and
214 /// storing the data values in a list.
215 ///
216 /// If an error occurs before `count` data events has
217 /// been collected, the returned future completes with
218 /// that error instead.
219 ///
220 /// If the stream closes before `count` data events,
221 /// the returned future completes with the list
222 /// of data collected so far. That is, the returned
223 /// list may have fewer than [count] elements.
224 Future<List<T>> take(int count) {
225 if (count < 0) throw new RangeError.range(count, 0, null, "count");
226 if (!_isClosed) {
227 var request = new _TakeRequest<T>(count);
228 _addRequest(request);
229 return request.future;
230 }
231 throw _failClosed();
232 }
233
234 /// Cancels the underlying stream subscription.
235 ///
236 /// The cancel operation waits until all previously requested
237 /// events have been processed, then it cancels the subscription
238 /// providing the events.
239 ///
240 /// The returned future completes with the result of calling
241 /// `cancel`.
242 ///
243 /// After calling `cancel`, no further events can be requested.
244 /// None of [next], [rest], [skip], [take] or [cancel] may be
245 /// called again.
246 Future cancel() {
247 if (!_isClosed) {
248 _isClosed = true;
249 var request = new _CancelRequest(this);
250 _addRequest(request);
251 return request.future;
252 }
253 throw _failClosed();
254 }
255
256 /// Returns an error for when a request is made after cancel.
257 ///
258 /// Returns a [StateError] with a message saying that either
259 /// [cancel] or [rest] have already been called.
260 Error _failClosed() {
261 return new StateError("Already cancelled");
262 }
263
264 // Callbacks receiving the events of the source stream.
265
266 void _onData(T data) {
267 _eventQueue.add(new Result.value(data));
268 _checkQueues();
269 }
270
271 void _onError(error, StackTrace stack) {
272 _eventQueue.add(new Result.error(error, stack));
273 _checkQueues();
274 }
275
276 void _onDone() {
277 _subscription = null;
278 _isDone = true;
279 _closeAllRequests();
280 }
281
282 // Request queue management.
283
284 /// Add a new request to the queue.
nweiz 2015/06/18 23:44:26 "Add" -> "Adds"
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
285 void _addRequest(_EventRequest request) {
286 if (_isDone) {
287 assert(_requestQueue.isEmpty);
288 if (!request.addEvents(_eventQueue)) {
289 request.close(_eventQueue);
290 }
291 return;
292 }
293 if (_requestQueue.isEmpty) {
294 if (request.addEvents(_eventQueue)) return;
295 _ensureListening();
296 }
297 _requestQueue.add(request);
298
nweiz 2015/06/18 23:44:26 Nit: extra newline
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
299 }
300
301 /// Ensures that we are listening on events from [_sourceStream].
302 ///
303 /// Resumes subscription on [_sourceStream], or create it if necessary.
nweiz 2015/06/18 23:44:26 "create" -> "creates"
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
304 StreamSubscription _ensureListening() {
305 assert(!_isDone);
306 if (_subscription == null) {
307 _subscription =
308 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
309 } else {
310 _subscription.resume();
311 }
312 }
313
314
315 /// Remove all requests and close them.
nweiz 2015/06/18 23:44:26 "Remove" -> "Removes", "close" -> "closes"
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
316 ///
317 /// Used when the source stream is done.
318 /// After this, no further requests will be added to the queue,
319 /// requests are immediately served entirely by events already in the event
320 /// queue, if any.
321 void _closeAllRequests() {
322 assert(_isDone);
323 while (_requestQueue.isNotEmpty) {
324 var request = _requestQueue.removeFirst();
325 if (!request.addEvents(_eventQueue)) {
nweiz 2015/06/18 23:44:26 Isn't this guaranteed to return false? [request.ad
Lasse Reichstein Nielsen 2015/06/30 10:34:12 It's only guaranteed to return false for the first
326 request.close(_eventQueue);
327 }
328 }
329 }
330
331 /// Matches events with requests.
332 ///
333 /// Called after receiving an event.
334 void _checkQueues() {
335 while (_requestQueue.isNotEmpty) {
336 if (_requestQueue.first.addEvents(_eventQueue)) {
337 _requestQueue.removeFirst();
338 } else {
339 return;
340 }
341 }
342 if (!_isDone && _eventQueue.length >= _prefetchCount) {
343 _subscription.pause();
344 }
345 }
346
347 /// Extracts the subscription and makes the events object unusable.
nweiz 2015/06/18 23:44:26 "events object" -> "stream queue"
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
348 ///
349 /// Can only be used by the very last request.
350 StreamSubscription _dispose() {
351 assert(_isClosed);
352 StreamSubscription subscription = _subscription;
nweiz 2015/06/18 23:44:25 Nit: "var"
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
353 _subscription = null;
354 _isDone = true;
355 return subscription;
356 }
357 }
358
359 /// Request object that receives events when they arrive, until fulfilled.
360 ///
361 /// Each request that cannot be fulfilled immediately is represented by
362 /// an `_EventRequest` object in the request queue.
363 ///
364 /// Events from the source stream are sent to the first request in the
365 /// queue until it reports itself as [isComplete].
366 ///
367 /// When the first request in the queue `isComplete`, either when becoming
368 /// the first request or after receiving an event, its [close] methods is
369 /// called.
370 ///
371 /// The [close] method is also called immediately when the source stream
372 /// is done.
373 abstract class _EventRequest implements EventSink {
374 /// Handle available events.
375 ///
376 /// The available events are provided as a queue. The `addEvents` function
377 /// should only access the queue from the start, e.g., using [removeFirst].
nweiz 2015/06/18 23:44:27 "access the queue from the start" -> "remove event
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
378 ///
379 /// Returns `true` if if the request is completed, or `false` if it needs
nweiz 2015/06/18 23:44:27 "if if" -> "if"
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
380 /// more events.
381 /// The call may keep events in the queue until the requeust is complete,
382 /// or it may remove them immediately.
383 ///
384 /// This method is called when a request reaches the front of the request
385 /// queue, and if it returns `false`, it's called again every time an event
nweiz 2015/06/18 23:44:25 "an event" -> "a new event"
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
386 /// becomes available.
nweiz 2015/06/18 23:44:27 "and finally when the stream closes"
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
387 bool addEvents(Queue<Result> events);
388
389 /// Complete the request.
390 ///
391 /// This is called when the source stream is done before the request
392 /// had a chance to receive events. If there are any events available,
393 /// they are in the [events] queue. No further events will become available.
394 ///
395 /// The queue should only be accessed from the start, e.g.,
396 /// using [removeFirst].
397 ///
398 /// If the requests kept events in the queue after an [addEvents] call,
nweiz 2015/06/18 23:44:26 "requests" -> "request"
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
399 /// it should remove them here.
400 void close(Queue<Result> events);
401 }
402
403 /// Request for a [StreamQueue.next] call.
404 ///
405 /// Completes the returned future when receiving the first event,
406 /// and is then complete.
407 class _NextRequest<T> implements _EventRequest {
408 /// Completer for the future returned by [StreamQueue.next].
409 ///
410 /// Set to `null` when it is completed, to mark it as already complete.
nweiz 2015/06/18 23:44:27 This is no longer accurate.
411 final Completer _completer;
412
413 _NextRequest() : _completer = new Completer<T>();
nweiz 2015/06/18 23:44:26 Nit: assign _completer in the declaration
Lasse Reichstein Nielsen 2015/06/30 10:34:12 Done.
414
415 Future<T> get future => _completer.future;
416
417 bool addEvents(Queue<Result> events) {
418 if (events.isEmpty) return false;
419 events.removeFirst().complete(_completer);
420 return true;
421 }
422
423 void close(Queue<Result> events) {
424 _completer.completeError(new StateError("no elements"));
nweiz 2015/06/18 23:44:27 "no" -> "No" Also include a stack trace here so t
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Done.
425 }
426 }
427
428 /// Request for a [StreamQueue.skip] call.
429 class _SkipRequest implements _EventRequest {
430 /// Completer for the future returned by the skip call.
431 final Completer _completer = new Completer<int>();
432
433 /// Number of remaining events to skip.
434 ///
435 /// The request [isComplete] when the values reaches zero.
436 ///
437 /// Decremented when an event is seen.
438 /// Set to zero when an error is seen since errors abort the skip request.
439 int _eventsToSkip;
440
441 _SkipRequest(this._eventsToSkip);
442
443 /// The future completed when the correct number of events have been skipped.
444 Future get future => _completer.future;
445
446 bool addEvents(Queue<Result> events) {
447 while (_eventsToSkip > 0) {
448 if (events.isEmpty) return false;
449 _eventsToSkip--;
450 var event = events.removeFirst();
451 if (event.isError) {
452 event.complete(_completer);
453 return true;
454 }
455 }
456 _completer.complete(0);
457 return true;
458 }
459
460 void close(Queue<Result> events) {
461 _completer.complete(_eventsToSkip);
462 }
463 }
464
465 /// Request for a [StreamQueue.take] call.
466 class _TakeRequest<T> implements _EventRequest {
467 /// Completer for the future returned by the take call.
468 final Completer _completer;
469
470 /// List collecting events until enough have been seen.
471 final List _list = <T>[];
472
473 /// Number of events to capture.
474 ///
475 /// The request [isComplete] when the length of [_list] reaches
476 /// this value.
477 final int _eventsToTake;
478
479 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
480
481 /// The future completed when the correct number of events have been captured.
482 Future get future => _completer.future;
483
484 bool addEvents(Queue<Events> events) {
485 while (_list.length < _eventsToTake) {
486 if (events.isEmpty) return false;
487 var result = events.removeFirst();
488 if (result.isError) {
489 result.complete(_completer);
490 return true;
491 }
492 _list.add(result.asValue.value);
493 }
494 _completer.complete(_list);
495 return true;
496 }
497
498 void close(Queue<Events> events) {
499 _completer.complete(_list);
500 }
501 }
502
503 /// Request for a [StreamQueue.cancel] call.
504 ///
505 /// The request is always complete, it just waits in the request queue
506 /// until all previous events are fulfilled, then it cancels the stream events
507 /// subscription.
508 class _CancelRequest implements _EventRequest {
509 /// Completer for the future returned by the `cancel` call.
510 final Completer _completer = new Completer();
511
512 /// The [StreamQueue] object that has this request queued.
513 ///
514 /// When the event is completed, it needs to cancel the active subscription
515 /// of the `StreamQueue` object, if any.
516 final StreamQueue _streamQueue;
517
518 _CancelRequest(this._streamQueue);
519
520 /// The future completed when the cancel request is completed.
521 Future get future => _completer.future;
522
523 bool addEvents(Queue<Result> events) {
524 _shutdown();
525 return true;
526 }
527
528 void close(_) {
529 _shutdown();
530 }
531
532 void _shutdown() {
533 if (_streamQueue._subscription == null) {
534 _completer.complete();
535 } else {
536 _completer.complete(_streamQueue._dispose().cancel());
537 }
538 }
539 }
540
541 /// Request for a [StreamQueue.rest] call.
542 ///
543 /// The request is always complete, it just waits in the request queue
544 /// until all previous events are fulfilled, then it takes over the
545 /// stream events subscription and creates a stream from it.
546 class _RestRequest<T> implements _EventRequest {
547 /// Completer for the stream returned by the `rest` call.
548 final StreamCompleter _completer;
549
550 /// The [StreamQueue] object that has this request queued.
551 ///
552 /// When the event is completed, it needs to cancel the active subscription
553 /// of the `StreamQueue` object, if any.
554 final StreamQueue _streamQueue;
555 _RestRequest(this._streamQueue) : _completer = new StreamCompleter<T>();
nweiz 2015/06/18 23:44:27 Nit: newline above. Also move [_completer]'s init
Lasse Reichstein Nielsen 2015/06/30 10:34:12 Done.
556
557 /// The future which will contain the remaining events of [_streamQueue].
nweiz 2015/06/18 23:44:26 "future" -> "stream"
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Done.
558 Stream<T> get stream => _completer.stream;
559
560 bool addEvents(Queue<Result> events) {
561 _completeStream(events);
562 return true;
563 }
564
565 void close(Queue<Result> events) {
566 _completeStream(events);
567 }
568
569 void _completeStream(Queue<Result> events) {
570 Stream getRestStream() {
nweiz 2015/06/18 23:44:26 Why is this a local method (as opposed to a privat
Lasse Reichstein Nielsen 2015/06/30 10:34:14 It's only used here, so I didn't see a need to giv
nweiz 2015/06/30 23:39:47 There are a lot of private methods at the top leve
Lasse Reichstein Nielsen 2015/07/01 08:24:56 Agree. Moved to private instance method.
571 if (_streamQueue._isDone) {
572 return new Stream<T>.empty();
nweiz 2015/06/18 23:44:25 If you're going to use this here, you'll need to u
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Ack. I SOOO wish 1.11 would be released soon. All
nweiz 2015/06/30 23:39:47 Yeah, this is one of the burdens of developing in
573 }
574 if (_streamQueue._subscription == null) {
575 return _streamQueue._sourceStream;
576 }
577 StreamSubscription subscription = _streamQueue._dispose();
578 subscription.resume();
579 return new SubscriptionStream<T>(subscription);
580 }
581 if (events.isEmpty) {
582 if (_streamQueue._isDone) {
583 _completer.setEmpty();
584 } else {
585 _completer.setSourceStream(getRestStream());
586 }
587 } else {
588 // There are prefetched events which needs to be added before the
589 // remaining stream.
590 StreamController controller = new StreamController<T>();
nweiz 2015/06/18 23:44:27 Nit: "var"
591 for (var event in events) event.addTo(controller);
nweiz 2015/06/18 23:44:25 I like this style of loop, but unfortunately it's
Lasse Reichstein Nielsen 2015/06/30 10:34:13 Do they want me to write events.forEach((even
592 controller.addStream(getRestStream(), cancelOnError: false)
593 .whenComplete(controller.close);
594 _completer.setSourceStream(controller.stream);
595 }
596 }
597 }
598
599 /// Request for a [StreamQueue.hasNext] call.
600 ///
601 /// Completes the [future] with `true` if it sees any event,
602 /// but doesn't consume the event.
603 /// If the request is closed without seeing an event, then
604 /// the [future] is completed with `false`.
605 class _HasNextRequest<T> implements _EventRequest {
606 final Completer _completer = new Completer<bool>();
607
608 Future<bool> get future => _completer.future;
609
610 bool addEvents(Queue<Result> events) {
611 if (events.isNotEmpty) {
612 _completer.complete(true);
613 return true;
614 }
615 return false;
616 }
617
618 void close(_) {
619 _completer.complete(false);
620 }
621 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698