Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(74)

Unified Diff: lib/src/stream_queue.dart

Issue 2649033006: Add `peek` and `lookAhead` to `StreamQueue`. (Closed)
Patch Set: Address comments. Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/stream_queue.dart
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 7b73eace731963b8187b3a2f0f54e12646cd5e5c..092a58ab893a626910f2de8e4b8db397e97393c7 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -131,6 +131,22 @@ abstract class StreamQueue<T> {
throw _failClosed();
}
+
+ /// Returns the next [count] data events without consuming them.
+ ///
+ /// Works like [take] except that the events are left in the queue.
+ /// If one of the next [count] events is an error, the returned future
+ /// completes with this error, and the error is still left in the queue.
+ Future<List<T>> lookAhead(int count) {
+ RangeError.checkNotNegative(count, "count");
+ if (!_isClosed) {
+ var request = new _LookAheadRequest<T>(count);
+ _addRequest(request);
+ return request.future;
+ }
+ throw _failClosed();
+ }
+
/// Requests the next (yet unrequested) event from the stream.
///
/// When the requested event arrives, the returned future is completed with
@@ -154,6 +170,19 @@ abstract class StreamQueue<T> {
throw _failClosed();
}
+ /// Returns the next (yet unrequested) event from the stream.
+ ///
+ /// Like [next] except that the event is not consumed.
+ /// If the next event is an error event, it also stays in the queue.
+ Future<T> get peek {
+ if (!_isClosed) {
+ var nextRequest = new _PeekRequest<T>();
+ _addRequest(nextRequest);
+ return nextRequest.future;
+ }
+ throw _failClosed();
+ }
+
/// Returns a stream of all the remaning events of the source stream.
///
/// All requested [next], [skip] or [take] operations are completed
@@ -189,7 +218,7 @@ abstract class StreamQueue<T> {
/// then all events were succssfully skipped. If the value
/// is greater than zero then the stream ended early.
Future<int> skip(int count) {
- if (count < 0) throw new RangeError.range(count, 0, null, "count");
+ RangeError.checkNotNegative(count, "count");
if (!_isClosed) {
var request = new _SkipRequest(count);
_addRequest(request);
@@ -214,7 +243,7 @@ abstract class StreamQueue<T> {
/// of data collected so far. That is, the returned
/// list may have fewer than [count] elements.
Future<List<T>> take(int count) {
- if (count < 0) throw new RangeError.range(count, 0, null, "count");
+ RangeError.checkNotNegative(count, "count");
if (!_isClosed) {
var request = new _TakeRequest<T>(count);
_addRequest(request);
@@ -352,9 +381,8 @@ abstract class StreamQueue<T> {
/// The returned future completes with the result of calling
/// `cancel`.
///
- /// After calling `cancel`, no further events can be requested.
- /// None of [next], [rest], [skip], [take] or [cancel] may be
- /// called again.
+ /// After calling `cancel`, no further events can be requested,
+ /// so methods like [next] or [peek] may not be called again.
Future cancel({bool immediate: false}) {
if (_isClosed) throw _failClosed();
_isClosed = true;
@@ -692,15 +720,42 @@ class _NextRequest<T> implements _EventRequest<T> {
return true;
}
if (isDone) {
- var errorFuture =
- new Future.sync(() => throw new StateError("No elements"));
- _completer.complete(errorFuture);
+ _completer.completeError(new StateError("No elements"),
+ StackTrace.current);
+ return true;
+ }
+ return false;
+ }
+}
+
+
+/// Request for a [StreamQueue.peek] call.
+///
+/// Completes the returned future when receiving the first event,
+/// and is then complete, but doesn't consume the event.
+class _PeekRequest<T> implements _EventRequest<T> {
+ /// Completer for the future returned by [StreamQueue.next].
+ final _completer = new Completer<T>();
+
+ _PeekRequest();
+
+ Future<T> get future => _completer.future;
+
+ bool update(QueueList<Result<T>> events, bool isDone) {
+ if (events.isNotEmpty) {
+ events.first.complete(_completer);
+ return true;
+ }
+ if (isDone) {
+ _completer.completeError(new StateError("No elements"),
+ StackTrace.current);
return true;
}
return false;
}
}
+
/// Request for a [StreamQueue.skip] call.
class _SkipRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the skip call.
@@ -738,8 +793,8 @@ class _SkipRequest<T> implements _EventRequest<T> {
}
}
-/// Request for a [StreamQueue.take] call.
-class _TakeRequest<T> implements _EventRequest<T> {
+/// Common superclass for [_TakeRequest] and [_LookAheadRequest].
+abstract class _ListRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the take call.
final _completer = new Completer<List<T>>();
@@ -752,10 +807,16 @@ class _TakeRequest<T> implements _EventRequest<T> {
/// this value.
final int _eventsToTake;
- _TakeRequest(this._eventsToTake);
+ _ListRequest(this._eventsToTake);
/// The future completed when the correct number of events have been captured.
Future<List<T>> get future => _completer.future;
+}
+
+
+/// Request for a [StreamQueue.take] call.
+class _TakeRequest<T> extends _ListRequest<T> {
+ _TakeRequest(int eventsToTake) : super(eventsToTake);
bool update(QueueList<Result<T>> events, bool isDone) {
while (_list.length < _eventsToTake) {
@@ -766,7 +827,7 @@ class _TakeRequest<T> implements _EventRequest<T> {
var event = events.removeFirst();
if (event.isError) {
- _completer.completeError(event.asError.error, event.asError.stackTrace);
+ event.asError.complete(_completer);
return true;
}
_list.add(event.asValue.value);
@@ -776,6 +837,30 @@ class _TakeRequest<T> implements _EventRequest<T> {
}
}
+
+/// Request for a [StreamQueue.lookAhead] call.
+class _LookAheadRequest<T> extends _ListRequest<T> {
+ _LookAheadRequest(int eventsToTake) : super(eventsToTake);
+
+ bool update(QueueList<Result<T>> events, bool isDone) {
+ while (_list.length < _eventsToTake) {
+ if (events.length == _list.length) {
+ if (isDone) break;
+ return false;
+ }
+ var event = events.elementAt(_list.length);
+ if (event.isError) {
+ event.asError.complete(_completer);
+ return true;
+ }
+ _list.add(event.asValue.value);
+ }
+ _completer.complete(_list);
+ return true;
+ }
+}
+
+
/// Request for a [StreamQueue.cancel] call.
///
/// The request needs no events, it just waits in the request queue
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698