OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
184 | 184 |
185 /** | 185 /** |
186 * Adds a subscription to this stream. | 186 * Adds a subscription to this stream. |
187 * | 187 * |
188 * On each data event from this stream, the subscriber's [onData] handler | 188 * On each data event from this stream, the subscriber's [onData] handler |
189 * is called. If [onData] is null, nothing happens. | 189 * is called. If [onData] is null, nothing happens. |
190 * | 190 * |
191 * On errors from this stream, the [onError] handler is given a | 191 * On errors from this stream, the [onError] handler is given a |
192 * object describing the error. | 192 * object describing the error. |
193 * | 193 * |
194 * The [onError] callback must be of type `void onError(error)` or | |
195 * `void onError(error, StackTrace stackTrace)`. Depending on the function | |
196 * type the the stream either invokes [onError] with or without a stack | |
197 * trace. The stack trace argument might be `null` if the stream itself | |
198 * received an error without stack trace. | |
199 * | |
194 * If this stream closes, the [onDone] handler is called. | 200 * If this stream closes, the [onDone] handler is called. |
195 * | 201 * |
196 * If [cancelOnError] is true, the subscription is ended when | 202 * If [cancelOnError] is true, the subscription is ended when |
197 * the first error is reported. The default is false. | 203 * the first error is reported. The default is false. |
198 */ | 204 */ |
199 StreamSubscription<T> listen(void onData(T event), | 205 StreamSubscription<T> listen(void onData(T event), |
200 { void onError(error), | 206 { Function onError, |
201 void onDone(), | 207 void onDone(), |
202 bool cancelOnError}); | 208 bool cancelOnError}); |
203 | 209 |
204 /** | 210 /** |
205 * Creates a new stream from this stream that discards some data events. | 211 * Creates a new stream from this stream that discards some data events. |
206 * | 212 * |
207 * The new stream sends the same error and done events as this stream, | 213 * The new stream sends the same error and done events as this stream, |
208 * but it only sends the data events that satisfy the [test]. | 214 * but it only sends the data events that satisfy the [test]. |
209 */ | 215 */ |
210 Stream<T> where(bool test(T event)) { | 216 Stream<T> where(bool test(T event)) { |
211 return new _WhereStream<T>(this, test); | 217 return new _WhereStream<T>(this, test); |
212 } | 218 } |
213 | 219 |
214 /** | 220 /** |
215 * Creates a new stream that converts each element of this stream | 221 * Creates a new stream that converts each element of this stream |
216 * to a new value using the [convert] function. | 222 * to a new value using the [convert] function. |
217 */ | 223 */ |
218 Stream map(convert(T event)) { | 224 Stream map(convert(T event)) { |
219 return new _MapStream<T, dynamic>(this, convert); | 225 return new _MapStream<T, dynamic>(this, convert); |
220 } | 226 } |
221 | 227 |
222 /** | 228 /** |
223 * Creates a wrapper Stream that intercepts some errors from this stream. | 229 * Creates a wrapper Stream that intercepts some errors from this stream. |
224 * | 230 * |
225 * If this stream sends an error that matches [test], then it is intercepted | 231 * If this stream sends an error that matches [test], then it is intercepted |
226 * by the [handle] function. | 232 * by the [handle] function. |
227 * | 233 * |
234 * The [onError] callback must be of type `void onError(error)` or | |
235 * `void onError(error, StackTrace stackTrace)`. Depending on the function | |
236 * type the the stream either invokes [onError] with or without a stack | |
237 * trace. The stack trace argument might be `null` if the stream itself | |
238 * received an error without stack trace. | |
Lasse Reichstein Nielsen
2013/10/04 08:45:17
"without a stack trace".
What happens if onError
floitsch
2013/10/05 18:11:48
That's an error ;)
Lasse Reichstein Nielsen
2013/10/07 11:55:48
Ah, it's an error, but we don't say which error it
| |
239 * | |
228 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns | 240 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns |
229 * true. If [test] is omitted, every error is considered matching. | 241 * true. If [test] is omitted, every error is considered matching. |
230 * | 242 * |
231 * If the error is intercepted, the [handle] function can decide what to do | 243 * If the error is intercepted, the [handle] function can decide what to do |
232 * with it. It can throw if it wants to raise a new (or the same) error, | 244 * with it. It can throw if it wants to raise a new (or the same) error, |
233 * or simply return to make the stream forget the error. | 245 * or simply return to make the stream forget the error. |
234 * | 246 * |
235 * If you need to transform an error into a data event, use the more generic | 247 * If you need to transform an error into a data event, use the more generic |
236 * [Stream.transform] to handle the event by writing a data event to | 248 * [Stream.transform] to handle the event by writing a data event to |
237 * the output sink | 249 * the output sink |
238 */ | 250 */ |
239 Stream<T> handleError(void handle( error), { bool test(error) }) { | 251 Stream<T> handleError(Function onError, { bool test(error) }) { |
240 return new _HandleErrorStream<T>(this, handle, test); | 252 return new _HandleErrorStream<T>(this, onError, test); |
241 } | 253 } |
242 | 254 |
243 /** | 255 /** |
244 * Creates a new stream from this stream that converts each element | 256 * Creates a new stream from this stream that converts each element |
245 * into zero or more events. | 257 * into zero or more events. |
246 * | 258 * |
247 * Each incoming event is converted to an [Iterable] of new events, | 259 * Each incoming event is converted to an [Iterable] of new events, |
248 * and each of these new events are then sent by the returned stream | 260 * and each of these new events are then sent by the returned stream |
249 * in order. | 261 * in order. |
250 */ | 262 */ |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
306 var value = initialValue; | 318 var value = initialValue; |
307 StreamSubscription subscription; | 319 StreamSubscription subscription; |
308 subscription = this.listen( | 320 subscription = this.listen( |
309 (T element) { | 321 (T element) { |
310 _runUserCode( | 322 _runUserCode( |
311 () => combine(value, element), | 323 () => combine(value, element), |
312 (newValue) { value = newValue; }, | 324 (newValue) { value = newValue; }, |
313 _cancelAndError(subscription, result) | 325 _cancelAndError(subscription, result) |
314 ); | 326 ); |
315 }, | 327 }, |
316 onError: (e) { | 328 onError: (e, st) { |
317 result._completeError(e); | 329 result._completeError(e, st); |
318 }, | 330 }, |
319 onDone: () { | 331 onDone: () { |
320 result._complete(value); | 332 result._complete(value); |
321 }, | 333 }, |
322 cancelOnError: true); | 334 cancelOnError: true); |
323 return result; | 335 return result; |
324 } | 336 } |
325 | 337 |
326 /** | 338 /** |
327 * Collects string of data events' string representations. | 339 * Collects string of data events' string representations. |
(...skipping 552 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
880 * | 892 * |
881 * If an event is currently firing, this unsubscription will only | 893 * If an event is currently firing, this unsubscription will only |
882 * take effect after all subscribers have received the current event. | 894 * take effect after all subscribers have received the current event. |
883 */ | 895 */ |
884 void cancel(); | 896 void cancel(); |
885 | 897 |
886 /** Set or override the data event handler of this subscription. */ | 898 /** Set or override the data event handler of this subscription. */ |
887 void onData(void handleData(T data)); | 899 void onData(void handleData(T data)); |
888 | 900 |
889 /** Set or override the error event handler of this subscription. */ | 901 /** Set or override the error event handler of this subscription. */ |
890 void onError(void handleError(error)); | 902 void onError(Function handleError); |
891 | 903 |
892 /** Set or override the done event handler of this subscription. */ | 904 /** Set or override the done event handler of this subscription. */ |
893 void onDone(void handleDone()); | 905 void onDone(void handleDone()); |
894 | 906 |
895 /** | 907 /** |
896 * Request that the stream pauses events until further notice. | 908 * Request that the stream pauses events until further notice. |
897 * | 909 * |
898 * If [resumeSignal] is provided, the stream will undo the pause | 910 * If [resumeSignal] is provided, the stream will undo the pause |
899 * when the future completes. If the future completes with an error, | 911 * when the future completes. If the future completes with an error, |
900 * it will not be handled! | 912 * it will not be handled! |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
951 | 963 |
952 StreamView(this._stream); | 964 StreamView(this._stream); |
953 | 965 |
954 bool get isBroadcast => _stream.isBroadcast; | 966 bool get isBroadcast => _stream.isBroadcast; |
955 | 967 |
956 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription), | 968 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription), |
957 void onCancel(StreamSubscription subscription)}) | 969 void onCancel(StreamSubscription subscription)}) |
958 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); | 970 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); |
959 | 971 |
960 StreamSubscription<T> listen(void onData(T value), | 972 StreamSubscription<T> listen(void onData(T value), |
961 { void onError(error), | 973 { Function onError, |
962 void onDone(), | 974 void onDone(), |
963 bool cancelOnError }) { | 975 bool cancelOnError }) { |
964 return _stream.listen(onData, onError: onError, onDone: onDone, | 976 return _stream.listen(onData, onError: onError, onDone: onDone, |
965 cancelOnError: cancelOnError); | 977 cancelOnError: cancelOnError); |
966 } | 978 } |
967 } | 979 } |
968 | 980 |
969 | 981 |
970 /** | 982 /** |
971 * The target of a [Stream.pipe] call. | 983 * The target of a [Stream.pipe] call. |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1034 * | 1046 * |
1035 * stringStream.transform(new StreamTransformer<String, String>( | 1047 * stringStream.transform(new StreamTransformer<String, String>( |
1036 * handleData: (String value, EventSink<String> sink) { | 1048 * handleData: (String value, EventSink<String> sink) { |
1037 * sink.add(value); | 1049 * sink.add(value); |
1038 * sink.add(value); // Duplicate the incoming events. | 1050 * sink.add(value); // Duplicate the incoming events. |
1039 * })); | 1051 * })); |
1040 * | 1052 * |
1041 */ | 1053 */ |
1042 factory StreamTransformer({ | 1054 factory StreamTransformer({ |
1043 void handleData(S data, EventSink<T> sink), | 1055 void handleData(S data, EventSink<T> sink), |
1044 void handleError(error, EventSink<T> sink), | 1056 Function handleError, |
1045 void handleDone(EventSink<T> sink)}) { | 1057 void handleDone(EventSink<T> sink)}) { |
1046 return new _StreamTransformerImpl<S, T>(handleData, | 1058 return new _StreamTransformerImpl<S, T>(handleData, |
1047 handleError, | 1059 handleError, |
1048 handleDone); | 1060 handleDone); |
1049 } | 1061 } |
1050 | 1062 |
1051 Stream<T> bind(Stream<S> stream); | 1063 Stream<T> bind(Stream<S> stream); |
1052 } | 1064 } |
1053 | 1065 |
1054 | 1066 |
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1127 * events on this stream. | 1139 * events on this stream. |
1128 */ | 1140 */ |
1129 class EventTransformStream<S, T> extends Stream<T> { | 1141 class EventTransformStream<S, T> extends Stream<T> { |
1130 final Stream<S> _source; | 1142 final Stream<S> _source; |
1131 final StreamEventTransformer _transformer; | 1143 final StreamEventTransformer _transformer; |
1132 EventTransformStream(Stream<S> source, | 1144 EventTransformStream(Stream<S> source, |
1133 StreamEventTransformer<S, T> transformer) | 1145 StreamEventTransformer<S, T> transformer) |
1134 : _source = source, _transformer = transformer; | 1146 : _source = source, _transformer = transformer; |
1135 | 1147 |
1136 StreamSubscription<T> listen(void onData(T data), | 1148 StreamSubscription<T> listen(void onData(T data), |
1137 { void onError(error), | 1149 { Function onError, |
1138 void onDone(), | 1150 void onDone(), |
1139 bool cancelOnError }) { | 1151 bool cancelOnError }) { |
1140 if (onData == null) onData = _nullDataHandler; | 1152 if (onData == null) onData = _nullDataHandler; |
1141 if (onError == null) onError = _nullErrorHandler; | 1153 if (onError == null) onError = _nullErrorHandler; |
1142 if (onDone == null) onDone = _nullDoneHandler; | 1154 if (onDone == null) onDone = _nullDoneHandler; |
1143 cancelOnError = identical(true, cancelOnError); | 1155 cancelOnError = identical(true, cancelOnError); |
1144 return new _EventTransformStreamSubscription(_source, _transformer, | 1156 return new _EventTransformStreamSubscription(_source, _transformer, |
1145 onData, onError, onDone, | 1157 onData, onError, onDone, |
1146 cancelOnError); | 1158 cancelOnError); |
1147 } | 1159 } |
1148 } | 1160 } |
1149 | 1161 |
1150 class _EventTransformStreamSubscription<S, T> | 1162 class _EventTransformStreamSubscription<S, T> |
1151 extends _BufferingStreamSubscription<T> { | 1163 extends _BufferingStreamSubscription<T> { |
1152 /** The transformer used to transform events. */ | 1164 /** The transformer used to transform events. */ |
1153 final StreamEventTransformer<S, T> _transformer; | 1165 final StreamEventTransformer<S, T> _transformer; |
1154 | 1166 |
1155 /** Whether this stream has sent a done event. */ | 1167 /** Whether this stream has sent a done event. */ |
1156 bool _isClosed = false; | 1168 bool _isClosed = false; |
1157 | 1169 |
1158 /** Source of incoming events. */ | 1170 /** Source of incoming events. */ |
1159 StreamSubscription<S> _subscription; | 1171 StreamSubscription<S> _subscription; |
1160 | 1172 |
1161 /** Cached EventSink wrapper for this class. */ | 1173 /** Cached EventSink wrapper for this class. */ |
1162 EventSink<T> _sink; | 1174 EventSink<T> _sink; |
1163 | 1175 |
1164 _EventTransformStreamSubscription(Stream<S> source, | 1176 _EventTransformStreamSubscription(Stream<S> source, |
1165 this._transformer, | 1177 this._transformer, |
1166 void onData(T data), | 1178 void onData(T data), |
1167 void onError(error), | 1179 Function onError, |
1168 void onDone(), | 1180 void onDone(), |
1169 bool cancelOnError) | 1181 bool cancelOnError) |
1170 : super(onData, onError, onDone, cancelOnError) { | 1182 : super(onData, onError, onDone, cancelOnError) { |
1171 _sink = new _EventSinkAdapter<T>(this); | 1183 _sink = new _EventSinkAdapter<T>(this); |
1172 _subscription = source.listen(_handleData, | 1184 _subscription = source.listen(_handleData, |
1173 onError: _handleError, | 1185 onError: _handleError, |
1174 onDone: _handleDone); | 1186 onDone: _handleDone); |
1175 } | 1187 } |
1176 | 1188 |
1177 /** Whether this subscription is still subscribed to its source. */ | 1189 /** Whether this subscription is still subscribed to its source. */ |
(...skipping 13 matching lines...) Expand all Loading... | |
1191 _subscription = null; | 1203 _subscription = null; |
1192 subscription.cancel(); | 1204 subscription.cancel(); |
1193 } | 1205 } |
1194 _isClosed = true; | 1206 _isClosed = true; |
1195 } | 1207 } |
1196 | 1208 |
1197 void _handleData(S data) { | 1209 void _handleData(S data) { |
1198 try { | 1210 try { |
1199 _transformer.handleData(data, _sink); | 1211 _transformer.handleData(data, _sink); |
1200 } catch (e, s) { | 1212 } catch (e, s) { |
1201 _addError(_asyncError(e, s)); | 1213 _addError(_asyncError(e, s), s); |
1202 } | 1214 } |
1203 } | 1215 } |
1204 | 1216 |
1205 void _handleError(error) { | 1217 void _handleError(error, [stackTrace]) { |
1206 try { | 1218 try { |
1207 _transformer.handleError(error, _sink); | 1219 _transformer.handleError(error, _sink); |
1208 } catch (e, s) { | 1220 } catch (e, s) { |
1209 _addError(_asyncError(e, s)); | 1221 if (identical(e, error)) { |
1222 _addError(error, stackTrace); | |
1223 } else { | |
1224 _addError(_asyncError(e, s), s); | |
1225 } | |
1210 } | 1226 } |
1211 } | 1227 } |
1212 | 1228 |
1213 void _handleDone() { | 1229 void _handleDone() { |
1214 try { | 1230 try { |
1215 _subscription = null; | 1231 _subscription = null; |
1216 _transformer.handleDone(_sink); | 1232 _transformer.handleDone(_sink); |
1217 } catch (e, s) { | 1233 } catch (e, s) { |
1218 _addError(_asyncError(e, s)); | 1234 _addError(_asyncError(e, s), s); |
1219 } | 1235 } |
1220 } | 1236 } |
1221 } | 1237 } |
1222 | 1238 |
1223 class _EventSinkAdapter<T> implements EventSink<T> { | 1239 class _EventSinkAdapter<T> implements EventSink<T> { |
1224 _EventSink _sink; | 1240 _EventSink _sink; |
1225 _EventSinkAdapter(this._sink); | 1241 _EventSinkAdapter(this._sink); |
1226 | 1242 |
1227 void add(T data) { _sink._add(data); } | 1243 void add(T data) { _sink._add(data); } |
1228 void addError(error) { _sink._addError(error); } | 1244 void addError(error, [StackTrace stackTrace]) { |
1245 _sink._addError(error, stackTrace); | |
1246 } | |
1229 void close() { _sink._close(); } | 1247 void close() { _sink._close(); } |
1230 } | 1248 } |
1231 | 1249 |
1232 | 1250 |
1233 /** | 1251 /** |
1234 * An [Iterable] like interface for the values of a [Stream]. | 1252 * An [Iterable] like interface for the values of a [Stream]. |
1235 * | 1253 * |
1236 * This wraps a [Stream] and a subscription on the stream. It listens | 1254 * This wraps a [Stream] and a subscription on the stream. It listens |
1237 * on the stream, and completes the future returned by [moveNext] when the | 1255 * on the stream, and completes the future returned by [moveNext] when the |
1238 * next value becomes available. | 1256 * next value becomes available. |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1274 * | 1292 * |
1275 * If a [moveNext] call has been made, it will complete with `false` as value, | 1293 * If a [moveNext] call has been made, it will complete with `false` as value, |
1276 * as will all further calls to [moveNext]. | 1294 * as will all further calls to [moveNext]. |
1277 * | 1295 * |
1278 * If you need to stop listening for values before the stream iterator is | 1296 * If you need to stop listening for values before the stream iterator is |
1279 * automatically closed, you must call [cancel] to ensure that the stream | 1297 * automatically closed, you must call [cancel] to ensure that the stream |
1280 * is properly closed. | 1298 * is properly closed. |
1281 */ | 1299 */ |
1282 void cancel(); | 1300 void cancel(); |
1283 } | 1301 } |
OLD | NEW |