OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
140 }); | 140 }); |
141 }, | 141 }, |
142 onCancel: () { | 142 onCancel: () { |
143 if (timer != null) timer.cancel(); | 143 if (timer != null) timer.cancel(); |
144 timer = null; | 144 timer = null; |
145 }); | 145 }); |
146 return controller.stream; | 146 return controller.stream; |
147 } | 147 } |
148 | 148 |
149 /** | 149 /** |
150 * Creates a stream where all events of an existing stream are piped through | |
151 * a sink-transformation. | |
152 * | |
153 * The given [mapSink] closure is invoked when the returned stream is | |
154 * listened to. All events from the [source] are added into the event sink | |
155 * that is returned from the invocation. The transformation puts all | |
156 * transformed events into the sink the [mapSink] closure received during | |
157 * its invocation. Conceptually the [mapSink] creates a transformation pipe | |
158 * with the input sink being the returned [EventSink] and the output sink | |
159 * being the sink it received. | |
160 * | |
161 * This constructor is frequently used to build transformers. | |
162 * | |
163 * Example use for a duplicating transformer: | |
164 * | |
165 * class DuplicationSink implements EventSink<String> { | |
166 * final EventSink<String> _outputSink; | |
167 * DuplicationSink(this._outputSink); | |
168 * | |
169 * void add(String data) { | |
170 * _outputSink.add(data); | |
171 * _outputSink.add(data); | |
172 * } | |
173 * | |
174 * void addError(e, [st]) => _outputSink(e, st); | |
175 * void close() => _outputSink.close(); | |
176 * } | |
177 * | |
178 * class DuplicationTransformer implements StreamTransformer<String, Strin g> { | |
Lasse Reichstein Nielsen
2013/10/04 14:35:53
Long line.
floitsch
2013/10/05 18:47:40
I know, but since the actual code is less than 80
| |
179 * // Some generic types ommitted for brevety. | |
180 * Stream bind(Stream stream) => new Stream<String>.eventTransform( | |
181 * stream, | |
182 * (EventSink sink) => new DuplicationSink(sink)); | |
183 * } | |
184 * | |
185 * stringStream.transform(new DuplicationTransformer()); | |
186 */ | |
187 factory Stream.eventTransformed(Stream source, | |
188 EventSink mapSink(EventSink<T> sink)) { | |
189 return new _BoundSinkStream(source, mapSink); | |
190 } | |
191 | |
192 /** | |
150 * Reports whether this stream is a broadcast stream. | 193 * Reports whether this stream is a broadcast stream. |
151 */ | 194 */ |
152 bool get isBroadcast => false; | 195 bool get isBroadcast => false; |
153 | 196 |
154 /** | 197 /** |
155 * Returns a multi-subscription stream that produces the same events as this. | 198 * Returns a multi-subscription stream that produces the same events as this. |
156 * | 199 * |
157 * If this stream is already a broadcast stream, it is returned unmodified. | 200 * If this stream is already a broadcast stream, it is returned unmodified. |
158 * | 201 * |
159 * If this stream is single-subscription, return a new stream that allows | 202 * If this stream is single-subscription, return a new stream that allows |
(...skipping 784 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
944 } | 987 } |
945 | 988 |
946 | 989 |
947 /** | 990 /** |
948 * An interface that abstracts creation or handling of [Stream] events. | 991 * An interface that abstracts creation or handling of [Stream] events. |
949 */ | 992 */ |
950 abstract class EventSink<T> { | 993 abstract class EventSink<T> { |
951 /** Create a data event */ | 994 /** Create a data event */ |
952 void add(T event); | 995 void add(T event); |
953 /** Create an async error. */ | 996 /** Create an async error. */ |
954 void addError(errorEvent); | 997 void addError(errorEvent, [StackTrace stackTrace]); |
955 /** Request a stream to close. */ | 998 /** Request a stream to close. */ |
956 void close(); | 999 void close(); |
957 } | 1000 } |
958 | 1001 |
959 | 1002 |
960 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 1003 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
961 class StreamView<T> extends Stream<T> { | 1004 class StreamView<T> extends Stream<T> { |
962 Stream<T> _stream; | 1005 Stream<T> _stream; |
963 | 1006 |
964 StreamView(this._stream); | 1007 StreamView(this._stream); |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1025 */ | 1068 */ |
1026 Future get done; | 1069 Future get done; |
1027 } | 1070 } |
1028 | 1071 |
1029 | 1072 |
1030 /** | 1073 /** |
1031 * The target of a [Stream.transform] call. | 1074 * The target of a [Stream.transform] call. |
1032 * | 1075 * |
1033 * The [Stream.transform] call will pass itself to this object and then return | 1076 * The [Stream.transform] call will pass itself to this object and then return |
1034 * the resulting stream. | 1077 * the resulting stream. |
1078 * | |
1079 * It is good practice to create transformers that can be used multiple times. | |
1035 */ | 1080 */ |
1036 abstract class StreamTransformer<S, T> { | 1081 abstract class StreamTransformer<S, T> { |
1082 | |
1037 /** | 1083 /** |
1038 * Create a [StreamTransformer] that delegates events to the given functions. | 1084 * Creates a [StreamTransformer]. |
1039 * | 1085 * |
1040 * This is actually a [StreamEventTransformer] where the event handling is | 1086 * The returned instance takes responsibility of binding ([bind]) and only |
1041 * performed by the function arguments. | 1087 * invokes the given closure [transformer] when a user starts listening on |
1042 * If an argument is omitted, it acts as the corresponding default method from | 1088 * the bound stream. At that point the transformer should start listening |
1043 * [StreamEventTransformer]. | 1089 * to the bound stream (which is given as argument) and return a |
1090 * [StreamSubscription]. The bound stream-transformer then sets the handlers | |
1091 * it received as part of the `listen` call. | |
1044 * | 1092 * |
1045 * Example use: | 1093 * There are two common ways to create a StreamSubscription: |
1094 * | |
1095 * 1. by creating a new class that implements [StreamSubscription]. | |
1096 * Note that the subscription should run callbacks in the [Zone] the | |
1097 * stream was listened to. | |
1098 * 2. by allocating a [StreamController] and to return the result of | |
1099 * listening to its stream. | |
1100 * | |
1101 * Example use of a duplicating transformer: | |
1046 * | 1102 * |
1047 * stringStream.transform(new StreamTransformer<String, String>( | 1103 * stringStream.transform(new StreamTransformer<String, String>( |
1104 * (Stream<String> input, bool cancelOnError) { | |
1105 * StreamController<String> controller; | |
1106 * StreamSubscription<String> subscription; | |
1107 * controller = new StreamController<String>( | |
1108 * onListen: () { | |
1109 * subscription = input.listen((data) { | |
1110 * // Duplicate the data. | |
1111 * controller.add(data); | |
1112 * controller.add(data); | |
1113 * }, | |
1114 * onError: controller.addError, | |
1115 * onDone: controller.close, | |
1116 * cancelOnError: cancelOnError); | |
1117 * }, | |
1118 * onPause: subscription.pause, | |
1119 * onResume: subscription.resume, | |
1120 * onCancel: subscription.cancel, | |
1121 * sync: true); // One of the few places, where sync is correct. | |
1122 * return controller.stream.listen(null); | |
1123 * }); | |
1124 */ | |
1125 const factory StreamTransformer( | |
1126 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) | |
1127 = _StreamSubscriptionTransformer; | |
1128 | |
1129 /** | |
1130 * Creates a [StreamTransformer] that delegates events to the given functions. | |
1131 * | |
1132 * This constructor returns a transformer that can only be used once. | |
1133 * | |
1134 * Example use of a duplicating transformer: | |
1135 * | |
1136 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs( | |
1048 * handleData: (String value, EventSink<String> sink) { | 1137 * handleData: (String value, EventSink<String> sink) { |
1049 * sink.add(value); | 1138 * sink.add(value); |
1050 * sink.add(value); // Duplicate the incoming events. | 1139 * sink.add(value); // Duplicate the incoming events. |
1051 * })); | 1140 * })); |
1052 * | |
1053 */ | 1141 */ |
1054 factory StreamTransformer({ | 1142 factory StreamTransformer.fromHandlers({ |
1055 void handleData(S data, EventSink<T> sink), | 1143 void handleData(S data, EventSink<T> sink), |
1056 Function handleError, | 1144 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
1057 void handleDone(EventSink<T> sink)}) { | 1145 void handleDone(EventSink<T> sink)}) |
1058 return new _StreamTransformerImpl<S, T>(handleData, | 1146 = _StreamHandlerTransformer; |
1059 handleError, | |
1060 handleDone); | |
1061 } | |
1062 | 1147 |
1063 Stream<T> bind(Stream<S> stream); | 1148 Stream<T> bind(Stream<S> stream); |
1064 } | 1149 } |
1065 | 1150 |
1066 | |
1067 /** | |
1068 * Base class for transformers that modifies stream events. | |
1069 * | |
1070 * A [StreamEventTransformer] transforms incoming Stream | |
1071 * events of one kind into outgoing events of (possibly) another kind. | |
1072 * | |
1073 * Subscribing on the stream returned by [bind] is the same as subscribing on | |
1074 * the source stream, except that events are passed through the [transformer] | |
1075 * before being emitted. The transformer may generate any number and | |
1076 * types of events for each incoming event. Pauses on the returned | |
1077 * subscription are forwarded to this stream. | |
1078 * | |
1079 * An example that duplicates all data events: | |
1080 * | |
1081 * class DoubleTransformer<T> extends StreamEventTransformer<T, T> { | |
1082 * void handleData(T data, EventSink<T> sink) { | |
1083 * sink.add(value); | |
1084 * sink.add(value); | |
1085 * } | |
1086 * } | |
1087 * someTypeStream.transform(new DoubleTransformer<Type>()); | |
1088 * | |
1089 * The default implementations of the "handle" methods forward | |
1090 * the events unmodified. If using the default [handleData] the generic type [T] | |
1091 * needs to be assignable to [S]. | |
1092 */ | |
1093 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { | |
1094 const StreamEventTransformer(); | |
1095 | |
1096 Stream<T> bind(Stream<S> source) { | |
1097 return new EventTransformStream<S, T>(source, this); | |
1098 } | |
1099 | |
1100 /** | |
1101 * Act on incoming data event. | |
1102 * | |
1103 * The method may generate any number of events on the sink, but should | |
1104 * not throw. | |
1105 */ | |
1106 void handleData(S event, EventSink<T> sink) { | |
1107 var data = event; | |
1108 sink.add(data); | |
1109 } | |
1110 | |
1111 /** | |
1112 * Act on incoming error event. | |
1113 * | |
1114 * The method may generate any number of events on the sink, but should | |
1115 * not throw. | |
1116 */ | |
1117 void handleError(error, EventSink<T> sink) { | |
1118 sink.addError(error); | |
1119 } | |
1120 | |
1121 /** | |
1122 * Act on incoming done event. | |
1123 * | |
1124 * The method may generate any number of events on the sink, but should | |
1125 * not throw. | |
1126 */ | |
1127 void handleDone(EventSink<T> sink){ | |
1128 sink.close(); | |
1129 } | |
1130 } | |
1131 | |
1132 | |
1133 /** | |
1134 * Stream that transforms another stream by intercepting and replacing events. | |
1135 * | |
1136 * This [Stream] is a transformation of a source stream. Listening on this | |
1137 * stream is the same as listening on the source stream, except that events | |
1138 * are intercepted and modified by a [StreamEventTransformer] before becoming | |
1139 * events on this stream. | |
1140 */ | |
1141 class EventTransformStream<S, T> extends Stream<T> { | |
1142 final Stream<S> _source; | |
1143 final StreamEventTransformer _transformer; | |
1144 EventTransformStream(Stream<S> source, | |
1145 StreamEventTransformer<S, T> transformer) | |
1146 : _source = source, _transformer = transformer; | |
1147 | |
1148 StreamSubscription<T> listen(void onData(T data), | |
1149 { Function onError, | |
1150 void onDone(), | |
1151 bool cancelOnError }) { | |
1152 if (onData == null) onData = _nullDataHandler; | |
1153 if (onError == null) onError = _nullErrorHandler; | |
1154 if (onDone == null) onDone = _nullDoneHandler; | |
1155 cancelOnError = identical(true, cancelOnError); | |
1156 return new _EventTransformStreamSubscription(_source, _transformer, | |
1157 onData, onError, onDone, | |
1158 cancelOnError); | |
1159 } | |
1160 } | |
1161 | |
1162 class _EventTransformStreamSubscription<S, T> | |
1163 extends _BufferingStreamSubscription<T> { | |
1164 /** The transformer used to transform events. */ | |
1165 final StreamEventTransformer<S, T> _transformer; | |
1166 | |
1167 /** Whether this stream has sent a done event. */ | |
1168 bool _isClosed = false; | |
1169 | |
1170 /** Source of incoming events. */ | |
1171 StreamSubscription<S> _subscription; | |
1172 | |
1173 /** Cached EventSink wrapper for this class. */ | |
1174 EventSink<T> _sink; | |
1175 | |
1176 _EventTransformStreamSubscription(Stream<S> source, | |
1177 this._transformer, | |
1178 void onData(T data), | |
1179 Function onError, | |
1180 void onDone(), | |
1181 bool cancelOnError) | |
1182 : super(onData, onError, onDone, cancelOnError) { | |
1183 _sink = new _EventSinkAdapter<T>(this); | |
1184 _subscription = source.listen(_handleData, | |
1185 onError: _handleError, | |
1186 onDone: _handleDone); | |
1187 } | |
1188 | |
1189 /** Whether this subscription is still subscribed to its source. */ | |
1190 bool get _isSubscribed => _subscription != null; | |
1191 | |
1192 void _onPause() { | |
1193 if (_isSubscribed) _subscription.pause(); | |
1194 } | |
1195 | |
1196 void _onResume() { | |
1197 if (_isSubscribed) _subscription.resume(); | |
1198 } | |
1199 | |
1200 void _onCancel() { | |
1201 if (_isSubscribed) { | |
1202 StreamSubscription subscription = _subscription; | |
1203 _subscription = null; | |
1204 subscription.cancel(); | |
1205 } | |
1206 _isClosed = true; | |
1207 } | |
1208 | |
1209 void _handleData(S data) { | |
1210 try { | |
1211 _transformer.handleData(data, _sink); | |
1212 } catch (e, s) { | |
1213 _addError(_asyncError(e, s), s); | |
1214 } | |
1215 } | |
1216 | |
1217 void _handleError(error, [stackTrace]) { | |
1218 try { | |
1219 _transformer.handleError(error, _sink); | |
1220 } catch (e, s) { | |
1221 if (identical(e, error)) { | |
1222 _addError(error, stackTrace); | |
1223 } else { | |
1224 _addError(_asyncError(e, s), s); | |
1225 } | |
1226 } | |
1227 } | |
1228 | |
1229 void _handleDone() { | |
1230 try { | |
1231 _subscription = null; | |
1232 _transformer.handleDone(_sink); | |
1233 } catch (e, s) { | |
1234 _addError(_asyncError(e, s), s); | |
1235 } | |
1236 } | |
1237 } | |
1238 | |
1239 class _EventSinkAdapter<T> implements EventSink<T> { | |
1240 _EventSink _sink; | |
1241 _EventSinkAdapter(this._sink); | |
1242 | |
1243 void add(T data) { _sink._add(data); } | |
1244 void addError(error, [StackTrace stackTrace]) { | |
1245 _sink._addError(error, stackTrace); | |
1246 } | |
1247 void close() { _sink._close(); } | |
1248 } | |
1249 | |
1250 | |
1251 /** | 1151 /** |
1252 * An [Iterable] like interface for the values of a [Stream]. | 1152 * An [Iterable] like interface for the values of a [Stream]. |
1253 * | 1153 * |
1254 * This wraps a [Stream] and a subscription on the stream. It listens | 1154 * This wraps a [Stream] and a subscription on the stream. It listens |
1255 * on the stream, and completes the future returned by [moveNext] when the | 1155 * on the stream, and completes the future returned by [moveNext] when the |
1256 * next value becomes available. | 1156 * next value becomes available. |
1257 * | 1157 * |
1258 * NOTICE: This is a tentative design. This class may change. | 1158 * NOTICE: This is a tentative design. This class may change. |
1259 */ | 1159 */ |
1260 abstract class StreamIterator<T> { | 1160 abstract class StreamIterator<T> { |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1292 * | 1192 * |
1293 * If a [moveNext] call has been made, it will complete with `false` as value, | 1193 * If a [moveNext] call has been made, it will complete with `false` as value, |
1294 * as will all further calls to [moveNext]. | 1194 * as will all further calls to [moveNext]. |
1295 * | 1195 * |
1296 * If you need to stop listening for values before the stream iterator is | 1196 * If you need to stop listening for values before the stream iterator is |
1297 * automatically closed, you must call [cancel] to ensure that the stream | 1197 * automatically closed, you must call [cancel] to ensure that the stream |
1298 * is properly closed. | 1198 * is properly closed. |
1299 */ | 1199 */ |
1300 void cancel(); | 1200 void cancel(); |
1301 } | 1201 } |
OLD | NEW |