OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 import 'dart:async'; | |
6 | |
7 import 'package:collection/collection.dart'; | |
8 | |
9 import '../utils.dart'; | |
10 import 'stream_subscription.dart'; | |
11 | |
12 class TypeSafeStream<T> implements Stream<T> { | |
13 final Stream _stream; | |
14 | |
15 Future<T> get first async => (await _stream.first) as T; | |
16 Future<T> get last async => (await _stream.last) as T; | |
17 Future<T> get single async => (await _stream.single) as T; | |
18 | |
19 bool get isBroadcast => _stream.isBroadcast; | |
20 Future<bool> get isEmpty => _stream.isEmpty; | |
21 Future<int> get length => _stream.length; | |
22 | |
23 TypeSafeStream(this._stream); | |
24 | |
25 Stream<T> asBroadcastStream( | |
26 {void onListen(StreamSubscription<T> subscription), | |
27 void onCancel(StreamSubscription<T> subscription)}) { | |
28 return new TypeSafeStream<T>(_stream.asBroadcastStream( | |
29 onListen: onListen == null | |
30 ? null | |
31 : (subscription) => | |
32 onListen(new TypeSafeStreamSubscription<T>(subscription)), | |
33 onCancel: onCancel == null | |
34 ? null | |
35 : (subscription) => | |
36 onCancel(new TypeSafeStreamSubscription<T>(subscription)))); | |
37 } | |
38 | |
39 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. | |
40 Stream asyncExpand(Stream convert(T event)) => | |
41 _stream.asyncExpand(_validateType(convert)); | |
42 | |
43 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. | |
44 Stream asyncMap(convert(T event)) => _stream.asyncMap(_validateType(convert)); | |
45 | |
46 Stream<T> distinct([bool equals(T previous, T next)]) => | |
47 new TypeSafeStream<T>(_stream.distinct(equals == null | |
48 ? null | |
49 : (previous, next) => equals(previous as T, next as T))); | |
50 | |
51 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. | |
52 Future drain([futureValue]) => _stream.drain(futureValue); | |
Lasse Reichstein Nielsen
2016/04/08 09:20:29
I'm guessing this could just be:
Future<S> drain
nweiz
2016/04/11 20:26:16
That's basically what the TODO is about. Unfortuna
| |
53 | |
54 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) => | |
55 _stream.expand(_validateType(convert)); | |
56 | |
57 Future firstWhere(bool test(T element), {Object defaultValue()}) => | |
58 _stream.firstWhere(_validateType(test), defaultValue: defaultValue); | |
59 | |
60 Future lastWhere(bool test(T element), {Object defaultValue()}) => | |
61 _stream.lastWhere(_validateType(test), defaultValue: defaultValue); | |
62 | |
63 Future<T> singleWhere(bool test(T element)) async => | |
64 (await _stream.singleWhere(_validateType(test))) as T; | |
65 | |
66 Future/*<S>*/ fold/*<S>*/(/*=S*/ initialValue, | |
67 /*=S*/ combine(/*=S*/ previous, T element)) => | |
68 _stream.fold(initialValue, | |
69 (previous, element) => combine(previous, element as T)); | |
70 | |
71 Future forEach(void action(T element)) => | |
72 _stream.forEach(_validateType(action)); | |
73 | |
74 Stream<T> handleError(Function onError, {bool test(error)}) => | |
75 new TypeSafeStream<T>(_stream.handleError(onError, test: test)); | |
76 | |
77 StreamSubscription<T> listen(void onData(T value), | |
78 {Function onError, void onDone(), bool cancelOnError}) => | |
79 new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData), | |
80 onError: onError, onDone: onDone, cancelOnError: cancelOnError)); | |
81 | |
82 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) => | |
83 _stream.map(_validateType(convert)); | |
84 | |
85 // Don't forward to `_stream.pipe` because we want the consumer to see the | |
86 // type-asserted stream. | |
87 Future pipe(StreamConsumer<T> consumer) => | |
88 consumer.addStream(this).then((_) => consumer.close()); | |
89 | |
90 Future<T> reduce(T combine(T previous, T element)) async { | |
91 var result = await _stream.reduce( | |
92 (previous, element) => combine(previous as T, element as T)); | |
93 return result as T; | |
94 } | |
95 | |
96 Stream<T> skipWhile(bool test(T element)) => | |
97 new TypeSafeStream<T>(_stream.skipWhile(_validateType(test))); | |
98 | |
99 Stream<T> takeWhile(bool test(T element)) => | |
100 new TypeSafeStream<T>(_stream.takeWhile(_validateType(test))); | |
101 | |
102 Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) => | |
103 _stream.timeout(timeLimit, onTimeout: onTimeout); | |
104 | |
105 Future<List<T>> toList() async => | |
106 DelegatingList.typed/*<T>*/(await _stream.toList()); | |
107 | |
108 Future<Set<T>> toSet() async => | |
109 DelegatingSet.typed/*<T>*/(await _stream.toSet()); | |
110 | |
111 // Don't forward to `_stream.transform` because we want the transformer to see | |
112 // the type-asserted stream. | |
113 Stream/*<S>*/ transform/*<S>*/( | |
114 StreamTransformer<T, dynamic/*=S*/> transformer) => | |
115 transformer.bind(this); | |
116 | |
117 Stream<T> where(bool test(T element)) => | |
118 new TypeSafeStream<T>(_stream.where(_validateType(test))); | |
119 | |
120 Future<bool> every(bool test(T element)) => | |
121 _stream.every(_validateType(test)); | |
122 | |
123 Future<bool> any(bool test(T element)) => _stream.any(_validateType(test)); | |
124 Stream<T> skip(int count) => new TypeSafeStream<T>(_stream.skip(count)); | |
125 Stream<T> take(int count) => new TypeSafeStream<T>(_stream.take(count)); | |
126 Future<T> elementAt(int index) async => (await _stream.elementAt(index)) as T; | |
127 Future<bool> contains(Object needle) => _stream.contains(needle); | |
128 Future<String> join([String separator = ""]) => _stream.join(separator); | |
129 String toString() => _stream.toString(); | |
130 | |
131 /// Returns a version of [function] that asserts that its argument is an | |
132 /// instance of `T`. | |
133 UnaryFunction/*<dynamic, S>*/ _validateType/*<S>*/( | |
Lasse Reichstein Nielsen
2016/04/08 09:20:29
_validateParameterType?
nweiz
2016/04/11 20:26:16
I think that's more verbose than necessary.
| |
134 /*=S*/ function(T value)) => | |
135 function == null ? null : (value) => function(value as T); | |
136 } | |
OLD | NEW |