OLD | NEW |
---|---|
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 import 'dart:collection'; | 6 import 'dart:collection'; |
7 | 7 |
8 import "result.dart"; | 8 import "result.dart"; |
9 import "subscription_stream.dart"; | 9 import "subscription_stream.dart"; |
10 import "stream_completer.dart"; | 10 import "stream_completer.dart"; |
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
87 | 87 |
88 /// Queue of events not used by a request yet. | 88 /// Queue of events not used by a request yet. |
89 final Queue<Result> _eventQueue = new Queue(); | 89 final Queue<Result> _eventQueue = new Queue(); |
90 | 90 |
91 /// Queue of pending requests. | 91 /// Queue of pending requests. |
92 /// | 92 /// |
93 /// Access through methods below to ensure consistency. | 93 /// Access through methods below to ensure consistency. |
94 final Queue<_EventRequest> _requestQueue = new Queue(); | 94 final Queue<_EventRequest> _requestQueue = new Queue(); |
95 | 95 |
96 /// Create a `StreamQueue` of the events of [source]. | 96 /// Create a `StreamQueue` of the events of [source]. |
97 factory StreamQueue(Stream source) = _StreamQueue<T>; | 97 factory StreamQueue(Stream<T> source) = _StreamQueue<T>; |
98 | 98 |
99 StreamQueue._(); | 99 StreamQueue._(); |
100 | 100 |
101 /// Asks if the stream has any more events. | 101 /// Asks if the stream has any more events. |
102 /// | 102 /// |
103 /// Returns a future that completes with `true` if the stream has any | 103 /// Returns a future that completes with `true` if the stream has any |
104 /// more events, whether data or error. | 104 /// more events, whether data or error. |
105 /// If the stream closes without producing any more events, the returned | 105 /// If the stream closes without producing any more events, the returned |
106 /// future completes with `false`. | 106 /// future completes with `false`. |
107 /// | 107 /// |
(...skipping 159 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
267 _pause(); | 267 _pause(); |
268 } | 268 } |
269 } | 269 } |
270 | 270 |
271 /// Extracts a stream from the event source and makes this stream queue | 271 /// Extracts a stream from the event source and makes this stream queue |
272 /// unusable. | 272 /// unusable. |
273 /// | 273 /// |
274 /// Can only be used by the very last request (the stream queue must | 274 /// Can only be used by the very last request (the stream queue must |
275 /// be closed by that request). | 275 /// be closed by that request). |
276 /// Only used by [rest]. | 276 /// Only used by [rest]. |
277 Stream _extractStream(); | 277 Stream<T> _extractStream(); |
278 | 278 |
279 /// Requests that the event source pauses events. | 279 /// Requests that the event source pauses events. |
280 /// | 280 /// |
281 /// This is called automatically when the request queue is empty. | 281 /// This is called automatically when the request queue is empty. |
282 /// | 282 /// |
283 /// The event source is restarted by the next call to [_ensureListening]. | 283 /// The event source is restarted by the next call to [_ensureListening]. |
284 void _pause(); | 284 void _pause(); |
285 | 285 |
286 /// Ensures that we are listening on events from the event source. | 286 /// Ensures that we are listening on events from the event source. |
287 /// | 287 /// |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
335 } | 335 } |
336 } | 336 } |
337 | 337 |
338 | 338 |
339 /// The default implementation of [StreamQueue]. | 339 /// The default implementation of [StreamQueue]. |
340 /// | 340 /// |
341 /// This queue gets its events from a stream which is listened | 341 /// This queue gets its events from a stream which is listened |
342 /// to when a request needs events. | 342 /// to when a request needs events. |
343 class _StreamQueue<T> extends StreamQueue<T> { | 343 class _StreamQueue<T> extends StreamQueue<T> { |
344 /// Source of events. | 344 /// Source of events. |
345 final Stream _sourceStream; | 345 final Stream<T> _sourceStream; |
346 | 346 |
347 /// Subscription on [_sourceStream] while listening for events. | 347 /// Subscription on [_sourceStream] while listening for events. |
348 /// | 348 /// |
349 /// Set to subscription when listening, and set to `null` when the | 349 /// Set to subscription when listening, and set to `null` when the |
350 /// subscription is done (and [_isDone] is set to true). | 350 /// subscription is done (and [_isDone] is set to true). |
351 StreamSubscription _subscription; | 351 StreamSubscription<T> _subscription; |
352 | 352 |
353 _StreamQueue(this._sourceStream) : super._(); | 353 _StreamQueue(this._sourceStream) : super._(); |
354 | 354 |
355 Future _cancel() { | 355 Future _cancel() { |
356 if (_isDone) return null; | 356 if (_isDone) return null; |
357 if (_subscription == null) _subscription = _sourceStream.listen(null); | 357 if (_subscription == null) _subscription = _sourceStream.listen(null); |
358 var future = _subscription.cancel(); | 358 var future = _subscription.cancel(); |
359 _close(); | 359 _close(); |
360 return future; | 360 return future; |
361 } | 361 } |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
415 /// | 415 /// |
416 /// Events from the source stream are sent to the first request in the | 416 /// Events from the source stream are sent to the first request in the |
417 /// queue until it reports itself as [isComplete]. | 417 /// queue until it reports itself as [isComplete]. |
418 /// | 418 /// |
419 /// When the first request in the queue `isComplete`, either when becoming | 419 /// When the first request in the queue `isComplete`, either when becoming |
420 /// the first request or after receiving an event, its [close] methods is | 420 /// the first request or after receiving an event, its [close] methods is |
421 /// called. | 421 /// called. |
422 /// | 422 /// |
423 /// The [close] method is also called immediately when the source stream | 423 /// The [close] method is also called immediately when the source stream |
424 /// is done. | 424 /// is done. |
425 abstract class _EventRequest { | 425 abstract class _EventRequest<T> { |
426 /// Handle available events. | 426 /// Handle available events. |
427 /// | 427 /// |
428 /// The available events are provided as a queue. The `update` function | 428 /// The available events are provided as a queue. The `update` function |
429 /// should only remove events from the front of the event queue, e.g., | 429 /// should only remove events from the front of the event queue, e.g., |
430 /// using [removeFirst]. | 430 /// using [removeFirst]. |
431 /// | 431 /// |
432 /// Returns `true` if the request is completed, or `false` if it needs | 432 /// Returns `true` if the request is completed, or `false` if it needs |
433 /// more events. | 433 /// more events. |
434 /// The call may keep events in the queue until the requeust is complete, | 434 /// The call may keep events in the queue until the requeust is complete, |
435 /// or it may remove them immediately. | 435 /// or it may remove them immediately. |
436 /// | 436 /// |
437 /// If the method returns true, the request is considered fulfilled, and | 437 /// If the method returns true, the request is considered fulfilled, and |
438 /// will never be called again. | 438 /// will never be called again. |
439 /// | 439 /// |
440 /// This method is called when a request reaches the front of the request | 440 /// This method is called when a request reaches the front of the request |
441 /// queue, and if it returns `false`, it's called again every time a new event | 441 /// queue, and if it returns `false`, it's called again every time a new event |
442 /// becomes available, or when the stream closes. | 442 /// becomes available, or when the stream closes. |
443 /// If the function returns `false` when the stream has already closed | 443 /// If the function returns `false` when the stream has already closed |
444 /// ([isDone] is true), then the request must call | 444 /// ([isDone] is true), then the request must call |
445 /// [StreamQueue._updateRequests] itself when it's ready to continue. | 445 /// [StreamQueue._updateRequests] itself when it's ready to continue. |
446 bool update(Queue<Result> events, bool isDone); | 446 bool update(Queue<Result<T>> events, bool isDone); |
447 } | 447 } |
448 | 448 |
449 /// Request for a [StreamQueue.next] call. | 449 /// Request for a [StreamQueue.next] call. |
450 /// | 450 /// |
451 /// Completes the returned future when receiving the first event, | 451 /// Completes the returned future when receiving the first event, |
452 /// and is then complete. | 452 /// and is then complete. |
453 class _NextRequest<T> implements _EventRequest { | 453 class _NextRequest<T> implements _EventRequest<T> { |
454 /// Completer for the future returned by [StreamQueue.next]. | 454 /// Completer for the future returned by [StreamQueue.next]. |
455 final Completer _completer; | 455 final _completer = new Completer<T>(); |
456 | 456 |
457 _NextRequest() : _completer = new Completer<T>(); | 457 _NextRequest(); |
458 | 458 |
459 Future<T> get future => _completer.future; | 459 Future<T> get future => _completer.future; |
460 | 460 |
461 bool update(Queue<Result> events, bool isDone) { | 461 bool update(Queue<Result<T>> events, bool isDone) { |
462 if (events.isNotEmpty) { | 462 if (events.isNotEmpty) { |
463 events.removeFirst().complete(_completer); | 463 events.removeFirst().complete(_completer); |
464 return true; | 464 return true; |
465 } | 465 } |
466 if (isDone) { | 466 if (isDone) { |
467 var errorFuture = | 467 var errorFuture = |
468 new Future.sync(() => throw new StateError("No elements")); | 468 new Future.sync(() => throw new StateError("No elements")); |
469 _completer.complete(errorFuture); | 469 _completer.complete(errorFuture); |
470 return true; | 470 return true; |
471 } | 471 } |
472 return false; | 472 return false; |
473 } | 473 } |
474 } | 474 } |
475 | 475 |
476 /// Request for a [StreamQueue.skip] call. | 476 /// Request for a [StreamQueue.skip] call. |
477 class _SkipRequest implements _EventRequest { | 477 class _SkipRequest<T> implements _EventRequest<T> { |
478 /// Completer for the future returned by the skip call. | 478 /// Completer for the future returned by the skip call. |
479 final Completer _completer = new Completer<int>(); | 479 final _completer = new Completer<int>(); |
480 | 480 |
481 /// Number of remaining events to skip. | 481 /// Number of remaining events to skip. |
482 /// | 482 /// |
483 /// The request [isComplete] when the values reaches zero. | 483 /// The request [isComplete] when the values reaches zero. |
484 /// | 484 /// |
485 /// Decremented when an event is seen. | 485 /// Decremented when an event is seen. |
486 /// Set to zero when an error is seen since errors abort the skip request. | 486 /// Set to zero when an error is seen since errors abort the skip request. |
487 int _eventsToSkip; | 487 int _eventsToSkip; |
488 | 488 |
489 _SkipRequest(this._eventsToSkip); | 489 _SkipRequest(this._eventsToSkip); |
490 | 490 |
491 /// The future completed when the correct number of events have been skipped. | 491 /// The future completed when the correct number of events have been skipped. |
492 Future get future => _completer.future; | 492 Future<int> get future => _completer.future; |
493 | 493 |
494 bool update(Queue<Result> events, bool isDone) { | 494 bool update(Queue<Result<T>> events, bool isDone) { |
495 while (_eventsToSkip > 0) { | 495 while (_eventsToSkip > 0) { |
496 if (events.isEmpty) { | 496 if (events.isEmpty) { |
497 if (isDone) break; | 497 if (isDone) break; |
498 return false; | 498 return false; |
499 } | 499 } |
500 _eventsToSkip--; | 500 _eventsToSkip--; |
501 | 501 |
502 var event = events.removeFirst(); | 502 var event = events.removeFirst(); |
503 if (event.isError) { | 503 if (event.isError) { |
504 event.complete(_completer); | 504 _completer.completeError(event.asError.error, event.asError.stackTrace); |
Lasse Reichstein Nielsen
2016/03/29 21:53:59
Why not event.complete(_completer)?
Is it because
nweiz
2016/03/30 00:57:19
Yes, that's why.
| |
505 return true; | 505 return true; |
506 } | 506 } |
507 } | 507 } |
508 _completer.complete(_eventsToSkip); | 508 _completer.complete(_eventsToSkip); |
509 return true; | 509 return true; |
510 } | 510 } |
511 } | 511 } |
512 | 512 |
513 /// Request for a [StreamQueue.take] call. | 513 /// Request for a [StreamQueue.take] call. |
514 class _TakeRequest<T> implements _EventRequest { | 514 class _TakeRequest<T> implements _EventRequest<T> { |
515 /// Completer for the future returned by the take call. | 515 /// Completer for the future returned by the take call. |
516 final Completer _completer; | 516 final _completer = new Completer<List<T>>(); |
517 | 517 |
518 /// List collecting events until enough have been seen. | 518 /// List collecting events until enough have been seen. |
519 final List _list = <T>[]; | 519 final _list = <T>[]; |
520 | 520 |
521 /// Number of events to capture. | 521 /// Number of events to capture. |
522 /// | 522 /// |
523 /// The request [isComplete] when the length of [_list] reaches | 523 /// The request [isComplete] when the length of [_list] reaches |
524 /// this value. | 524 /// this value. |
525 final int _eventsToTake; | 525 final int _eventsToTake; |
526 | 526 |
527 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 527 _TakeRequest(this._eventsToTake); |
528 | 528 |
529 /// The future completed when the correct number of events have been captured. | 529 /// The future completed when the correct number of events have been captured. |
530 Future get future => _completer.future; | 530 Future<List<T>> get future => _completer.future; |
531 | 531 |
532 bool update(Queue<Result> events, bool isDone) { | 532 bool update(Queue<Result<T>> events, bool isDone) { |
533 while (_list.length < _eventsToTake) { | 533 while (_list.length < _eventsToTake) { |
534 if (events.isEmpty) { | 534 if (events.isEmpty) { |
535 if (isDone) break; | 535 if (isDone) break; |
536 return false; | 536 return false; |
537 } | 537 } |
538 | 538 |
539 var result = events.removeFirst(); | 539 var event = events.removeFirst(); |
540 if (result.isError) { | 540 if (event.isError) { |
541 result.complete(_completer); | 541 _completer.completeError(event.asError.error, event.asError.stackTrace); |
542 return true; | 542 return true; |
543 } | 543 } |
544 _list.add(result.asValue.value); | 544 _list.add(event.asValue.value); |
545 } | 545 } |
546 _completer.complete(_list); | 546 _completer.complete(_list); |
547 return true; | 547 return true; |
548 } | 548 } |
549 } | 549 } |
550 | 550 |
551 /// Request for a [StreamQueue.cancel] call. | 551 /// Request for a [StreamQueue.cancel] call. |
552 /// | 552 /// |
553 /// The request needs no events, it just waits in the request queue | 553 /// The request needs no events, it just waits in the request queue |
554 /// until all previous events are fulfilled, then it cancels the stream queue | 554 /// until all previous events are fulfilled, then it cancels the stream queue |
555 /// source subscription. | 555 /// source subscription. |
556 class _CancelRequest implements _EventRequest { | 556 class _CancelRequest<T> implements _EventRequest<T> { |
557 /// Completer for the future returned by the `cancel` call. | 557 /// Completer for the future returned by the `cancel` call. |
558 final Completer _completer = new Completer(); | 558 final _completer = new Completer(); |
559 | 559 |
560 /// The [StreamQueue] object that has this request queued. | 560 /// The [StreamQueue] object that has this request queued. |
561 /// | 561 /// |
562 /// When the event is completed, it needs to cancel the active subscription | 562 /// When the event is completed, it needs to cancel the active subscription |
563 /// of the `StreamQueue` object, if any. | 563 /// of the `StreamQueue` object, if any. |
564 final StreamQueue _streamQueue; | 564 final StreamQueue _streamQueue; |
565 | 565 |
566 _CancelRequest(this._streamQueue); | 566 _CancelRequest(this._streamQueue); |
567 | 567 |
568 /// The future completed when the cancel request is completed. | 568 /// The future completed when the cancel request is completed. |
569 Future get future => _completer.future; | 569 Future get future => _completer.future; |
570 | 570 |
571 bool update(Queue<Result> events, bool isDone) { | 571 bool update(Queue<Result<T>> events, bool isDone) { |
572 if (_streamQueue._isDone) { | 572 if (_streamQueue._isDone) { |
573 _completer.complete(); | 573 _completer.complete(); |
574 } else { | 574 } else { |
575 _streamQueue._ensureListening(); | 575 _streamQueue._ensureListening(); |
576 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); | 576 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); |
577 } | 577 } |
578 return true; | 578 return true; |
579 } | 579 } |
580 } | 580 } |
581 | 581 |
582 /// Request for a [StreamQueue.rest] call. | 582 /// Request for a [StreamQueue.rest] call. |
583 /// | 583 /// |
584 /// The request is always complete, it just waits in the request queue | 584 /// The request is always complete, it just waits in the request queue |
585 /// until all previous events are fulfilled, then it takes over the | 585 /// until all previous events are fulfilled, then it takes over the |
586 /// stream events subscription and creates a stream from it. | 586 /// stream events subscription and creates a stream from it. |
587 class _RestRequest<T> implements _EventRequest { | 587 class _RestRequest<T> implements _EventRequest<T> { |
588 /// Completer for the stream returned by the `rest` call. | 588 /// Completer for the stream returned by the `rest` call. |
589 final StreamCompleter _completer = new StreamCompleter<T>(); | 589 final _completer = new StreamCompleter<T>(); |
590 | 590 |
591 /// The [StreamQueue] object that has this request queued. | 591 /// The [StreamQueue] object that has this request queued. |
592 /// | 592 /// |
593 /// When the event is completed, it needs to cancel the active subscription | 593 /// When the event is completed, it needs to cancel the active subscription |
594 /// of the `StreamQueue` object, if any. | 594 /// of the `StreamQueue` object, if any. |
595 final StreamQueue _streamQueue; | 595 final StreamQueue<T> _streamQueue; |
596 | 596 |
597 _RestRequest(this._streamQueue); | 597 _RestRequest(this._streamQueue); |
598 | 598 |
599 /// The stream which will contain the remaining events of [_streamQueue]. | 599 /// The stream which will contain the remaining events of [_streamQueue]. |
600 Stream<T> get stream => _completer.stream; | 600 Stream<T> get stream => _completer.stream; |
601 | 601 |
602 bool update(Queue<Result> events, bool isDone) { | 602 bool update(Queue<Result<T>> events, bool isDone) { |
603 if (events.isEmpty) { | 603 if (events.isEmpty) { |
604 if (_streamQueue._isDone) { | 604 if (_streamQueue._isDone) { |
605 _completer.setEmpty(); | 605 _completer.setEmpty(); |
606 } else { | 606 } else { |
607 _completer.setSourceStream(_streamQueue._extractStream()); | 607 _completer.setSourceStream(_streamQueue._extractStream()); |
608 } | 608 } |
609 } else { | 609 } else { |
610 // There are prefetched events which needs to be added before the | 610 // There are prefetched events which needs to be added before the |
611 // remaining stream. | 611 // remaining stream. |
612 var controller = new StreamController<T>(); | 612 var controller = new StreamController<T>(); |
613 for (var event in events) { | 613 for (var event in events) { |
614 event.addTo(controller); | 614 event.addTo(controller); |
615 } | 615 } |
616 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) | 616 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) |
617 .whenComplete(controller.close); | 617 .whenComplete(controller.close); |
618 _completer.setSourceStream(controller.stream); | 618 _completer.setSourceStream(controller.stream); |
619 } | 619 } |
620 return true; | 620 return true; |
621 } | 621 } |
622 } | 622 } |
623 | 623 |
624 /// Request for a [StreamQueue.hasNext] call. | 624 /// Request for a [StreamQueue.hasNext] call. |
625 /// | 625 /// |
626 /// Completes the [future] with `true` if it sees any event, | 626 /// Completes the [future] with `true` if it sees any event, |
627 /// but doesn't consume the event. | 627 /// but doesn't consume the event. |
628 /// If the request is closed without seeing an event, then | 628 /// If the request is closed without seeing an event, then |
629 /// the [future] is completed with `false`. | 629 /// the [future] is completed with `false`. |
630 class _HasNextRequest<T> implements _EventRequest { | 630 class _HasNextRequest<T> implements _EventRequest<T> { |
631 final Completer _completer = new Completer<bool>(); | 631 final _completer = new Completer<bool>(); |
632 | 632 |
633 Future<bool> get future => _completer.future; | 633 Future<bool> get future => _completer.future; |
634 | 634 |
635 bool update(Queue<Result> events, bool isDone) { | 635 bool update(Queue<Result<T>> events, bool isDone) { |
636 if (events.isNotEmpty) { | 636 if (events.isNotEmpty) { |
637 _completer.complete(true); | 637 _completer.complete(true); |
638 return true; | 638 return true; |
639 } | 639 } |
640 if (isDone) { | 640 if (isDone) { |
641 _completer.complete(false); | 641 _completer.complete(false); |
642 return true; | 642 return true; |
643 } | 643 } |
644 return false; | 644 return false; |
645 } | 645 } |
646 } | 646 } |
OLD | NEW |