OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
184 | 184 |
185 /** | 185 /** |
186 * Adds a subscription to this stream. | 186 * Adds a subscription to this stream. |
187 * | 187 * |
188 * On each data event from this stream, the subscriber's [onData] handler | 188 * On each data event from this stream, the subscriber's [onData] handler |
189 * is called. If [onData] is null, nothing happens. | 189 * is called. If [onData] is null, nothing happens. |
190 * | 190 * |
191 * On errors from this stream, the [onError] handler is given a | 191 * On errors from this stream, the [onError] handler is given a |
192 * object describing the error. | 192 * object describing the error. |
193 * | 193 * |
| 194 * The [onError] callback must be of type `void onError(error)` or |
| 195 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts |
| 196 * two arguments it is called with the stack trace (which could be `null` if |
| 197 * the stream itself received an error without stack trace). |
| 198 * Otherwise it is called with just the error object. |
| 199 * |
194 * If this stream closes, the [onDone] handler is called. | 200 * If this stream closes, the [onDone] handler is called. |
195 * | 201 * |
196 * If [cancelOnError] is true, the subscription is ended when | 202 * If [cancelOnError] is true, the subscription is ended when |
197 * the first error is reported. The default is false. | 203 * the first error is reported. The default is false. |
198 */ | 204 */ |
199 StreamSubscription<T> listen(void onData(T event), | 205 StreamSubscription<T> listen(void onData(T event), |
200 { void onError(error), | 206 { Function onError, |
201 void onDone(), | 207 void onDone(), |
202 bool cancelOnError}); | 208 bool cancelOnError}); |
203 | 209 |
204 /** | 210 /** |
205 * Creates a new stream from this stream that discards some data events. | 211 * Creates a new stream from this stream that discards some data events. |
206 * | 212 * |
207 * The new stream sends the same error and done events as this stream, | 213 * The new stream sends the same error and done events as this stream, |
208 * but it only sends the data events that satisfy the [test]. | 214 * but it only sends the data events that satisfy the [test]. |
209 */ | 215 */ |
210 Stream<T> where(bool test(T event)) { | 216 Stream<T> where(bool test(T event)) { |
211 return new _WhereStream<T>(this, test); | 217 return new _WhereStream<T>(this, test); |
212 } | 218 } |
213 | 219 |
214 /** | 220 /** |
215 * Creates a new stream that converts each element of this stream | 221 * Creates a new stream that converts each element of this stream |
216 * to a new value using the [convert] function. | 222 * to a new value using the [convert] function. |
217 */ | 223 */ |
218 Stream map(convert(T event)) { | 224 Stream map(convert(T event)) { |
219 return new _MapStream<T, dynamic>(this, convert); | 225 return new _MapStream<T, dynamic>(this, convert); |
220 } | 226 } |
221 | 227 |
222 /** | 228 /** |
223 * Creates a wrapper Stream that intercepts some errors from this stream. | 229 * Creates a wrapper Stream that intercepts some errors from this stream. |
224 * | 230 * |
225 * If this stream sends an error that matches [test], then it is intercepted | 231 * If this stream sends an error that matches [test], then it is intercepted |
226 * by the [handle] function. | 232 * by the [handle] function. |
227 * | 233 * |
| 234 * The [onError] callback must be of type `void onError(error)` or |
| 235 * `void onError(error, StackTrace stackTrace)`. Depending on the function |
| 236 * type the the stream either invokes [onError] with or without a stack |
| 237 * trace. The stack trace argument might be `null` if the stream itself |
| 238 * received an error without stack trace. |
| 239 * |
228 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns | 240 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns |
229 * true. If [test] is omitted, every error is considered matching. | 241 * true. If [test] is omitted, every error is considered matching. |
230 * | 242 * |
231 * If the error is intercepted, the [handle] function can decide what to do | 243 * If the error is intercepted, the [handle] function can decide what to do |
232 * with it. It can throw if it wants to raise a new (or the same) error, | 244 * with it. It can throw if it wants to raise a new (or the same) error, |
233 * or simply return to make the stream forget the error. | 245 * or simply return to make the stream forget the error. |
234 * | 246 * |
235 * If you need to transform an error into a data event, use the more generic | 247 * If you need to transform an error into a data event, use the more generic |
236 * [Stream.transform] to handle the event by writing a data event to | 248 * [Stream.transform] to handle the event by writing a data event to |
237 * the output sink | 249 * the output sink |
238 */ | 250 */ |
239 Stream<T> handleError(void handle( error), { bool test(error) }) { | 251 Stream<T> handleError(Function onError, { bool test(error) }) { |
240 return new _HandleErrorStream<T>(this, handle, test); | 252 return new _HandleErrorStream<T>(this, onError, test); |
241 } | 253 } |
242 | 254 |
243 /** | 255 /** |
244 * Creates a new stream from this stream that converts each element | 256 * Creates a new stream from this stream that converts each element |
245 * into zero or more events. | 257 * into zero or more events. |
246 * | 258 * |
247 * Each incoming event is converted to an [Iterable] of new events, | 259 * Each incoming event is converted to an [Iterable] of new events, |
248 * and each of these new events are then sent by the returned stream | 260 * and each of these new events are then sent by the returned stream |
249 * in order. | 261 * in order. |
250 */ | 262 */ |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
306 var value = initialValue; | 318 var value = initialValue; |
307 StreamSubscription subscription; | 319 StreamSubscription subscription; |
308 subscription = this.listen( | 320 subscription = this.listen( |
309 (T element) { | 321 (T element) { |
310 _runUserCode( | 322 _runUserCode( |
311 () => combine(value, element), | 323 () => combine(value, element), |
312 (newValue) { value = newValue; }, | 324 (newValue) { value = newValue; }, |
313 _cancelAndError(subscription, result) | 325 _cancelAndError(subscription, result) |
314 ); | 326 ); |
315 }, | 327 }, |
316 onError: (e) { | 328 onError: (e, st) { |
317 result._completeError(e); | 329 result._completeError(e, st); |
318 }, | 330 }, |
319 onDone: () { | 331 onDone: () { |
320 result._complete(value); | 332 result._complete(value); |
321 }, | 333 }, |
322 cancelOnError: true); | 334 cancelOnError: true); |
323 return result; | 335 return result; |
324 } | 336 } |
325 | 337 |
326 /** | 338 /** |
327 * Collects string of data events' string representations. | 339 * Collects string of data events' string representations. |
(...skipping 551 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
879 * Cancels this subscription. It will no longer receive events. | 891 * Cancels this subscription. It will no longer receive events. |
880 * | 892 * |
881 * If an event is currently firing, this unsubscription will only | 893 * If an event is currently firing, this unsubscription will only |
882 * take effect after all subscribers have received the current event. | 894 * take effect after all subscribers have received the current event. |
883 */ | 895 */ |
884 void cancel(); | 896 void cancel(); |
885 | 897 |
886 /** Set or override the data event handler of this subscription. */ | 898 /** Set or override the data event handler of this subscription. */ |
887 void onData(void handleData(T data)); | 899 void onData(void handleData(T data)); |
888 | 900 |
889 /** Set or override the error event handler of this subscription. */ | 901 /** |
890 void onError(void handleError(error)); | 902 * Set or override the error event handler of this subscription. |
| 903 * |
| 904 * This method overrides the handler that has been set at the invocation of |
| 905 * [Stream.listen]. |
| 906 */ |
| 907 void onError(Function handleError); |
891 | 908 |
892 /** Set or override the done event handler of this subscription. */ | 909 /** Set or override the done event handler of this subscription. */ |
893 void onDone(void handleDone()); | 910 void onDone(void handleDone()); |
894 | 911 |
895 /** | 912 /** |
896 * Request that the stream pauses events until further notice. | 913 * Request that the stream pauses events until further notice. |
897 * | 914 * |
898 * If [resumeSignal] is provided, the stream will undo the pause | 915 * If [resumeSignal] is provided, the stream will undo the pause |
899 * when the future completes. If the future completes with an error, | 916 * when the future completes. If the future completes with an error, |
900 * it will not be handled! | 917 * it will not be handled! |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
951 | 968 |
952 StreamView(this._stream); | 969 StreamView(this._stream); |
953 | 970 |
954 bool get isBroadcast => _stream.isBroadcast; | 971 bool get isBroadcast => _stream.isBroadcast; |
955 | 972 |
956 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription), | 973 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription), |
957 void onCancel(StreamSubscription subscription)}) | 974 void onCancel(StreamSubscription subscription)}) |
958 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); | 975 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); |
959 | 976 |
960 StreamSubscription<T> listen(void onData(T value), | 977 StreamSubscription<T> listen(void onData(T value), |
961 { void onError(error), | 978 { Function onError, |
962 void onDone(), | 979 void onDone(), |
963 bool cancelOnError }) { | 980 bool cancelOnError }) { |
964 return _stream.listen(onData, onError: onError, onDone: onDone, | 981 return _stream.listen(onData, onError: onError, onDone: onDone, |
965 cancelOnError: cancelOnError); | 982 cancelOnError: cancelOnError); |
966 } | 983 } |
967 } | 984 } |
968 | 985 |
969 | 986 |
970 /** | 987 /** |
971 * The target of a [Stream.pipe] call. | 988 * The target of a [Stream.pipe] call. |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1034 * | 1051 * |
1035 * stringStream.transform(new StreamTransformer<String, String>( | 1052 * stringStream.transform(new StreamTransformer<String, String>( |
1036 * handleData: (String value, EventSink<String> sink) { | 1053 * handleData: (String value, EventSink<String> sink) { |
1037 * sink.add(value); | 1054 * sink.add(value); |
1038 * sink.add(value); // Duplicate the incoming events. | 1055 * sink.add(value); // Duplicate the incoming events. |
1039 * })); | 1056 * })); |
1040 * | 1057 * |
1041 */ | 1058 */ |
1042 factory StreamTransformer({ | 1059 factory StreamTransformer({ |
1043 void handleData(S data, EventSink<T> sink), | 1060 void handleData(S data, EventSink<T> sink), |
1044 void handleError(error, EventSink<T> sink), | 1061 Function handleError, |
1045 void handleDone(EventSink<T> sink)}) { | 1062 void handleDone(EventSink<T> sink)}) { |
1046 return new _StreamTransformerImpl<S, T>(handleData, | 1063 return new _StreamTransformerImpl<S, T>(handleData, |
1047 handleError, | 1064 handleError, |
1048 handleDone); | 1065 handleDone); |
1049 } | 1066 } |
1050 | 1067 |
1051 Stream<T> bind(Stream<S> stream); | 1068 Stream<T> bind(Stream<S> stream); |
1052 } | 1069 } |
1053 | 1070 |
1054 | 1071 |
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1127 * events on this stream. | 1144 * events on this stream. |
1128 */ | 1145 */ |
1129 class EventTransformStream<S, T> extends Stream<T> { | 1146 class EventTransformStream<S, T> extends Stream<T> { |
1130 final Stream<S> _source; | 1147 final Stream<S> _source; |
1131 final StreamEventTransformer _transformer; | 1148 final StreamEventTransformer _transformer; |
1132 EventTransformStream(Stream<S> source, | 1149 EventTransformStream(Stream<S> source, |
1133 StreamEventTransformer<S, T> transformer) | 1150 StreamEventTransformer<S, T> transformer) |
1134 : _source = source, _transformer = transformer; | 1151 : _source = source, _transformer = transformer; |
1135 | 1152 |
1136 StreamSubscription<T> listen(void onData(T data), | 1153 StreamSubscription<T> listen(void onData(T data), |
1137 { void onError(error), | 1154 { Function onError, |
1138 void onDone(), | 1155 void onDone(), |
1139 bool cancelOnError }) { | 1156 bool cancelOnError }) { |
1140 if (onData == null) onData = _nullDataHandler; | 1157 if (onData == null) onData = _nullDataHandler; |
1141 if (onError == null) onError = _nullErrorHandler; | 1158 if (onError == null) onError = _nullErrorHandler; |
1142 if (onDone == null) onDone = _nullDoneHandler; | 1159 if (onDone == null) onDone = _nullDoneHandler; |
1143 cancelOnError = identical(true, cancelOnError); | 1160 cancelOnError = identical(true, cancelOnError); |
1144 return new _EventTransformStreamSubscription(_source, _transformer, | 1161 return new _EventTransformStreamSubscription(_source, _transformer, |
1145 onData, onError, onDone, | 1162 onData, onError, onDone, |
1146 cancelOnError); | 1163 cancelOnError); |
1147 } | 1164 } |
1148 } | 1165 } |
1149 | 1166 |
1150 class _EventTransformStreamSubscription<S, T> | 1167 class _EventTransformStreamSubscription<S, T> |
1151 extends _BufferingStreamSubscription<T> { | 1168 extends _BufferingStreamSubscription<T> { |
1152 /** The transformer used to transform events. */ | 1169 /** The transformer used to transform events. */ |
1153 final StreamEventTransformer<S, T> _transformer; | 1170 final StreamEventTransformer<S, T> _transformer; |
1154 | 1171 |
1155 /** Whether this stream has sent a done event. */ | 1172 /** Whether this stream has sent a done event. */ |
1156 bool _isClosed = false; | 1173 bool _isClosed = false; |
1157 | 1174 |
1158 /** Source of incoming events. */ | 1175 /** Source of incoming events. */ |
1159 StreamSubscription<S> _subscription; | 1176 StreamSubscription<S> _subscription; |
1160 | 1177 |
1161 /** Cached EventSink wrapper for this class. */ | 1178 /** Cached EventSink wrapper for this class. */ |
1162 EventSink<T> _sink; | 1179 EventSink<T> _sink; |
1163 | 1180 |
1164 _EventTransformStreamSubscription(Stream<S> source, | 1181 _EventTransformStreamSubscription(Stream<S> source, |
1165 this._transformer, | 1182 this._transformer, |
1166 void onData(T data), | 1183 void onData(T data), |
1167 void onError(error), | 1184 Function onError, |
1168 void onDone(), | 1185 void onDone(), |
1169 bool cancelOnError) | 1186 bool cancelOnError) |
1170 : super(onData, onError, onDone, cancelOnError) { | 1187 : super(onData, onError, onDone, cancelOnError) { |
1171 _sink = new _EventSinkAdapter<T>(this); | 1188 _sink = new _EventSinkAdapter<T>(this); |
1172 _subscription = source.listen(_handleData, | 1189 _subscription = source.listen(_handleData, |
1173 onError: _handleError, | 1190 onError: _handleError, |
1174 onDone: _handleDone); | 1191 onDone: _handleDone); |
1175 } | 1192 } |
1176 | 1193 |
1177 /** Whether this subscription is still subscribed to its source. */ | 1194 /** Whether this subscription is still subscribed to its source. */ |
(...skipping 13 matching lines...) Expand all Loading... |
1191 _subscription = null; | 1208 _subscription = null; |
1192 subscription.cancel(); | 1209 subscription.cancel(); |
1193 } | 1210 } |
1194 _isClosed = true; | 1211 _isClosed = true; |
1195 } | 1212 } |
1196 | 1213 |
1197 void _handleData(S data) { | 1214 void _handleData(S data) { |
1198 try { | 1215 try { |
1199 _transformer.handleData(data, _sink); | 1216 _transformer.handleData(data, _sink); |
1200 } catch (e, s) { | 1217 } catch (e, s) { |
1201 _addError(_asyncError(e, s)); | 1218 _addError(_asyncError(e, s), s); |
1202 } | 1219 } |
1203 } | 1220 } |
1204 | 1221 |
1205 void _handleError(error) { | 1222 void _handleError(error, [stackTrace]) { |
1206 try { | 1223 try { |
1207 _transformer.handleError(error, _sink); | 1224 _transformer.handleError(error, _sink); |
1208 } catch (e, s) { | 1225 } catch (e, s) { |
1209 _addError(_asyncError(e, s)); | 1226 if (identical(e, error)) { |
| 1227 _addError(error, stackTrace); |
| 1228 } else { |
| 1229 _addError(_asyncError(e, s), s); |
| 1230 } |
1210 } | 1231 } |
1211 } | 1232 } |
1212 | 1233 |
1213 void _handleDone() { | 1234 void _handleDone() { |
1214 try { | 1235 try { |
1215 _subscription = null; | 1236 _subscription = null; |
1216 _transformer.handleDone(_sink); | 1237 _transformer.handleDone(_sink); |
1217 } catch (e, s) { | 1238 } catch (e, s) { |
1218 _addError(_asyncError(e, s)); | 1239 _addError(_asyncError(e, s), s); |
1219 } | 1240 } |
1220 } | 1241 } |
1221 } | 1242 } |
1222 | 1243 |
1223 class _EventSinkAdapter<T> implements EventSink<T> { | 1244 class _EventSinkAdapter<T> implements EventSink<T> { |
1224 _EventSink _sink; | 1245 _EventSink _sink; |
1225 _EventSinkAdapter(this._sink); | 1246 _EventSinkAdapter(this._sink); |
1226 | 1247 |
1227 void add(T data) { _sink._add(data); } | 1248 void add(T data) { _sink._add(data); } |
1228 void addError(error) { _sink._addError(error); } | 1249 void addError(error, [StackTrace stackTrace]) { |
| 1250 _sink._addError(error, stackTrace); |
| 1251 } |
1229 void close() { _sink._close(); } | 1252 void close() { _sink._close(); } |
1230 } | 1253 } |
1231 | 1254 |
1232 | 1255 |
1233 /** | 1256 /** |
1234 * An [Iterable] like interface for the values of a [Stream]. | 1257 * An [Iterable] like interface for the values of a [Stream]. |
1235 * | 1258 * |
1236 * This wraps a [Stream] and a subscription on the stream. It listens | 1259 * This wraps a [Stream] and a subscription on the stream. It listens |
1237 * on the stream, and completes the future returned by [moveNext] when the | 1260 * on the stream, and completes the future returned by [moveNext] when the |
1238 * next value becomes available. | 1261 * next value becomes available. |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1274 * | 1297 * |
1275 * If a [moveNext] call has been made, it will complete with `false` as value, | 1298 * If a [moveNext] call has been made, it will complete with `false` as value, |
1276 * as will all further calls to [moveNext]. | 1299 * as will all further calls to [moveNext]. |
1277 * | 1300 * |
1278 * If you need to stop listening for values before the stream iterator is | 1301 * If you need to stop listening for values before the stream iterator is |
1279 * automatically closed, you must call [cancel] to ensure that the stream | 1302 * automatically closed, you must call [cancel] to ensure that the stream |
1280 * is properly closed. | 1303 * is properly closed. |
1281 */ | 1304 */ |
1282 void cancel(); | 1305 void cancel(); |
1283 } | 1306 } |
OLD | NEW |