Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(296)

Side by Side Diff: lib/src/stream_queue.dart

Issue 2660333005: Change generic comment syntax to real generic syntax. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 import 'dart:collection'; 6 import 'dart:collection';
7 7
8 import 'package:collection/collection.dart'; 8 import 'package:collection/collection.dart';
9 9
10 import "cancelable_operation.dart"; 10 import "cancelable_operation.dart";
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after
124 /// one events. 124 /// one events.
125 Future<bool> get hasNext { 125 Future<bool> get hasNext {
126 if (!_isClosed) { 126 if (!_isClosed) {
127 var hasNextRequest = new _HasNextRequest(); 127 var hasNextRequest = new _HasNextRequest();
128 _addRequest(hasNextRequest); 128 _addRequest(hasNextRequest);
129 return hasNextRequest.future; 129 return hasNextRequest.future;
130 } 130 }
131 throw _failClosed(); 131 throw _failClosed();
132 } 132 }
133 133
134
135 /// Look at the next [count] data events without consuming them. 134 /// Look at the next [count] data events without consuming them.
136 /// 135 ///
137 /// Works like [take] except that the events are left in the queue. 136 /// Works like [take] except that the events are left in the queue.
138 /// If one of the next [count] events is an error, the returned future 137 /// If one of the next [count] events is an error, the returned future
139 /// completes with this error, and the error is still left in the queue. 138 /// completes with this error, and the error is still left in the queue.
140 Future<List<T>> lookAhead(int count) { 139 Future<List<T>> lookAhead(int count) {
141 if (count < 0) throw new RangeError.range(count, 0, null, "count"); 140 if (count < 0) throw new RangeError.range(count, 0, null, "count");
142 if (!_isClosed) { 141 if (!_isClosed) {
143 var request = new _LookAheadRequest<T>(count); 142 var request = new _LookAheadRequest<T>(count);
144 _addRequest(request); 143 _addRequest(request);
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after
346 /// ```dart 345 /// ```dart
347 /// final _stdinQueue = new StreamQueue(stdin); 346 /// final _stdinQueue = new StreamQueue(stdin);
348 /// 347 ///
349 /// /// Returns an operation that completes when the user sends a line to 348 /// /// Returns an operation that completes when the user sends a line to
350 /// /// standard input. 349 /// /// standard input.
351 /// /// 350 /// ///
352 /// /// If the operation is canceled, stops waiting for user input. 351 /// /// If the operation is canceled, stops waiting for user input.
353 /// CancelableOperation<String> nextStdinLine() => 352 /// CancelableOperation<String> nextStdinLine() =>
354 /// _stdinQueue.cancelable((queue) => queue.next); 353 /// _stdinQueue.cancelable((queue) => queue.next);
355 /// ``` 354 /// ```
356 CancelableOperation/*<S>*/ cancelable/*<S>*/( 355 CancelableOperation<S> cancelable<S>(
357 Future/*<S>*/ callback(StreamQueue<T> queue)) { 356 Future<S> callback(StreamQueue<T> queue)) {
358 var transaction = startTransaction(); 357 var transaction = startTransaction();
359 var completer = new CancelableCompleter/*<S>*/(onCancel: () { 358 var completer = new CancelableCompleter<S>(onCancel: () {
360 transaction.reject(); 359 transaction.reject();
361 }); 360 });
362 361
363 var queue = transaction.newQueue(); 362 var queue = transaction.newQueue();
364 completer.complete(callback(queue).whenComplete(() { 363 completer.complete(callback(queue).whenComplete(() {
365 if (!completer.isCanceled) transaction.commit(queue); 364 if (!completer.isCanceled) transaction.commit(queue);
366 })); 365 }));
367 366
368 return completer.operation; 367 return completer.operation;
369 } 368 }
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after
487 /// immediately, it skips the queue. 486 /// immediately, it skips the queue.
488 void _addRequest(_EventRequest request) { 487 void _addRequest(_EventRequest request) {
489 if (_requestQueue.isEmpty) { 488 if (_requestQueue.isEmpty) {
490 if (request.update(_eventQueue, _isDone)) return; 489 if (request.update(_eventQueue, _isDone)) return;
491 _ensureListening(); 490 _ensureListening();
492 } 491 }
493 _requestQueue.add(request); 492 _requestQueue.add(request);
494 } 493 }
495 } 494 }
496 495
497
498 /// The default implementation of [StreamQueue]. 496 /// The default implementation of [StreamQueue].
499 /// 497 ///
500 /// This queue gets its events from a stream which is listened 498 /// This queue gets its events from a stream which is listened
501 /// to when a request needs events. 499 /// to when a request needs events.
502 class _StreamQueue<T> extends StreamQueue<T> { 500 class _StreamQueue<T> extends StreamQueue<T> {
503 /// Source of events. 501 /// Source of events.
504 final Stream<T> _sourceStream; 502 final Stream<T> _sourceStream;
505 503
506 /// Subscription on [_sourceStream] while listening for events. 504 /// Subscription on [_sourceStream] while listening for events.
507 /// 505 ///
508 /// Set to subscription when listening, and set to `null` when the 506 /// Set to subscription when listening, and set to `null` when the
509 /// subscription is done (and [_isDone] is set to true). 507 /// subscription is done (and [_isDone] is set to true).
510 StreamSubscription<T> _subscription; 508 StreamSubscription<T> _subscription;
511 509
512 _StreamQueue(this._sourceStream) : super._(); 510 _StreamQueue(this._sourceStream) : super._();
513 511
514 Future _cancel() { 512 Future _cancel() {
515 if (_isDone) return null; 513 if (_isDone) return null;
516 if (_subscription == null) _subscription = _sourceStream.listen(null); 514 if (_subscription == null) _subscription = _sourceStream.listen(null);
517 var future = _subscription.cancel(); 515 var future = _subscription.cancel();
518 _close(); 516 _close();
519 return future; 517 return future;
520 } 518 }
521 519
522 void _ensureListening() { 520 void _ensureListening() {
523 if (_isDone) return; 521 if (_isDone) return;
524 if (_subscription == null) { 522 if (_subscription == null) {
525 _subscription = 523 _subscription = _sourceStream.listen((data) {
526 _sourceStream.listen( 524 _addResult(new Result.value(data));
527 (data) { 525 }, onError: (error, StackTrace stackTrace) {
528 _addResult(new Result.value(data)); 526 _addResult(new Result.error(error, stackTrace));
529 }, 527 }, onDone: () {
530 onError: (error, StackTrace stackTrace) { 528 _subscription = null;
531 _addResult(new Result.error(error, stackTrace)); 529 this._close();
532 }, 530 });
533 onDone: () {
534 _subscription = null;
535 this._close();
536 });
537 } else { 531 } else {
538 _subscription.resume(); 532 _subscription.resume();
539 } 533 }
540 } 534 }
541 535
542 void _pause() { 536 void _pause() {
543 _subscription.pause(); 537 _subscription.pause();
544 } 538 }
545 539
546 Stream<T> _extractStream() { 540 Stream<T> _extractStream() {
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
642 } 636 }
643 637
644 // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s 638 // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s
645 // request queue, and runs the next request. 639 // request queue, and runs the next request.
646 void _done() { 640 void _done() {
647 _splitter.close(); 641 _splitter.close();
648 for (var queue in _queues) { 642 for (var queue in _queues) {
649 queue._cancel(); 643 queue._cancel();
650 } 644 }
651 645
652 assert((_parent._requestQueue.first as _TransactionRequest) 646 assert((_parent._requestQueue.first as _TransactionRequest).transaction ==
653 .transaction == this); 647 this);
654 _parent._requestQueue.removeFirst(); 648 _parent._requestQueue.removeFirst();
655 _parent._updateRequests(); 649 _parent._updateRequests();
656 } 650 }
657 651
658 /// Throws a [StateError] if [accept] or [reject] has already been called. 652 /// Throws a [StateError] if [accept] or [reject] has already been called.
659 void _assertActive() { 653 void _assertActive() {
660 if (_committed) { 654 if (_committed) {
661 throw new StateError("This transaction has already been accepted."); 655 throw new StateError("This transaction has already been accepted.");
662 } else if (_rejected) { 656 } else if (_rejected) {
663 throw new StateError("This transaction has already been rejected."); 657 throw new StateError("This transaction has already been rejected.");
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
714 _NextRequest(); 708 _NextRequest();
715 709
716 Future<T> get future => _completer.future; 710 Future<T> get future => _completer.future;
717 711
718 bool update(QueueList<Result<T>> events, bool isDone) { 712 bool update(QueueList<Result<T>> events, bool isDone) {
719 if (events.isNotEmpty) { 713 if (events.isNotEmpty) {
720 events.removeFirst().complete(_completer); 714 events.removeFirst().complete(_completer);
721 return true; 715 return true;
722 } 716 }
723 if (isDone) { 717 if (isDone) {
724 _completer.completeError(new StateError("No elements"), 718 _completer.completeError(
725 StackTrace.current); 719 new StateError("No elements"), StackTrace.current);
726 return true; 720 return true;
727 } 721 }
728 return false; 722 return false;
729 } 723 }
730 } 724 }
731 725
732
733 /// Request for a [StreamQueue.peek] call. 726 /// Request for a [StreamQueue.peek] call.
734 /// 727 ///
735 /// Completes the returned future when receiving the first event, 728 /// Completes the returned future when receiving the first event,
736 /// and is then complete, but doesn't consume the event. 729 /// and is then complete, but doesn't consume the event.
737 class _PeekRequest<T> implements _EventRequest<T> { 730 class _PeekRequest<T> implements _EventRequest<T> {
738 /// Completer for the future returned by [StreamQueue.next]. 731 /// Completer for the future returned by [StreamQueue.next].
739 final _completer = new Completer<T>(); 732 final _completer = new Completer<T>();
740 733
741 _PeekRequest(); 734 _PeekRequest();
742 735
743 Future<T> get future => _completer.future; 736 Future<T> get future => _completer.future;
744 737
745 bool update(QueueList<Result<T>> events, bool isDone) { 738 bool update(QueueList<Result<T>> events, bool isDone) {
746 if (events.isNotEmpty) { 739 if (events.isNotEmpty) {
747 events.first.complete(_completer); 740 events.first.complete(_completer);
748 return true; 741 return true;
749 } 742 }
750 if (isDone) { 743 if (isDone) {
751 _completer.completeError(new StateError("No elements"), 744 _completer.completeError(
752 StackTrace.current); 745 new StateError("No elements"), StackTrace.current);
753 return true; 746 return true;
754 } 747 }
755 return false; 748 return false;
756 } 749 }
757 } 750 }
758 751
759
760 /// Request for a [StreamQueue.skip] call. 752 /// Request for a [StreamQueue.skip] call.
761 class _SkipRequest<T> implements _EventRequest<T> { 753 class _SkipRequest<T> implements _EventRequest<T> {
762 /// Completer for the future returned by the skip call. 754 /// Completer for the future returned by the skip call.
763 final _completer = new Completer<int>(); 755 final _completer = new Completer<int>();
764 756
765 /// Number of remaining events to skip. 757 /// Number of remaining events to skip.
766 /// 758 ///
767 /// The request [isComplete] when the values reaches zero. 759 /// The request [isComplete] when the values reaches zero.
768 /// 760 ///
769 /// Decremented when an event is seen. 761 /// Decremented when an event is seen.
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
807 /// The request [isComplete] when the length of [_list] reaches 799 /// The request [isComplete] when the length of [_list] reaches
808 /// this value. 800 /// this value.
809 final int _eventsToTake; 801 final int _eventsToTake;
810 802
811 _ListRequest(this._eventsToTake); 803 _ListRequest(this._eventsToTake);
812 804
813 /// The future completed when the correct number of events have been captured. 805 /// The future completed when the correct number of events have been captured.
814 Future<List<T>> get future => _completer.future; 806 Future<List<T>> get future => _completer.future;
815 } 807 }
816 808
817
818 /// Request for a [StreamQueue.take] call. 809 /// Request for a [StreamQueue.take] call.
819 class _TakeRequest<T> extends _ListRequest<T> { 810 class _TakeRequest<T> extends _ListRequest<T> {
820 _TakeRequest(int eventsToTake) : super(eventsToTake); 811 _TakeRequest(int eventsToTake) : super(eventsToTake);
821 812
822 bool update(QueueList<Result<T>> events, bool isDone) { 813 bool update(QueueList<Result<T>> events, bool isDone) {
823 while (_list.length < _eventsToTake) { 814 while (_list.length < _eventsToTake) {
824 if (events.isEmpty) { 815 if (events.isEmpty) {
825 if (isDone) break; 816 if (isDone) break;
826 return false; 817 return false;
827 } 818 }
828 819
829 var event = events.removeFirst(); 820 var event = events.removeFirst();
830 if (event.isError) { 821 if (event.isError) {
831 event.asError.complete(_completer); 822 event.asError.complete(_completer);
832 return true; 823 return true;
833 } 824 }
834 _list.add(event.asValue.value); 825 _list.add(event.asValue.value);
835 } 826 }
836 _completer.complete(_list); 827 _completer.complete(_list);
837 return true; 828 return true;
838 } 829 }
839 } 830 }
840 831
841
842 /// Request for a [StreamQueue.lookAhead] call. 832 /// Request for a [StreamQueue.lookAhead] call.
843 class _LookAheadRequest<T> extends _ListRequest<T> { 833 class _LookAheadRequest<T> extends _ListRequest<T> {
844 _LookAheadRequest(int eventsToTake) : super(eventsToTake); 834 _LookAheadRequest(int eventsToTake) : super(eventsToTake);
845 835
846 bool update(QueueList<Result<T>> events, bool isDone) { 836 bool update(QueueList<Result<T>> events, bool isDone) {
847 while (_list.length < _eventsToTake) { 837 while (_list.length < _eventsToTake) {
848 if (events.length == _list.length) { 838 if (events.length == _list.length) {
849 if (isDone) break; 839 if (isDone) break;
850 return false; 840 return false;
851 } 841 }
852 var event = events.elementAt(_list.length); 842 var event = events.elementAt(_list.length);
853 if (event.isError) { 843 if (event.isError) {
854 event.asError.complete(_completer); 844 event.asError.complete(_completer);
855 return true; 845 return true;
856 } 846 }
857 _list.add(event.asValue.value); 847 _list.add(event.asValue.value);
858 } 848 }
859 _completer.complete(_list); 849 _completer.complete(_list);
860 return true; 850 return true;
861 } 851 }
862 } 852 }
863 853
864
865 /// Request for a [StreamQueue.cancel] call. 854 /// Request for a [StreamQueue.cancel] call.
866 /// 855 ///
867 /// The request needs no events, it just waits in the request queue 856 /// The request needs no events, it just waits in the request queue
868 /// until all previous events are fulfilled, then it cancels the stream queue 857 /// until all previous events are fulfilled, then it cancels the stream queue
869 /// source subscription. 858 /// source subscription.
870 class _CancelRequest<T> implements _EventRequest<T> { 859 class _CancelRequest<T> implements _EventRequest<T> {
871 /// Completer for the future returned by the `cancel` call. 860 /// Completer for the future returned by the `cancel` call.
872 final _completer = new Completer(); 861 final _completer = new Completer();
862
873 /// 863 ///
874 /// When the event is completed, it needs to cancel the active subscription 864 /// When the event is completed, it needs to cancel the active subscription
875 /// of the `StreamQueue` object, if any. 865 /// of the `StreamQueue` object, if any.
876 final StreamQueue _streamQueue; 866 final StreamQueue _streamQueue;
877 867
878 _CancelRequest(this._streamQueue); 868 _CancelRequest(this._streamQueue);
879 869
880 /// The future completed when the cancel request is completed. 870 /// The future completed when the cancel request is completed.
881 Future get future => _completer.future; 871 Future get future => _completer.future;
882 872
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
918 } else { 908 } else {
919 _completer.setSourceStream(_streamQueue._extractStream()); 909 _completer.setSourceStream(_streamQueue._extractStream());
920 } 910 }
921 } else { 911 } else {
922 // There are prefetched events which needs to be added before the 912 // There are prefetched events which needs to be added before the
923 // remaining stream. 913 // remaining stream.
924 var controller = new StreamController<T>(); 914 var controller = new StreamController<T>();
925 for (var event in events) { 915 for (var event in events) {
926 event.addTo(controller); 916 event.addTo(controller);
927 } 917 }
928 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) 918 controller
929 .whenComplete(controller.close); 919 .addStream(_streamQueue._extractStream(), cancelOnError: false)
920 .whenComplete(controller.close);
930 _completer.setSourceStream(controller.stream); 921 _completer.setSourceStream(controller.stream);
931 } 922 }
932 return true; 923 return true;
933 } 924 }
934 } 925 }
935 926
936 /// Request for a [StreamQueue.hasNext] call. 927 /// Request for a [StreamQueue.hasNext] call.
937 /// 928 ///
938 /// Completes the [future] with `true` if it sees any event, 929 /// Completes the [future] with `true` if it sees any event,
939 /// but doesn't consume the event. 930 /// but doesn't consume the event.
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
979 } 970 }
980 971
981 bool update(QueueList<Result<T>> events, bool isDone) { 972 bool update(QueueList<Result<T>> events, bool isDone) {
982 while (_eventsSent < events.length) { 973 while (_eventsSent < events.length) {
983 events[_eventsSent++].addTo(_controller); 974 events[_eventsSent++].addTo(_controller);
984 } 975 }
985 if (isDone && !_controller.isClosed) _controller.close(); 976 if (isDone && !_controller.isClosed) _controller.close();
986 return false; 977 return false;
987 } 978 }
988 } 979 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698