Index: lib/src/stream_queue.dart |
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
index f770e16e78c5f1e580bc097b58f9b65190ba8ceb..65b5f547d2e283daaa362bd08eb01a6142204329 100644 |
--- a/lib/src/stream_queue.dart |
+++ b/lib/src/stream_queue.dart |
@@ -122,6 +122,22 @@ abstract class StreamQueue<T> { |
throw _failClosed(); |
} |
+ |
+ /// Look at the next [count] data events without consuming them. |
nweiz
2017/01/24 22:37:59
"Look at" -> "Returns"
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
|
+ /// |
+ /// Works like [take] except that the events are left in the queue. |
+ /// If one of the next [count] events is an error, the returned future |
+ /// completes with this error, and the error is still left in the queue. |
+ Future<List<T>> lookAhead(int count) { |
+ if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
nweiz
2017/01/24 22:37:59
`RangeError.checkNotNegative()`?
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
|
+ if (!_isClosed) { |
+ var request = new _LookAheadRequest<T>(count); |
+ _addRequest(request); |
+ return request.future; |
+ } |
+ throw _failClosed(); |
+ } |
nweiz
2017/01/24 22:37:59
Why not just implement these in terms of transacti
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Because it would also be a lot more expensive.
I p
nweiz
2017/01/25 22:41:18
Do you have benchmarks where the extra processing
Lasse Reichstein Nielsen
2017/01/26 12:13:43
I've added a benchmark below. It shows that `retur
|
+ |
/// Requests the next (yet unrequested) event from the stream. |
/// |
/// When the requested event arrives, the returned future is completed with |
@@ -145,6 +161,19 @@ abstract class StreamQueue<T> { |
throw _failClosed(); |
} |
+ /// Looks at the next (yet unrequested) event from the stream. |
nweiz
2017/01/24 22:37:59
"Looks at" -> "Returns"
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
|
+ /// |
+ /// Like [next] except that the event is not consumed. |
+ /// If the next event is an error event, it stays in the queue. |
+ Future<T> get peek { |
+ if (!_isClosed) { |
+ var nextRequest = new _PeekRequest<T>(); |
+ _addRequest(nextRequest); |
+ return nextRequest.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
/// Returns a stream of all the remaning events of the source stream. |
/// |
/// All requested [next], [skip] or [take] operations are completed |
@@ -344,8 +373,8 @@ abstract class StreamQueue<T> { |
/// `cancel`. |
/// |
/// After calling `cancel`, no further events can be requested. |
- /// None of [next], [rest], [skip], [take] or [cancel] may be |
- /// called again. |
+ /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel] |
+ /// may be called again. |
nweiz
2017/01/24 22:37:59
Is it really necessary to explicitly list all the
Lasse Reichstein Nielsen
2017/01/25 07:58:03
It seemed like a great idea at the time ... but it
|
Future cancel({bool immediate: false}) { |
if (_isClosed) throw _failClosed(); |
_isClosed = true; |
@@ -703,15 +732,42 @@ class _NextRequest<T> implements _EventRequest<T> { |
return true; |
} |
if (isDone) { |
- var errorFuture = |
- new Future.sync(() => throw new StateError("No elements")); |
- _completer.complete(errorFuture); |
+ _completer.completeError(new StateError("No elements"), |
+ StackTrace.current); |
return true; |
} |
return false; |
} |
} |
+ |
+/// Request for a [StreamQueue.peek] call. |
+/// |
+/// Completes the returned future when receiving the first event, |
+/// and is then complete, but doesn't consume the event. |
+class _PeekRequest<T> implements _EventRequest<T> { |
+ /// Completer for the future returned by [StreamQueue.next]. |
+ final _completer = new Completer<T>(); |
+ |
+ _PeekRequest(); |
+ |
+ Future<T> get future => _completer.future; |
+ |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
+ if (events.isNotEmpty) { |
+ events.first.complete(_completer); |
+ return true; |
+ } |
+ if (isDone) { |
+ _completer.completeError(new StateError("No elements"), |
+ StackTrace.current); |
+ return true; |
+ } |
+ return false; |
+ } |
+} |
+ |
+ |
/// Request for a [StreamQueue.skip] call. |
class _SkipRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the skip call. |
@@ -749,8 +805,8 @@ class _SkipRequest<T> implements _EventRequest<T> { |
} |
} |
-/// Request for a [StreamQueue.take] call. |
-class _TakeRequest<T> implements _EventRequest<T> { |
+/// Common superclass for [_TakeRequest] and [_LookAheadRequest]. |
+abstract class _ListRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the take call. |
final _completer = new Completer<List<T>>(); |
@@ -763,10 +819,16 @@ class _TakeRequest<T> implements _EventRequest<T> { |
/// this value. |
final int _eventsToTake; |
- _TakeRequest(this._eventsToTake); |
+ _ListRequest(this._eventsToTake); |
/// The future completed when the correct number of events have been captured. |
Future<List<T>> get future => _completer.future; |
+} |
+ |
+ |
+/// Request for a [StreamQueue.take] call. |
+class _TakeRequest<T> extends _ListRequest<T> { |
+ _TakeRequest(int eventsToTake) : super(eventsToTake); |
bool update(QueueList<Result<T>> events, bool isDone) { |
while (_list.length < _eventsToTake) { |
@@ -777,7 +839,7 @@ class _TakeRequest<T> implements _EventRequest<T> { |
var event = events.removeFirst(); |
if (event.isError) { |
- _completer.completeError(event.asError.error, event.asError.stackTrace); |
+ event.asError.complete(_completer); |
return true; |
} |
_list.add(event.asValue.value); |
@@ -787,6 +849,30 @@ class _TakeRequest<T> implements _EventRequest<T> { |
} |
} |
+ |
+/// Request for a [StreamQueue.lookAhead] call. |
+class _LookAheadRequest<T> extends _ListRequest<T> { |
+ _LookAheadRequest(int eventsToTake) : super(eventsToTake); |
+ |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
+ while (_list.length < _eventsToTake) { |
+ if (events.length == _list.length) { |
+ if (isDone) break; |
+ return false; |
+ } |
+ var event = events.elementAt(_list.length); |
+ if (event.isError) { |
+ event.asError.complete(_completer); |
+ return true; |
+ } |
+ _list.add(event.asValue.value); |
+ } |
+ _completer.complete(_list); |
+ return true; |
+ } |
+} |
+ |
+ |
/// Request for a [StreamQueue.cancel] call. |
/// |
/// The request needs no events, it just waits in the request queue |