OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 import 'dart:async'; |
| 6 |
| 7 import 'package:collection/collection.dart'; |
| 8 |
| 9 import '../utils.dart'; |
| 10 import 'stream_subscription.dart'; |
| 11 |
| 12 class TypeSafeStream<T> implements Stream<T> { |
| 13 final Stream _stream; |
| 14 |
| 15 Future<T> get first async => (await _stream.first) as T; |
| 16 Future<T> get last async => (await _stream.last) as T; |
| 17 Future<T> get single async => (await _stream.single) as T; |
| 18 |
| 19 bool get isBroadcast => _stream.isBroadcast; |
| 20 Future<bool> get isEmpty => _stream.isEmpty; |
| 21 Future<int> get length => _stream.length; |
| 22 |
| 23 TypeSafeStream(this._stream); |
| 24 |
| 25 Stream<T> asBroadcastStream( |
| 26 {void onListen(StreamSubscription<T> subscription), |
| 27 void onCancel(StreamSubscription<T> subscription)}) { |
| 28 return new TypeSafeStream<T>(_stream.asBroadcastStream( |
| 29 onListen: onListen == null |
| 30 ? null |
| 31 : (subscription) => |
| 32 onListen(new TypeSafeStreamSubscription<T>(subscription)), |
| 33 onCancel: onCancel == null |
| 34 ? null |
| 35 : (subscription) => |
| 36 onCancel(new TypeSafeStreamSubscription<T>(subscription)))); |
| 37 } |
| 38 |
| 39 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. |
| 40 Stream asyncExpand(Stream convert(T event)) => |
| 41 _stream.asyncExpand(_validateType(convert)); |
| 42 |
| 43 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. |
| 44 Stream asyncMap(convert(T event)) => _stream.asyncMap(_validateType(convert)); |
| 45 |
| 46 Stream<T> distinct([bool equals(T previous, T next)]) => |
| 47 new TypeSafeStream<T>(_stream.distinct(equals == null |
| 48 ? null |
| 49 : (previous, next) => equals(previous as T, next as T))); |
| 50 |
| 51 // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed. |
| 52 Future drain([futureValue]) => _stream.drain(futureValue); |
| 53 |
| 54 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) => |
| 55 _stream.expand(_validateType(convert)); |
| 56 |
| 57 Future firstWhere(bool test(T element), {Object defaultValue()}) => |
| 58 _stream.firstWhere(_validateType(test), defaultValue: defaultValue); |
| 59 |
| 60 Future lastWhere(bool test(T element), {Object defaultValue()}) => |
| 61 _stream.lastWhere(_validateType(test), defaultValue: defaultValue); |
| 62 |
| 63 Future<T> singleWhere(bool test(T element)) async => |
| 64 (await _stream.singleWhere(_validateType(test))) as T; |
| 65 |
| 66 Future/*<S>*/ fold/*<S>*/(/*=S*/ initialValue, |
| 67 /*=S*/ combine(/*=S*/ previous, T element)) => |
| 68 _stream.fold(initialValue, |
| 69 (previous, element) => combine(previous, element as T)); |
| 70 |
| 71 Future forEach(void action(T element)) => |
| 72 _stream.forEach(_validateType(action)); |
| 73 |
| 74 Stream<T> handleError(Function onError, {bool test(error)}) => |
| 75 new TypeSafeStream<T>(_stream.handleError(onError, test: test)); |
| 76 |
| 77 StreamSubscription<T> listen(void onData(T value), |
| 78 {Function onError, void onDone(), bool cancelOnError}) => |
| 79 new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData), |
| 80 onError: onError, onDone: onDone, cancelOnError: cancelOnError)); |
| 81 |
| 82 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) => |
| 83 _stream.map(_validateType(convert)); |
| 84 |
| 85 // Don't forward to `_stream.pipe` because we want the consumer to see the |
| 86 // type-asserted stream. |
| 87 Future pipe(StreamConsumer<T> consumer) => |
| 88 consumer.addStream(this).then((_) => consumer.close()); |
| 89 |
| 90 Future<T> reduce(T combine(T previous, T element)) async { |
| 91 var result = await _stream.reduce( |
| 92 (previous, element) => combine(previous as T, element as T)); |
| 93 return result as T; |
| 94 } |
| 95 |
| 96 Stream<T> skipWhile(bool test(T element)) => |
| 97 new TypeSafeStream<T>(_stream.skipWhile(_validateType(test))); |
| 98 |
| 99 Stream<T> takeWhile(bool test(T element)) => |
| 100 new TypeSafeStream<T>(_stream.takeWhile(_validateType(test))); |
| 101 |
| 102 Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) => |
| 103 _stream.timeout(timeLimit, onTimeout: onTimeout); |
| 104 |
| 105 Future<List<T>> toList() async => |
| 106 DelegatingList.typed/*<T>*/(await _stream.toList()); |
| 107 |
| 108 Future<Set<T>> toSet() async => |
| 109 DelegatingSet.typed/*<T>*/(await _stream.toSet()); |
| 110 |
| 111 // Don't forward to `_stream.transform` because we want the transformer to see |
| 112 // the type-asserted stream. |
| 113 Stream/*<S>*/ transform/*<S>*/( |
| 114 StreamTransformer<T, dynamic/*=S*/> transformer) => |
| 115 transformer.bind(this); |
| 116 |
| 117 Stream<T> where(bool test(T element)) => |
| 118 new TypeSafeStream<T>(_stream.where(_validateType(test))); |
| 119 |
| 120 Future<bool> every(bool test(T element)) => |
| 121 _stream.every(_validateType(test)); |
| 122 |
| 123 Future<bool> any(bool test(T element)) => _stream.any(_validateType(test)); |
| 124 Stream<T> skip(int count) => new TypeSafeStream<T>(_stream.skip(count)); |
| 125 Stream<T> take(int count) => new TypeSafeStream<T>(_stream.take(count)); |
| 126 Future<T> elementAt(int index) async => (await _stream.elementAt(index)) as T; |
| 127 Future<bool> contains(Object needle) => _stream.contains(needle); |
| 128 Future<String> join([String separator = ""]) => _stream.join(separator); |
| 129 String toString() => _stream.toString(); |
| 130 |
| 131 /// Returns a version of [function] that asserts that its argument is an |
| 132 /// instance of `T`. |
| 133 UnaryFunction/*<dynamic, S>*/ _validateType/*<S>*/( |
| 134 /*=S*/ function(T value)) => |
| 135 function == null ? null : (value) => function(value as T); |
| 136 } |
OLD | NEW |