OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 | 6 |
7 import 'package:collection/collection.dart'; | 7 import 'package:collection/collection.dart'; |
8 | 8 |
9 import '../utils.dart'; | 9 import '../utils.dart'; |
10 import 'stream_subscription.dart'; | 10 import 'stream_subscription.dart'; |
(...skipping 19 matching lines...) Expand all Loading... |
30 onListen: onListen == null | 30 onListen: onListen == null |
31 ? null | 31 ? null |
32 : (subscription) => | 32 : (subscription) => |
33 onListen(new TypeSafeStreamSubscription<T>(subscription)), | 33 onListen(new TypeSafeStreamSubscription<T>(subscription)), |
34 onCancel: onCancel == null | 34 onCancel: onCancel == null |
35 ? null | 35 ? null |
36 : (subscription) => | 36 : (subscription) => |
37 onCancel(new TypeSafeStreamSubscription<T>(subscription)))); | 37 onCancel(new TypeSafeStreamSubscription<T>(subscription)))); |
38 } | 38 } |
39 | 39 |
40 Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) => | 40 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) => |
41 _stream.asyncExpand(_validateType(convert)); | 41 _stream.asyncExpand(_validateType(convert)); |
42 | 42 |
43 Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) => | 43 Stream<E> asyncMap<E>(convert(T event)) => |
44 _stream.asyncMap(_validateType(convert)); | 44 _stream.asyncMap(_validateType(convert)); |
45 | 45 |
46 Stream<T> distinct([bool equals(T previous, T next)]) => | 46 Stream<T> distinct([bool equals(T previous, T next)]) => |
47 new TypeSafeStream<T>(_stream.distinct(equals == null | 47 new TypeSafeStream<T>(_stream.distinct(equals == null |
48 ? null | 48 ? null |
49 : (previous, next) => equals(previous as T, next as T))); | 49 : (previous, next) => equals(previous as T, next as T))); |
50 | 50 |
51 Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) => | 51 Future<E> drain<E>([E futureValue]) => |
52 _stream.drain(futureValue); | 52 _stream.drain(futureValue); |
53 | 53 |
54 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) => | 54 Stream<S> expand<S>(Iterable<S> convert(T value)) => |
55 _stream.expand(_validateType(convert)); | 55 _stream.expand(_validateType(convert)); |
56 | 56 |
57 Future firstWhere(bool test(T element), {Object defaultValue()}) => | 57 Future firstWhere(bool test(T element), {Object defaultValue()}) => |
58 _stream.firstWhere(_validateType(test), defaultValue: defaultValue); | 58 _stream.firstWhere(_validateType(test), defaultValue: defaultValue); |
59 | 59 |
60 Future lastWhere(bool test(T element), {Object defaultValue()}) => | 60 Future lastWhere(bool test(T element), {Object defaultValue()}) => |
61 _stream.lastWhere(_validateType(test), defaultValue: defaultValue); | 61 _stream.lastWhere(_validateType(test), defaultValue: defaultValue); |
62 | 62 |
63 Future<T> singleWhere(bool test(T element)) async => | 63 Future<T> singleWhere(bool test(T element)) async => |
64 (await _stream.singleWhere(_validateType(test))) as T; | 64 (await _stream.singleWhere(_validateType(test))) as T; |
65 | 65 |
66 Future/*<S>*/ fold/*<S>*/(/*=S*/ initialValue, | 66 Future<S> fold<S>(S initialValue, |
67 /*=S*/ combine(/*=S*/ previous, T element)) => | 67 S combine(S previous, T element)) => |
68 _stream.fold(initialValue, | 68 _stream.fold(initialValue, |
69 (previous, element) => combine(previous, element as T)); | 69 (previous, element) => combine(previous, element as T)); |
70 | 70 |
71 Future forEach(void action(T element)) => | 71 Future forEach(void action(T element)) => |
72 _stream.forEach(_validateType(action)); | 72 _stream.forEach(_validateType(action)); |
73 | 73 |
74 Stream<T> handleError(Function onError, {bool test(error)}) => | 74 Stream<T> handleError(Function onError, {bool test(error)}) => |
75 new TypeSafeStream<T>(_stream.handleError(onError, test: test)); | 75 new TypeSafeStream<T>(_stream.handleError(onError, test: test)); |
76 | 76 |
77 StreamSubscription<T> listen(void onData(T value), | 77 StreamSubscription<T> listen(void onData(T value), |
78 {Function onError, void onDone(), bool cancelOnError}) => | 78 {Function onError, void onDone(), bool cancelOnError}) => |
79 new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData), | 79 new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData), |
80 onError: onError, onDone: onDone, cancelOnError: cancelOnError)); | 80 onError: onError, onDone: onDone, cancelOnError: cancelOnError)); |
81 | 81 |
82 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) => | 82 Stream<S> map<S>(S convert(T event)) => |
83 _stream.map(_validateType(convert)); | 83 _stream.map(_validateType(convert)); |
84 | 84 |
85 // Don't forward to `_stream.pipe` because we want the consumer to see the | 85 // Don't forward to `_stream.pipe` because we want the consumer to see the |
86 // type-asserted stream. | 86 // type-asserted stream. |
87 Future pipe(StreamConsumer<T> consumer) => | 87 Future pipe(StreamConsumer<T> consumer) => |
88 consumer.addStream(this).then((_) => consumer.close()); | 88 consumer.addStream(this).then((_) => consumer.close()); |
89 | 89 |
90 Future<T> reduce(T combine(T previous, T element)) async { | 90 Future<T> reduce(T combine(T previous, T element)) async { |
91 var result = await _stream.reduce( | 91 var result = await _stream.reduce( |
92 (previous, element) => combine(previous as T, element as T)); | 92 (previous, element) => combine(previous as T, element as T)); |
93 return result as T; | 93 return result as T; |
94 } | 94 } |
95 | 95 |
96 Stream<T> skipWhile(bool test(T element)) => | 96 Stream<T> skipWhile(bool test(T element)) => |
97 new TypeSafeStream<T>(_stream.skipWhile(_validateType(test))); | 97 new TypeSafeStream<T>(_stream.skipWhile(_validateType(test))); |
98 | 98 |
99 Stream<T> takeWhile(bool test(T element)) => | 99 Stream<T> takeWhile(bool test(T element)) => |
100 new TypeSafeStream<T>(_stream.takeWhile(_validateType(test))); | 100 new TypeSafeStream<T>(_stream.takeWhile(_validateType(test))); |
101 | 101 |
102 Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) => | 102 Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) => |
103 new TypeSafeStream<T>(_stream.timeout( | 103 new TypeSafeStream<T>(_stream.timeout( |
104 timeLimit, | 104 timeLimit, |
105 onTimeout: (sink) => onTimeout(DelegatingEventSink.typed(sink)))); | 105 onTimeout: (sink) => onTimeout(DelegatingEventSink.typed(sink)))); |
106 | 106 |
107 Future<List<T>> toList() async => | 107 Future<List<T>> toList() async => |
108 DelegatingList.typed/*<T>*/(await _stream.toList()); | 108 DelegatingList.typed<T>(await _stream.toList()); |
109 | 109 |
110 Future<Set<T>> toSet() async => | 110 Future<Set<T>> toSet() async => |
111 DelegatingSet.typed/*<T>*/(await _stream.toSet()); | 111 DelegatingSet.typed<T>(await _stream.toSet()); |
112 | 112 |
113 // Don't forward to `_stream.transform` because we want the transformer to see | 113 // Don't forward to `_stream.transform` because we want the transformer to see |
114 // the type-asserted stream. | 114 // the type-asserted stream. |
115 Stream/*<S>*/ transform/*<S>*/( | 115 Stream<S> transform<S>( |
116 StreamTransformer<T, dynamic/*=S*/> transformer) => | 116 StreamTransformer<T, S> transformer) => |
117 transformer.bind(this); | 117 transformer.bind(this); |
118 | 118 |
119 Stream<T> where(bool test(T element)) => | 119 Stream<T> where(bool test(T element)) => |
120 new TypeSafeStream<T>(_stream.where(_validateType(test))); | 120 new TypeSafeStream<T>(_stream.where(_validateType(test))); |
121 | 121 |
122 Future<bool> every(bool test(T element)) => | 122 Future<bool> every(bool test(T element)) => |
123 _stream.every(_validateType(test)); | 123 _stream.every(_validateType(test)); |
124 | 124 |
125 Future<bool> any(bool test(T element)) => _stream.any(_validateType(test)); | 125 Future<bool> any(bool test(T element)) => _stream.any(_validateType(test)); |
126 Stream<T> skip(int count) => new TypeSafeStream<T>(_stream.skip(count)); | 126 Stream<T> skip(int count) => new TypeSafeStream<T>(_stream.skip(count)); |
127 Stream<T> take(int count) => new TypeSafeStream<T>(_stream.take(count)); | 127 Stream<T> take(int count) => new TypeSafeStream<T>(_stream.take(count)); |
128 Future<T> elementAt(int index) async => (await _stream.elementAt(index)) as T; | 128 Future<T> elementAt(int index) async => (await _stream.elementAt(index)) as T; |
129 Future<bool> contains(Object needle) => _stream.contains(needle); | 129 Future<bool> contains(Object needle) => _stream.contains(needle); |
130 Future<String> join([String separator = ""]) => _stream.join(separator); | 130 Future<String> join([String separator = ""]) => _stream.join(separator); |
131 String toString() => _stream.toString(); | 131 String toString() => _stream.toString(); |
132 | 132 |
133 /// Returns a version of [function] that asserts that its argument is an | 133 /// Returns a version of [function] that asserts that its argument is an |
134 /// instance of `T`. | 134 /// instance of `T`. |
135 UnaryFunction/*<dynamic, S>*/ _validateType/*<S>*/( | 135 UnaryFunction<dynamic, S> _validateType<S>( |
136 /*=S*/ function(T value)) => | 136 S function(T value)) => |
137 function == null ? null : (value) => function(value as T); | 137 function == null ? null : (value) => function(value as T); |
138 } | 138 } |
OLD | NEW |