| Index: lib/src/stream_queue.dart
|
| diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
|
| index 7b73eace731963b8187b3a2f0f54e12646cd5e5c..092a58ab893a626910f2de8e4b8db397e97393c7 100644
|
| --- a/lib/src/stream_queue.dart
|
| +++ b/lib/src/stream_queue.dart
|
| @@ -131,6 +131,22 @@ abstract class StreamQueue<T> {
|
| throw _failClosed();
|
| }
|
|
|
| +
|
| + /// Returns the next [count] data events without consuming them.
|
| + ///
|
| + /// Works like [take] except that the events are left in the queue.
|
| + /// If one of the next [count] events is an error, the returned future
|
| + /// completes with this error, and the error is still left in the queue.
|
| + Future<List<T>> lookAhead(int count) {
|
| + RangeError.checkNotNegative(count, "count");
|
| + if (!_isClosed) {
|
| + var request = new _LookAheadRequest<T>(count);
|
| + _addRequest(request);
|
| + return request.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| /// Requests the next (yet unrequested) event from the stream.
|
| ///
|
| /// When the requested event arrives, the returned future is completed with
|
| @@ -154,6 +170,19 @@ abstract class StreamQueue<T> {
|
| throw _failClosed();
|
| }
|
|
|
| + /// Returns the next (yet unrequested) event from the stream.
|
| + ///
|
| + /// Like [next] except that the event is not consumed.
|
| + /// If the next event is an error event, it also stays in the queue.
|
| + Future<T> get peek {
|
| + if (!_isClosed) {
|
| + var nextRequest = new _PeekRequest<T>();
|
| + _addRequest(nextRequest);
|
| + return nextRequest.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| /// Returns a stream of all the remaning events of the source stream.
|
| ///
|
| /// All requested [next], [skip] or [take] operations are completed
|
| @@ -189,7 +218,7 @@ abstract class StreamQueue<T> {
|
| /// then all events were succssfully skipped. If the value
|
| /// is greater than zero then the stream ended early.
|
| Future<int> skip(int count) {
|
| - if (count < 0) throw new RangeError.range(count, 0, null, "count");
|
| + RangeError.checkNotNegative(count, "count");
|
| if (!_isClosed) {
|
| var request = new _SkipRequest(count);
|
| _addRequest(request);
|
| @@ -214,7 +243,7 @@ abstract class StreamQueue<T> {
|
| /// of data collected so far. That is, the returned
|
| /// list may have fewer than [count] elements.
|
| Future<List<T>> take(int count) {
|
| - if (count < 0) throw new RangeError.range(count, 0, null, "count");
|
| + RangeError.checkNotNegative(count, "count");
|
| if (!_isClosed) {
|
| var request = new _TakeRequest<T>(count);
|
| _addRequest(request);
|
| @@ -352,9 +381,8 @@ abstract class StreamQueue<T> {
|
| /// The returned future completes with the result of calling
|
| /// `cancel`.
|
| ///
|
| - /// After calling `cancel`, no further events can be requested.
|
| - /// None of [next], [rest], [skip], [take] or [cancel] may be
|
| - /// called again.
|
| + /// After calling `cancel`, no further events can be requested,
|
| + /// so methods like [next] or [peek] may not be called again.
|
| Future cancel({bool immediate: false}) {
|
| if (_isClosed) throw _failClosed();
|
| _isClosed = true;
|
| @@ -692,15 +720,42 @@ class _NextRequest<T> implements _EventRequest<T> {
|
| return true;
|
| }
|
| if (isDone) {
|
| - var errorFuture =
|
| - new Future.sync(() => throw new StateError("No elements"));
|
| - _completer.complete(errorFuture);
|
| + _completer.completeError(new StateError("No elements"),
|
| + StackTrace.current);
|
| + return true;
|
| + }
|
| + return false;
|
| + }
|
| +}
|
| +
|
| +
|
| +/// Request for a [StreamQueue.peek] call.
|
| +///
|
| +/// Completes the returned future when receiving the first event,
|
| +/// and is then complete, but doesn't consume the event.
|
| +class _PeekRequest<T> implements _EventRequest<T> {
|
| + /// Completer for the future returned by [StreamQueue.next].
|
| + final _completer = new Completer<T>();
|
| +
|
| + _PeekRequest();
|
| +
|
| + Future<T> get future => _completer.future;
|
| +
|
| + bool update(QueueList<Result<T>> events, bool isDone) {
|
| + if (events.isNotEmpty) {
|
| + events.first.complete(_completer);
|
| + return true;
|
| + }
|
| + if (isDone) {
|
| + _completer.completeError(new StateError("No elements"),
|
| + StackTrace.current);
|
| return true;
|
| }
|
| return false;
|
| }
|
| }
|
|
|
| +
|
| /// Request for a [StreamQueue.skip] call.
|
| class _SkipRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by the skip call.
|
| @@ -738,8 +793,8 @@ class _SkipRequest<T> implements _EventRequest<T> {
|
| }
|
| }
|
|
|
| -/// Request for a [StreamQueue.take] call.
|
| -class _TakeRequest<T> implements _EventRequest<T> {
|
| +/// Common superclass for [_TakeRequest] and [_LookAheadRequest].
|
| +abstract class _ListRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by the take call.
|
| final _completer = new Completer<List<T>>();
|
|
|
| @@ -752,10 +807,16 @@ class _TakeRequest<T> implements _EventRequest<T> {
|
| /// this value.
|
| final int _eventsToTake;
|
|
|
| - _TakeRequest(this._eventsToTake);
|
| + _ListRequest(this._eventsToTake);
|
|
|
| /// The future completed when the correct number of events have been captured.
|
| Future<List<T>> get future => _completer.future;
|
| +}
|
| +
|
| +
|
| +/// Request for a [StreamQueue.take] call.
|
| +class _TakeRequest<T> extends _ListRequest<T> {
|
| + _TakeRequest(int eventsToTake) : super(eventsToTake);
|
|
|
| bool update(QueueList<Result<T>> events, bool isDone) {
|
| while (_list.length < _eventsToTake) {
|
| @@ -766,7 +827,7 @@ class _TakeRequest<T> implements _EventRequest<T> {
|
|
|
| var event = events.removeFirst();
|
| if (event.isError) {
|
| - _completer.completeError(event.asError.error, event.asError.stackTrace);
|
| + event.asError.complete(_completer);
|
| return true;
|
| }
|
| _list.add(event.asValue.value);
|
| @@ -776,6 +837,30 @@ class _TakeRequest<T> implements _EventRequest<T> {
|
| }
|
| }
|
|
|
| +
|
| +/// Request for a [StreamQueue.lookAhead] call.
|
| +class _LookAheadRequest<T> extends _ListRequest<T> {
|
| + _LookAheadRequest(int eventsToTake) : super(eventsToTake);
|
| +
|
| + bool update(QueueList<Result<T>> events, bool isDone) {
|
| + while (_list.length < _eventsToTake) {
|
| + if (events.length == _list.length) {
|
| + if (isDone) break;
|
| + return false;
|
| + }
|
| + var event = events.elementAt(_list.length);
|
| + if (event.isError) {
|
| + event.asError.complete(_completer);
|
| + return true;
|
| + }
|
| + _list.add(event.asValue.value);
|
| + }
|
| + _completer.complete(_list);
|
| + return true;
|
| + }
|
| +}
|
| +
|
| +
|
| /// Request for a [StreamQueue.cancel] call.
|
| ///
|
| /// The request needs no events, it just waits in the request queue
|
|
|