Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: lib/src/stream_queue.dart

Issue 2649033006: Add `peek` and `lookAhead` to `StreamQueue`. (Closed)
Patch Set: Address comments. Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 import 'dart:collection'; 6 import 'dart:collection';
7 7
8 import 'package:collection/collection.dart'; 8 import 'package:collection/collection.dart';
9 9
10 import "cancelable_operation.dart"; 10 import "cancelable_operation.dart";
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after
124 /// one events. 124 /// one events.
125 Future<bool> get hasNext { 125 Future<bool> get hasNext {
126 if (!_isClosed) { 126 if (!_isClosed) {
127 var hasNextRequest = new _HasNextRequest(); 127 var hasNextRequest = new _HasNextRequest();
128 _addRequest(hasNextRequest); 128 _addRequest(hasNextRequest);
129 return hasNextRequest.future; 129 return hasNextRequest.future;
130 } 130 }
131 throw _failClosed(); 131 throw _failClosed();
132 } 132 }
133 133
134
135 /// Returns the next [count] data events without consuming them.
136 ///
137 /// Works like [take] except that the events are left in the queue.
138 /// If one of the next [count] events is an error, the returned future
139 /// completes with this error, and the error is still left in the queue.
140 Future<List<T>> lookAhead(int count) {
141 RangeError.checkNotNegative(count, "count");
142 if (!_isClosed) {
143 var request = new _LookAheadRequest<T>(count);
144 _addRequest(request);
145 return request.future;
146 }
147 throw _failClosed();
148 }
149
134 /// Requests the next (yet unrequested) event from the stream. 150 /// Requests the next (yet unrequested) event from the stream.
135 /// 151 ///
136 /// When the requested event arrives, the returned future is completed with 152 /// When the requested event arrives, the returned future is completed with
137 /// the event. 153 /// the event.
138 /// If the event is a data event, the returned future completes 154 /// If the event is a data event, the returned future completes
139 /// with its value. 155 /// with its value.
140 /// If the event is an error event, the returned future completes with 156 /// If the event is an error event, the returned future completes with
141 /// its error and stack trace. 157 /// its error and stack trace.
142 /// If the stream closes before an event arrives, the returned future 158 /// If the stream closes before an event arrives, the returned future
143 /// completes with a [StateError]. 159 /// completes with a [StateError].
144 /// 160 ///
145 /// It's possible to have several pending [next] calls (or other requests), 161 /// It's possible to have several pending [next] calls (or other requests),
146 /// and they will be completed in the order they were requested, by the 162 /// and they will be completed in the order they were requested, by the
147 /// first events that were not consumed by previous requeusts. 163 /// first events that were not consumed by previous requeusts.
148 Future<T> get next { 164 Future<T> get next {
149 if (!_isClosed) { 165 if (!_isClosed) {
150 var nextRequest = new _NextRequest<T>(); 166 var nextRequest = new _NextRequest<T>();
151 _addRequest(nextRequest); 167 _addRequest(nextRequest);
152 return nextRequest.future; 168 return nextRequest.future;
153 } 169 }
154 throw _failClosed(); 170 throw _failClosed();
155 } 171 }
156 172
173 /// Returns the next (yet unrequested) event from the stream.
174 ///
175 /// Like [next] except that the event is not consumed.
176 /// If the next event is an error event, it also stays in the queue.
177 Future<T> get peek {
178 if (!_isClosed) {
179 var nextRequest = new _PeekRequest<T>();
180 _addRequest(nextRequest);
181 return nextRequest.future;
182 }
183 throw _failClosed();
184 }
185
157 /// Returns a stream of all the remaning events of the source stream. 186 /// Returns a stream of all the remaning events of the source stream.
158 /// 187 ///
159 /// All requested [next], [skip] or [take] operations are completed 188 /// All requested [next], [skip] or [take] operations are completed
160 /// first, and then any remaining events are provided as events of 189 /// first, and then any remaining events are provided as events of
161 /// the returned stream. 190 /// the returned stream.
162 /// 191 ///
163 /// Using `rest` closes this stream queue. After getting the 192 /// Using `rest` closes this stream queue. After getting the
164 /// `rest` the caller may no longer request other events, like 193 /// `rest` the caller may no longer request other events, like
165 /// after calling [cancel]. 194 /// after calling [cancel].
166 Stream<T> get rest { 195 Stream<T> get rest {
(...skipping 15 matching lines...) Expand all
182 /// 211 ///
183 /// If an error occurs before `count` data events have been skipped, 212 /// If an error occurs before `count` data events have been skipped,
184 /// the returned future completes with that error instead. 213 /// the returned future completes with that error instead.
185 /// 214 ///
186 /// If the stream closes before `count` data events, 215 /// If the stream closes before `count` data events,
187 /// the remaining unskipped event count is returned. 216 /// the remaining unskipped event count is returned.
188 /// If the returned future completes with the integer `0`, 217 /// If the returned future completes with the integer `0`,
189 /// then all events were succssfully skipped. If the value 218 /// then all events were succssfully skipped. If the value
190 /// is greater than zero then the stream ended early. 219 /// is greater than zero then the stream ended early.
191 Future<int> skip(int count) { 220 Future<int> skip(int count) {
192 if (count < 0) throw new RangeError.range(count, 0, null, "count"); 221 RangeError.checkNotNegative(count, "count");
193 if (!_isClosed) { 222 if (!_isClosed) {
194 var request = new _SkipRequest(count); 223 var request = new _SkipRequest(count);
195 _addRequest(request); 224 _addRequest(request);
196 return request.future; 225 return request.future;
197 } 226 }
198 throw _failClosed(); 227 throw _failClosed();
199 } 228 }
200 229
201 /// Requests the next [count] data events as a list. 230 /// Requests the next [count] data events as a list.
202 /// 231 ///
203 /// The [count] must be non-negative. 232 /// The [count] must be non-negative.
204 /// 233 ///
205 /// Equivalent to calling [next] `count` times and 234 /// Equivalent to calling [next] `count` times and
206 /// storing the data values in a list. 235 /// storing the data values in a list.
207 /// 236 ///
208 /// If an error occurs before `count` data events has 237 /// If an error occurs before `count` data events has
209 /// been collected, the returned future completes with 238 /// been collected, the returned future completes with
210 /// that error instead. 239 /// that error instead.
211 /// 240 ///
212 /// If the stream closes before `count` data events, 241 /// If the stream closes before `count` data events,
213 /// the returned future completes with the list 242 /// the returned future completes with the list
214 /// of data collected so far. That is, the returned 243 /// of data collected so far. That is, the returned
215 /// list may have fewer than [count] elements. 244 /// list may have fewer than [count] elements.
216 Future<List<T>> take(int count) { 245 Future<List<T>> take(int count) {
217 if (count < 0) throw new RangeError.range(count, 0, null, "count"); 246 RangeError.checkNotNegative(count, "count");
218 if (!_isClosed) { 247 if (!_isClosed) {
219 var request = new _TakeRequest<T>(count); 248 var request = new _TakeRequest<T>(count);
220 _addRequest(request); 249 _addRequest(request);
221 return request.future; 250 return request.future;
222 } 251 }
223 throw _failClosed(); 252 throw _failClosed();
224 } 253 }
225 254
226 /// Requests a transaction that can conditionally consume events. 255 /// Requests a transaction that can conditionally consume events.
227 /// 256 ///
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after
345 /// all previously requested events have been processed, then it cancels the 374 /// all previously requested events have been processed, then it cancels the
346 /// subscription providing the events. 375 /// subscription providing the events.
347 /// 376 ///
348 /// If [immediate] is `true`, the source is instead canceled 377 /// If [immediate] is `true`, the source is instead canceled
349 /// immediately. Any pending events are completed as though the underlying 378 /// immediately. Any pending events are completed as though the underlying
350 /// stream had closed. 379 /// stream had closed.
351 /// 380 ///
352 /// The returned future completes with the result of calling 381 /// The returned future completes with the result of calling
353 /// `cancel`. 382 /// `cancel`.
354 /// 383 ///
355 /// After calling `cancel`, no further events can be requested. 384 /// After calling `cancel`, no further events can be requested,
356 /// None of [next], [rest], [skip], [take] or [cancel] may be 385 /// so methods like [next] or [peek] may not be called again.
357 /// called again.
358 Future cancel({bool immediate: false}) { 386 Future cancel({bool immediate: false}) {
359 if (_isClosed) throw _failClosed(); 387 if (_isClosed) throw _failClosed();
360 _isClosed = true; 388 _isClosed = true;
361 389
362 if (!immediate) { 390 if (!immediate) {
363 var request = new _CancelRequest(this); 391 var request = new _CancelRequest(this);
364 _addRequest(request); 392 _addRequest(request);
365 return request.future; 393 return request.future;
366 } 394 }
367 395
(...skipping 317 matching lines...) Expand 10 before | Expand all | Expand 10 after
685 _NextRequest(); 713 _NextRequest();
686 714
687 Future<T> get future => _completer.future; 715 Future<T> get future => _completer.future;
688 716
689 bool update(QueueList<Result<T>> events, bool isDone) { 717 bool update(QueueList<Result<T>> events, bool isDone) {
690 if (events.isNotEmpty) { 718 if (events.isNotEmpty) {
691 events.removeFirst().complete(_completer); 719 events.removeFirst().complete(_completer);
692 return true; 720 return true;
693 } 721 }
694 if (isDone) { 722 if (isDone) {
695 var errorFuture = 723 _completer.completeError(new StateError("No elements"),
696 new Future.sync(() => throw new StateError("No elements")); 724 StackTrace.current);
697 _completer.complete(errorFuture);
698 return true; 725 return true;
699 } 726 }
700 return false; 727 return false;
701 } 728 }
702 } 729 }
703 730
731
732 /// Request for a [StreamQueue.peek] call.
733 ///
734 /// Completes the returned future when receiving the first event,
735 /// and is then complete, but doesn't consume the event.
736 class _PeekRequest<T> implements _EventRequest<T> {
737 /// Completer for the future returned by [StreamQueue.next].
738 final _completer = new Completer<T>();
739
740 _PeekRequest();
741
742 Future<T> get future => _completer.future;
743
744 bool update(QueueList<Result<T>> events, bool isDone) {
745 if (events.isNotEmpty) {
746 events.first.complete(_completer);
747 return true;
748 }
749 if (isDone) {
750 _completer.completeError(new StateError("No elements"),
751 StackTrace.current);
752 return true;
753 }
754 return false;
755 }
756 }
757
758
704 /// Request for a [StreamQueue.skip] call. 759 /// Request for a [StreamQueue.skip] call.
705 class _SkipRequest<T> implements _EventRequest<T> { 760 class _SkipRequest<T> implements _EventRequest<T> {
706 /// Completer for the future returned by the skip call. 761 /// Completer for the future returned by the skip call.
707 final _completer = new Completer<int>(); 762 final _completer = new Completer<int>();
708 763
709 /// Number of remaining events to skip. 764 /// Number of remaining events to skip.
710 /// 765 ///
711 /// The request [isComplete] when the values reaches zero. 766 /// The request [isComplete] when the values reaches zero.
712 /// 767 ///
713 /// Decremented when an event is seen. 768 /// Decremented when an event is seen.
(...skipping 17 matching lines...) Expand all
731 if (event.isError) { 786 if (event.isError) {
732 _completer.completeError(event.asError.error, event.asError.stackTrace); 787 _completer.completeError(event.asError.error, event.asError.stackTrace);
733 return true; 788 return true;
734 } 789 }
735 } 790 }
736 _completer.complete(_eventsToSkip); 791 _completer.complete(_eventsToSkip);
737 return true; 792 return true;
738 } 793 }
739 } 794 }
740 795
741 /// Request for a [StreamQueue.take] call. 796 /// Common superclass for [_TakeRequest] and [_LookAheadRequest].
742 class _TakeRequest<T> implements _EventRequest<T> { 797 abstract class _ListRequest<T> implements _EventRequest<T> {
743 /// Completer for the future returned by the take call. 798 /// Completer for the future returned by the take call.
744 final _completer = new Completer<List<T>>(); 799 final _completer = new Completer<List<T>>();
745 800
746 /// List collecting events until enough have been seen. 801 /// List collecting events until enough have been seen.
747 final _list = <T>[]; 802 final _list = <T>[];
748 803
749 /// Number of events to capture. 804 /// Number of events to capture.
750 /// 805 ///
751 /// The request [isComplete] when the length of [_list] reaches 806 /// The request [isComplete] when the length of [_list] reaches
752 /// this value. 807 /// this value.
753 final int _eventsToTake; 808 final int _eventsToTake;
754 809
755 _TakeRequest(this._eventsToTake); 810 _ListRequest(this._eventsToTake);
756 811
757 /// The future completed when the correct number of events have been captured. 812 /// The future completed when the correct number of events have been captured.
758 Future<List<T>> get future => _completer.future; 813 Future<List<T>> get future => _completer.future;
814 }
815
816
817 /// Request for a [StreamQueue.take] call.
818 class _TakeRequest<T> extends _ListRequest<T> {
819 _TakeRequest(int eventsToTake) : super(eventsToTake);
759 820
760 bool update(QueueList<Result<T>> events, bool isDone) { 821 bool update(QueueList<Result<T>> events, bool isDone) {
761 while (_list.length < _eventsToTake) { 822 while (_list.length < _eventsToTake) {
762 if (events.isEmpty) { 823 if (events.isEmpty) {
763 if (isDone) break; 824 if (isDone) break;
764 return false; 825 return false;
765 } 826 }
766 827
767 var event = events.removeFirst(); 828 var event = events.removeFirst();
768 if (event.isError) { 829 if (event.isError) {
769 _completer.completeError(event.asError.error, event.asError.stackTrace); 830 event.asError.complete(_completer);
770 return true; 831 return true;
771 } 832 }
772 _list.add(event.asValue.value); 833 _list.add(event.asValue.value);
773 } 834 }
774 _completer.complete(_list); 835 _completer.complete(_list);
775 return true; 836 return true;
776 } 837 }
777 } 838 }
778 839
840
841 /// Request for a [StreamQueue.lookAhead] call.
842 class _LookAheadRequest<T> extends _ListRequest<T> {
843 _LookAheadRequest(int eventsToTake) : super(eventsToTake);
844
845 bool update(QueueList<Result<T>> events, bool isDone) {
846 while (_list.length < _eventsToTake) {
847 if (events.length == _list.length) {
848 if (isDone) break;
849 return false;
850 }
851 var event = events.elementAt(_list.length);
852 if (event.isError) {
853 event.asError.complete(_completer);
854 return true;
855 }
856 _list.add(event.asValue.value);
857 }
858 _completer.complete(_list);
859 return true;
860 }
861 }
862
863
779 /// Request for a [StreamQueue.cancel] call. 864 /// Request for a [StreamQueue.cancel] call.
780 /// 865 ///
781 /// The request needs no events, it just waits in the request queue 866 /// The request needs no events, it just waits in the request queue
782 /// until all previous events are fulfilled, then it cancels the stream queue 867 /// until all previous events are fulfilled, then it cancels the stream queue
783 /// source subscription. 868 /// source subscription.
784 class _CancelRequest<T> implements _EventRequest<T> { 869 class _CancelRequest<T> implements _EventRequest<T> {
785 /// Completer for the future returned by the `cancel` call. 870 /// Completer for the future returned by the `cancel` call.
786 final _completer = new Completer(); 871 final _completer = new Completer();
787 /// 872 ///
788 /// When the event is completed, it needs to cancel the active subscription 873 /// When the event is completed, it needs to cancel the active subscription
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
893 } 978 }
894 979
895 bool update(QueueList<Result<T>> events, bool isDone) { 980 bool update(QueueList<Result<T>> events, bool isDone) {
896 while (_eventsSent < events.length) { 981 while (_eventsSent < events.length) {
897 events[_eventsSent++].addTo(_controller); 982 events[_eventsSent++].addTo(_controller);
898 } 983 }
899 if (isDone && !_controller.isClosed) _controller.close(); 984 if (isDone && !_controller.isClosed) _controller.close();
900 return false; 985 return false;
901 } 986 }
902 } 987 }
OLDNEW
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698