Index: lib/src/stream_queue.dart |
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
index 7b73eace731963b8187b3a2f0f54e12646cd5e5c..092a58ab893a626910f2de8e4b8db397e97393c7 100644 |
--- a/lib/src/stream_queue.dart |
+++ b/lib/src/stream_queue.dart |
@@ -131,6 +131,22 @@ abstract class StreamQueue<T> { |
throw _failClosed(); |
} |
+ |
+ /// Returns the next [count] data events without consuming them. |
+ /// |
+ /// Works like [take] except that the events are left in the queue. |
+ /// If one of the next [count] events is an error, the returned future |
+ /// completes with this error, and the error is still left in the queue. |
+ Future<List<T>> lookAhead(int count) { |
+ RangeError.checkNotNegative(count, "count"); |
+ if (!_isClosed) { |
+ var request = new _LookAheadRequest<T>(count); |
+ _addRequest(request); |
+ return request.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
/// Requests the next (yet unrequested) event from the stream. |
/// |
/// When the requested event arrives, the returned future is completed with |
@@ -154,6 +170,19 @@ abstract class StreamQueue<T> { |
throw _failClosed(); |
} |
+ /// Returns the next (yet unrequested) event from the stream. |
+ /// |
+ /// Like [next] except that the event is not consumed. |
+ /// If the next event is an error event, it also stays in the queue. |
+ Future<T> get peek { |
+ if (!_isClosed) { |
+ var nextRequest = new _PeekRequest<T>(); |
+ _addRequest(nextRequest); |
+ return nextRequest.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
/// Returns a stream of all the remaning events of the source stream. |
/// |
/// All requested [next], [skip] or [take] operations are completed |
@@ -189,7 +218,7 @@ abstract class StreamQueue<T> { |
/// then all events were succssfully skipped. If the value |
/// is greater than zero then the stream ended early. |
Future<int> skip(int count) { |
- if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
+ RangeError.checkNotNegative(count, "count"); |
if (!_isClosed) { |
var request = new _SkipRequest(count); |
_addRequest(request); |
@@ -214,7 +243,7 @@ abstract class StreamQueue<T> { |
/// of data collected so far. That is, the returned |
/// list may have fewer than [count] elements. |
Future<List<T>> take(int count) { |
- if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
+ RangeError.checkNotNegative(count, "count"); |
if (!_isClosed) { |
var request = new _TakeRequest<T>(count); |
_addRequest(request); |
@@ -352,9 +381,8 @@ abstract class StreamQueue<T> { |
/// The returned future completes with the result of calling |
/// `cancel`. |
/// |
- /// After calling `cancel`, no further events can be requested. |
- /// None of [next], [rest], [skip], [take] or [cancel] may be |
- /// called again. |
+ /// After calling `cancel`, no further events can be requested, |
+ /// so methods like [next] or [peek] may not be called again. |
Future cancel({bool immediate: false}) { |
if (_isClosed) throw _failClosed(); |
_isClosed = true; |
@@ -692,15 +720,42 @@ class _NextRequest<T> implements _EventRequest<T> { |
return true; |
} |
if (isDone) { |
- var errorFuture = |
- new Future.sync(() => throw new StateError("No elements")); |
- _completer.complete(errorFuture); |
+ _completer.completeError(new StateError("No elements"), |
+ StackTrace.current); |
+ return true; |
+ } |
+ return false; |
+ } |
+} |
+ |
+ |
+/// Request for a [StreamQueue.peek] call. |
+/// |
+/// Completes the returned future when receiving the first event, |
+/// and is then complete, but doesn't consume the event. |
+class _PeekRequest<T> implements _EventRequest<T> { |
+ /// Completer for the future returned by [StreamQueue.next]. |
+ final _completer = new Completer<T>(); |
+ |
+ _PeekRequest(); |
+ |
+ Future<T> get future => _completer.future; |
+ |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
+ if (events.isNotEmpty) { |
+ events.first.complete(_completer); |
+ return true; |
+ } |
+ if (isDone) { |
+ _completer.completeError(new StateError("No elements"), |
+ StackTrace.current); |
return true; |
} |
return false; |
} |
} |
+ |
/// Request for a [StreamQueue.skip] call. |
class _SkipRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the skip call. |
@@ -738,8 +793,8 @@ class _SkipRequest<T> implements _EventRequest<T> { |
} |
} |
-/// Request for a [StreamQueue.take] call. |
-class _TakeRequest<T> implements _EventRequest<T> { |
+/// Common superclass for [_TakeRequest] and [_LookAheadRequest]. |
+abstract class _ListRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the take call. |
final _completer = new Completer<List<T>>(); |
@@ -752,10 +807,16 @@ class _TakeRequest<T> implements _EventRequest<T> { |
/// this value. |
final int _eventsToTake; |
- _TakeRequest(this._eventsToTake); |
+ _ListRequest(this._eventsToTake); |
/// The future completed when the correct number of events have been captured. |
Future<List<T>> get future => _completer.future; |
+} |
+ |
+ |
+/// Request for a [StreamQueue.take] call. |
+class _TakeRequest<T> extends _ListRequest<T> { |
+ _TakeRequest(int eventsToTake) : super(eventsToTake); |
bool update(QueueList<Result<T>> events, bool isDone) { |
while (_list.length < _eventsToTake) { |
@@ -766,7 +827,7 @@ class _TakeRequest<T> implements _EventRequest<T> { |
var event = events.removeFirst(); |
if (event.isError) { |
- _completer.completeError(event.asError.error, event.asError.stackTrace); |
+ event.asError.complete(_completer); |
return true; |
} |
_list.add(event.asValue.value); |
@@ -776,6 +837,30 @@ class _TakeRequest<T> implements _EventRequest<T> { |
} |
} |
+ |
+/// Request for a [StreamQueue.lookAhead] call. |
+class _LookAheadRequest<T> extends _ListRequest<T> { |
+ _LookAheadRequest(int eventsToTake) : super(eventsToTake); |
+ |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
+ while (_list.length < _eventsToTake) { |
+ if (events.length == _list.length) { |
+ if (isDone) break; |
+ return false; |
+ } |
+ var event = events.elementAt(_list.length); |
+ if (event.isError) { |
+ event.asError.complete(_completer); |
+ return true; |
+ } |
+ _list.add(event.asValue.value); |
+ } |
+ _completer.complete(_list); |
+ return true; |
+ } |
+} |
+ |
+ |
/// Request for a [StreamQueue.cancel] call. |
/// |
/// The request needs no events, it just waits in the request queue |