Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(22)

Unified Diff: lib/src/stream_queue.dart

Issue 2649033006: Add `peek` and `lookAhead` to `StreamQueue`. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« CHANGELOG.md ('K') | « CHANGELOG.md ('k') | test/stream_queue_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/stream_queue.dart
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index f770e16e78c5f1e580bc097b58f9b65190ba8ceb..65b5f547d2e283daaa362bd08eb01a6142204329 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -122,6 +122,22 @@ abstract class StreamQueue<T> {
throw _failClosed();
}
+
+ /// Look at the next [count] data events without consuming them.
nweiz 2017/01/24 22:37:59 "Look at" -> "Returns"
Lasse Reichstein Nielsen 2017/01/25 07:58:03 Done.
+ ///
+ /// Works like [take] except that the events are left in the queue.
+ /// If one of the next [count] events is an error, the returned future
+ /// completes with this error, and the error is still left in the queue.
+ Future<List<T>> lookAhead(int count) {
+ if (count < 0) throw new RangeError.range(count, 0, null, "count");
nweiz 2017/01/24 22:37:59 `RangeError.checkNotNegative()`?
Lasse Reichstein Nielsen 2017/01/25 07:58:03 Done.
+ if (!_isClosed) {
+ var request = new _LookAheadRequest<T>(count);
+ _addRequest(request);
+ return request.future;
+ }
+ throw _failClosed();
+ }
nweiz 2017/01/24 22:37:59 Why not just implement these in terms of transacti
Lasse Reichstein Nielsen 2017/01/25 07:58:03 Because it would also be a lot more expensive. I p
nweiz 2017/01/25 22:41:18 Do you have benchmarks where the extra processing
Lasse Reichstein Nielsen 2017/01/26 12:13:43 I've added a benchmark below. It shows that `retur
+
/// Requests the next (yet unrequested) event from the stream.
///
/// When the requested event arrives, the returned future is completed with
@@ -145,6 +161,19 @@ abstract class StreamQueue<T> {
throw _failClosed();
}
+ /// Looks at the next (yet unrequested) event from the stream.
nweiz 2017/01/24 22:37:59 "Looks at" -> "Returns"
Lasse Reichstein Nielsen 2017/01/25 07:58:03 Done.
+ ///
+ /// Like [next] except that the event is not consumed.
+ /// If the next event is an error event, it stays in the queue.
+ Future<T> get peek {
+ if (!_isClosed) {
+ var nextRequest = new _PeekRequest<T>();
+ _addRequest(nextRequest);
+ return nextRequest.future;
+ }
+ throw _failClosed();
+ }
+
/// Returns a stream of all the remaning events of the source stream.
///
/// All requested [next], [skip] or [take] operations are completed
@@ -344,8 +373,8 @@ abstract class StreamQueue<T> {
/// `cancel`.
///
/// After calling `cancel`, no further events can be requested.
- /// None of [next], [rest], [skip], [take] or [cancel] may be
- /// called again.
+ /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
+ /// may be called again.
nweiz 2017/01/24 22:37:59 Is it really necessary to explicitly list all the
Lasse Reichstein Nielsen 2017/01/25 07:58:03 It seemed like a great idea at the time ... but it
Future cancel({bool immediate: false}) {
if (_isClosed) throw _failClosed();
_isClosed = true;
@@ -703,15 +732,42 @@ class _NextRequest<T> implements _EventRequest<T> {
return true;
}
if (isDone) {
- var errorFuture =
- new Future.sync(() => throw new StateError("No elements"));
- _completer.complete(errorFuture);
+ _completer.completeError(new StateError("No elements"),
+ StackTrace.current);
return true;
}
return false;
}
}
+
+/// Request for a [StreamQueue.peek] call.
+///
+/// Completes the returned future when receiving the first event,
+/// and is then complete, but doesn't consume the event.
+class _PeekRequest<T> implements _EventRequest<T> {
+ /// Completer for the future returned by [StreamQueue.next].
+ final _completer = new Completer<T>();
+
+ _PeekRequest();
+
+ Future<T> get future => _completer.future;
+
+ bool update(QueueList<Result<T>> events, bool isDone) {
+ if (events.isNotEmpty) {
+ events.first.complete(_completer);
+ return true;
+ }
+ if (isDone) {
+ _completer.completeError(new StateError("No elements"),
+ StackTrace.current);
+ return true;
+ }
+ return false;
+ }
+}
+
+
/// Request for a [StreamQueue.skip] call.
class _SkipRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the skip call.
@@ -749,8 +805,8 @@ class _SkipRequest<T> implements _EventRequest<T> {
}
}
-/// Request for a [StreamQueue.take] call.
-class _TakeRequest<T> implements _EventRequest<T> {
+/// Common superclass for [_TakeRequest] and [_LookAheadRequest].
+abstract class _ListRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the take call.
final _completer = new Completer<List<T>>();
@@ -763,10 +819,16 @@ class _TakeRequest<T> implements _EventRequest<T> {
/// this value.
final int _eventsToTake;
- _TakeRequest(this._eventsToTake);
+ _ListRequest(this._eventsToTake);
/// The future completed when the correct number of events have been captured.
Future<List<T>> get future => _completer.future;
+}
+
+
+/// Request for a [StreamQueue.take] call.
+class _TakeRequest<T> extends _ListRequest<T> {
+ _TakeRequest(int eventsToTake) : super(eventsToTake);
bool update(QueueList<Result<T>> events, bool isDone) {
while (_list.length < _eventsToTake) {
@@ -777,7 +839,7 @@ class _TakeRequest<T> implements _EventRequest<T> {
var event = events.removeFirst();
if (event.isError) {
- _completer.completeError(event.asError.error, event.asError.stackTrace);
+ event.asError.complete(_completer);
return true;
}
_list.add(event.asValue.value);
@@ -787,6 +849,30 @@ class _TakeRequest<T> implements _EventRequest<T> {
}
}
+
+/// Request for a [StreamQueue.lookAhead] call.
+class _LookAheadRequest<T> extends _ListRequest<T> {
+ _LookAheadRequest(int eventsToTake) : super(eventsToTake);
+
+ bool update(QueueList<Result<T>> events, bool isDone) {
+ while (_list.length < _eventsToTake) {
+ if (events.length == _list.length) {
+ if (isDone) break;
+ return false;
+ }
+ var event = events.elementAt(_list.length);
+ if (event.isError) {
+ event.asError.complete(_completer);
+ return true;
+ }
+ _list.add(event.asValue.value);
+ }
+ _completer.complete(_list);
+ return true;
+ }
+}
+
+
/// Request for a [StreamQueue.cancel] call.
///
/// The request needs no events, it just waits in the request queue
« CHANGELOG.md ('K') | « CHANGELOG.md ('k') | test/stream_queue_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698