Index: lib/src/stream_subscription_transformer.dart |
diff --git a/lib/src/stream_subscription_transformer.dart b/lib/src/stream_subscription_transformer.dart |
index c13341cc3774deee59a6ed526d4de840ab278b11..1443b183203adda77f3bd44c7baf1cc6a7e955ef 100644 |
--- a/lib/src/stream_subscription_transformer.dart |
+++ b/lib/src/stream_subscription_transformer.dart |
@@ -27,20 +27,22 @@ typedef void _VoidHandler<T>(StreamSubscription<T> inner); |
/// synchronously call the corresponding method** on the inner |
/// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause] |
/// must call `pause()`, and [handleResume] must call `resume()`. |
-StreamTransformer/*<T, T>*/ subscriptionTransformer/*<T>*/( |
- {Future handleCancel(StreamSubscription/*<T>*/ inner), |
- void handlePause(StreamSubscription/*<T>*/ inner), |
- void handleResume(StreamSubscription/*<T>*/ inner)}) { |
+StreamTransformer<T, T> subscriptionTransformer<T>( |
+ {Future handleCancel(StreamSubscription<T> inner), |
+ void handlePause(StreamSubscription<T> inner), |
+ void handleResume(StreamSubscription<T> inner)}) { |
return new StreamTransformer((stream, cancelOnError) { |
return new _TransformedSubscription( |
stream.listen(null, cancelOnError: cancelOnError), |
handleCancel ?? (inner) => inner.cancel(), |
- handlePause ?? (inner) { |
- inner.pause(); |
- }, |
- handleResume ?? (inner) { |
- inner.resume(); |
- }); |
+ handlePause ?? |
+ (inner) { |
+ inner.pause(); |
+ }, |
+ handleResume ?? |
+ (inner) { |
+ inner.resume(); |
+ }); |
}); |
} |
@@ -61,8 +63,8 @@ class _TransformedSubscription<T> implements StreamSubscription<T> { |
bool get isPaused => _inner?.isPaused ?? false; |
- _TransformedSubscription(this._inner, this._handleCancel, this._handlePause, |
- this._handleResume); |
+ _TransformedSubscription( |
+ this._inner, this._handleCancel, this._handlePause, this._handleResume); |
void onData(void handleData(T data)) { |
_inner?.onData(handleData); |
@@ -77,15 +79,15 @@ class _TransformedSubscription<T> implements StreamSubscription<T> { |
} |
Future cancel() => _cancelMemoizer.runOnce(() { |
- var inner = _inner; |
- _inner.onData(null); |
- _inner.onDone(null); |
- |
- // Setting onError to null will cause errors to be top-leveled. |
- _inner.onError((_, __) {}); |
- _inner = null; |
- return _handleCancel(inner); |
- }); |
+ var inner = _inner; |
+ _inner.onData(null); |
+ _inner.onDone(null); |
+ |
+ // Setting onError to null will cause errors to be top-leveled. |
+ _inner.onError((_, __) {}); |
+ _inner = null; |
+ return _handleCancel(inner); |
+ }); |
final _cancelMemoizer = new AsyncMemoizer(); |
void pause([Future resumeFuture]) { |
@@ -99,6 +101,6 @@ class _TransformedSubscription<T> implements StreamSubscription<T> { |
_handleResume(_inner); |
} |
- Future/*<E>*/ asFuture/*<E>*/([/*=E*/ futureValue]) => |
- _inner?.asFuture(futureValue) ?? new Completer/*<E>*/().future; |
+ Future<E> asFuture<E>([E futureValue]) => |
+ _inner?.asFuture(futureValue) ?? new Completer<E>().future; |
} |