Index: lib/src/typed/stream.dart |
diff --git a/lib/src/typed/stream.dart b/lib/src/typed/stream.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..3db9f69932e22376a305297981d88b2f67ffaaa6 |
--- /dev/null |
+++ b/lib/src/typed/stream.dart |
@@ -0,0 +1,136 @@ |
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import 'dart:async'; |
+ |
+import 'package:collection/collection.dart'; |
+ |
+import '../utils.dart'; |
+import 'stream_subscription.dart'; |
+ |
+class TypeSafeStream<T> implements Stream<T> { |
+ final Stream _stream; |
+ |
+ Future<T> get first async => (await _stream.first) as T; |
+ Future<T> get last async => (await _stream.last) as T; |
+ Future<T> get single async => (await _stream.single) as T; |
+ |
+ bool get isBroadcast => _stream.isBroadcast; |
+ Future<bool> get isEmpty => _stream.isEmpty; |
+ Future<int> get length => _stream.length; |
+ |
+ TypeSafeStream(this._stream); |
+ |
+ Stream<T> asBroadcastStream( |
+ {void onListen(StreamSubscription<T> subscription), |
+ void onCancel(StreamSubscription<T> subscription)}) { |
+ return new TypeSafeStream<T>(_stream.asBroadcastStream( |
+ onListen: onListen == null |
+ ? null |
+ : (subscription) => |
+ onListen(new TypeSafeStreamSubscription<T>(subscription)), |
+ onCancel: onCancel == null |
+ ? null |
+ : (subscription) => |
+ onCancel(new TypeSafeStreamSubscription<T>(subscription)))); |
+ } |
+ |
+ // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. |
+ Stream asyncExpand(Stream convert(T event)) => |
+ _stream.asyncExpand(_validateType(convert)); |
+ |
+ // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. |
+ Stream asyncMap(convert(T event)) => _stream.asyncMap(_validateType(convert)); |
+ |
+ Stream<T> distinct([bool equals(T previous, T next)]) => |
+ new TypeSafeStream<T>(_stream.distinct(equals == null |
+ ? null |
+ : (previous, next) => equals(previous as T, next as T))); |
+ |
+ // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. |
+ Future drain([futureValue]) => _stream.drain(futureValue); |
+ |
+ Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) => |
+ _stream.expand(_validateType(convert)); |
+ |
+ Future firstWhere(bool test(T element), {Object defaultValue()}) => |
+ _stream.firstWhere(_validateType(test), defaultValue: defaultValue); |
+ |
+ Future lastWhere(bool test(T element), {Object defaultValue()}) => |
+ _stream.lastWhere(_validateType(test), defaultValue: defaultValue); |
+ |
+ Future<T> singleWhere(bool test(T element)) async => |
+ (await _stream.singleWhere(_validateType(test))) as T; |
+ |
+ Future/*<S>*/ fold/*<S>*/(/*=S*/ initialValue, |
+ /*=S*/ combine(/*=S*/ previous, T element)) => |
+ _stream.fold(initialValue, |
+ (previous, element) => combine(previous, element as T)); |
+ |
+ Future forEach(void action(T element)) => |
+ _stream.forEach(_validateType(action)); |
+ |
+ Stream<T> handleError(Function onError, {bool test(error)}) => |
+ new TypeSafeStream<T>(_stream.handleError(onError, test: test)); |
+ |
+ StreamSubscription<T> listen(void onData(T value), |
+ {Function onError, void onDone(), bool cancelOnError}) => |
+ new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData), |
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError)); |
+ |
+ Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) => |
+ _stream.map(_validateType(convert)); |
+ |
+ // Don't forward to `_stream.pipe` because we want the consumer to see the |
+ // type-asserted stream. |
+ Future pipe(StreamConsumer<T> consumer) => |
+ consumer.addStream(this).then((_) => consumer.close()); |
+ |
+ Future<T> reduce(T combine(T previous, T element)) async { |
+ var result = await _stream.reduce( |
+ (previous, element) => combine(previous as T, element as T)); |
+ return result as T; |
+ } |
+ |
+ Stream<T> skipWhile(bool test(T element)) => |
+ new TypeSafeStream<T>(_stream.skipWhile(_validateType(test))); |
+ |
+ Stream<T> takeWhile(bool test(T element)) => |
+ new TypeSafeStream<T>(_stream.takeWhile(_validateType(test))); |
+ |
+ Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) => |
+ _stream.timeout(timeLimit, onTimeout: onTimeout); |
+ |
+ Future<List<T>> toList() async => |
+ DelegatingList.typed/*<T>*/(await _stream.toList()); |
+ |
+ Future<Set<T>> toSet() async => |
+ DelegatingSet.typed/*<T>*/(await _stream.toSet()); |
+ |
+ // Don't forward to `_stream.transform` because we want the transformer to see |
+ // the type-asserted stream. |
+ Stream/*<S>*/ transform/*<S>*/( |
+ StreamTransformer<T, dynamic/*=S*/> transformer) => |
+ transformer.bind(this); |
+ |
+ Stream<T> where(bool test(T element)) => |
+ new TypeSafeStream<T>(_stream.where(_validateType(test))); |
+ |
+ Future<bool> every(bool test(T element)) => |
+ _stream.every(_validateType(test)); |
+ |
+ Future<bool> any(bool test(T element)) => _stream.any(_validateType(test)); |
+ Stream<T> skip(int count) => new TypeSafeStream<T>(_stream.skip(count)); |
+ Stream<T> take(int count) => new TypeSafeStream<T>(_stream.take(count)); |
+ Future<T> elementAt(int index) async => (await _stream.elementAt(index)) as T; |
+ Future<bool> contains(Object needle) => _stream.contains(needle); |
+ Future<String> join([String separator = ""]) => _stream.join(separator); |
+ String toString() => _stream.toString(); |
+ |
+ /// Returns a version of [function] that asserts that its argument is an |
+ /// instance of `T`. |
+ UnaryFunction/*<dynamic, S>*/ _validateType/*<S>*/( |
+ /*=S*/ function(T value)) => |
+ function == null ? null : (value) => function(value as T); |
+} |