Chromium Code Reviews| Index: lib/src/stream_queue.dart |
| diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
| index f770e16e78c5f1e580bc097b58f9b65190ba8ceb..65b5f547d2e283daaa362bd08eb01a6142204329 100644 |
| --- a/lib/src/stream_queue.dart |
| +++ b/lib/src/stream_queue.dart |
| @@ -122,6 +122,22 @@ abstract class StreamQueue<T> { |
| throw _failClosed(); |
| } |
| + |
| + /// Look at the next [count] data events without consuming them. |
|
nweiz
2017/01/24 22:37:59
"Look at" -> "Returns"
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
|
| + /// |
| + /// Works like [take] except that the events are left in the queue. |
| + /// If one of the next [count] events is an error, the returned future |
| + /// completes with this error, and the error is still left in the queue. |
| + Future<List<T>> lookAhead(int count) { |
| + if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
|
nweiz
2017/01/24 22:37:59
`RangeError.checkNotNegative()`?
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
|
| + if (!_isClosed) { |
| + var request = new _LookAheadRequest<T>(count); |
| + _addRequest(request); |
| + return request.future; |
| + } |
| + throw _failClosed(); |
| + } |
|
nweiz
2017/01/24 22:37:59
Why not just implement these in terms of transacti
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Because it would also be a lot more expensive.
I p
nweiz
2017/01/25 22:41:18
Do you have benchmarks where the extra processing
Lasse Reichstein Nielsen
2017/01/26 12:13:43
I've added a benchmark below. It shows that `retur
|
| + |
| /// Requests the next (yet unrequested) event from the stream. |
| /// |
| /// When the requested event arrives, the returned future is completed with |
| @@ -145,6 +161,19 @@ abstract class StreamQueue<T> { |
| throw _failClosed(); |
| } |
| + /// Looks at the next (yet unrequested) event from the stream. |
|
nweiz
2017/01/24 22:37:59
"Looks at" -> "Returns"
Lasse Reichstein Nielsen
2017/01/25 07:58:03
Done.
|
| + /// |
| + /// Like [next] except that the event is not consumed. |
| + /// If the next event is an error event, it stays in the queue. |
| + Future<T> get peek { |
| + if (!_isClosed) { |
| + var nextRequest = new _PeekRequest<T>(); |
| + _addRequest(nextRequest); |
| + return nextRequest.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| /// Returns a stream of all the remaning events of the source stream. |
| /// |
| /// All requested [next], [skip] or [take] operations are completed |
| @@ -344,8 +373,8 @@ abstract class StreamQueue<T> { |
| /// `cancel`. |
| /// |
| /// After calling `cancel`, no further events can be requested. |
| - /// None of [next], [rest], [skip], [take] or [cancel] may be |
| - /// called again. |
| + /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel] |
| + /// may be called again. |
|
nweiz
2017/01/24 22:37:59
Is it really necessary to explicitly list all the
Lasse Reichstein Nielsen
2017/01/25 07:58:03
It seemed like a great idea at the time ... but it
|
| Future cancel({bool immediate: false}) { |
| if (_isClosed) throw _failClosed(); |
| _isClosed = true; |
| @@ -703,15 +732,42 @@ class _NextRequest<T> implements _EventRequest<T> { |
| return true; |
| } |
| if (isDone) { |
| - var errorFuture = |
| - new Future.sync(() => throw new StateError("No elements")); |
| - _completer.complete(errorFuture); |
| + _completer.completeError(new StateError("No elements"), |
| + StackTrace.current); |
| return true; |
| } |
| return false; |
| } |
| } |
| + |
| +/// Request for a [StreamQueue.peek] call. |
| +/// |
| +/// Completes the returned future when receiving the first event, |
| +/// and is then complete, but doesn't consume the event. |
| +class _PeekRequest<T> implements _EventRequest<T> { |
| + /// Completer for the future returned by [StreamQueue.next]. |
| + final _completer = new Completer<T>(); |
| + |
| + _PeekRequest(); |
| + |
| + Future<T> get future => _completer.future; |
| + |
| + bool update(QueueList<Result<T>> events, bool isDone) { |
| + if (events.isNotEmpty) { |
| + events.first.complete(_completer); |
| + return true; |
| + } |
| + if (isDone) { |
| + _completer.completeError(new StateError("No elements"), |
| + StackTrace.current); |
| + return true; |
| + } |
| + return false; |
| + } |
| +} |
| + |
| + |
| /// Request for a [StreamQueue.skip] call. |
| class _SkipRequest<T> implements _EventRequest<T> { |
| /// Completer for the future returned by the skip call. |
| @@ -749,8 +805,8 @@ class _SkipRequest<T> implements _EventRequest<T> { |
| } |
| } |
| -/// Request for a [StreamQueue.take] call. |
| -class _TakeRequest<T> implements _EventRequest<T> { |
| +/// Common superclass for [_TakeRequest] and [_LookAheadRequest]. |
| +abstract class _ListRequest<T> implements _EventRequest<T> { |
| /// Completer for the future returned by the take call. |
| final _completer = new Completer<List<T>>(); |
| @@ -763,10 +819,16 @@ class _TakeRequest<T> implements _EventRequest<T> { |
| /// this value. |
| final int _eventsToTake; |
| - _TakeRequest(this._eventsToTake); |
| + _ListRequest(this._eventsToTake); |
| /// The future completed when the correct number of events have been captured. |
| Future<List<T>> get future => _completer.future; |
| +} |
| + |
| + |
| +/// Request for a [StreamQueue.take] call. |
| +class _TakeRequest<T> extends _ListRequest<T> { |
| + _TakeRequest(int eventsToTake) : super(eventsToTake); |
| bool update(QueueList<Result<T>> events, bool isDone) { |
| while (_list.length < _eventsToTake) { |
| @@ -777,7 +839,7 @@ class _TakeRequest<T> implements _EventRequest<T> { |
| var event = events.removeFirst(); |
| if (event.isError) { |
| - _completer.completeError(event.asError.error, event.asError.stackTrace); |
| + event.asError.complete(_completer); |
| return true; |
| } |
| _list.add(event.asValue.value); |
| @@ -787,6 +849,30 @@ class _TakeRequest<T> implements _EventRequest<T> { |
| } |
| } |
| + |
| +/// Request for a [StreamQueue.lookAhead] call. |
| +class _LookAheadRequest<T> extends _ListRequest<T> { |
| + _LookAheadRequest(int eventsToTake) : super(eventsToTake); |
| + |
| + bool update(QueueList<Result<T>> events, bool isDone) { |
| + while (_list.length < _eventsToTake) { |
| + if (events.length == _list.length) { |
| + if (isDone) break; |
| + return false; |
| + } |
| + var event = events.elementAt(_list.length); |
| + if (event.isError) { |
| + event.asError.complete(_completer); |
| + return true; |
| + } |
| + _list.add(event.asValue.value); |
| + } |
| + _completer.complete(_list); |
| + return true; |
| + } |
| +} |
| + |
| + |
| /// Request for a [StreamQueue.cancel] call. |
| /// |
| /// The request needs no events, it just waits in the request queue |