Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(73)

Side by Side Diff: lib/src/stream_queue.dart

Issue 2649033006: Add `peek` and `lookAhead` to `StreamQueue`. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 import 'dart:collection'; 6 import 'dart:collection';
7 7
8 import 'package:collection/collection.dart'; 8 import 'package:collection/collection.dart';
9 9
10 import "cancelable_operation.dart"; 10 import "cancelable_operation.dart";
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
115 /// one events. 115 /// one events.
116 Future<bool> get hasNext { 116 Future<bool> get hasNext {
117 if (!_isClosed) { 117 if (!_isClosed) {
118 var hasNextRequest = new _HasNextRequest(); 118 var hasNextRequest = new _HasNextRequest();
119 _addRequest(hasNextRequest); 119 _addRequest(hasNextRequest);
120 return hasNextRequest.future; 120 return hasNextRequest.future;
121 } 121 }
122 throw _failClosed(); 122 throw _failClosed();
123 } 123 }
124 124
125
126 /// Look at the next [count] data events without consuming them.
nweiz 2017/01/24 22:37:59 "Look at" -> "Returns"
Lasse Reichstein Nielsen 2017/01/25 07:58:03 Done.
127 ///
128 /// Works like [take] except that the events are left in the queue.
129 /// If one of the next [count] events is an error, the returned future
130 /// completes with this error, and the error is still left in the queue.
131 Future<List<T>> lookAhead(int count) {
132 if (count < 0) throw new RangeError.range(count, 0, null, "count");
nweiz 2017/01/24 22:37:59 `RangeError.checkNotNegative()`?
Lasse Reichstein Nielsen 2017/01/25 07:58:03 Done.
133 if (!_isClosed) {
134 var request = new _LookAheadRequest<T>(count);
135 _addRequest(request);
136 return request.future;
137 }
138 throw _failClosed();
139 }
nweiz 2017/01/24 22:37:59 Why not just implement these in terms of transacti
Lasse Reichstein Nielsen 2017/01/25 07:58:03 Because it would also be a lot more expensive. I p
nweiz 2017/01/25 22:41:18 Do you have benchmarks where the extra processing
Lasse Reichstein Nielsen 2017/01/26 12:13:43 I've added a benchmark below. It shows that `retur
140
125 /// Requests the next (yet unrequested) event from the stream. 141 /// Requests the next (yet unrequested) event from the stream.
126 /// 142 ///
127 /// When the requested event arrives, the returned future is completed with 143 /// When the requested event arrives, the returned future is completed with
128 /// the event. 144 /// the event.
129 /// If the event is a data event, the returned future completes 145 /// If the event is a data event, the returned future completes
130 /// with its value. 146 /// with its value.
131 /// If the event is an error event, the returned future completes with 147 /// If the event is an error event, the returned future completes with
132 /// its error and stack trace. 148 /// its error and stack trace.
133 /// If the stream closes before an event arrives, the returned future 149 /// If the stream closes before an event arrives, the returned future
134 /// completes with a [StateError]. 150 /// completes with a [StateError].
135 /// 151 ///
136 /// It's possible to have several pending [next] calls (or other requests), 152 /// It's possible to have several pending [next] calls (or other requests),
137 /// and they will be completed in the order they were requested, by the 153 /// and they will be completed in the order they were requested, by the
138 /// first events that were not consumed by previous requeusts. 154 /// first events that were not consumed by previous requeusts.
139 Future<T> get next { 155 Future<T> get next {
140 if (!_isClosed) { 156 if (!_isClosed) {
141 var nextRequest = new _NextRequest<T>(); 157 var nextRequest = new _NextRequest<T>();
142 _addRequest(nextRequest); 158 _addRequest(nextRequest);
143 return nextRequest.future; 159 return nextRequest.future;
144 } 160 }
145 throw _failClosed(); 161 throw _failClosed();
146 } 162 }
147 163
164 /// Looks at the next (yet unrequested) event from the stream.
nweiz 2017/01/24 22:37:59 "Looks at" -> "Returns"
Lasse Reichstein Nielsen 2017/01/25 07:58:03 Done.
165 ///
166 /// Like [next] except that the event is not consumed.
167 /// If the next event is an error event, it stays in the queue.
168 Future<T> get peek {
169 if (!_isClosed) {
170 var nextRequest = new _PeekRequest<T>();
171 _addRequest(nextRequest);
172 return nextRequest.future;
173 }
174 throw _failClosed();
175 }
176
148 /// Returns a stream of all the remaning events of the source stream. 177 /// Returns a stream of all the remaning events of the source stream.
149 /// 178 ///
150 /// All requested [next], [skip] or [take] operations are completed 179 /// All requested [next], [skip] or [take] operations are completed
151 /// first, and then any remaining events are provided as events of 180 /// first, and then any remaining events are provided as events of
152 /// the returned stream. 181 /// the returned stream.
153 /// 182 ///
154 /// Using `rest` closes this stream queue. After getting the 183 /// Using `rest` closes this stream queue. After getting the
155 /// `rest` the caller may no longer request other events, like 184 /// `rest` the caller may no longer request other events, like
156 /// after calling [cancel]. 185 /// after calling [cancel].
157 Stream<T> get rest { 186 Stream<T> get rest {
(...skipping 179 matching lines...) Expand 10 before | Expand all | Expand 10 after
337 /// subscription providing the events. 366 /// subscription providing the events.
338 /// 367 ///
339 /// If [immediate] is `true`, the source is instead canceled 368 /// If [immediate] is `true`, the source is instead canceled
340 /// immediately. Any pending events are completed as though the underlying 369 /// immediately. Any pending events are completed as though the underlying
341 /// stream had closed. 370 /// stream had closed.
342 /// 371 ///
343 /// The returned future completes with the result of calling 372 /// The returned future completes with the result of calling
344 /// `cancel`. 373 /// `cancel`.
345 /// 374 ///
346 /// After calling `cancel`, no further events can be requested. 375 /// After calling `cancel`, no further events can be requested.
347 /// None of [next], [rest], [skip], [take] or [cancel] may be 376 /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
348 /// called again. 377 /// may be called again.
nweiz 2017/01/24 22:37:59 Is it really necessary to explicitly list all the
Lasse Reichstein Nielsen 2017/01/25 07:58:03 It seemed like a great idea at the time ... but it
349 Future cancel({bool immediate: false}) { 378 Future cancel({bool immediate: false}) {
350 if (_isClosed) throw _failClosed(); 379 if (_isClosed) throw _failClosed();
351 _isClosed = true; 380 _isClosed = true;
352 381
353 if (!immediate) { 382 if (!immediate) {
354 var request = new _CancelRequest(this); 383 var request = new _CancelRequest(this);
355 _addRequest(request); 384 _addRequest(request);
356 return request.future; 385 return request.future;
357 } 386 }
358 387
(...skipping 337 matching lines...) Expand 10 before | Expand all | Expand 10 after
696 _NextRequest(); 725 _NextRequest();
697 726
698 Future<T> get future => _completer.future; 727 Future<T> get future => _completer.future;
699 728
700 bool update(QueueList<Result<T>> events, bool isDone) { 729 bool update(QueueList<Result<T>> events, bool isDone) {
701 if (events.isNotEmpty) { 730 if (events.isNotEmpty) {
702 events.removeFirst().complete(_completer); 731 events.removeFirst().complete(_completer);
703 return true; 732 return true;
704 } 733 }
705 if (isDone) { 734 if (isDone) {
706 var errorFuture = 735 _completer.completeError(new StateError("No elements"),
707 new Future.sync(() => throw new StateError("No elements")); 736 StackTrace.current);
708 _completer.complete(errorFuture);
709 return true; 737 return true;
710 } 738 }
711 return false; 739 return false;
712 } 740 }
713 } 741 }
714 742
743
744 /// Request for a [StreamQueue.peek] call.
745 ///
746 /// Completes the returned future when receiving the first event,
747 /// and is then complete, but doesn't consume the event.
748 class _PeekRequest<T> implements _EventRequest<T> {
749 /// Completer for the future returned by [StreamQueue.next].
750 final _completer = new Completer<T>();
751
752 _PeekRequest();
753
754 Future<T> get future => _completer.future;
755
756 bool update(QueueList<Result<T>> events, bool isDone) {
757 if (events.isNotEmpty) {
758 events.first.complete(_completer);
759 return true;
760 }
761 if (isDone) {
762 _completer.completeError(new StateError("No elements"),
763 StackTrace.current);
764 return true;
765 }
766 return false;
767 }
768 }
769
770
715 /// Request for a [StreamQueue.skip] call. 771 /// Request for a [StreamQueue.skip] call.
716 class _SkipRequest<T> implements _EventRequest<T> { 772 class _SkipRequest<T> implements _EventRequest<T> {
717 /// Completer for the future returned by the skip call. 773 /// Completer for the future returned by the skip call.
718 final _completer = new Completer<int>(); 774 final _completer = new Completer<int>();
719 775
720 /// Number of remaining events to skip. 776 /// Number of remaining events to skip.
721 /// 777 ///
722 /// The request [isComplete] when the values reaches zero. 778 /// The request [isComplete] when the values reaches zero.
723 /// 779 ///
724 /// Decremented when an event is seen. 780 /// Decremented when an event is seen.
(...skipping 17 matching lines...) Expand all
742 if (event.isError) { 798 if (event.isError) {
743 _completer.completeError(event.asError.error, event.asError.stackTrace); 799 _completer.completeError(event.asError.error, event.asError.stackTrace);
744 return true; 800 return true;
745 } 801 }
746 } 802 }
747 _completer.complete(_eventsToSkip); 803 _completer.complete(_eventsToSkip);
748 return true; 804 return true;
749 } 805 }
750 } 806 }
751 807
752 /// Request for a [StreamQueue.take] call. 808 /// Common superclass for [_TakeRequest] and [_LookAheadRequest].
753 class _TakeRequest<T> implements _EventRequest<T> { 809 abstract class _ListRequest<T> implements _EventRequest<T> {
754 /// Completer for the future returned by the take call. 810 /// Completer for the future returned by the take call.
755 final _completer = new Completer<List<T>>(); 811 final _completer = new Completer<List<T>>();
756 812
757 /// List collecting events until enough have been seen. 813 /// List collecting events until enough have been seen.
758 final _list = <T>[]; 814 final _list = <T>[];
759 815
760 /// Number of events to capture. 816 /// Number of events to capture.
761 /// 817 ///
762 /// The request [isComplete] when the length of [_list] reaches 818 /// The request [isComplete] when the length of [_list] reaches
763 /// this value. 819 /// this value.
764 final int _eventsToTake; 820 final int _eventsToTake;
765 821
766 _TakeRequest(this._eventsToTake); 822 _ListRequest(this._eventsToTake);
767 823
768 /// The future completed when the correct number of events have been captured. 824 /// The future completed when the correct number of events have been captured.
769 Future<List<T>> get future => _completer.future; 825 Future<List<T>> get future => _completer.future;
826 }
827
828
829 /// Request for a [StreamQueue.take] call.
830 class _TakeRequest<T> extends _ListRequest<T> {
831 _TakeRequest(int eventsToTake) : super(eventsToTake);
770 832
771 bool update(QueueList<Result<T>> events, bool isDone) { 833 bool update(QueueList<Result<T>> events, bool isDone) {
772 while (_list.length < _eventsToTake) { 834 while (_list.length < _eventsToTake) {
773 if (events.isEmpty) { 835 if (events.isEmpty) {
774 if (isDone) break; 836 if (isDone) break;
775 return false; 837 return false;
776 } 838 }
777 839
778 var event = events.removeFirst(); 840 var event = events.removeFirst();
779 if (event.isError) { 841 if (event.isError) {
780 _completer.completeError(event.asError.error, event.asError.stackTrace); 842 event.asError.complete(_completer);
781 return true; 843 return true;
782 } 844 }
783 _list.add(event.asValue.value); 845 _list.add(event.asValue.value);
784 } 846 }
785 _completer.complete(_list); 847 _completer.complete(_list);
786 return true; 848 return true;
787 } 849 }
788 } 850 }
789 851
852
853 /// Request for a [StreamQueue.lookAhead] call.
854 class _LookAheadRequest<T> extends _ListRequest<T> {
855 _LookAheadRequest(int eventsToTake) : super(eventsToTake);
856
857 bool update(QueueList<Result<T>> events, bool isDone) {
858 while (_list.length < _eventsToTake) {
859 if (events.length == _list.length) {
860 if (isDone) break;
861 return false;
862 }
863 var event = events.elementAt(_list.length);
864 if (event.isError) {
865 event.asError.complete(_completer);
866 return true;
867 }
868 _list.add(event.asValue.value);
869 }
870 _completer.complete(_list);
871 return true;
872 }
873 }
874
875
790 /// Request for a [StreamQueue.cancel] call. 876 /// Request for a [StreamQueue.cancel] call.
791 /// 877 ///
792 /// The request needs no events, it just waits in the request queue 878 /// The request needs no events, it just waits in the request queue
793 /// until all previous events are fulfilled, then it cancels the stream queue 879 /// until all previous events are fulfilled, then it cancels the stream queue
794 /// source subscription. 880 /// source subscription.
795 class _CancelRequest<T> implements _EventRequest<T> { 881 class _CancelRequest<T> implements _EventRequest<T> {
796 /// Completer for the future returned by the `cancel` call. 882 /// Completer for the future returned by the `cancel` call.
797 final _completer = new Completer(); 883 final _completer = new Completer();
798 /// 884 ///
799 /// When the event is completed, it needs to cancel the active subscription 885 /// When the event is completed, it needs to cancel the active subscription
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
904 } 990 }
905 991
906 bool update(QueueList<Result<T>> events, bool isDone) { 992 bool update(QueueList<Result<T>> events, bool isDone) {
907 while (_eventsSent < events.length) { 993 while (_eventsSent < events.length) {
908 events[_eventsSent++].addTo(_controller); 994 events[_eventsSent++].addTo(_controller);
909 } 995 }
910 if (isDone && !_controller.isClosed) _controller.close(); 996 if (isDone && !_controller.isClosed) _controller.close();
911 return false; 997 return false;
912 } 998 }
913 } 999 }
OLDNEW
« CHANGELOG.md ('K') | « CHANGELOG.md ('k') | test/stream_queue_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698