| Index: lib/src/typed/stream.dart
|
| diff --git a/lib/src/typed/stream.dart b/lib/src/typed/stream.dart
|
| index 3db9f69932e22376a305297981d88b2f67ffaaa6..afa4462d9a019f0f6843ac1938e69e0d8a5fc06d 100644
|
| --- a/lib/src/typed/stream.dart
|
| +++ b/lib/src/typed/stream.dart
|
| @@ -8,6 +8,7 @@ import 'package:collection/collection.dart';
|
|
|
| import '../utils.dart';
|
| import 'stream_subscription.dart';
|
| +import '../delegate/event_sink.dart';
|
|
|
| class TypeSafeStream<T> implements Stream<T> {
|
| final Stream _stream;
|
| @@ -36,20 +37,19 @@ class TypeSafeStream<T> implements Stream<T> {
|
| onCancel(new TypeSafeStreamSubscription<T>(subscription))));
|
| }
|
|
|
| - // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
|
| - Stream asyncExpand(Stream convert(T event)) =>
|
| + Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) =>
|
| _stream.asyncExpand(_validateType(convert));
|
|
|
| - // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
|
| - Stream asyncMap(convert(T event)) => _stream.asyncMap(_validateType(convert));
|
| + Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) =>
|
| + _stream.asyncMap(_validateType(convert));
|
|
|
| Stream<T> distinct([bool equals(T previous, T next)]) =>
|
| new TypeSafeStream<T>(_stream.distinct(equals == null
|
| ? null
|
| : (previous, next) => equals(previous as T, next as T)));
|
|
|
| - // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
|
| - Future drain([futureValue]) => _stream.drain(futureValue);
|
| + Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) =>
|
| + _stream.drain(futureValue);
|
|
|
| Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) =>
|
| _stream.expand(_validateType(convert));
|
| @@ -99,8 +99,10 @@ class TypeSafeStream<T> implements Stream<T> {
|
| Stream<T> takeWhile(bool test(T element)) =>
|
| new TypeSafeStream<T>(_stream.takeWhile(_validateType(test)));
|
|
|
| - Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) =>
|
| - _stream.timeout(timeLimit, onTimeout: onTimeout);
|
| + Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) =>
|
| + new TypeSafeStream<T>(_stream.timeout(
|
| + timeLimit,
|
| + onTimeout: (sink) => onTimeout(DelegatingEventSink.typed(sink))));
|
|
|
| Future<List<T>> toList() async =>
|
| DelegatingList.typed/*<T>*/(await _stream.toList());
|
|
|