| Index: lib/src/stream_subscription_transformer.dart
|
| diff --git a/lib/src/stream_subscription_transformer.dart b/lib/src/stream_subscription_transformer.dart
|
| index c13341cc3774deee59a6ed526d4de840ab278b11..1443b183203adda77f3bd44c7baf1cc6a7e955ef 100644
|
| --- a/lib/src/stream_subscription_transformer.dart
|
| +++ b/lib/src/stream_subscription_transformer.dart
|
| @@ -27,20 +27,22 @@ typedef void _VoidHandler<T>(StreamSubscription<T> inner);
|
| /// synchronously call the corresponding method** on the inner
|
| /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause]
|
| /// must call `pause()`, and [handleResume] must call `resume()`.
|
| -StreamTransformer/*<T, T>*/ subscriptionTransformer/*<T>*/(
|
| - {Future handleCancel(StreamSubscription/*<T>*/ inner),
|
| - void handlePause(StreamSubscription/*<T>*/ inner),
|
| - void handleResume(StreamSubscription/*<T>*/ inner)}) {
|
| +StreamTransformer<T, T> subscriptionTransformer<T>(
|
| + {Future handleCancel(StreamSubscription<T> inner),
|
| + void handlePause(StreamSubscription<T> inner),
|
| + void handleResume(StreamSubscription<T> inner)}) {
|
| return new StreamTransformer((stream, cancelOnError) {
|
| return new _TransformedSubscription(
|
| stream.listen(null, cancelOnError: cancelOnError),
|
| handleCancel ?? (inner) => inner.cancel(),
|
| - handlePause ?? (inner) {
|
| - inner.pause();
|
| - },
|
| - handleResume ?? (inner) {
|
| - inner.resume();
|
| - });
|
| + handlePause ??
|
| + (inner) {
|
| + inner.pause();
|
| + },
|
| + handleResume ??
|
| + (inner) {
|
| + inner.resume();
|
| + });
|
| });
|
| }
|
|
|
| @@ -61,8 +63,8 @@ class _TransformedSubscription<T> implements StreamSubscription<T> {
|
|
|
| bool get isPaused => _inner?.isPaused ?? false;
|
|
|
| - _TransformedSubscription(this._inner, this._handleCancel, this._handlePause,
|
| - this._handleResume);
|
| + _TransformedSubscription(
|
| + this._inner, this._handleCancel, this._handlePause, this._handleResume);
|
|
|
| void onData(void handleData(T data)) {
|
| _inner?.onData(handleData);
|
| @@ -77,15 +79,15 @@ class _TransformedSubscription<T> implements StreamSubscription<T> {
|
| }
|
|
|
| Future cancel() => _cancelMemoizer.runOnce(() {
|
| - var inner = _inner;
|
| - _inner.onData(null);
|
| - _inner.onDone(null);
|
| -
|
| - // Setting onError to null will cause errors to be top-leveled.
|
| - _inner.onError((_, __) {});
|
| - _inner = null;
|
| - return _handleCancel(inner);
|
| - });
|
| + var inner = _inner;
|
| + _inner.onData(null);
|
| + _inner.onDone(null);
|
| +
|
| + // Setting onError to null will cause errors to be top-leveled.
|
| + _inner.onError((_, __) {});
|
| + _inner = null;
|
| + return _handleCancel(inner);
|
| + });
|
| final _cancelMemoizer = new AsyncMemoizer();
|
|
|
| void pause([Future resumeFuture]) {
|
| @@ -99,6 +101,6 @@ class _TransformedSubscription<T> implements StreamSubscription<T> {
|
| _handleResume(_inner);
|
| }
|
|
|
| - Future/*<E>*/ asFuture/*<E>*/([/*=E*/ futureValue]) =>
|
| - _inner?.asFuture(futureValue) ?? new Completer/*<E>*/().future;
|
| + Future<E> asFuture<E>([E futureValue]) =>
|
| + _inner?.asFuture(futureValue) ?? new Completer<E>().future;
|
| }
|
|
|