| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 184 | 184 |
| 185 /** | 185 /** |
| 186 * Adds a subscription to this stream. | 186 * Adds a subscription to this stream. |
| 187 * | 187 * |
| 188 * On each data event from this stream, the subscriber's [onData] handler | 188 * On each data event from this stream, the subscriber's [onData] handler |
| 189 * is called. If [onData] is null, nothing happens. | 189 * is called. If [onData] is null, nothing happens. |
| 190 * | 190 * |
| 191 * On errors from this stream, the [onError] handler is given a | 191 * On errors from this stream, the [onError] handler is given a |
| 192 * object describing the error. | 192 * object describing the error. |
| 193 * | 193 * |
| 194 * The [onError] callback must be of type `void onError(error)` or |
| 195 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts |
| 196 * two arguments it is called with the stack trace (which could be `null` if |
| 197 * the stream itself received an error without stack trace). |
| 198 * Otherwise it is called with just the error object. |
| 199 * |
| 194 * If this stream closes, the [onDone] handler is called. | 200 * If this stream closes, the [onDone] handler is called. |
| 195 * | 201 * |
| 196 * If [cancelOnError] is true, the subscription is ended when | 202 * If [cancelOnError] is true, the subscription is ended when |
| 197 * the first error is reported. The default is false. | 203 * the first error is reported. The default is false. |
| 198 */ | 204 */ |
| 199 StreamSubscription<T> listen(void onData(T event), | 205 StreamSubscription<T> listen(void onData(T event), |
| 200 { void onError(error), | 206 { Function onError, |
| 201 void onDone(), | 207 void onDone(), |
| 202 bool cancelOnError}); | 208 bool cancelOnError}); |
| 203 | 209 |
| 204 /** | 210 /** |
| 205 * Creates a new stream from this stream that discards some data events. | 211 * Creates a new stream from this stream that discards some data events. |
| 206 * | 212 * |
| 207 * The new stream sends the same error and done events as this stream, | 213 * The new stream sends the same error and done events as this stream, |
| 208 * but it only sends the data events that satisfy the [test]. | 214 * but it only sends the data events that satisfy the [test]. |
| 209 */ | 215 */ |
| 210 Stream<T> where(bool test(T event)) { | 216 Stream<T> where(bool test(T event)) { |
| 211 return new _WhereStream<T>(this, test); | 217 return new _WhereStream<T>(this, test); |
| 212 } | 218 } |
| 213 | 219 |
| 214 /** | 220 /** |
| 215 * Creates a new stream that converts each element of this stream | 221 * Creates a new stream that converts each element of this stream |
| 216 * to a new value using the [convert] function. | 222 * to a new value using the [convert] function. |
| 217 */ | 223 */ |
| 218 Stream map(convert(T event)) { | 224 Stream map(convert(T event)) { |
| 219 return new _MapStream<T, dynamic>(this, convert); | 225 return new _MapStream<T, dynamic>(this, convert); |
| 220 } | 226 } |
| 221 | 227 |
| 222 /** | 228 /** |
| 223 * Creates a wrapper Stream that intercepts some errors from this stream. | 229 * Creates a wrapper Stream that intercepts some errors from this stream. |
| 224 * | 230 * |
| 225 * If this stream sends an error that matches [test], then it is intercepted | 231 * If this stream sends an error that matches [test], then it is intercepted |
| 226 * by the [handle] function. | 232 * by the [handle] function. |
| 227 * | 233 * |
| 234 * The [onError] callback must be of type `void onError(error)` or |
| 235 * `void onError(error, StackTrace stackTrace)`. Depending on the function |
| 236 * type the the stream either invokes [onError] with or without a stack |
| 237 * trace. The stack trace argument might be `null` if the stream itself |
| 238 * received an error without stack trace. |
| 239 * |
| 228 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns | 240 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns |
| 229 * true. If [test] is omitted, every error is considered matching. | 241 * true. If [test] is omitted, every error is considered matching. |
| 230 * | 242 * |
| 231 * If the error is intercepted, the [handle] function can decide what to do | 243 * If the error is intercepted, the [handle] function can decide what to do |
| 232 * with it. It can throw if it wants to raise a new (or the same) error, | 244 * with it. It can throw if it wants to raise a new (or the same) error, |
| 233 * or simply return to make the stream forget the error. | 245 * or simply return to make the stream forget the error. |
| 234 * | 246 * |
| 235 * If you need to transform an error into a data event, use the more generic | 247 * If you need to transform an error into a data event, use the more generic |
| 236 * [Stream.transform] to handle the event by writing a data event to | 248 * [Stream.transform] to handle the event by writing a data event to |
| 237 * the output sink | 249 * the output sink |
| 238 */ | 250 */ |
| 239 Stream<T> handleError(void handle( error), { bool test(error) }) { | 251 Stream<T> handleError(Function onError, { bool test(error) }) { |
| 240 return new _HandleErrorStream<T>(this, handle, test); | 252 return new _HandleErrorStream<T>(this, onError, test); |
| 241 } | 253 } |
| 242 | 254 |
| 243 /** | 255 /** |
| 244 * Creates a new stream from this stream that converts each element | 256 * Creates a new stream from this stream that converts each element |
| 245 * into zero or more events. | 257 * into zero or more events. |
| 246 * | 258 * |
| 247 * Each incoming event is converted to an [Iterable] of new events, | 259 * Each incoming event is converted to an [Iterable] of new events, |
| 248 * and each of these new events are then sent by the returned stream | 260 * and each of these new events are then sent by the returned stream |
| 249 * in order. | 261 * in order. |
| 250 */ | 262 */ |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 306 var value = initialValue; | 318 var value = initialValue; |
| 307 StreamSubscription subscription; | 319 StreamSubscription subscription; |
| 308 subscription = this.listen( | 320 subscription = this.listen( |
| 309 (T element) { | 321 (T element) { |
| 310 _runUserCode( | 322 _runUserCode( |
| 311 () => combine(value, element), | 323 () => combine(value, element), |
| 312 (newValue) { value = newValue; }, | 324 (newValue) { value = newValue; }, |
| 313 _cancelAndError(subscription, result) | 325 _cancelAndError(subscription, result) |
| 314 ); | 326 ); |
| 315 }, | 327 }, |
| 316 onError: (e) { | 328 onError: (e, st) { |
| 317 result._completeError(e); | 329 result._completeError(e, st); |
| 318 }, | 330 }, |
| 319 onDone: () { | 331 onDone: () { |
| 320 result._complete(value); | 332 result._complete(value); |
| 321 }, | 333 }, |
| 322 cancelOnError: true); | 334 cancelOnError: true); |
| 323 return result; | 335 return result; |
| 324 } | 336 } |
| 325 | 337 |
| 326 /** | 338 /** |
| 327 * Collects string of data events' string representations. | 339 * Collects string of data events' string representations. |
| (...skipping 551 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 879 * Cancels this subscription. It will no longer receive events. | 891 * Cancels this subscription. It will no longer receive events. |
| 880 * | 892 * |
| 881 * If an event is currently firing, this unsubscription will only | 893 * If an event is currently firing, this unsubscription will only |
| 882 * take effect after all subscribers have received the current event. | 894 * take effect after all subscribers have received the current event. |
| 883 */ | 895 */ |
| 884 void cancel(); | 896 void cancel(); |
| 885 | 897 |
| 886 /** Set or override the data event handler of this subscription. */ | 898 /** Set or override the data event handler of this subscription. */ |
| 887 void onData(void handleData(T data)); | 899 void onData(void handleData(T data)); |
| 888 | 900 |
| 889 /** Set or override the error event handler of this subscription. */ | 901 /** |
| 890 void onError(void handleError(error)); | 902 * Set or override the error event handler of this subscription. |
| 903 * |
| 904 * This method overrides the handler that has been set at the invocation of |
| 905 * [Stream.listen]. |
| 906 */ |
| 907 void onError(Function handleError); |
| 891 | 908 |
| 892 /** Set or override the done event handler of this subscription. */ | 909 /** Set or override the done event handler of this subscription. */ |
| 893 void onDone(void handleDone()); | 910 void onDone(void handleDone()); |
| 894 | 911 |
| 895 /** | 912 /** |
| 896 * Request that the stream pauses events until further notice. | 913 * Request that the stream pauses events until further notice. |
| 897 * | 914 * |
| 898 * If [resumeSignal] is provided, the stream will undo the pause | 915 * If [resumeSignal] is provided, the stream will undo the pause |
| 899 * when the future completes. If the future completes with an error, | 916 * when the future completes. If the future completes with an error, |
| 900 * it will not be handled! | 917 * it will not be handled! |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 951 | 968 |
| 952 StreamView(this._stream); | 969 StreamView(this._stream); |
| 953 | 970 |
| 954 bool get isBroadcast => _stream.isBroadcast; | 971 bool get isBroadcast => _stream.isBroadcast; |
| 955 | 972 |
| 956 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription), | 973 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription), |
| 957 void onCancel(StreamSubscription subscription)}) | 974 void onCancel(StreamSubscription subscription)}) |
| 958 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); | 975 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); |
| 959 | 976 |
| 960 StreamSubscription<T> listen(void onData(T value), | 977 StreamSubscription<T> listen(void onData(T value), |
| 961 { void onError(error), | 978 { Function onError, |
| 962 void onDone(), | 979 void onDone(), |
| 963 bool cancelOnError }) { | 980 bool cancelOnError }) { |
| 964 return _stream.listen(onData, onError: onError, onDone: onDone, | 981 return _stream.listen(onData, onError: onError, onDone: onDone, |
| 965 cancelOnError: cancelOnError); | 982 cancelOnError: cancelOnError); |
| 966 } | 983 } |
| 967 } | 984 } |
| 968 | 985 |
| 969 | 986 |
| 970 /** | 987 /** |
| 971 * The target of a [Stream.pipe] call. | 988 * The target of a [Stream.pipe] call. |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1034 * | 1051 * |
| 1035 * stringStream.transform(new StreamTransformer<String, String>( | 1052 * stringStream.transform(new StreamTransformer<String, String>( |
| 1036 * handleData: (String value, EventSink<String> sink) { | 1053 * handleData: (String value, EventSink<String> sink) { |
| 1037 * sink.add(value); | 1054 * sink.add(value); |
| 1038 * sink.add(value); // Duplicate the incoming events. | 1055 * sink.add(value); // Duplicate the incoming events. |
| 1039 * })); | 1056 * })); |
| 1040 * | 1057 * |
| 1041 */ | 1058 */ |
| 1042 factory StreamTransformer({ | 1059 factory StreamTransformer({ |
| 1043 void handleData(S data, EventSink<T> sink), | 1060 void handleData(S data, EventSink<T> sink), |
| 1044 void handleError(error, EventSink<T> sink), | 1061 Function handleError, |
| 1045 void handleDone(EventSink<T> sink)}) { | 1062 void handleDone(EventSink<T> sink)}) { |
| 1046 return new _StreamTransformerImpl<S, T>(handleData, | 1063 return new _StreamTransformerImpl<S, T>(handleData, |
| 1047 handleError, | 1064 handleError, |
| 1048 handleDone); | 1065 handleDone); |
| 1049 } | 1066 } |
| 1050 | 1067 |
| 1051 Stream<T> bind(Stream<S> stream); | 1068 Stream<T> bind(Stream<S> stream); |
| 1052 } | 1069 } |
| 1053 | 1070 |
| 1054 | 1071 |
| (...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1127 * events on this stream. | 1144 * events on this stream. |
| 1128 */ | 1145 */ |
| 1129 class EventTransformStream<S, T> extends Stream<T> { | 1146 class EventTransformStream<S, T> extends Stream<T> { |
| 1130 final Stream<S> _source; | 1147 final Stream<S> _source; |
| 1131 final StreamEventTransformer _transformer; | 1148 final StreamEventTransformer _transformer; |
| 1132 EventTransformStream(Stream<S> source, | 1149 EventTransformStream(Stream<S> source, |
| 1133 StreamEventTransformer<S, T> transformer) | 1150 StreamEventTransformer<S, T> transformer) |
| 1134 : _source = source, _transformer = transformer; | 1151 : _source = source, _transformer = transformer; |
| 1135 | 1152 |
| 1136 StreamSubscription<T> listen(void onData(T data), | 1153 StreamSubscription<T> listen(void onData(T data), |
| 1137 { void onError(error), | 1154 { Function onError, |
| 1138 void onDone(), | 1155 void onDone(), |
| 1139 bool cancelOnError }) { | 1156 bool cancelOnError }) { |
| 1140 if (onData == null) onData = _nullDataHandler; | 1157 if (onData == null) onData = _nullDataHandler; |
| 1141 if (onError == null) onError = _nullErrorHandler; | 1158 if (onError == null) onError = _nullErrorHandler; |
| 1142 if (onDone == null) onDone = _nullDoneHandler; | 1159 if (onDone == null) onDone = _nullDoneHandler; |
| 1143 cancelOnError = identical(true, cancelOnError); | 1160 cancelOnError = identical(true, cancelOnError); |
| 1144 return new _EventTransformStreamSubscription(_source, _transformer, | 1161 return new _EventTransformStreamSubscription(_source, _transformer, |
| 1145 onData, onError, onDone, | 1162 onData, onError, onDone, |
| 1146 cancelOnError); | 1163 cancelOnError); |
| 1147 } | 1164 } |
| 1148 } | 1165 } |
| 1149 | 1166 |
| 1150 class _EventTransformStreamSubscription<S, T> | 1167 class _EventTransformStreamSubscription<S, T> |
| 1151 extends _BufferingStreamSubscription<T> { | 1168 extends _BufferingStreamSubscription<T> { |
| 1152 /** The transformer used to transform events. */ | 1169 /** The transformer used to transform events. */ |
| 1153 final StreamEventTransformer<S, T> _transformer; | 1170 final StreamEventTransformer<S, T> _transformer; |
| 1154 | 1171 |
| 1155 /** Whether this stream has sent a done event. */ | 1172 /** Whether this stream has sent a done event. */ |
| 1156 bool _isClosed = false; | 1173 bool _isClosed = false; |
| 1157 | 1174 |
| 1158 /** Source of incoming events. */ | 1175 /** Source of incoming events. */ |
| 1159 StreamSubscription<S> _subscription; | 1176 StreamSubscription<S> _subscription; |
| 1160 | 1177 |
| 1161 /** Cached EventSink wrapper for this class. */ | 1178 /** Cached EventSink wrapper for this class. */ |
| 1162 EventSink<T> _sink; | 1179 EventSink<T> _sink; |
| 1163 | 1180 |
| 1164 _EventTransformStreamSubscription(Stream<S> source, | 1181 _EventTransformStreamSubscription(Stream<S> source, |
| 1165 this._transformer, | 1182 this._transformer, |
| 1166 void onData(T data), | 1183 void onData(T data), |
| 1167 void onError(error), | 1184 Function onError, |
| 1168 void onDone(), | 1185 void onDone(), |
| 1169 bool cancelOnError) | 1186 bool cancelOnError) |
| 1170 : super(onData, onError, onDone, cancelOnError) { | 1187 : super(onData, onError, onDone, cancelOnError) { |
| 1171 _sink = new _EventSinkAdapter<T>(this); | 1188 _sink = new _EventSinkAdapter<T>(this); |
| 1172 _subscription = source.listen(_handleData, | 1189 _subscription = source.listen(_handleData, |
| 1173 onError: _handleError, | 1190 onError: _handleError, |
| 1174 onDone: _handleDone); | 1191 onDone: _handleDone); |
| 1175 } | 1192 } |
| 1176 | 1193 |
| 1177 /** Whether this subscription is still subscribed to its source. */ | 1194 /** Whether this subscription is still subscribed to its source. */ |
| (...skipping 13 matching lines...) Expand all Loading... |
| 1191 _subscription = null; | 1208 _subscription = null; |
| 1192 subscription.cancel(); | 1209 subscription.cancel(); |
| 1193 } | 1210 } |
| 1194 _isClosed = true; | 1211 _isClosed = true; |
| 1195 } | 1212 } |
| 1196 | 1213 |
| 1197 void _handleData(S data) { | 1214 void _handleData(S data) { |
| 1198 try { | 1215 try { |
| 1199 _transformer.handleData(data, _sink); | 1216 _transformer.handleData(data, _sink); |
| 1200 } catch (e, s) { | 1217 } catch (e, s) { |
| 1201 _addError(_asyncError(e, s)); | 1218 _addError(_asyncError(e, s), s); |
| 1202 } | 1219 } |
| 1203 } | 1220 } |
| 1204 | 1221 |
| 1205 void _handleError(error) { | 1222 void _handleError(error, [stackTrace]) { |
| 1206 try { | 1223 try { |
| 1207 _transformer.handleError(error, _sink); | 1224 _transformer.handleError(error, _sink); |
| 1208 } catch (e, s) { | 1225 } catch (e, s) { |
| 1209 _addError(_asyncError(e, s)); | 1226 if (identical(e, error)) { |
| 1227 _addError(error, stackTrace); |
| 1228 } else { |
| 1229 _addError(_asyncError(e, s), s); |
| 1230 } |
| 1210 } | 1231 } |
| 1211 } | 1232 } |
| 1212 | 1233 |
| 1213 void _handleDone() { | 1234 void _handleDone() { |
| 1214 try { | 1235 try { |
| 1215 _subscription = null; | 1236 _subscription = null; |
| 1216 _transformer.handleDone(_sink); | 1237 _transformer.handleDone(_sink); |
| 1217 } catch (e, s) { | 1238 } catch (e, s) { |
| 1218 _addError(_asyncError(e, s)); | 1239 _addError(_asyncError(e, s), s); |
| 1219 } | 1240 } |
| 1220 } | 1241 } |
| 1221 } | 1242 } |
| 1222 | 1243 |
| 1223 class _EventSinkAdapter<T> implements EventSink<T> { | 1244 class _EventSinkAdapter<T> implements EventSink<T> { |
| 1224 _EventSink _sink; | 1245 _EventSink _sink; |
| 1225 _EventSinkAdapter(this._sink); | 1246 _EventSinkAdapter(this._sink); |
| 1226 | 1247 |
| 1227 void add(T data) { _sink._add(data); } | 1248 void add(T data) { _sink._add(data); } |
| 1228 void addError(error) { _sink._addError(error); } | 1249 void addError(error, [StackTrace stackTrace]) { |
| 1250 _sink._addError(error, stackTrace); |
| 1251 } |
| 1229 void close() { _sink._close(); } | 1252 void close() { _sink._close(); } |
| 1230 } | 1253 } |
| 1231 | 1254 |
| 1232 | 1255 |
| 1233 /** | 1256 /** |
| 1234 * An [Iterable] like interface for the values of a [Stream]. | 1257 * An [Iterable] like interface for the values of a [Stream]. |
| 1235 * | 1258 * |
| 1236 * This wraps a [Stream] and a subscription on the stream. It listens | 1259 * This wraps a [Stream] and a subscription on the stream. It listens |
| 1237 * on the stream, and completes the future returned by [moveNext] when the | 1260 * on the stream, and completes the future returned by [moveNext] when the |
| 1238 * next value becomes available. | 1261 * next value becomes available. |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1274 * | 1297 * |
| 1275 * If a [moveNext] call has been made, it will complete with `false` as value, | 1298 * If a [moveNext] call has been made, it will complete with `false` as value, |
| 1276 * as will all further calls to [moveNext]. | 1299 * as will all further calls to [moveNext]. |
| 1277 * | 1300 * |
| 1278 * If you need to stop listening for values before the stream iterator is | 1301 * If you need to stop listening for values before the stream iterator is |
| 1279 * automatically closed, you must call [cancel] to ensure that the stream | 1302 * automatically closed, you must call [cancel] to ensure that the stream |
| 1280 * is properly closed. | 1303 * is properly closed. |
| 1281 */ | 1304 */ |
| 1282 void cancel(); | 1305 void cancel(); |
| 1283 } | 1306 } |
| OLD | NEW |