| Index: lib/src/stream_queue.dart
|
| diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
|
| index 14eb7a0a66f6b29e48b2da2bed9010c235561fd4..756f08312bbfa8a49f30392f5565c690b27f2257 100644
|
| --- a/lib/src/stream_queue.dart
|
| +++ b/lib/src/stream_queue.dart
|
| @@ -131,7 +131,6 @@ abstract class StreamQueue<T> {
|
| throw _failClosed();
|
| }
|
|
|
| -
|
| /// Look at the next [count] data events without consuming them.
|
| ///
|
| /// Works like [take] except that the events are left in the queue.
|
| @@ -353,10 +352,10 @@ abstract class StreamQueue<T> {
|
| /// CancelableOperation<String> nextStdinLine() =>
|
| /// _stdinQueue.cancelable((queue) => queue.next);
|
| /// ```
|
| - CancelableOperation/*<S>*/ cancelable/*<S>*/(
|
| - Future/*<S>*/ callback(StreamQueue<T> queue)) {
|
| + CancelableOperation<S> cancelable<S>(
|
| + Future<S> callback(StreamQueue<T> queue)) {
|
| var transaction = startTransaction();
|
| - var completer = new CancelableCompleter/*<S>*/(onCancel: () {
|
| + var completer = new CancelableCompleter<S>(onCancel: () {
|
| transaction.reject();
|
| });
|
|
|
| @@ -494,7 +493,6 @@ abstract class StreamQueue<T> {
|
| }
|
| }
|
|
|
| -
|
| /// The default implementation of [StreamQueue].
|
| ///
|
| /// This queue gets its events from a stream which is listened
|
| @@ -522,18 +520,14 @@ class _StreamQueue<T> extends StreamQueue<T> {
|
| void _ensureListening() {
|
| if (_isDone) return;
|
| if (_subscription == null) {
|
| - _subscription =
|
| - _sourceStream.listen(
|
| - (data) {
|
| - _addResult(new Result.value(data));
|
| - },
|
| - onError: (error, StackTrace stackTrace) {
|
| - _addResult(new Result.error(error, stackTrace));
|
| - },
|
| - onDone: () {
|
| - _subscription = null;
|
| - this._close();
|
| - });
|
| + _subscription = _sourceStream.listen((data) {
|
| + _addResult(new Result.value(data));
|
| + }, onError: (error, StackTrace stackTrace) {
|
| + _addResult(new Result.error(error, stackTrace));
|
| + }, onDone: () {
|
| + _subscription = null;
|
| + this._close();
|
| + });
|
| } else {
|
| _subscription.resume();
|
| }
|
| @@ -649,8 +643,8 @@ class StreamQueueTransaction<T> {
|
| queue._cancel();
|
| }
|
|
|
| - assert((_parent._requestQueue.first as _TransactionRequest)
|
| - .transaction == this);
|
| + assert((_parent._requestQueue.first as _TransactionRequest).transaction ==
|
| + this);
|
| _parent._requestQueue.removeFirst();
|
| _parent._updateRequests();
|
| }
|
| @@ -721,15 +715,14 @@ class _NextRequest<T> implements _EventRequest<T> {
|
| return true;
|
| }
|
| if (isDone) {
|
| - _completer.completeError(new StateError("No elements"),
|
| - StackTrace.current);
|
| + _completer.completeError(
|
| + new StateError("No elements"), StackTrace.current);
|
| return true;
|
| }
|
| return false;
|
| }
|
| }
|
|
|
| -
|
| /// Request for a [StreamQueue.peek] call.
|
| ///
|
| /// Completes the returned future when receiving the first event,
|
| @@ -748,15 +741,14 @@ class _PeekRequest<T> implements _EventRequest<T> {
|
| return true;
|
| }
|
| if (isDone) {
|
| - _completer.completeError(new StateError("No elements"),
|
| - StackTrace.current);
|
| + _completer.completeError(
|
| + new StateError("No elements"), StackTrace.current);
|
| return true;
|
| }
|
| return false;
|
| }
|
| }
|
|
|
| -
|
| /// Request for a [StreamQueue.skip] call.
|
| class _SkipRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by the skip call.
|
| @@ -814,7 +806,6 @@ abstract class _ListRequest<T> implements _EventRequest<T> {
|
| Future<List<T>> get future => _completer.future;
|
| }
|
|
|
| -
|
| /// Request for a [StreamQueue.take] call.
|
| class _TakeRequest<T> extends _ListRequest<T> {
|
| _TakeRequest(int eventsToTake) : super(eventsToTake);
|
| @@ -838,7 +829,6 @@ class _TakeRequest<T> extends _ListRequest<T> {
|
| }
|
| }
|
|
|
| -
|
| /// Request for a [StreamQueue.lookAhead] call.
|
| class _LookAheadRequest<T> extends _ListRequest<T> {
|
| _LookAheadRequest(int eventsToTake) : super(eventsToTake);
|
| @@ -861,7 +851,6 @@ class _LookAheadRequest<T> extends _ListRequest<T> {
|
| }
|
| }
|
|
|
| -
|
| /// Request for a [StreamQueue.cancel] call.
|
| ///
|
| /// The request needs no events, it just waits in the request queue
|
| @@ -870,6 +859,7 @@ class _LookAheadRequest<T> extends _ListRequest<T> {
|
| class _CancelRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by the `cancel` call.
|
| final _completer = new Completer();
|
| +
|
| ///
|
| /// When the event is completed, it needs to cancel the active subscription
|
| /// of the `StreamQueue` object, if any.
|
| @@ -925,8 +915,9 @@ class _RestRequest<T> implements _EventRequest<T> {
|
| for (var event in events) {
|
| event.addTo(controller);
|
| }
|
| - controller.addStream(_streamQueue._extractStream(), cancelOnError: false)
|
| - .whenComplete(controller.close);
|
| + controller
|
| + .addStream(_streamQueue._extractStream(), cancelOnError: false)
|
| + .whenComplete(controller.close);
|
| _completer.setSourceStream(controller.stream);
|
| }
|
| return true;
|
|
|