Index: lib/src/typed/stream.dart |
diff --git a/lib/src/typed/stream.dart b/lib/src/typed/stream.dart |
index 3db9f69932e22376a305297981d88b2f67ffaaa6..afa4462d9a019f0f6843ac1938e69e0d8a5fc06d 100644 |
--- a/lib/src/typed/stream.dart |
+++ b/lib/src/typed/stream.dart |
@@ -8,6 +8,7 @@ import 'package:collection/collection.dart'; |
import '../utils.dart'; |
import 'stream_subscription.dart'; |
+import '../delegate/event_sink.dart'; |
class TypeSafeStream<T> implements Stream<T> { |
final Stream _stream; |
@@ -36,20 +37,19 @@ class TypeSafeStream<T> implements Stream<T> { |
onCancel(new TypeSafeStreamSubscription<T>(subscription)))); |
} |
- // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. |
- Stream asyncExpand(Stream convert(T event)) => |
+ Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) => |
_stream.asyncExpand(_validateType(convert)); |
- // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. |
- Stream asyncMap(convert(T event)) => _stream.asyncMap(_validateType(convert)); |
+ Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) => |
+ _stream.asyncMap(_validateType(convert)); |
Stream<T> distinct([bool equals(T previous, T next)]) => |
new TypeSafeStream<T>(_stream.distinct(equals == null |
? null |
: (previous, next) => equals(previous as T, next as T))); |
- // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. |
- Future drain([futureValue]) => _stream.drain(futureValue); |
+ Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) => |
+ _stream.drain(futureValue); |
Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) => |
_stream.expand(_validateType(convert)); |
@@ -99,8 +99,10 @@ class TypeSafeStream<T> implements Stream<T> { |
Stream<T> takeWhile(bool test(T element)) => |
new TypeSafeStream<T>(_stream.takeWhile(_validateType(test))); |
- Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) => |
- _stream.timeout(timeLimit, onTimeout: onTimeout); |
+ Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) => |
+ new TypeSafeStream<T>(_stream.timeout( |
+ timeLimit, |
+ onTimeout: (sink) => onTimeout(DelegatingEventSink.typed(sink)))); |
Future<List<T>> toList() async => |
DelegatingList.typed/*<T>*/(await _stream.toList()); |