| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** | |
| 8 * Utility function to attach a stack trace to an [error] if it doesn't have | |
| 9 * one already. | |
| 10 */ | |
| 11 _asyncError(Object error, StackTrace stackTrace) { | |
| 12 if (stackTrace == null) return error; | |
| 13 if (getAttachedStackTrace(error) != null) return error; | |
| 14 _attachStackTrace(error, stackTrace); | |
| 15 return error; | |
| 16 } | |
| 17 | |
| 18 /** Runs user code and takes actions depending on success or failure. */ | 7 /** Runs user code and takes actions depending on success or failure. */ |
| 19 _runUserCode(userCode(), | 8 _runUserCode(userCode(), |
| 20 onSuccess(value), | 9 onSuccess(value), |
| 21 onError(error, StackTrace stackTrace)) { | 10 onError(error, StackTrace stackTrace)) { |
| 22 try { | 11 try { |
| 23 onSuccess(userCode()); | 12 onSuccess(userCode()); |
| 24 } catch (e, s) { | 13 } catch (e, s) { |
| 25 onError(_asyncError(e, s), s); | 14 onError(e, s); |
| 26 } | 15 } |
| 27 } | 16 } |
| 28 | 17 |
| 29 /** Helper function to cancel a subscription and wait for the potential future, | 18 /** Helper function to cancel a subscription and wait for the potential future, |
| 30 before completing with an error. */ | 19 before completing with an error. */ |
| 31 void _cancelAndError(StreamSubscription subscription, | 20 void _cancelAndError(StreamSubscription subscription, |
| 32 _Future future, | 21 _Future future, |
| 33 error, | 22 error, |
| 34 StackTrace stackTrace) { | 23 StackTrace stackTrace) { |
| 35 var cancelFuture = subscription.cancel(); | 24 var cancelFuture = subscription.cancel(); |
| (...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 180 final _Predicate<T> _test; | 169 final _Predicate<T> _test; |
| 181 | 170 |
| 182 _WhereStream(Stream<T> source, bool test(T value)) | 171 _WhereStream(Stream<T> source, bool test(T value)) |
| 183 : _test = test, super(source); | 172 : _test = test, super(source); |
| 184 | 173 |
| 185 void _handleData(T inputEvent, _EventSink<T> sink) { | 174 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 186 bool satisfies; | 175 bool satisfies; |
| 187 try { | 176 try { |
| 188 satisfies = _test(inputEvent); | 177 satisfies = _test(inputEvent); |
| 189 } catch (e, s) { | 178 } catch (e, s) { |
| 190 sink._addError(_asyncError(e, s), s); | 179 sink._addError(e, s); |
| 191 return; | 180 return; |
| 192 } | 181 } |
| 193 if (satisfies) { | 182 if (satisfies) { |
| 194 sink._add(inputEvent); | 183 sink._add(inputEvent); |
| 195 } | 184 } |
| 196 } | 185 } |
| 197 } | 186 } |
| 198 | 187 |
| 199 | 188 |
| 200 typedef T _Transformation<S, T>(S value); | 189 typedef T _Transformation<S, T>(S value); |
| 201 | 190 |
| 202 /** | 191 /** |
| 203 * A stream pipe that converts data events before passing them on. | 192 * A stream pipe that converts data events before passing them on. |
| 204 */ | 193 */ |
| 205 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 194 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
| 206 final _Transformation _transform; | 195 final _Transformation _transform; |
| 207 | 196 |
| 208 _MapStream(Stream<S> source, T transform(S event)) | 197 _MapStream(Stream<S> source, T transform(S event)) |
| 209 : this._transform = transform, super(source); | 198 : this._transform = transform, super(source); |
| 210 | 199 |
| 211 void _handleData(S inputEvent, _EventSink<T> sink) { | 200 void _handleData(S inputEvent, _EventSink<T> sink) { |
| 212 T outputEvent; | 201 T outputEvent; |
| 213 try { | 202 try { |
| 214 outputEvent = _transform(inputEvent); | 203 outputEvent = _transform(inputEvent); |
| 215 } catch (e, s) { | 204 } catch (e, s) { |
| 216 sink._addError(_asyncError(e, s), s); | 205 sink._addError(e, s); |
| 217 return; | 206 return; |
| 218 } | 207 } |
| 219 sink._add(outputEvent); | 208 sink._add(outputEvent); |
| 220 } | 209 } |
| 221 } | 210 } |
| 222 | 211 |
| 223 /** | 212 /** |
| 224 * A stream pipe that converts data events before passing them on. | 213 * A stream pipe that converts data events before passing them on. |
| 225 */ | 214 */ |
| 226 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 215 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
| 227 final _Transformation<S, Iterable<T>> _expand; | 216 final _Transformation<S, Iterable<T>> _expand; |
| 228 | 217 |
| 229 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 218 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
| 230 : this._expand = expand, super(source); | 219 : this._expand = expand, super(source); |
| 231 | 220 |
| 232 void _handleData(S inputEvent, _EventSink<T> sink) { | 221 void _handleData(S inputEvent, _EventSink<T> sink) { |
| 233 try { | 222 try { |
| 234 for (T value in _expand(inputEvent)) { | 223 for (T value in _expand(inputEvent)) { |
| 235 sink._add(value); | 224 sink._add(value); |
| 236 } | 225 } |
| 237 } catch (e, s) { | 226 } catch (e, s) { |
| 238 // If either _expand or iterating the generated iterator throws, | 227 // If either _expand or iterating the generated iterator throws, |
| 239 // we abort the iteration. | 228 // we abort the iteration. |
| 240 sink._addError(_asyncError(e, s), s); | 229 sink._addError(e, s); |
| 241 } | 230 } |
| 242 } | 231 } |
| 243 } | 232 } |
| 244 | 233 |
| 245 | 234 |
| 246 typedef bool _ErrorTest(error); | 235 typedef bool _ErrorTest(error); |
| 247 | 236 |
| 248 /** | 237 /** |
| 249 * A stream pipe that converts or disposes error events | 238 * A stream pipe that converts or disposes error events |
| 250 * before passing them on. | 239 * before passing them on. |
| 251 */ | 240 */ |
| 252 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 241 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
| 253 final Function _transform; | 242 final Function _transform; |
| 254 final _ErrorTest _test; | 243 final _ErrorTest _test; |
| 255 | 244 |
| 256 _HandleErrorStream(Stream<T> source, | 245 _HandleErrorStream(Stream<T> source, |
| 257 Function onError, | 246 Function onError, |
| 258 bool test(error)) | 247 bool test(error)) |
| 259 : this._transform = onError, this._test = test, super(source); | 248 : this._transform = onError, this._test = test, super(source); |
| 260 | 249 |
| 261 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { | 250 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { |
| 262 bool matches = true; | 251 bool matches = true; |
| 263 if (_test != null) { | 252 if (_test != null) { |
| 264 try { | 253 try { |
| 265 matches = _test(error); | 254 matches = _test(error); |
| 266 } catch (e, s) { | 255 } catch (e, s) { |
| 267 sink._addError(_asyncError(e, s), s); | 256 sink._addError(e, s); |
| 268 return; | 257 return; |
| 269 } | 258 } |
| 270 } | 259 } |
| 271 if (matches) { | 260 if (matches) { |
| 272 try { | 261 try { |
| 273 _invokeErrorHandler(_transform, error, stackTrace); | 262 _invokeErrorHandler(_transform, error, stackTrace); |
| 274 } catch (e, s) { | 263 } catch (e, s) { |
| 275 if (identical(e, error)) { | 264 if (identical(e, error)) { |
| 276 sink._addError(error, stackTrace); | 265 sink._addError(error, stackTrace); |
| 277 } else { | 266 } else { |
| 278 sink._addError(_asyncError(e, s), s); | 267 sink._addError(e, s); |
| 279 } | 268 } |
| 280 return; | 269 return; |
| 281 } | 270 } |
| 282 } else { | 271 } else { |
| 283 sink._addError(error, stackTrace); | 272 sink._addError(error, stackTrace); |
| 284 } | 273 } |
| 285 } | 274 } |
| 286 } | 275 } |
| 287 | 276 |
| 288 | 277 |
| (...skipping 25 matching lines...) Expand all Loading... |
| 314 final _Predicate<T> _test; | 303 final _Predicate<T> _test; |
| 315 | 304 |
| 316 _TakeWhileStream(Stream<T> source, bool test(T value)) | 305 _TakeWhileStream(Stream<T> source, bool test(T value)) |
| 317 : this._test = test, super(source); | 306 : this._test = test, super(source); |
| 318 | 307 |
| 319 void _handleData(T inputEvent, _EventSink<T> sink) { | 308 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 320 bool satisfies; | 309 bool satisfies; |
| 321 try { | 310 try { |
| 322 satisfies = _test(inputEvent); | 311 satisfies = _test(inputEvent); |
| 323 } catch (e, s) { | 312 } catch (e, s) { |
| 324 sink._addError(_asyncError(e, s), s); | 313 sink._addError(e, s); |
| 325 // The test didn't say true. Didn't say false either, but we stop anyway. | 314 // The test didn't say true. Didn't say false either, but we stop anyway. |
| 326 sink._close(); | 315 sink._close(); |
| 327 return; | 316 return; |
| 328 } | 317 } |
| 329 if (satisfies) { | 318 if (satisfies) { |
| 330 sink._add(inputEvent); | 319 sink._add(inputEvent); |
| 331 } else { | 320 } else { |
| 332 sink._close(); | 321 sink._close(); |
| 333 } | 322 } |
| 334 } | 323 } |
| (...skipping 27 matching lines...) Expand all Loading... |
| 362 | 351 |
| 363 void _handleData(T inputEvent, _EventSink<T> sink) { | 352 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 364 if (_hasFailed) { | 353 if (_hasFailed) { |
| 365 sink._add(inputEvent); | 354 sink._add(inputEvent); |
| 366 return; | 355 return; |
| 367 } | 356 } |
| 368 bool satisfies; | 357 bool satisfies; |
| 369 try { | 358 try { |
| 370 satisfies = _test(inputEvent); | 359 satisfies = _test(inputEvent); |
| 371 } catch (e, s) { | 360 } catch (e, s) { |
| 372 sink._addError(_asyncError(e, s), s); | 361 sink._addError(e, s); |
| 373 // A failure to return a boolean is considered "not matching". | 362 // A failure to return a boolean is considered "not matching". |
| 374 _hasFailed = true; | 363 _hasFailed = true; |
| 375 return; | 364 return; |
| 376 } | 365 } |
| 377 if (!satisfies) { | 366 if (!satisfies) { |
| 378 _hasFailed = true; | 367 _hasFailed = true; |
| 379 sink._add(inputEvent); | 368 sink._add(inputEvent); |
| 380 } | 369 } |
| 381 } | 370 } |
| 382 } | 371 } |
| (...skipping 15 matching lines...) Expand all Loading... |
| 398 return sink._add(inputEvent); | 387 return sink._add(inputEvent); |
| 399 } else { | 388 } else { |
| 400 bool isEqual; | 389 bool isEqual; |
| 401 try { | 390 try { |
| 402 if (_equals == null) { | 391 if (_equals == null) { |
| 403 isEqual = (_previous == inputEvent); | 392 isEqual = (_previous == inputEvent); |
| 404 } else { | 393 } else { |
| 405 isEqual = _equals(_previous, inputEvent); | 394 isEqual = _equals(_previous, inputEvent); |
| 406 } | 395 } |
| 407 } catch (e, s) { | 396 } catch (e, s) { |
| 408 sink._addError(_asyncError(e, s), s); | 397 sink._addError(e, s); |
| 409 return null; | 398 return null; |
| 410 } | 399 } |
| 411 if (!isEqual) { | 400 if (!isEqual) { |
| 412 sink._add(inputEvent); | 401 sink._add(inputEvent); |
| 413 _previous = inputEvent; | 402 _previous = inputEvent; |
| 414 } | 403 } |
| 415 } | 404 } |
| 416 } | 405 } |
| 417 } | 406 } |
| OLD | NEW |