Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(124)

Side by Side Diff: lib/src/typed/stream.dart

Issue 1870543004: Add typed wrapper functions to delegate classes. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Code review changes Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import 'dart:async';
6
7 import 'package:collection/collection.dart';
8
9 import '../utils.dart';
10 import 'stream_subscription.dart';
11
12 class TypeSafeStream<T> implements Stream<T> {
13 final Stream _stream;
14
15 Future<T> get first async => (await _stream.first) as T;
16 Future<T> get last async => (await _stream.last) as T;
17 Future<T> get single async => (await _stream.single) as T;
18
19 bool get isBroadcast => _stream.isBroadcast;
20 Future<bool> get isEmpty => _stream.isEmpty;
21 Future<int> get length => _stream.length;
22
23 TypeSafeStream(this._stream);
24
25 Stream<T> asBroadcastStream(
26 {void onListen(StreamSubscription<T> subscription),
27 void onCancel(StreamSubscription<T> subscription)}) {
28 return new TypeSafeStream<T>(_stream.asBroadcastStream(
29 onListen: onListen == null
30 ? null
31 : (subscription) =>
32 onListen(new TypeSafeStreamSubscription<T>(subscription)),
33 onCancel: onCancel == null
34 ? null
35 : (subscription) =>
36 onCancel(new TypeSafeStreamSubscription<T>(subscription))));
37 }
38
39 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
40 Stream asyncExpand(Stream convert(T event)) =>
41 _stream.asyncExpand(_validateType(convert));
42
43 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
44 Stream asyncMap(convert(T event)) => _stream.asyncMap(_validateType(convert));
45
46 Stream<T> distinct([bool equals(T previous, T next)]) =>
47 new TypeSafeStream<T>(_stream.distinct(equals == null
48 ? null
49 : (previous, next) => equals(previous as T, next as T)));
50
51 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
52 Future drain([futureValue]) => _stream.drain(futureValue);
53
54 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) =>
55 _stream.expand(_validateType(convert));
56
57 Future firstWhere(bool test(T element), {Object defaultValue()}) =>
58 _stream.firstWhere(_validateType(test), defaultValue: defaultValue);
59
60 Future lastWhere(bool test(T element), {Object defaultValue()}) =>
61 _stream.lastWhere(_validateType(test), defaultValue: defaultValue);
62
63 Future<T> singleWhere(bool test(T element)) async =>
64 (await _stream.singleWhere(_validateType(test))) as T;
65
66 Future/*<S>*/ fold/*<S>*/(/*=S*/ initialValue,
67 /*=S*/ combine(/*=S*/ previous, T element)) =>
68 _stream.fold(initialValue,
69 (previous, element) => combine(previous, element as T));
70
71 Future forEach(void action(T element)) =>
72 _stream.forEach(_validateType(action));
73
74 Stream<T> handleError(Function onError, {bool test(error)}) =>
75 new TypeSafeStream<T>(_stream.handleError(onError, test: test));
76
77 StreamSubscription<T> listen(void onData(T value),
78 {Function onError, void onDone(), bool cancelOnError}) =>
79 new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData),
80 onError: onError, onDone: onDone, cancelOnError: cancelOnError));
81
82 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) =>
83 _stream.map(_validateType(convert));
84
85 // Don't forward to `_stream.pipe` because we want the consumer to see the
86 // type-asserted stream.
87 Future pipe(StreamConsumer<T> consumer) =>
88 consumer.addStream(this).then((_) => consumer.close());
89
90 Future<T> reduce(T combine(T previous, T element)) async {
91 var result = await _stream.reduce(
92 (previous, element) => combine(previous as T, element as T));
93 return result as T;
94 }
95
96 Stream<T> skipWhile(bool test(T element)) =>
97 new TypeSafeStream<T>(_stream.skipWhile(_validateType(test)));
98
99 Stream<T> takeWhile(bool test(T element)) =>
100 new TypeSafeStream<T>(_stream.takeWhile(_validateType(test)));
101
102 Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) =>
103 _stream.timeout(timeLimit, onTimeout: onTimeout);
104
105 Future<List<T>> toList() async =>
106 DelegatingList.typed/*<T>*/(await _stream.toList());
107
108 Future<Set<T>> toSet() async =>
109 DelegatingSet.typed/*<T>*/(await _stream.toSet());
110
111 // Don't forward to `_stream.transform` because we want the transformer to see
112 // the type-asserted stream.
113 Stream/*<S>*/ transform/*<S>*/(
114 StreamTransformer<T, dynamic/*=S*/> transformer) =>
115 transformer.bind(this);
116
117 Stream<T> where(bool test(T element)) =>
118 new TypeSafeStream<T>(_stream.where(_validateType(test)));
119
120 Future<bool> every(bool test(T element)) =>
121 _stream.every(_validateType(test));
122
123 Future<bool> any(bool test(T element)) => _stream.any(_validateType(test));
124 Stream<T> skip(int count) => new TypeSafeStream<T>(_stream.skip(count));
125 Stream<T> take(int count) => new TypeSafeStream<T>(_stream.take(count));
126 Future<T> elementAt(int index) async => (await _stream.elementAt(index)) as T;
127 Future<bool> contains(Object needle) => _stream.contains(needle);
128 Future<String> join([String separator = ""]) => _stream.join(separator);
129 String toString() => _stream.toString();
130
131 /// Returns a version of [function] that asserts that its argument is an
132 /// instance of `T`.
133 UnaryFunction/*<dynamic, S>*/ _validateType/*<S>*/(
134 /*=S*/ function(T value)) =>
135 function == null ? null : (value) => function(value as T);
136 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698