| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/ | 5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/ |
| 6 // lands. | 6 // lands. |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; | 8 import 'dart:collection'; |
| 9 | 9 |
| 10 import "package:async/async.dart" hide ForkableStream, StreamQueue; | 10 import "package:async/async.dart" hide ForkableStream, StreamQueue; |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 71 // again when new events are available. | 71 // again when new events are available. |
| 72 // The request can remove events that it uses, or keep them in the event | 72 // The request can remove events that it uses, or keep them in the event |
| 73 // queue until it has all that it needs. | 73 // queue until it has all that it needs. |
| 74 // | 74 // |
| 75 // This model is very flexible and easily extensible. | 75 // This model is very flexible and easily extensible. |
| 76 // It allows requests that don't consume events (like [hasNext]) or | 76 // It allows requests that don't consume events (like [hasNext]) or |
| 77 // potentially a request that takes either five or zero events, determined | 77 // potentially a request that takes either five or zero events, determined |
| 78 // by the content of the fifth event. | 78 // by the content of the fifth event. |
| 79 | 79 |
| 80 /// Source of events. | 80 /// Source of events. |
| 81 final ForkableStream _sourceStream; | 81 final ForkableStream<T> _sourceStream; |
| 82 | 82 |
| 83 /// Subscription on [_sourceStream] while listening for events. | 83 /// Subscription on [_sourceStream] while listening for events. |
| 84 /// | 84 /// |
| 85 /// Set to subscription when listening, and set to `null` when the | 85 /// Set to subscription when listening, and set to `null` when the |
| 86 /// subscription is done (and [_isDone] is set to true). | 86 /// subscription is done (and [_isDone] is set to true). |
| 87 StreamSubscription _subscription; | 87 StreamSubscription<T> _subscription; |
| 88 | 88 |
| 89 /// Whether we have listened on [_sourceStream] and the subscription is done. | 89 /// Whether we have listened on [_sourceStream] and the subscription is done. |
| 90 bool _isDone = false; | 90 bool _isDone = false; |
| 91 | 91 |
| 92 /// Whether a closing operation has been performed on the stream queue. | 92 /// Whether a closing operation has been performed on the stream queue. |
| 93 /// | 93 /// |
| 94 /// Closing operations are [cancel] and [rest]. | 94 /// Closing operations are [cancel] and [rest]. |
| 95 bool _isClosed = false; | 95 bool _isClosed = false; |
| 96 | 96 |
| 97 /// Queue of events not used by a request yet. | 97 /// Queue of events not used by a request yet. |
| 98 final Queue<Result> _eventQueue = new Queue(); | 98 final Queue<Result> _eventQueue = new Queue(); |
| 99 | 99 |
| 100 /// Queue of pending requests. | 100 /// Queue of pending requests. |
| 101 /// | 101 /// |
| 102 /// Access through methods below to ensure consistency. | 102 /// Access through methods below to ensure consistency. |
| 103 final Queue<_EventRequest> _requestQueue = new Queue(); | 103 final Queue<_EventRequest> _requestQueue = new Queue(); |
| 104 | 104 |
| 105 /// Create a `StreamQueue` of the events of [source]. | 105 /// Create a `StreamQueue` of the events of [source]. |
| 106 StreamQueue(Stream source) | 106 StreamQueue(Stream<T> source) |
| 107 : _sourceStream = source is ForkableStream | 107 : _sourceStream = source is ForkableStream |
| 108 ? source | 108 ? source |
| 109 : new ForkableStream(source); | 109 : new ForkableStream(source); |
| 110 | 110 |
| 111 /// Asks if the stream has any more events. | 111 /// Asks if the stream has any more events. |
| 112 /// | 112 /// |
| 113 /// Returns a future that completes with `true` if the stream has any | 113 /// Returns a future that completes with `true` if the stream has any |
| 114 /// more events, whether data or error. | 114 /// more events, whether data or error. |
| 115 /// If the stream closes without producing any more events, the returned | 115 /// If the stream closes without producing any more events, the returned |
| 116 /// future completes with `false`. | 116 /// future completes with `false`. |
| (...skipping 237 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 354 } | 354 } |
| 355 | 355 |
| 356 if (!_isDone) { | 356 if (!_isDone) { |
| 357 _subscription.pause(); | 357 _subscription.pause(); |
| 358 } | 358 } |
| 359 } | 359 } |
| 360 | 360 |
| 361 /// Extracts the subscription and makes this stream queue unusable. | 361 /// Extracts the subscription and makes this stream queue unusable. |
| 362 /// | 362 /// |
| 363 /// Can only be used by the very last request. | 363 /// Can only be used by the very last request. |
| 364 StreamSubscription _dispose() { | 364 StreamSubscription<T> _dispose() { |
| 365 assert(_isClosed); | 365 assert(_isClosed); |
| 366 var subscription = _subscription; | 366 var subscription = _subscription; |
| 367 _subscription = null; | 367 _subscription = null; |
| 368 _isDone = true; | 368 _isDone = true; |
| 369 return subscription; | 369 return subscription; |
| 370 } | 370 } |
| 371 } | 371 } |
| 372 | 372 |
| 373 /// Request object that receives events when they arrive, until fulfilled. | 373 /// Request object that receives events when they arrive, until fulfilled. |
| 374 /// | 374 /// |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 419 /// this is the last chance to use them. | 419 /// this is the last chance to use them. |
| 420 void close(Queue<Result> events); | 420 void close(Queue<Result> events); |
| 421 } | 421 } |
| 422 | 422 |
| 423 /// Request for a [StreamQueue.next] call. | 423 /// Request for a [StreamQueue.next] call. |
| 424 /// | 424 /// |
| 425 /// Completes the returned future when receiving the first event, | 425 /// Completes the returned future when receiving the first event, |
| 426 /// and is then complete. | 426 /// and is then complete. |
| 427 class _NextRequest<T> implements _EventRequest { | 427 class _NextRequest<T> implements _EventRequest { |
| 428 /// Completer for the future returned by [StreamQueue.next]. | 428 /// Completer for the future returned by [StreamQueue.next]. |
| 429 final Completer _completer; | 429 final _completer = new Completer<T>(); |
| 430 | 430 |
| 431 _NextRequest() : _completer = new Completer<T>(); | 431 _NextRequest(); |
| 432 | 432 |
| 433 Future<T> get future => _completer.future; | 433 Future<T> get future => _completer.future; |
| 434 | 434 |
| 435 bool addEvents(Queue<Result> events) { | 435 bool addEvents(Queue<Result> events) { |
| 436 if (events.isEmpty) return false; | 436 if (events.isEmpty) return false; |
| 437 events.removeFirst().complete(_completer); | 437 events.removeFirst().complete(_completer); |
| 438 return true; | 438 return true; |
| 439 } | 439 } |
| 440 | 440 |
| 441 void close(Queue<Result> events) { | 441 void close(Queue<Result> events) { |
| 442 var errorFuture = | 442 var errorFuture = |
| 443 new Future.sync(() => throw new StateError("No elements")); | 443 new Future.sync(() => throw new StateError("No elements")); |
| 444 _completer.complete(errorFuture); | 444 _completer.complete(errorFuture); |
| 445 } | 445 } |
| 446 } | 446 } |
| 447 | 447 |
| 448 /// Request for a [StreamQueue.skip] call. | 448 /// Request for a [StreamQueue.skip] call. |
| 449 class _SkipRequest implements _EventRequest { | 449 class _SkipRequest implements _EventRequest { |
| 450 /// Completer for the future returned by the skip call. | 450 /// Completer for the future returned by the skip call. |
| 451 final Completer _completer = new Completer<int>(); | 451 final _completer = new Completer<int>(); |
| 452 | 452 |
| 453 /// Number of remaining events to skip. | 453 /// Number of remaining events to skip. |
| 454 /// | 454 /// |
| 455 /// The request [isComplete] when the values reaches zero. | 455 /// The request [isComplete] when the values reaches zero. |
| 456 /// | 456 /// |
| 457 /// Decremented when an event is seen. | 457 /// Decremented when an event is seen. |
| 458 /// Set to zero when an error is seen since errors abort the skip request. | 458 /// Set to zero when an error is seen since errors abort the skip request. |
| 459 int _eventsToSkip; | 459 int _eventsToSkip; |
| 460 | 460 |
| 461 _SkipRequest(this._eventsToSkip); | 461 _SkipRequest(this._eventsToSkip); |
| 462 | 462 |
| 463 /// The future completed when the correct number of events have been skipped. | 463 /// The future completed when the correct number of events have been skipped. |
| 464 Future get future => _completer.future; | 464 Future<int> get future => _completer.future; |
| 465 | 465 |
| 466 bool addEvents(Queue<Result> events) { | 466 bool addEvents(Queue<Result> events) { |
| 467 while (_eventsToSkip > 0) { | 467 while (_eventsToSkip > 0) { |
| 468 if (events.isEmpty) return false; | 468 if (events.isEmpty) return false; |
| 469 _eventsToSkip--; | 469 _eventsToSkip--; |
| 470 var event = events.removeFirst(); | 470 var event = events.removeFirst(); |
| 471 if (event.isError) { | 471 if (event.isError) { |
| 472 event.complete(_completer); | 472 event.complete(_completer); |
| 473 return true; | 473 return true; |
| 474 } | 474 } |
| 475 } | 475 } |
| 476 _completer.complete(0); | 476 _completer.complete(0); |
| 477 return true; | 477 return true; |
| 478 } | 478 } |
| 479 | 479 |
| 480 void close(Queue<Result> events) { | 480 void close(Queue<Result> events) { |
| 481 _completer.complete(_eventsToSkip); | 481 _completer.complete(_eventsToSkip); |
| 482 } | 482 } |
| 483 } | 483 } |
| 484 | 484 |
| 485 /// Request for a [StreamQueue.take] call. | 485 /// Request for a [StreamQueue.take] call. |
| 486 class _TakeRequest<T> implements _EventRequest { | 486 class _TakeRequest<T> implements _EventRequest { |
| 487 /// Completer for the future returned by the take call. | 487 /// Completer for the future returned by the take call. |
| 488 final Completer _completer; | 488 final _completer = new Completer<List<T>>(); |
| 489 | 489 |
| 490 /// List collecting events until enough have been seen. | 490 /// List collecting events until enough have been seen. |
| 491 final List _list = <T>[]; | 491 final List _list = <T>[]; |
| 492 | 492 |
| 493 /// Number of events to capture. | 493 /// Number of events to capture. |
| 494 /// | 494 /// |
| 495 /// The request [isComplete] when the length of [_list] reaches | 495 /// The request [isComplete] when the length of [_list] reaches |
| 496 /// this value. | 496 /// this value. |
| 497 final int _eventsToTake; | 497 final int _eventsToTake; |
| 498 | 498 |
| 499 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 499 _TakeRequest(this._eventsToTake); |
| 500 | 500 |
| 501 /// The future completed when the correct number of events have been captured. | 501 /// The future completed when the correct number of events have been captured. |
| 502 Future get future => _completer.future; | 502 Future<List<T>> get future => _completer.future; |
| 503 | 503 |
| 504 bool addEvents(Queue<Result> events) { | 504 bool addEvents(Queue<Result> events) { |
| 505 while (_list.length < _eventsToTake) { | 505 while (_list.length < _eventsToTake) { |
| 506 if (events.isEmpty) return false; | 506 if (events.isEmpty) return false; |
| 507 var result = events.removeFirst(); | 507 var result = events.removeFirst(); |
| 508 if (result.isError) { | 508 if (result.isError) { |
| 509 result.complete(_completer); | 509 result.complete(_completer); |
| 510 return true; | 510 return true; |
| 511 } | 511 } |
| 512 _list.add(result.asValue.value); | 512 _list.add(result.asValue.value); |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 559 } | 559 } |
| 560 } | 560 } |
| 561 | 561 |
| 562 /// Request for a [StreamQueue.rest] call. | 562 /// Request for a [StreamQueue.rest] call. |
| 563 /// | 563 /// |
| 564 /// The request is always complete, it just waits in the request queue | 564 /// The request is always complete, it just waits in the request queue |
| 565 /// until all previous events are fulfilled, then it takes over the | 565 /// until all previous events are fulfilled, then it takes over the |
| 566 /// stream events subscription and creates a stream from it. | 566 /// stream events subscription and creates a stream from it. |
| 567 class _RestRequest<T> implements _EventRequest { | 567 class _RestRequest<T> implements _EventRequest { |
| 568 /// Completer for the stream returned by the `rest` call. | 568 /// Completer for the stream returned by the `rest` call. |
| 569 final StreamCompleter _completer = new StreamCompleter<T>(); | 569 final _completer = new StreamCompleter<T>(); |
| 570 | 570 |
| 571 /// The [StreamQueue] object that has this request queued. | 571 /// The [StreamQueue] object that has this request queued. |
| 572 /// | 572 /// |
| 573 /// When the event is completed, it needs to cancel the active subscription | 573 /// When the event is completed, it needs to cancel the active subscription |
| 574 /// of the `StreamQueue` object, if any. | 574 /// of the `StreamQueue` object, if any. |
| 575 final StreamQueue _streamQueue; | 575 final StreamQueue<T> _streamQueue; |
| 576 | 576 |
| 577 _RestRequest(this._streamQueue); | 577 _RestRequest(this._streamQueue); |
| 578 | 578 |
| 579 /// The stream which will contain the remaining events of [_streamQueue]. | 579 /// The stream which will contain the remaining events of [_streamQueue]. |
| 580 Stream<T> get stream => _completer.stream; | 580 Stream<T> get stream => _completer.stream; |
| 581 | 581 |
| 582 bool addEvents(Queue<Result> events) { | 582 bool addEvents(Queue<Result> events) { |
| 583 _completeStream(events); | 583 _completeStream(events); |
| 584 return true; | 584 return true; |
| 585 } | 585 } |
| (...skipping 16 matching lines...) Expand all Loading... |
| 602 for (var event in events) { | 602 for (var event in events) { |
| 603 event.addTo(controller); | 603 event.addTo(controller); |
| 604 } | 604 } |
| 605 controller.addStream(_getRestStream(), cancelOnError: false) | 605 controller.addStream(_getRestStream(), cancelOnError: false) |
| 606 .whenComplete(controller.close); | 606 .whenComplete(controller.close); |
| 607 _completer.setSourceStream(controller.stream); | 607 _completer.setSourceStream(controller.stream); |
| 608 } | 608 } |
| 609 } | 609 } |
| 610 | 610 |
| 611 /// Create a stream from the rest of [_streamQueue]'s subscription. | 611 /// Create a stream from the rest of [_streamQueue]'s subscription. |
| 612 Stream _getRestStream() { | 612 Stream<T> _getRestStream() { |
| 613 if (_streamQueue._isDone) { | 613 if (_streamQueue._isDone) { |
| 614 var controller = new StreamController<T>()..close(); | 614 var controller = new StreamController<T>()..close(); |
| 615 return controller.stream; | 615 return controller.stream; |
| 616 // TODO(lrn). Use the following when 1.11 is released. | 616 // TODO(lrn). Use the following when 1.11 is released. |
| 617 // return new Stream<T>.empty(); | 617 // return new Stream<T>.empty(); |
| 618 } | 618 } |
| 619 if (_streamQueue._subscription == null) { | 619 if (_streamQueue._subscription == null) { |
| 620 return _streamQueue._sourceStream; | 620 return _streamQueue._sourceStream; |
| 621 } | 621 } |
| 622 var subscription = _streamQueue._dispose(); | 622 var subscription = _streamQueue._dispose(); |
| 623 subscription.resume(); | 623 subscription.resume(); |
| 624 return new SubscriptionStream<T>(subscription); | 624 return new SubscriptionStream<T>(subscription); |
| 625 } | 625 } |
| 626 } | 626 } |
| 627 | 627 |
| 628 /// Request for a [StreamQueue.hasNext] call. | 628 /// Request for a [StreamQueue.hasNext] call. |
| 629 /// | 629 /// |
| 630 /// Completes the [future] with `true` if it sees any event, | 630 /// Completes the [future] with `true` if it sees any event, |
| 631 /// but doesn't consume the event. | 631 /// but doesn't consume the event. |
| 632 /// If the request is closed without seeing an event, then | 632 /// If the request is closed without seeing an event, then |
| 633 /// the [future] is completed with `false`. | 633 /// the [future] is completed with `false`. |
| 634 class _HasNextRequest<T> implements _EventRequest { | 634 class _HasNextRequest<T> implements _EventRequest { |
| 635 final Completer _completer = new Completer<bool>(); | 635 final _completer = new Completer<bool>(); |
| 636 | 636 |
| 637 Future<bool> get future => _completer.future; | 637 Future<bool> get future => _completer.future; |
| 638 | 638 |
| 639 bool addEvents(Queue<Result> events) { | 639 bool addEvents(Queue<Result> events) { |
| 640 if (events.isNotEmpty) { | 640 if (events.isNotEmpty) { |
| 641 _completer.complete(true); | 641 _completer.complete(true); |
| 642 return true; | 642 return true; |
| 643 } | 643 } |
| 644 return false; | 644 return false; |
| 645 } | 645 } |
| 646 | 646 |
| 647 void close(_) { | 647 void close(_) { |
| 648 _completer.complete(false); | 648 _completer.complete(false); |
| 649 } | 649 } |
| 650 } | 650 } |
| 651 | 651 |
| 652 /// Request for a [StreamQueue.fork] call. | 652 /// Request for a [StreamQueue.fork] call. |
| 653 class _ForkRequest<T> implements _EventRequest { | 653 class _ForkRequest<T> implements _EventRequest { |
| 654 /// Completer for the stream used by the queue by the `fork` call. | 654 /// Completer for the stream used by the queue by the `fork` call. |
| 655 StreamCompleter _completer; | 655 StreamCompleter<T> _completer; |
| 656 | 656 |
| 657 StreamQueue<T> queue; | 657 StreamQueue<T> queue; |
| 658 | 658 |
| 659 /// The [StreamQueue] object that has this request queued. | 659 /// The [StreamQueue] object that has this request queued. |
| 660 final StreamQueue _streamQueue; | 660 final StreamQueue<T> _streamQueue; |
| 661 | 661 |
| 662 _ForkRequest(this._streamQueue) { | 662 _ForkRequest(this._streamQueue) { |
| 663 _completer = new StreamCompleter<T>(); | 663 _completer = new StreamCompleter(); |
| 664 queue = new StreamQueue<T>(_completer.stream); | 664 queue = new StreamQueue(_completer.stream); |
| 665 } | 665 } |
| 666 | 666 |
| 667 bool addEvents(Queue<Result> events) { | 667 bool addEvents(Queue<Result> events) { |
| 668 _completeStream(events); | 668 _completeStream(events); |
| 669 return true; | 669 return true; |
| 670 } | 670 } |
| 671 | 671 |
| 672 void close(Queue<Result> events) { | 672 void close(Queue<Result> events) { |
| 673 _completeStream(events); | 673 _completeStream(events); |
| 674 } | 674 } |
| (...skipping 13 matching lines...) Expand all Loading... |
| 688 event.addTo(controller); | 688 event.addTo(controller); |
| 689 } | 689 } |
| 690 | 690 |
| 691 var fork = _streamQueue._sourceStream.fork(); | 691 var fork = _streamQueue._sourceStream.fork(); |
| 692 controller.addStream(fork, cancelOnError: false) | 692 controller.addStream(fork, cancelOnError: false) |
| 693 .whenComplete(controller.close); | 693 .whenComplete(controller.close); |
| 694 _completer.setSourceStream(controller.stream); | 694 _completer.setSourceStream(controller.stream); |
| 695 } | 695 } |
| 696 } | 696 } |
| 697 } | 697 } |
| OLD | NEW |