OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 | 6 |
7 import 'package:collection/collection.dart'; | 7 import 'package:collection/collection.dart'; |
8 | 8 |
9 import '../utils.dart'; | 9 import '../utils.dart'; |
10 import 'stream_subscription.dart'; | 10 import 'stream_subscription.dart'; |
| 11 import '../delegate/event_sink.dart'; |
11 | 12 |
12 class TypeSafeStream<T> implements Stream<T> { | 13 class TypeSafeStream<T> implements Stream<T> { |
13 final Stream _stream; | 14 final Stream _stream; |
14 | 15 |
15 Future<T> get first async => (await _stream.first) as T; | 16 Future<T> get first async => (await _stream.first) as T; |
16 Future<T> get last async => (await _stream.last) as T; | 17 Future<T> get last async => (await _stream.last) as T; |
17 Future<T> get single async => (await _stream.single) as T; | 18 Future<T> get single async => (await _stream.single) as T; |
18 | 19 |
19 bool get isBroadcast => _stream.isBroadcast; | 20 bool get isBroadcast => _stream.isBroadcast; |
20 Future<bool> get isEmpty => _stream.isEmpty; | 21 Future<bool> get isEmpty => _stream.isEmpty; |
21 Future<int> get length => _stream.length; | 22 Future<int> get length => _stream.length; |
22 | 23 |
23 TypeSafeStream(this._stream); | 24 TypeSafeStream(this._stream); |
24 | 25 |
25 Stream<T> asBroadcastStream( | 26 Stream<T> asBroadcastStream( |
26 {void onListen(StreamSubscription<T> subscription), | 27 {void onListen(StreamSubscription<T> subscription), |
27 void onCancel(StreamSubscription<T> subscription)}) { | 28 void onCancel(StreamSubscription<T> subscription)}) { |
28 return new TypeSafeStream<T>(_stream.asBroadcastStream( | 29 return new TypeSafeStream<T>(_stream.asBroadcastStream( |
29 onListen: onListen == null | 30 onListen: onListen == null |
30 ? null | 31 ? null |
31 : (subscription) => | 32 : (subscription) => |
32 onListen(new TypeSafeStreamSubscription<T>(subscription)), | 33 onListen(new TypeSafeStreamSubscription<T>(subscription)), |
33 onCancel: onCancel == null | 34 onCancel: onCancel == null |
34 ? null | 35 ? null |
35 : (subscription) => | 36 : (subscription) => |
36 onCancel(new TypeSafeStreamSubscription<T>(subscription)))); | 37 onCancel(new TypeSafeStreamSubscription<T>(subscription)))); |
37 } | 38 } |
38 | 39 |
39 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. | 40 Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) => |
40 Stream asyncExpand(Stream convert(T event)) => | |
41 _stream.asyncExpand(_validateType(convert)); | 41 _stream.asyncExpand(_validateType(convert)); |
42 | 42 |
43 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. | 43 Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) => |
44 Stream asyncMap(convert(T event)) => _stream.asyncMap(_validateType(convert)); | 44 _stream.asyncMap(_validateType(convert)); |
45 | 45 |
46 Stream<T> distinct([bool equals(T previous, T next)]) => | 46 Stream<T> distinct([bool equals(T previous, T next)]) => |
47 new TypeSafeStream<T>(_stream.distinct(equals == null | 47 new TypeSafeStream<T>(_stream.distinct(equals == null |
48 ? null | 48 ? null |
49 : (previous, next) => equals(previous as T, next as T))); | 49 : (previous, next) => equals(previous as T, next as T))); |
50 | 50 |
51 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. | 51 Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) => |
52 Future drain([futureValue]) => _stream.drain(futureValue); | 52 _stream.drain(futureValue); |
53 | 53 |
54 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) => | 54 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) => |
55 _stream.expand(_validateType(convert)); | 55 _stream.expand(_validateType(convert)); |
56 | 56 |
57 Future firstWhere(bool test(T element), {Object defaultValue()}) => | 57 Future firstWhere(bool test(T element), {Object defaultValue()}) => |
58 _stream.firstWhere(_validateType(test), defaultValue: defaultValue); | 58 _stream.firstWhere(_validateType(test), defaultValue: defaultValue); |
59 | 59 |
60 Future lastWhere(bool test(T element), {Object defaultValue()}) => | 60 Future lastWhere(bool test(T element), {Object defaultValue()}) => |
61 _stream.lastWhere(_validateType(test), defaultValue: defaultValue); | 61 _stream.lastWhere(_validateType(test), defaultValue: defaultValue); |
62 | 62 |
(...skipping 29 matching lines...) Expand all Loading... |
92 (previous, element) => combine(previous as T, element as T)); | 92 (previous, element) => combine(previous as T, element as T)); |
93 return result as T; | 93 return result as T; |
94 } | 94 } |
95 | 95 |
96 Stream<T> skipWhile(bool test(T element)) => | 96 Stream<T> skipWhile(bool test(T element)) => |
97 new TypeSafeStream<T>(_stream.skipWhile(_validateType(test))); | 97 new TypeSafeStream<T>(_stream.skipWhile(_validateType(test))); |
98 | 98 |
99 Stream<T> takeWhile(bool test(T element)) => | 99 Stream<T> takeWhile(bool test(T element)) => |
100 new TypeSafeStream<T>(_stream.takeWhile(_validateType(test))); | 100 new TypeSafeStream<T>(_stream.takeWhile(_validateType(test))); |
101 | 101 |
102 Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) => | 102 Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) => |
103 _stream.timeout(timeLimit, onTimeout: onTimeout); | 103 new TypeSafeStream<T>(_stream.timeout( |
| 104 timeLimit, |
| 105 onTimeout: (sink) => onTimeout(DelegatingEventSink.typed(sink)))); |
104 | 106 |
105 Future<List<T>> toList() async => | 107 Future<List<T>> toList() async => |
106 DelegatingList.typed/*<T>*/(await _stream.toList()); | 108 DelegatingList.typed/*<T>*/(await _stream.toList()); |
107 | 109 |
108 Future<Set<T>> toSet() async => | 110 Future<Set<T>> toSet() async => |
109 DelegatingSet.typed/*<T>*/(await _stream.toSet()); | 111 DelegatingSet.typed/*<T>*/(await _stream.toSet()); |
110 | 112 |
111 // Don't forward to `_stream.transform` because we want the transformer to see | 113 // Don't forward to `_stream.transform` because we want the transformer to see |
112 // the type-asserted stream. | 114 // the type-asserted stream. |
113 Stream/*<S>*/ transform/*<S>*/( | 115 Stream/*<S>*/ transform/*<S>*/( |
(...skipping 13 matching lines...) Expand all Loading... |
127 Future<bool> contains(Object needle) => _stream.contains(needle); | 129 Future<bool> contains(Object needle) => _stream.contains(needle); |
128 Future<String> join([String separator = ""]) => _stream.join(separator); | 130 Future<String> join([String separator = ""]) => _stream.join(separator); |
129 String toString() => _stream.toString(); | 131 String toString() => _stream.toString(); |
130 | 132 |
131 /// Returns a version of [function] that asserts that its argument is an | 133 /// Returns a version of [function] that asserts that its argument is an |
132 /// instance of `T`. | 134 /// instance of `T`. |
133 UnaryFunction/*<dynamic, S>*/ _validateType/*<S>*/( | 135 UnaryFunction/*<dynamic, S>*/ _validateType/*<S>*/( |
134 /*=S*/ function(T value)) => | 136 /*=S*/ function(T value)) => |
135 function == null ? null : (value) => function(value as T); | 137 function == null ? null : (value) => function(value as T); |
136 } | 138 } |
OLD | NEW |