Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(157)

Side by Side Diff: lib/src/stream_queue.dart

Issue 1305063002: Make it possible to not complete a request when the event source is done. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_events; 5 library async.stream_events;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 9
10 import "subscription_stream.dart"; 10 import "subscription_stream.dart";
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
53 /// var headerCount = 53 /// var headerCount =
54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); 54 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
55 /// handleMessage(headers: await events.take(headerCount), 55 /// handleMessage(headers: await events.take(headerCount),
56 /// body: events.rest); 56 /// body: events.rest);
57 /// return; 57 /// return;
58 /// } 58 /// }
59 /// // Error handling. 59 /// // Error handling.
60 /// 60 ///
61 /// When you need no further events the `StreamQueue` should be closed 61 /// When you need no further events the `StreamQueue` should be closed
62 /// using [cancel]. This releases the underlying stream subscription. 62 /// using [cancel]. This releases the underlying stream subscription.
63 class StreamQueue<T> { 63 abstract class StreamQueue<T> {
64 // This class maintains two queues: one of events and one of requests. 64 // This class maintains two queues: one of events and one of requests.
65 // The active request (the one in front of the queue) is called with 65 // The active request (the one in front of the queue) is called with
66 // the current event queue when it becomes active. 66 // the current event queue when it becomes active.
67 // 67 //
68 // If the request returns true, it's complete and will be removed from the 68 // If the request returns true, it's complete and will be removed from the
69 // request queue. 69 // request queue.
70 // If the request returns false, it needs more events, and will be called 70 // If the request returns false, it needs more events, and will be called
71 // again when new events are available. 71 // again when new events are available.
72 // The request can remove events that it uses, or keep them in the event 72 // The request can remove events that it uses, or keep them in the event
73 // queue until it has all that it needs. 73 // queue until it has all that it needs.
74 // 74 //
75 // This model is very flexible and easily extensible. 75 // This model is very flexible and easily extensible.
76 // It allows requests that don't consume events (like [hasNext]) or 76 // It allows requests that don't consume events (like [hasNext]) or
77 // potentially a request that takes either five or zero events, determined 77 // potentially a request that takes either five or zero events, determined
78 // by the content of the fifth event. 78 // by the content of the fifth event.
79 79
80 /// Source of events. 80 /// Whether the event source done.
nweiz 2015/08/21 19:43:57 "is done"
Lasse Reichstein Nielsen 2015/08/24 08:31:33 Done. I mean: Is done! :)
81 final Stream _sourceStream;
82
83 /// Subscription on [_sourceStream] while listening for events.
84 ///
85 /// Set to subscription when listening, and set to `null` when the
86 /// subscription is done (and [_isDone] is set to true).
87 StreamSubscription _subscription;
88
89 /// Whether we have listened on [_sourceStream] and the subscription is done.
90 bool _isDone = false; 81 bool _isDone = false;
91 82
92 /// Whether a closing operation has been performed on the stream queue. 83 /// Whether a closing operation has been performed on the stream queue.
93 /// 84 ///
94 /// Closing operations are [cancel] and [rest]. 85 /// Closing operations are [cancel] and [rest].
95 bool _isClosed = false; 86 bool _isClosed = false;
96 87
97 /// Queue of events not used by a request yet. 88 /// Queue of events not used by a request yet.
98 final Queue<Result> _eventQueue = new Queue(); 89 final Queue<Result> _eventQueue = new Queue();
99 90
100 /// Queue of pending requests. 91 /// Queue of pending requests.
101 /// 92 ///
102 /// Access through methods below to ensure consistency. 93 /// Access through methods below to ensure consistency.
103 final Queue<_EventRequest> _requestQueue = new Queue(); 94 final Queue<_EventRequest> _requestQueue = new Queue();
104 95
105 /// Create a `StreamQueue` of the events of [source]. 96 /// Create a `StreamQueue` of the events of [source].
106 StreamQueue(Stream source) 97 factory StreamQueue(Stream source) = _StreamQueue<T>;
107 : _sourceStream = source; 98
99 StreamQueue._();
108 100
109 /// Asks if the stream has any more events. 101 /// Asks if the stream has any more events.
110 /// 102 ///
111 /// Returns a future that completes with `true` if the stream has any 103 /// Returns a future that completes with `true` if the stream has any
112 /// more events, whether data or error. 104 /// more events, whether data or error.
113 /// If the stream closes without producing any more events, the returned 105 /// If the stream closes without producing any more events, the returned
114 /// future completes with `false`. 106 /// future completes with `false`.
115 /// 107 ///
116 /// Can be used before using [next] to avoid getting an error in the 108 /// Can be used before using [next] to avoid getting an error in the
117 /// future returned by `next` in the case where there are no more events. 109 /// future returned by `next` in the case where there are no more events.
110 /// Another alternative is to use `take(1)` which returns either zero or
111 /// one events.
118 Future<bool> get hasNext { 112 Future<bool> get hasNext {
119 if (!_isClosed) { 113 if (!_isClosed) {
120 var hasNextRequest = new _HasNextRequest(); 114 var hasNextRequest = new _HasNextRequest();
121 _addRequest(hasNextRequest); 115 _addRequest(hasNextRequest);
122 return hasNextRequest.future; 116 return hasNextRequest.future;
123 } 117 }
124 throw _failClosed(); 118 throw _failClosed();
125 } 119 }
126 120
127 /// Requests the next (yet unrequested) event from the stream. 121 /// Requests the next (yet unrequested) event from the stream.
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
209 Future<List<T>> take(int count) { 203 Future<List<T>> take(int count) {
210 if (count < 0) throw new RangeError.range(count, 0, null, "count"); 204 if (count < 0) throw new RangeError.range(count, 0, null, "count");
211 if (!_isClosed) { 205 if (!_isClosed) {
212 var request = new _TakeRequest<T>(count); 206 var request = new _TakeRequest<T>(count);
213 _addRequest(request); 207 _addRequest(request);
214 return request.future; 208 return request.future;
215 } 209 }
216 throw _failClosed(); 210 throw _failClosed();
217 } 211 }
218 212
219 /// Cancels the underlying stream subscription. 213 /// Cancels the underlying event source.
220 /// 214 ///
221 /// If [immediate] is `false` (the default), the cancel operation waits until 215 /// If [immediate] is `false` (the default), the cancel operation waits until
222 /// all previously requested events have been processed, then it cancels the 216 /// all previously requested events have been processed, then it cancels the
223 /// subscription providing the events. 217 /// subscription providing the events.
224 /// 218 ///
225 /// If [immediate] is `true`, the subscription is instead canceled 219 /// If [immediate] is `true`, the source is instead canceled
226 /// immediately. Any pending events are completed as though the underlying 220 /// immediately. Any pending events are completed as though the underlying
227 /// stream had closed. 221 /// stream had closed.
228 /// 222 ///
229 /// The returned future completes with the result of calling 223 /// The returned future completes with the result of calling
230 /// `cancel`. 224 /// `cancel`.
231 /// 225 ///
232 /// After calling `cancel`, no further events can be requested. 226 /// After calling `cancel`, no further events can be requested.
233 /// None of [next], [rest], [skip], [take] or [cancel] may be 227 /// None of [next], [rest], [skip], [take] or [cancel] may be
234 /// called again. 228 /// called again.
235 Future cancel({bool immediate: false}) { 229 Future cancel({bool immediate: false}) {
236 if (_isClosed) throw _failClosed(); 230 if (_isClosed) throw _failClosed();
237 _isClosed = true; 231 _isClosed = true;
238 232
239 if (!immediate) { 233 if (!immediate) {
240 var request = new _CancelRequest(this); 234 var request = new _CancelRequest(this);
241 _addRequest(request); 235 _addRequest(request);
242 return request.future; 236 return request.future;
243 } 237 }
244 238
245 if (_isDone) return new Future.value(); 239 if (_isDone && _eventQueue.isEmpty) return new Future.value();
246 if (_subscription == null) _subscription = _sourceStream.listen(null); 240 return _cancel();
247 var future = _subscription.cancel();
248 _onDone();
249 return future;
250 } 241 }
251 242
243 /// Cancels the underlying event source.
244 Future _cancel();
nweiz 2015/08/21 19:43:57 It might be clearer to explicitly divide these pri
Lasse Reichstein Nielsen 2015/08/24 08:31:34 Good idea.
245
252 /// Returns an error for when a request is made after cancel. 246 /// Returns an error for when a request is made after cancel.
253 /// 247 ///
254 /// Returns a [StateError] with a message saying that either 248 /// Returns a [StateError] with a message saying that either
255 /// [cancel] or [rest] have already been called. 249 /// [cancel] or [rest] have already been called.
256 Error _failClosed() { 250 Error _failClosed() {
257 return new StateError("Already cancelled"); 251 return new StateError("Already cancelled");
258 } 252 }
259 253
260 // Callbacks receiving the events of the source stream. 254 // Callbacks receiving the events of the source stream.
261 255 void _addResult(Result result) {
nweiz 2015/08/21 19:43:57 Nit: keep this empty line.
Lasse Reichstein Nielsen 2015/08/24 08:31:34 I've documented the function instead.
262 void _onData(T data) { 256 _eventQueue.add(result);
263 _eventQueue.add(new Result.value(data));
264 _checkQueues();
265 }
266
267 void _onError(error, StackTrace stack) {
268 _eventQueue.add(new Result.error(error, stack));
269 _checkQueues(); 257 _checkQueues();
270 } 258 }
271 259
272 void _onDone() { 260 void _onDone() {
nweiz 2015/08/21 19:43:57 Document this.
Lasse Reichstein Nielsen 2015/08/24 08:31:33 Done.
273 _subscription = null;
274 _isDone = true; 261 _isDone = true;
275 _closeAllRequests(); 262 _checkQueues();
276 } 263 }
277 264
278 // Request queue management. 265 // Request queue management.
279 266
280 /// Adds a new request to the queue. 267 /// Adds a new request to the queue.
281 void _addRequest(_EventRequest request) { 268 void _addRequest(_EventRequest request) {
282 if (_isDone) {
283 assert(_requestQueue.isEmpty);
284 if (!request.addEvents(_eventQueue)) {
285 request.close(_eventQueue);
286 }
287 return;
288 }
289 if (_requestQueue.isEmpty) { 269 if (_requestQueue.isEmpty) {
290 if (request.addEvents(_eventQueue)) return; 270 if (request.addEvents(_eventQueue, _isDone)) return;
291 _ensureListening(); 271 _ensureListening();
292 } 272 }
293 _requestQueue.add(request); 273 _requestQueue.add(request);
294 } 274 }
295 275
296 /// Ensures that we are listening on events from [_sourceStream]. 276 /// Ensures that we are listening on events from the event source.
297 /// 277 ///
298 /// Resumes subscription on [_sourceStream], or creates it if necessary. 278 /// Starts listening for the first time or resumes after a [_pause].
279 void _ensureListening();
280
281 /// Matches events with requests.
282 ///
283 /// Called after receiving an event or when the event source closes.
284 /// May be called by requests which have stalled after the event queue
nweiz 2015/08/21 19:43:57 "stalled" is confusing for me here—it's not a term
Lasse Reichstein Nielsen 2015/08/24 08:31:34 Done.
285 /// is empty or when the event source is done.
286 void _checkQueues() {
287 while (_requestQueue.isNotEmpty) {
288 if (_requestQueue.first.addEvents(_eventQueue, _isDone)) {
289 _requestQueue.removeFirst();
290 } else {
291 return;
292 }
293 }
294 if (!_isDone) {
nweiz 2015/08/21 19:43:57 Nit: I find it tends to look better when block-lev
Lasse Reichstein Nielsen 2015/08/24 08:31:34 I've added some extra lines (not all between block
295 _pause();
296 }
297 }
298
299 /// Requests that the event source pauses events.
300 ///
301 /// The event source is restarted by the next call to [_ensureListening].
302 void _pause();
303
304 /// Extracts a stream from the event source and makes this stream queue
305 /// unusable.
306 ///
307 /// Can only be used by the very last request.
308 /// Only used by [rest].
309 Stream _extractStream();
310 }
311
312
313 /// The default implementation of [StreamQueue].
314 ///
315 /// This queue gets its events from a stream which is listened
316 /// to when a request needs events.
317 class _StreamQueue<T> extends StreamQueue<T> {
318
nweiz 2015/08/21 19:43:57 Nit: extra newline.
Lasse Reichstein Nielsen 2015/08/24 08:31:33 Done.
319 /// Source of events.
320 final Stream _sourceStream;
321
322 /// Subscription on [_sourceStream] while listening for events.
323 ///
324 /// Set to subscription when listening, and set to `null` when the
325 /// subscription is done (and [_isDone] is set to true).
326 StreamSubscription _subscription;
327
328 _StreamQueue(this._sourceStream) : super._();
329
330 Future _cancel() {
331 if (_isDone) return null;
332 if (_subscription == null) _subscription = _sourceStream.listen(null);
333 var future = _subscription.cancel();
334 _onDone();
335 return future;
336 }
337
338 void _onData(T data) {
nweiz 2015/08/21 19:43:57 For what it's worth, these methods are so small I'
Lasse Reichstein Nielsen 2015/08/24 08:31:33 Done.
339 _addResult(new Result.value(data));
340 }
341
342 void _onError(error, StackTrace stack) {
343 _addResult(new Result.error(error, stack));
344 }
345
346 void _onDone() {
347 _subscription = null;
348 super._onDone();
349 }
350
299 void _ensureListening() { 351 void _ensureListening() {
300 assert(!_isDone); 352 assert(!_isDone);
301 if (_subscription == null) { 353 if (_subscription == null) {
302 _subscription = 354 _subscription =
303 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); 355 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
304 } else { 356 } else {
305 _subscription.resume(); 357 _subscription.resume();
306 } 358 }
307 } 359 }
308 360
309 /// Removes all requests and closes them. 361 void _pause() {
310 /// 362 _subscription.pause();
311 /// Used when the source stream is done.
312 /// After this, no further requests will be added to the queue,
313 /// requests are immediately served entirely by events already in the event
314 /// queue, if any.
315 void _closeAllRequests() {
316 assert(_isDone);
317 while (_requestQueue.isNotEmpty) {
318 var request = _requestQueue.removeFirst();
319 if (!request.addEvents(_eventQueue)) {
320 request.close(_eventQueue);
321 }
322 }
323 } 363 }
324 364
325 /// Matches events with requests. 365 Stream<T> _extractStream() {
326 /// 366 assert(_isClosed);
327 /// Called after receiving an event. 367 if (_isDone) {
328 void _checkQueues() { 368 return new Stream<T>.empty();
329 while (_requestQueue.isNotEmpty) {
330 if (_requestQueue.first.addEvents(_eventQueue)) {
331 _requestQueue.removeFirst();
332 } else {
333 return;
334 }
335 } 369 }
336 if (!_isDone) { 370 if (_subscription == null) {
337 _subscription.pause(); 371 return _sourceStream;
nweiz 2015/08/21 19:43:57 Nit: consider making this a single-line if.
Lasse Reichstein Nielsen 2015/08/24 08:31:34 I prefer to not hide significant control flow (ret
338 } 372 }
339 }
340
341 /// Extracts the subscription and makes this stream queue unusable.
342 ///
343 /// Can only be used by the very last request.
344 StreamSubscription _dispose() {
345 assert(_isClosed);
346 var subscription = _subscription; 373 var subscription = _subscription;
347 _subscription = null; 374 _subscription = null;
348 _isDone = true; 375 _isDone = true;
349 return subscription; 376 if (!subscription.isPaused) subscription.pause();
nweiz 2015/08/21 19:43:57 Why pause and re-resume it?
Lasse Reichstein Nielsen 2015/08/24 08:31:34 We need to resume it if it was paused, because the
377 var result = new SubscriptionStream<T>(subscription);
378 subscription.resume();
379 return result;
350 } 380 }
351 } 381 }
352 382
383
384
nweiz 2015/08/21 19:43:57 Nit: three newlines here seems excessive. It's als
Lasse Reichstein Nielsen 2015/08/24 08:31:33 True, should only be two.
353 /// Request object that receives events when they arrive, until fulfilled. 385 /// Request object that receives events when they arrive, until fulfilled.
354 /// 386 ///
355 /// Each request that cannot be fulfilled immediately is represented by 387 /// Each request that cannot be fulfilled immediately is represented by
356 /// an `_EventRequest` object in the request queue. 388 /// an `_EventRequest` object in the request queue.
357 /// 389 ///
358 /// Events from the source stream are sent to the first request in the 390 /// Events from the source stream are sent to the first request in the
359 /// queue until it reports itself as [isComplete]. 391 /// queue until it reports itself as [isComplete].
360 /// 392 ///
361 /// When the first request in the queue `isComplete`, either when becoming 393 /// When the first request in the queue `isComplete`, either when becoming
362 /// the first request or after receiving an event, its [close] methods is 394 /// the first request or after receiving an event, its [close] methods is
(...skipping 12 matching lines...) Expand all
375 /// more events. 407 /// more events.
376 /// The call may keep events in the queue until the requeust is complete, 408 /// The call may keep events in the queue until the requeust is complete,
377 /// or it may remove them immediately. 409 /// or it may remove them immediately.
378 /// 410 ///
379 /// If the method returns true, the request is considered fulfilled, and 411 /// If the method returns true, the request is considered fulfilled, and
380 /// will never be called again. 412 /// will never be called again.
381 /// 413 ///
382 /// This method is called when a request reaches the front of the request 414 /// This method is called when a request reaches the front of the request
383 /// queue, and if it returns `false`, it's called again every time a new event 415 /// queue, and if it returns `false`, it's called again every time a new event
384 /// becomes available, or when the stream closes. 416 /// becomes available, or when the stream closes.
385 bool addEvents(Queue<Result> events); 417 /// If the function returns `false` when the stream has already closed
nweiz 2015/08/21 19:43:57 "the function" -> "this method" or "it" What are
Lasse Reichstein Nielsen 2015/08/23 11:30:00 That's exactly for "lookahead". The look-ahead may
nweiz 2015/08/25 00:38:21 SGTM
386 418 /// ([isDone] is true), then the request must call [StreamQueue._checkQueues]
387 /// Complete the request. 419 /// itself when it's ready to continue.
388 /// 420 bool addEvents(Queue<Result> events, bool isDone);
389 /// This is called when the source stream is done before the request
390 /// had a chance to receive all its events. That is, after a call
391 /// to [addEvents] has returned `false`.
392 /// If there are any unused events available, they are in the [events] queue.
393 /// No further events will become available.
394 ///
395 /// The queue should only remove events from the front of the event queue,
396 /// e.g., using [removeFirst].
397 ///
398 /// If the request kept events in the queue after an [addEvents] call,
399 /// this is the last chance to use them.
400 void close(Queue<Result> events);
401 } 421 }
402 422
403 /// Request for a [StreamQueue.next] call. 423 /// Request for a [StreamQueue.next] call.
404 /// 424 ///
405 /// Completes the returned future when receiving the first event, 425 /// Completes the returned future when receiving the first event,
406 /// and is then complete. 426 /// and is then complete.
407 class _NextRequest<T> implements _EventRequest { 427 class _NextRequest<T> implements _EventRequest {
408 /// Completer for the future returned by [StreamQueue.next]. 428 /// Completer for the future returned by [StreamQueue.next].
409 final Completer _completer; 429 final Completer _completer;
410 430
411 _NextRequest() : _completer = new Completer<T>(); 431 _NextRequest() : _completer = new Completer<T>();
412 432
413 Future<T> get future => _completer.future; 433 Future<T> get future => _completer.future;
414 434
415 bool addEvents(Queue<Result> events) { 435 bool addEvents(Queue<Result> events, bool isDone) {
416 if (events.isEmpty) return false; 436 if (events.isNotEmpty) {
417 events.removeFirst().complete(_completer); 437 events.removeFirst().complete(_completer);
418 return true; 438 return true;
419 } 439 }
420 440 if (isDone) {
421 void close(Queue<Result> events) { 441 var errorFuture =
422 var errorFuture = 442 new Future.sync(() => throw new StateError("No elements"));
423 new Future.sync(() => throw new StateError("No elements")); 443 _completer.complete(errorFuture);
424 _completer.complete(errorFuture); 444 return true;
445 }
446 return false;
425 } 447 }
426 } 448 }
427 449
428 /// Request for a [StreamQueue.skip] call. 450 /// Request for a [StreamQueue.skip] call.
429 class _SkipRequest implements _EventRequest { 451 class _SkipRequest implements _EventRequest {
430 /// Completer for the future returned by the skip call. 452 /// Completer for the future returned by the skip call.
431 final Completer _completer = new Completer<int>(); 453 final Completer _completer = new Completer<int>();
432 454
433 /// Number of remaining events to skip. 455 /// Number of remaining events to skip.
434 /// 456 ///
435 /// The request [isComplete] when the values reaches zero. 457 /// The request [isComplete] when the values reaches zero.
436 /// 458 ///
437 /// Decremented when an event is seen. 459 /// Decremented when an event is seen.
438 /// Set to zero when an error is seen since errors abort the skip request. 460 /// Set to zero when an error is seen since errors abort the skip request.
439 int _eventsToSkip; 461 int _eventsToSkip;
440 462
441 _SkipRequest(this._eventsToSkip); 463 _SkipRequest(this._eventsToSkip);
442 464
443 /// The future completed when the correct number of events have been skipped. 465 /// The future completed when the correct number of events have been skipped.
444 Future get future => _completer.future; 466 Future get future => _completer.future;
445 467
446 bool addEvents(Queue<Result> events) { 468 bool addEvents(Queue<Result> events, bool isDone) {
447 while (_eventsToSkip > 0) { 469 while (_eventsToSkip > 0) {
448 if (events.isEmpty) return false; 470 if (events.isEmpty) {
471 if (isDone) break;
472 return false;
473 }
449 _eventsToSkip--; 474 _eventsToSkip--;
450 var event = events.removeFirst(); 475 var event = events.removeFirst();
451 if (event.isError) { 476 if (event.isError) {
452 event.complete(_completer); 477 event.complete(_completer);
453 return true; 478 return true;
454 } 479 }
455 } 480 }
456 _completer.complete(0); 481 _completer.complete(_eventsToSkip);
457 return true; 482 return true;
458 } 483 }
459
460 void close(Queue<Result> events) {
461 _completer.complete(_eventsToSkip);
462 }
463 } 484 }
464 485
465 /// Request for a [StreamQueue.take] call. 486 /// Request for a [StreamQueue.take] call.
466 class _TakeRequest<T> implements _EventRequest { 487 class _TakeRequest<T> implements _EventRequest {
467 /// Completer for the future returned by the take call. 488 /// Completer for the future returned by the take call.
468 final Completer _completer; 489 final Completer _completer;
469 490
470 /// List collecting events until enough have been seen. 491 /// List collecting events until enough have been seen.
471 final List _list = <T>[]; 492 final List _list = <T>[];
472 493
473 /// Number of events to capture. 494 /// Number of events to capture.
474 /// 495 ///
475 /// The request [isComplete] when the length of [_list] reaches 496 /// The request [isComplete] when the length of [_list] reaches
476 /// this value. 497 /// this value.
477 final int _eventsToTake; 498 final int _eventsToTake;
478 499
479 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); 500 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
480 501
481 /// The future completed when the correct number of events have been captured. 502 /// The future completed when the correct number of events have been captured.
482 Future get future => _completer.future; 503 Future get future => _completer.future;
483 504
484 bool addEvents(Queue<Result> events) { 505 bool addEvents(Queue<Result> events, bool isDone) {
485 while (_list.length < _eventsToTake) { 506 while (_list.length < _eventsToTake) {
486 if (events.isEmpty) return false; 507 if (events.isEmpty) {
508 if (isDone) break;
509 return false;
510 }
487 var result = events.removeFirst(); 511 var result = events.removeFirst();
488 if (result.isError) { 512 if (result.isError) {
489 result.complete(_completer); 513 result.complete(_completer);
490 return true; 514 return true;
491 } 515 }
492 _list.add(result.asValue.value); 516 _list.add(result.asValue.value);
493 } 517 }
494 _completer.complete(_list); 518 _completer.complete(_list);
495 return true; 519 return true;
496 } 520 }
497
498 void close(Queue<Result> events) {
499 _completer.complete(_list);
500 }
501 } 521 }
502 522
503 /// Request for a [StreamQueue.cancel] call. 523 /// Request for a [StreamQueue.cancel] call.
504 /// 524 ///
505 /// The request needs no events, it just waits in the request queue 525 /// The request needs no events, it just waits in the request queue
506 /// until all previous events are fulfilled, then it cancels the stream queue 526 /// until all previous events are fulfilled, then it cancels the stream queue
507 /// source subscription. 527 /// source subscription.
508 class _CancelRequest implements _EventRequest { 528 class _CancelRequest implements _EventRequest {
509 /// Completer for the future returned by the `cancel` call. 529 /// Completer for the future returned by the `cancel` call.
510 final Completer _completer = new Completer(); 530 final Completer _completer = new Completer();
511 531
512 /// The [StreamQueue] object that has this request queued. 532 /// The [StreamQueue] object that has this request queued.
513 /// 533 ///
514 /// When the event is completed, it needs to cancel the active subscription 534 /// When the event is completed, it needs to cancel the active subscription
515 /// of the `StreamQueue` object, if any. 535 /// of the `StreamQueue` object, if any.
516 final StreamQueue _streamQueue; 536 final StreamQueue _streamQueue;
517 537
518 _CancelRequest(this._streamQueue); 538 _CancelRequest(this._streamQueue);
519 539
520 /// The future completed when the cancel request is completed. 540 /// The future completed when the cancel request is completed.
521 Future get future => _completer.future; 541 Future get future => _completer.future;
522 542
523 bool addEvents(Queue<Result> events) { 543 bool addEvents(Queue<Result> events, bool isDone) {
524 _shutdown(); 544 _completer.complete(_streamQueue._cancel());
525 return true; 545 return true;
526 } 546 }
527
528 void close(_) {
529 _shutdown();
530 }
531
532 void _shutdown() {
533 if (_streamQueue._subscription == null) {
534 _completer.complete();
535 } else {
536 _completer.complete(_streamQueue._dispose().cancel());
537 }
538 }
539 } 547 }
540 548
541 /// Request for a [StreamQueue.rest] call. 549 /// Request for a [StreamQueue.rest] call.
542 /// 550 ///
543 /// The request is always complete, it just waits in the request queue 551 /// The request is always complete, it just waits in the request queue
544 /// until all previous events are fulfilled, then it takes over the 552 /// until all previous events are fulfilled, then it takes over the
545 /// stream events subscription and creates a stream from it. 553 /// stream events subscription and creates a stream from it.
546 class _RestRequest<T> implements _EventRequest { 554 class _RestRequest<T> implements _EventRequest {
547 /// Completer for the stream returned by the `rest` call. 555 /// Completer for the stream returned by the `rest` call.
548 final StreamCompleter _completer = new StreamCompleter<T>(); 556 final StreamCompleter _completer = new StreamCompleter<T>();
549 557
550 /// The [StreamQueue] object that has this request queued. 558 /// The [StreamQueue] object that has this request queued.
551 /// 559 ///
552 /// When the event is completed, it needs to cancel the active subscription 560 /// When the event is completed, it needs to cancel the active subscription
553 /// of the `StreamQueue` object, if any. 561 /// of the `StreamQueue` object, if any.
554 final StreamQueue _streamQueue; 562 final StreamQueue _streamQueue;
555 563
556 _RestRequest(this._streamQueue); 564 _RestRequest(this._streamQueue);
557 565
558 /// The stream which will contain the remaining events of [_streamQueue]. 566 /// The stream which will contain the remaining events of [_streamQueue].
559 Stream<T> get stream => _completer.stream; 567 Stream<T> get stream => _completer.stream;
560 568
561 bool addEvents(Queue<Result> events) { 569 bool addEvents(Queue<Result> events, bool isDone) {
562 _completeStream(events);
563 return true;
564 }
565
566 void close(Queue<Result> events) {
567 _completeStream(events);
568 }
569
570 void _completeStream(Queue<Result> events) {
571 if (events.isEmpty) { 570 if (events.isEmpty) {
572 if (_streamQueue._isDone) { 571 if (_streamQueue._isDone) {
573 _completer.setEmpty(); 572 _completer.setEmpty();
574 } else { 573 } else {
575 _completer.setSourceStream(_getRestStream()); 574 _completer.setSourceStream(_streamQueue._extractStream());
576 } 575 }
577 } else { 576 } else {
578 // There are prefetched events which needs to be added before the 577 // There are prefetched events which needs to be added before the
579 // remaining stream. 578 // remaining stream.
580 var controller = new StreamController<T>(); 579 var controller = new StreamController<T>();
581 for (var event in events) { 580 for (var event in events) {
582 event.addTo(controller); 581 event.addTo(controller);
583 } 582 }
584 controller.addStream(_getRestStream(), cancelOnError: false) 583 controller.addStream(_streamQueue._extractStream(), cancelOnError: false)
585 .whenComplete(controller.close); 584 .whenComplete(controller.close);
586 _completer.setSourceStream(controller.stream); 585 _completer.setSourceStream(controller.stream);
587 } 586 }
588 } 587 return true;
589
590 /// Create a stream from the rest of [_streamQueue]'s subscription.
591 Stream _getRestStream() {
592 if (_streamQueue._isDone) {
593 var controller = new StreamController<T>()..close();
594 return controller.stream;
595 // TODO(lrn). Use the following when 1.11 is released.
596 // return new Stream<T>.empty();
597 }
598 if (_streamQueue._subscription == null) {
599 return _streamQueue._sourceStream;
600 }
601 var subscription = _streamQueue._dispose();
602 subscription.resume();
603 return new SubscriptionStream<T>(subscription);
604 } 588 }
605 } 589 }
606 590
607 /// Request for a [StreamQueue.hasNext] call. 591 /// Request for a [StreamQueue.hasNext] call.
608 /// 592 ///
609 /// Completes the [future] with `true` if it sees any event, 593 /// Completes the [future] with `true` if it sees any event,
610 /// but doesn't consume the event. 594 /// but doesn't consume the event.
611 /// If the request is closed without seeing an event, then 595 /// If the request is closed without seeing an event, then
612 /// the [future] is completed with `false`. 596 /// the [future] is completed with `false`.
613 class _HasNextRequest<T> implements _EventRequest { 597 class _HasNextRequest<T> implements _EventRequest {
614 final Completer _completer = new Completer<bool>(); 598 final Completer _completer = new Completer<bool>();
615 599
616 Future<bool> get future => _completer.future; 600 Future<bool> get future => _completer.future;
617 601
618 bool addEvents(Queue<Result> events) { 602 bool addEvents(Queue<Result> events, bool isDone) {
619 if (events.isNotEmpty) { 603 if (events.isNotEmpty) {
620 _completer.complete(true); 604 _completer.complete(true);
621 return true; 605 return true;
622 } 606 }
607 if (isDone) {
608 _completer.complete(false);
609 return true;
610 }
623 return false; 611 return false;
624 } 612 }
625
626 void close(_) {
627 _completer.complete(false);
628 }
629 } 613 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698