 Chromium Code Reviews
 Chromium Code Reviews Issue 2649033006:
  Add `peek` and `lookAhead` to `StreamQueue`.  (Closed)
    
  
    Issue 2649033006:
  Add `peek` and `lookAhead` to `StreamQueue`.  (Closed) 
  | OLD | NEW | 
|---|---|
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a | 
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. | 
| 4 | 4 | 
| 5 import 'dart:async'; | 5 import 'dart:async'; | 
| 6 import 'dart:collection'; | 6 import 'dart:collection'; | 
| 7 | 7 | 
| 8 import 'package:collection/collection.dart'; | 8 import 'package:collection/collection.dart'; | 
| 9 | 9 | 
| 10 import "cancelable_operation.dart"; | 10 import "cancelable_operation.dart"; | 
| (...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 115 /// one events. | 115 /// one events. | 
| 116 Future<bool> get hasNext { | 116 Future<bool> get hasNext { | 
| 117 if (!_isClosed) { | 117 if (!_isClosed) { | 
| 118 var hasNextRequest = new _HasNextRequest(); | 118 var hasNextRequest = new _HasNextRequest(); | 
| 119 _addRequest(hasNextRequest); | 119 _addRequest(hasNextRequest); | 
| 120 return hasNextRequest.future; | 120 return hasNextRequest.future; | 
| 121 } | 121 } | 
| 122 throw _failClosed(); | 122 throw _failClosed(); | 
| 123 } | 123 } | 
| 124 | 124 | 
| 125 | |
| 126 /// Look at the next [count] data events without consuming them. | |
| 
nweiz
2017/01/24 22:37:59
"Look at" -> "Returns"
 
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
 | |
| 127 /// | |
| 128 /// Works like [take] except that the events are left in the queue. | |
| 129 /// If one of the next [count] events is an error, the returned future | |
| 130 /// completes with this error, and the error is still left in the queue. | |
| 131 Future<List<T>> lookAhead(int count) { | |
| 132 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
| 
nweiz
2017/01/24 22:37:59
`RangeError.checkNotNegative()`?
 
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
 | |
| 133 if (!_isClosed) { | |
| 134 var request = new _LookAheadRequest<T>(count); | |
| 135 _addRequest(request); | |
| 136 return request.future; | |
| 137 } | |
| 138 throw _failClosed(); | |
| 139 } | |
| 
nweiz
2017/01/24 22:37:59
Why not just implement these in terms of transacti
 
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Because it would also be a lot more expensive.
I p
 
nweiz
2017/01/25 22:41:18
Do you have benchmarks where the extra processing
 
Lasse Reichstein Nielsen
2017/01/26 12:13:43
I've added a benchmark below. It shows that `retur
 | |
| 140 | |
| 125 /// Requests the next (yet unrequested) event from the stream. | 141 /// Requests the next (yet unrequested) event from the stream. | 
| 126 /// | 142 /// | 
| 127 /// When the requested event arrives, the returned future is completed with | 143 /// When the requested event arrives, the returned future is completed with | 
| 128 /// the event. | 144 /// the event. | 
| 129 /// If the event is a data event, the returned future completes | 145 /// If the event is a data event, the returned future completes | 
| 130 /// with its value. | 146 /// with its value. | 
| 131 /// If the event is an error event, the returned future completes with | 147 /// If the event is an error event, the returned future completes with | 
| 132 /// its error and stack trace. | 148 /// its error and stack trace. | 
| 133 /// If the stream closes before an event arrives, the returned future | 149 /// If the stream closes before an event arrives, the returned future | 
| 134 /// completes with a [StateError]. | 150 /// completes with a [StateError]. | 
| 135 /// | 151 /// | 
| 136 /// It's possible to have several pending [next] calls (or other requests), | 152 /// It's possible to have several pending [next] calls (or other requests), | 
| 137 /// and they will be completed in the order they were requested, by the | 153 /// and they will be completed in the order they were requested, by the | 
| 138 /// first events that were not consumed by previous requeusts. | 154 /// first events that were not consumed by previous requeusts. | 
| 139 Future<T> get next { | 155 Future<T> get next { | 
| 140 if (!_isClosed) { | 156 if (!_isClosed) { | 
| 141 var nextRequest = new _NextRequest<T>(); | 157 var nextRequest = new _NextRequest<T>(); | 
| 142 _addRequest(nextRequest); | 158 _addRequest(nextRequest); | 
| 143 return nextRequest.future; | 159 return nextRequest.future; | 
| 144 } | 160 } | 
| 145 throw _failClosed(); | 161 throw _failClosed(); | 
| 146 } | 162 } | 
| 147 | 163 | 
| 164 /// Looks at the next (yet unrequested) event from the stream. | |
| 
nweiz
2017/01/24 22:37:59
"Looks at" -> "Returns"
 
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
 | |
| 165 /// | |
| 166 /// Like [next] except that the event is not consumed. | |
| 167 /// If the next event is an error event, it stays in the queue. | |
| 168 Future<T> get peek { | |
| 169 if (!_isClosed) { | |
| 170 var nextRequest = new _PeekRequest<T>(); | |
| 171 _addRequest(nextRequest); | |
| 172 return nextRequest.future; | |
| 173 } | |
| 174 throw _failClosed(); | |
| 175 } | |
| 176 | |
| 148 /// Returns a stream of all the remaning events of the source stream. | 177 /// Returns a stream of all the remaning events of the source stream. | 
| 149 /// | 178 /// | 
| 150 /// All requested [next], [skip] or [take] operations are completed | 179 /// All requested [next], [skip] or [take] operations are completed | 
| 151 /// first, and then any remaining events are provided as events of | 180 /// first, and then any remaining events are provided as events of | 
| 152 /// the returned stream. | 181 /// the returned stream. | 
| 153 /// | 182 /// | 
| 154 /// Using `rest` closes this stream queue. After getting the | 183 /// Using `rest` closes this stream queue. After getting the | 
| 155 /// `rest` the caller may no longer request other events, like | 184 /// `rest` the caller may no longer request other events, like | 
| 156 /// after calling [cancel]. | 185 /// after calling [cancel]. | 
| 157 Stream<T> get rest { | 186 Stream<T> get rest { | 
| (...skipping 179 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 337 /// subscription providing the events. | 366 /// subscription providing the events. | 
| 338 /// | 367 /// | 
| 339 /// If [immediate] is `true`, the source is instead canceled | 368 /// If [immediate] is `true`, the source is instead canceled | 
| 340 /// immediately. Any pending events are completed as though the underlying | 369 /// immediately. Any pending events are completed as though the underlying | 
| 341 /// stream had closed. | 370 /// stream had closed. | 
| 342 /// | 371 /// | 
| 343 /// The returned future completes with the result of calling | 372 /// The returned future completes with the result of calling | 
| 344 /// `cancel`. | 373 /// `cancel`. | 
| 345 /// | 374 /// | 
| 346 /// After calling `cancel`, no further events can be requested. | 375 /// After calling `cancel`, no further events can be requested. | 
| 347 /// None of [next], [rest], [skip], [take] or [cancel] may be | 376 /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel] | 
| 348 /// called again. | 377 /// may be called again. | 
| 
nweiz
2017/01/24 22:37:59
Is it really necessary to explicitly list all the
 
Lasse Reichstein Nielsen
2017/01/25 07:58:03
It seemed like a great idea at the time ... but it
 | |
| 349 Future cancel({bool immediate: false}) { | 378 Future cancel({bool immediate: false}) { | 
| 350 if (_isClosed) throw _failClosed(); | 379 if (_isClosed) throw _failClosed(); | 
| 351 _isClosed = true; | 380 _isClosed = true; | 
| 352 | 381 | 
| 353 if (!immediate) { | 382 if (!immediate) { | 
| 354 var request = new _CancelRequest(this); | 383 var request = new _CancelRequest(this); | 
| 355 _addRequest(request); | 384 _addRequest(request); | 
| 356 return request.future; | 385 return request.future; | 
| 357 } | 386 } | 
| 358 | 387 | 
| (...skipping 337 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 696 _NextRequest(); | 725 _NextRequest(); | 
| 697 | 726 | 
| 698 Future<T> get future => _completer.future; | 727 Future<T> get future => _completer.future; | 
| 699 | 728 | 
| 700 bool update(QueueList<Result<T>> events, bool isDone) { | 729 bool update(QueueList<Result<T>> events, bool isDone) { | 
| 701 if (events.isNotEmpty) { | 730 if (events.isNotEmpty) { | 
| 702 events.removeFirst().complete(_completer); | 731 events.removeFirst().complete(_completer); | 
| 703 return true; | 732 return true; | 
| 704 } | 733 } | 
| 705 if (isDone) { | 734 if (isDone) { | 
| 706 var errorFuture = | 735 _completer.completeError(new StateError("No elements"), | 
| 707 new Future.sync(() => throw new StateError("No elements")); | 736 StackTrace.current); | 
| 708 _completer.complete(errorFuture); | |
| 709 return true; | 737 return true; | 
| 710 } | 738 } | 
| 711 return false; | 739 return false; | 
| 712 } | 740 } | 
| 713 } | 741 } | 
| 714 | 742 | 
| 743 | |
| 744 /// Request for a [StreamQueue.peek] call. | |
| 745 /// | |
| 746 /// Completes the returned future when receiving the first event, | |
| 747 /// and is then complete, but doesn't consume the event. | |
| 748 class _PeekRequest<T> implements _EventRequest<T> { | |
| 749 /// Completer for the future returned by [StreamQueue.next]. | |
| 750 final _completer = new Completer<T>(); | |
| 751 | |
| 752 _PeekRequest(); | |
| 753 | |
| 754 Future<T> get future => _completer.future; | |
| 755 | |
| 756 bool update(QueueList<Result<T>> events, bool isDone) { | |
| 757 if (events.isNotEmpty) { | |
| 758 events.first.complete(_completer); | |
| 759 return true; | |
| 760 } | |
| 761 if (isDone) { | |
| 762 _completer.completeError(new StateError("No elements"), | |
| 763 StackTrace.current); | |
| 764 return true; | |
| 765 } | |
| 766 return false; | |
| 767 } | |
| 768 } | |
| 769 | |
| 770 | |
| 715 /// Request for a [StreamQueue.skip] call. | 771 /// Request for a [StreamQueue.skip] call. | 
| 716 class _SkipRequest<T> implements _EventRequest<T> { | 772 class _SkipRequest<T> implements _EventRequest<T> { | 
| 717 /// Completer for the future returned by the skip call. | 773 /// Completer for the future returned by the skip call. | 
| 718 final _completer = new Completer<int>(); | 774 final _completer = new Completer<int>(); | 
| 719 | 775 | 
| 720 /// Number of remaining events to skip. | 776 /// Number of remaining events to skip. | 
| 721 /// | 777 /// | 
| 722 /// The request [isComplete] when the values reaches zero. | 778 /// The request [isComplete] when the values reaches zero. | 
| 723 /// | 779 /// | 
| 724 /// Decremented when an event is seen. | 780 /// Decremented when an event is seen. | 
| (...skipping 17 matching lines...) Expand all Loading... | |
| 742 if (event.isError) { | 798 if (event.isError) { | 
| 743 _completer.completeError(event.asError.error, event.asError.stackTrace); | 799 _completer.completeError(event.asError.error, event.asError.stackTrace); | 
| 744 return true; | 800 return true; | 
| 745 } | 801 } | 
| 746 } | 802 } | 
| 747 _completer.complete(_eventsToSkip); | 803 _completer.complete(_eventsToSkip); | 
| 748 return true; | 804 return true; | 
| 749 } | 805 } | 
| 750 } | 806 } | 
| 751 | 807 | 
| 752 /// Request for a [StreamQueue.take] call. | 808 /// Common superclass for [_TakeRequest] and [_LookAheadRequest]. | 
| 753 class _TakeRequest<T> implements _EventRequest<T> { | 809 abstract class _ListRequest<T> implements _EventRequest<T> { | 
| 754 /// Completer for the future returned by the take call. | 810 /// Completer for the future returned by the take call. | 
| 755 final _completer = new Completer<List<T>>(); | 811 final _completer = new Completer<List<T>>(); | 
| 756 | 812 | 
| 757 /// List collecting events until enough have been seen. | 813 /// List collecting events until enough have been seen. | 
| 758 final _list = <T>[]; | 814 final _list = <T>[]; | 
| 759 | 815 | 
| 760 /// Number of events to capture. | 816 /// Number of events to capture. | 
| 761 /// | 817 /// | 
| 762 /// The request [isComplete] when the length of [_list] reaches | 818 /// The request [isComplete] when the length of [_list] reaches | 
| 763 /// this value. | 819 /// this value. | 
| 764 final int _eventsToTake; | 820 final int _eventsToTake; | 
| 765 | 821 | 
| 766 _TakeRequest(this._eventsToTake); | 822 _ListRequest(this._eventsToTake); | 
| 767 | 823 | 
| 768 /// The future completed when the correct number of events have been captured. | 824 /// The future completed when the correct number of events have been captured. | 
| 769 Future<List<T>> get future => _completer.future; | 825 Future<List<T>> get future => _completer.future; | 
| 826 } | |
| 827 | |
| 828 | |
| 829 /// Request for a [StreamQueue.take] call. | |
| 830 class _TakeRequest<T> extends _ListRequest<T> { | |
| 831 _TakeRequest(int eventsToTake) : super(eventsToTake); | |
| 770 | 832 | 
| 771 bool update(QueueList<Result<T>> events, bool isDone) { | 833 bool update(QueueList<Result<T>> events, bool isDone) { | 
| 772 while (_list.length < _eventsToTake) { | 834 while (_list.length < _eventsToTake) { | 
| 773 if (events.isEmpty) { | 835 if (events.isEmpty) { | 
| 774 if (isDone) break; | 836 if (isDone) break; | 
| 775 return false; | 837 return false; | 
| 776 } | 838 } | 
| 777 | 839 | 
| 778 var event = events.removeFirst(); | 840 var event = events.removeFirst(); | 
| 779 if (event.isError) { | 841 if (event.isError) { | 
| 780 _completer.completeError(event.asError.error, event.asError.stackTrace); | 842 event.asError.complete(_completer); | 
| 781 return true; | 843 return true; | 
| 782 } | 844 } | 
| 783 _list.add(event.asValue.value); | 845 _list.add(event.asValue.value); | 
| 784 } | 846 } | 
| 785 _completer.complete(_list); | 847 _completer.complete(_list); | 
| 786 return true; | 848 return true; | 
| 787 } | 849 } | 
| 788 } | 850 } | 
| 789 | 851 | 
| 852 | |
| 853 /// Request for a [StreamQueue.lookAhead] call. | |
| 854 class _LookAheadRequest<T> extends _ListRequest<T> { | |
| 855 _LookAheadRequest(int eventsToTake) : super(eventsToTake); | |
| 856 | |
| 857 bool update(QueueList<Result<T>> events, bool isDone) { | |
| 858 while (_list.length < _eventsToTake) { | |
| 859 if (events.length == _list.length) { | |
| 860 if (isDone) break; | |
| 861 return false; | |
| 862 } | |
| 863 var event = events.elementAt(_list.length); | |
| 864 if (event.isError) { | |
| 865 event.asError.complete(_completer); | |
| 866 return true; | |
| 867 } | |
| 868 _list.add(event.asValue.value); | |
| 869 } | |
| 870 _completer.complete(_list); | |
| 871 return true; | |
| 872 } | |
| 873 } | |
| 874 | |
| 875 | |
| 790 /// Request for a [StreamQueue.cancel] call. | 876 /// Request for a [StreamQueue.cancel] call. | 
| 791 /// | 877 /// | 
| 792 /// The request needs no events, it just waits in the request queue | 878 /// The request needs no events, it just waits in the request queue | 
| 793 /// until all previous events are fulfilled, then it cancels the stream queue | 879 /// until all previous events are fulfilled, then it cancels the stream queue | 
| 794 /// source subscription. | 880 /// source subscription. | 
| 795 class _CancelRequest<T> implements _EventRequest<T> { | 881 class _CancelRequest<T> implements _EventRequest<T> { | 
| 796 /// Completer for the future returned by the `cancel` call. | 882 /// Completer for the future returned by the `cancel` call. | 
| 797 final _completer = new Completer(); | 883 final _completer = new Completer(); | 
| 798 /// | 884 /// | 
| 799 /// When the event is completed, it needs to cancel the active subscription | 885 /// When the event is completed, it needs to cancel the active subscription | 
| (...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 904 } | 990 } | 
| 905 | 991 | 
| 906 bool update(QueueList<Result<T>> events, bool isDone) { | 992 bool update(QueueList<Result<T>> events, bool isDone) { | 
| 907 while (_eventsSent < events.length) { | 993 while (_eventsSent < events.length) { | 
| 908 events[_eventsSent++].addTo(_controller); | 994 events[_eventsSent++].addTo(_controller); | 
| 909 } | 995 } | 
| 910 if (isDone && !_controller.isClosed) _controller.close(); | 996 if (isDone && !_controller.isClosed) _controller.close(); | 
| 911 return false; | 997 return false; | 
| 912 } | 998 } | 
| 913 } | 999 } | 
| OLD | NEW |