Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(603)

Unified Diff: lib/src/typed/stream.dart

Issue 1870543004: Add typed wrapper functions to delegate classes. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Code review changes Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/typed/stream.dart
diff --git a/lib/src/typed/stream.dart b/lib/src/typed/stream.dart
new file mode 100644
index 0000000000000000000000000000000000000000..3db9f69932e22376a305297981d88b2f67ffaaa6
--- /dev/null
+++ b/lib/src/typed/stream.dart
@@ -0,0 +1,136 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:collection/collection.dart';
+
+import '../utils.dart';
+import 'stream_subscription.dart';
+
+class TypeSafeStream<T> implements Stream<T> {
+ final Stream _stream;
+
+ Future<T> get first async => (await _stream.first) as T;
+ Future<T> get last async => (await _stream.last) as T;
+ Future<T> get single async => (await _stream.single) as T;
+
+ bool get isBroadcast => _stream.isBroadcast;
+ Future<bool> get isEmpty => _stream.isEmpty;
+ Future<int> get length => _stream.length;
+
+ TypeSafeStream(this._stream);
+
+ Stream<T> asBroadcastStream(
+ {void onListen(StreamSubscription<T> subscription),
+ void onCancel(StreamSubscription<T> subscription)}) {
+ return new TypeSafeStream<T>(_stream.asBroadcastStream(
+ onListen: onListen == null
+ ? null
+ : (subscription) =>
+ onListen(new TypeSafeStreamSubscription<T>(subscription)),
+ onCancel: onCancel == null
+ ? null
+ : (subscription) =>
+ onCancel(new TypeSafeStreamSubscription<T>(subscription))));
+ }
+
+ // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
+ Stream asyncExpand(Stream convert(T event)) =>
+ _stream.asyncExpand(_validateType(convert));
+
+ // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
+ Stream asyncMap(convert(T event)) => _stream.asyncMap(_validateType(convert));
+
+ Stream<T> distinct([bool equals(T previous, T next)]) =>
+ new TypeSafeStream<T>(_stream.distinct(equals == null
+ ? null
+ : (previous, next) => equals(previous as T, next as T)));
+
+ // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
+ Future drain([futureValue]) => _stream.drain(futureValue);
+
+ Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) =>
+ _stream.expand(_validateType(convert));
+
+ Future firstWhere(bool test(T element), {Object defaultValue()}) =>
+ _stream.firstWhere(_validateType(test), defaultValue: defaultValue);
+
+ Future lastWhere(bool test(T element), {Object defaultValue()}) =>
+ _stream.lastWhere(_validateType(test), defaultValue: defaultValue);
+
+ Future<T> singleWhere(bool test(T element)) async =>
+ (await _stream.singleWhere(_validateType(test))) as T;
+
+ Future/*<S>*/ fold/*<S>*/(/*=S*/ initialValue,
+ /*=S*/ combine(/*=S*/ previous, T element)) =>
+ _stream.fold(initialValue,
+ (previous, element) => combine(previous, element as T));
+
+ Future forEach(void action(T element)) =>
+ _stream.forEach(_validateType(action));
+
+ Stream<T> handleError(Function onError, {bool test(error)}) =>
+ new TypeSafeStream<T>(_stream.handleError(onError, test: test));
+
+ StreamSubscription<T> listen(void onData(T value),
+ {Function onError, void onDone(), bool cancelOnError}) =>
+ new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData),
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError));
+
+ Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) =>
+ _stream.map(_validateType(convert));
+
+ // Don't forward to `_stream.pipe` because we want the consumer to see the
+ // type-asserted stream.
+ Future pipe(StreamConsumer<T> consumer) =>
+ consumer.addStream(this).then((_) => consumer.close());
+
+ Future<T> reduce(T combine(T previous, T element)) async {
+ var result = await _stream.reduce(
+ (previous, element) => combine(previous as T, element as T));
+ return result as T;
+ }
+
+ Stream<T> skipWhile(bool test(T element)) =>
+ new TypeSafeStream<T>(_stream.skipWhile(_validateType(test)));
+
+ Stream<T> takeWhile(bool test(T element)) =>
+ new TypeSafeStream<T>(_stream.takeWhile(_validateType(test)));
+
+ Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) =>
+ _stream.timeout(timeLimit, onTimeout: onTimeout);
+
+ Future<List<T>> toList() async =>
+ DelegatingList.typed/*<T>*/(await _stream.toList());
+
+ Future<Set<T>> toSet() async =>
+ DelegatingSet.typed/*<T>*/(await _stream.toSet());
+
+ // Don't forward to `_stream.transform` because we want the transformer to see
+ // the type-asserted stream.
+ Stream/*<S>*/ transform/*<S>*/(
+ StreamTransformer<T, dynamic/*=S*/> transformer) =>
+ transformer.bind(this);
+
+ Stream<T> where(bool test(T element)) =>
+ new TypeSafeStream<T>(_stream.where(_validateType(test)));
+
+ Future<bool> every(bool test(T element)) =>
+ _stream.every(_validateType(test));
+
+ Future<bool> any(bool test(T element)) => _stream.any(_validateType(test));
+ Stream<T> skip(int count) => new TypeSafeStream<T>(_stream.skip(count));
+ Stream<T> take(int count) => new TypeSafeStream<T>(_stream.take(count));
+ Future<T> elementAt(int index) async => (await _stream.elementAt(index)) as T;
+ Future<bool> contains(Object needle) => _stream.contains(needle);
+ Future<String> join([String separator = ""]) => _stream.join(separator);
+ String toString() => _stream.toString();
+
+ /// Returns a version of [function] that asserts that its argument is an
+ /// instance of `T`.
+ UnaryFunction/*<dynamic, S>*/ _validateType/*<S>*/(
+ /*=S*/ function(T value)) =>
+ function == null ? null : (value) => function(value as T);
+}

Powered by Google App Engine
This is Rietveld 408576698