Index: lib/src/stream_queue.dart |
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
index 14eb7a0a66f6b29e48b2da2bed9010c235561fd4..756f08312bbfa8a49f30392f5565c690b27f2257 100644 |
--- a/lib/src/stream_queue.dart |
+++ b/lib/src/stream_queue.dart |
@@ -131,7 +131,6 @@ abstract class StreamQueue<T> { |
throw _failClosed(); |
} |
- |
/// Look at the next [count] data events without consuming them. |
/// |
/// Works like [take] except that the events are left in the queue. |
@@ -353,10 +352,10 @@ abstract class StreamQueue<T> { |
/// CancelableOperation<String> nextStdinLine() => |
/// _stdinQueue.cancelable((queue) => queue.next); |
/// ``` |
- CancelableOperation/*<S>*/ cancelable/*<S>*/( |
- Future/*<S>*/ callback(StreamQueue<T> queue)) { |
+ CancelableOperation<S> cancelable<S>( |
+ Future<S> callback(StreamQueue<T> queue)) { |
var transaction = startTransaction(); |
- var completer = new CancelableCompleter/*<S>*/(onCancel: () { |
+ var completer = new CancelableCompleter<S>(onCancel: () { |
transaction.reject(); |
}); |
@@ -494,7 +493,6 @@ abstract class StreamQueue<T> { |
} |
} |
- |
/// The default implementation of [StreamQueue]. |
/// |
/// This queue gets its events from a stream which is listened |
@@ -522,18 +520,14 @@ class _StreamQueue<T> extends StreamQueue<T> { |
void _ensureListening() { |
if (_isDone) return; |
if (_subscription == null) { |
- _subscription = |
- _sourceStream.listen( |
- (data) { |
- _addResult(new Result.value(data)); |
- }, |
- onError: (error, StackTrace stackTrace) { |
- _addResult(new Result.error(error, stackTrace)); |
- }, |
- onDone: () { |
- _subscription = null; |
- this._close(); |
- }); |
+ _subscription = _sourceStream.listen((data) { |
+ _addResult(new Result.value(data)); |
+ }, onError: (error, StackTrace stackTrace) { |
+ _addResult(new Result.error(error, stackTrace)); |
+ }, onDone: () { |
+ _subscription = null; |
+ this._close(); |
+ }); |
} else { |
_subscription.resume(); |
} |
@@ -649,8 +643,8 @@ class StreamQueueTransaction<T> { |
queue._cancel(); |
} |
- assert((_parent._requestQueue.first as _TransactionRequest) |
- .transaction == this); |
+ assert((_parent._requestQueue.first as _TransactionRequest).transaction == |
+ this); |
_parent._requestQueue.removeFirst(); |
_parent._updateRequests(); |
} |
@@ -721,15 +715,14 @@ class _NextRequest<T> implements _EventRequest<T> { |
return true; |
} |
if (isDone) { |
- _completer.completeError(new StateError("No elements"), |
- StackTrace.current); |
+ _completer.completeError( |
+ new StateError("No elements"), StackTrace.current); |
return true; |
} |
return false; |
} |
} |
- |
/// Request for a [StreamQueue.peek] call. |
/// |
/// Completes the returned future when receiving the first event, |
@@ -748,15 +741,14 @@ class _PeekRequest<T> implements _EventRequest<T> { |
return true; |
} |
if (isDone) { |
- _completer.completeError(new StateError("No elements"), |
- StackTrace.current); |
+ _completer.completeError( |
+ new StateError("No elements"), StackTrace.current); |
return true; |
} |
return false; |
} |
} |
- |
/// Request for a [StreamQueue.skip] call. |
class _SkipRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the skip call. |
@@ -814,7 +806,6 @@ abstract class _ListRequest<T> implements _EventRequest<T> { |
Future<List<T>> get future => _completer.future; |
} |
- |
/// Request for a [StreamQueue.take] call. |
class _TakeRequest<T> extends _ListRequest<T> { |
_TakeRequest(int eventsToTake) : super(eventsToTake); |
@@ -838,7 +829,6 @@ class _TakeRequest<T> extends _ListRequest<T> { |
} |
} |
- |
/// Request for a [StreamQueue.lookAhead] call. |
class _LookAheadRequest<T> extends _ListRequest<T> { |
_LookAheadRequest(int eventsToTake) : super(eventsToTake); |
@@ -861,7 +851,6 @@ class _LookAheadRequest<T> extends _ListRequest<T> { |
} |
} |
- |
/// Request for a [StreamQueue.cancel] call. |
/// |
/// The request needs no events, it just waits in the request queue |
@@ -870,6 +859,7 @@ class _LookAheadRequest<T> extends _ListRequest<T> { |
class _CancelRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the `cancel` call. |
final _completer = new Completer(); |
+ |
/// |
/// When the event is completed, it needs to cancel the active subscription |
/// of the `StreamQueue` object, if any. |
@@ -925,8 +915,9 @@ class _RestRequest<T> implements _EventRequest<T> { |
for (var event in events) { |
event.addTo(controller); |
} |
- controller.addStream(_streamQueue._extractStream(), cancelOnError: false) |
- .whenComplete(controller.close); |
+ controller |
+ .addStream(_streamQueue._extractStream(), cancelOnError: false) |
+ .whenComplete(controller.close); |
_completer.setSourceStream(controller.stream); |
} |
return true; |