| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Runs user code and takes actions depending on success or failure. */ | 7 /** Runs user code and takes actions depending on success or failure. */ |
| 8 _runUserCode(userCode(), | 8 _runUserCode(userCode(), |
| 9 onSuccess(value), | 9 onSuccess(value), |
| 10 onError(error, StackTrace stackTrace)) { | 10 onError(error, StackTrace stackTrace)) { |
| (...skipping 29 matching lines...) Expand all Loading... |
| 40 _Future future, | 40 _Future future, |
| 41 error, StackTrace stackTrace) { | 41 error, StackTrace stackTrace) { |
| 42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | 42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); |
| 43 if (replacement != null) { | 43 if (replacement != null) { |
| 44 error = _nonNullError(replacement.error); | 44 error = _nonNullError(replacement.error); |
| 45 stackTrace = replacement.stackTrace; | 45 stackTrace = replacement.stackTrace; |
| 46 } | 46 } |
| 47 _cancelAndError(subscription, future, error, stackTrace); | 47 _cancelAndError(subscription, future, error, stackTrace); |
| 48 } | 48 } |
| 49 | 49 |
| 50 typedef void _ErrorCallback(error, StackTrace stackTrace); |
| 51 |
| 50 /** Helper function to make an onError argument to [_runUserCode]. */ | 52 /** Helper function to make an onError argument to [_runUserCode]. */ |
| 51 _cancelAndErrorClosure(StreamSubscription subscription, _Future future) => | 53 _ErrorCallback _cancelAndErrorClosure( |
| 52 ((error, StackTrace stackTrace) => _cancelAndError( | 54 StreamSubscription subscription, _Future future) { |
| 53 subscription, future, error, stackTrace)); | 55 return (error, StackTrace stackTrace) { |
| 56 _cancelAndError(subscription, future, error, stackTrace); |
| 57 }; |
| 58 } |
| 54 | 59 |
| 55 /** Helper function to cancel a subscription and wait for the potential future, | 60 /** Helper function to cancel a subscription and wait for the potential future, |
| 56 before completing with a value. */ | 61 before completing with a value. */ |
| 57 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { | 62 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { |
| 58 var cancelFuture = subscription.cancel(); | 63 var cancelFuture = subscription.cancel(); |
| 59 if (cancelFuture is Future) { | 64 if (cancelFuture is Future) { |
| 60 cancelFuture.whenComplete(() => future._complete(value)); | 65 cancelFuture.whenComplete(() => future._complete(value)); |
| 61 } else { | 66 } else { |
| 62 future._complete(value); | 67 future._complete(value); |
| 63 } | 68 } |
| (...skipping 29 matching lines...) Expand all Loading... |
| 93 Function onError, | 98 Function onError, |
| 94 void onDone(), | 99 void onDone(), |
| 95 bool cancelOnError) { | 100 bool cancelOnError) { |
| 96 return new _ForwardingStreamSubscription<S, T>( | 101 return new _ForwardingStreamSubscription<S, T>( |
| 97 this, onData, onError, onDone, cancelOnError); | 102 this, onData, onError, onDone, cancelOnError); |
| 98 } | 103 } |
| 99 | 104 |
| 100 // Override the following methods in subclasses to change the behavior. | 105 // Override the following methods in subclasses to change the behavior. |
| 101 | 106 |
| 102 void _handleData(S data, _EventSink<T> sink) { | 107 void _handleData(S data, _EventSink<T> sink) { |
| 103 dynamic outputData = data; | 108 sink._add(data as Object /*=T*/); |
| 104 sink._add(outputData); | |
| 105 } | 109 } |
| 106 | 110 |
| 107 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { | 111 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { |
| 108 sink._addError(error, stackTrace); | 112 sink._addError(error, stackTrace); |
| 109 } | 113 } |
| 110 | 114 |
| 111 void _handleDone(_EventSink<T> sink) { | 115 void _handleDone(_EventSink<T> sink) { |
| 112 sink._close(); | 116 sink._close(); |
| 113 } | 117 } |
| 114 } | 118 } |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 154 | 158 |
| 155 void _onResume() { | 159 void _onResume() { |
| 156 if (_subscription == null) return; | 160 if (_subscription == null) return; |
| 157 _subscription.resume(); | 161 _subscription.resume(); |
| 158 } | 162 } |
| 159 | 163 |
| 160 Future _onCancel() { | 164 Future _onCancel() { |
| 161 if (_subscription != null) { | 165 if (_subscription != null) { |
| 162 StreamSubscription subscription = _subscription; | 166 StreamSubscription subscription = _subscription; |
| 163 _subscription = null; | 167 _subscription = null; |
| 164 subscription.cancel(); | 168 return subscription.cancel(); |
| 165 } | 169 } |
| 166 return null; | 170 return null; |
| 167 } | 171 } |
| 168 | 172 |
| 169 // Methods used as listener on source subscription. | 173 // Methods used as listener on source subscription. |
| 170 | 174 |
| 171 void _handleData(S data) { | 175 void _handleData(S data) { |
| 172 _stream._handleData(data, this); | 176 _stream._handleData(data, this); |
| 173 } | 177 } |
| 174 | 178 |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 217 } | 221 } |
| 218 } | 222 } |
| 219 | 223 |
| 220 | 224 |
| 221 typedef T _Transformation<S, T>(S value); | 225 typedef T _Transformation<S, T>(S value); |
| 222 | 226 |
| 223 /** | 227 /** |
| 224 * A stream pipe that converts data events before passing them on. | 228 * A stream pipe that converts data events before passing them on. |
| 225 */ | 229 */ |
| 226 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 230 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
| 227 final _Transformation _transform; | 231 final _Transformation<S, T> _transform; |
| 228 | 232 |
| 229 _MapStream(Stream<S> source, T transform(S event)) | 233 _MapStream(Stream<S> source, T transform(S event)) |
| 230 : this._transform = transform, super(source); | 234 : this._transform = transform, super(source); |
| 231 | 235 |
| 232 void _handleData(S inputEvent, _EventSink<T> sink) { | 236 void _handleData(S inputEvent, _EventSink<T> sink) { |
| 233 T outputEvent; | 237 T outputEvent; |
| 234 try { | 238 try { |
| 235 outputEvent = _transform(inputEvent); | 239 outputEvent = _transform(inputEvent); |
| 236 } catch (e, s) { | 240 } catch (e, s) { |
| 237 _addErrorWithReplacement(sink, e, s); | 241 _addErrorWithReplacement(sink, e, s); |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 301 return; | 305 return; |
| 302 } | 306 } |
| 303 } else { | 307 } else { |
| 304 sink._addError(error, stackTrace); | 308 sink._addError(error, stackTrace); |
| 305 } | 309 } |
| 306 } | 310 } |
| 307 } | 311 } |
| 308 | 312 |
| 309 | 313 |
| 310 class _TakeStream<T> extends _ForwardingStream<T, T> { | 314 class _TakeStream<T> extends _ForwardingStream<T, T> { |
| 311 int _remaining; | 315 final int _count; |
| 312 | 316 |
| 313 _TakeStream(Stream<T> source, int count) | 317 _TakeStream(Stream<T> source, int count) |
| 314 : this._remaining = count, super(source) { | 318 : this._count = count, super(source) { |
| 315 // This test is done early to avoid handling an async error | 319 // This test is done early to avoid handling an async error |
| 316 // in the _handleData method. | 320 // in the _handleData method. |
| 317 if (count is! int) throw new ArgumentError(count); | 321 if (count is! int) throw new ArgumentError(count); |
| 318 } | 322 } |
| 319 | 323 |
| 324 StreamSubscription<T> _createSubscription( |
| 325 void onData(T data), |
| 326 Function onError, |
| 327 void onDone(), |
| 328 bool cancelOnError) { |
| 329 return new _StateStreamSubscription<T>( |
| 330 this, onData, onError, onDone, cancelOnError, _count); |
| 331 } |
| 332 |
| 320 void _handleData(T inputEvent, _EventSink<T> sink) { | 333 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 321 if (_remaining > 0) { | 334 _StateStreamSubscription<T> subscription = sink; |
| 335 int count = subscription._count; |
| 336 if (count > 0) { |
| 322 sink._add(inputEvent); | 337 sink._add(inputEvent); |
| 323 _remaining -= 1; | 338 count -= 1; |
| 324 if (_remaining == 0) { | 339 subscription._count = count; |
| 340 if (count == 0) { |
| 325 // Closing also unsubscribes all subscribers, which unsubscribes | 341 // Closing also unsubscribes all subscribers, which unsubscribes |
| 326 // this from source. | 342 // this from source. |
| 327 sink._close(); | 343 sink._close(); |
| 328 } | 344 } |
| 329 } | 345 } |
| 330 } | 346 } |
| 331 } | 347 } |
| 332 | 348 |
| 349 /** |
| 350 * A [_ForwardingStreamSubscription] with one extra state field. |
| 351 * |
| 352 * Use by several different classes, some storing an integer, others a bool. |
| 353 */ |
| 354 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { |
| 355 // Raw state field. Typed access provided by getters and setters below. |
| 356 var _sharedState; |
| 357 |
| 358 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), |
| 359 Function onError, void onDone(), |
| 360 bool cancelOnError, this._sharedState) |
| 361 : super(stream, onData, onError, onDone, cancelOnError); |
| 362 |
| 363 bool get _flag => _sharedState; |
| 364 void set _flag(bool flag) { _sharedState = flag; } |
| 365 int get _count => _sharedState; |
| 366 void set _count(int count) { _sharedState = count; } |
| 367 } |
| 368 |
| 333 | 369 |
| 334 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 370 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
| 335 final _Predicate<T> _test; | 371 final _Predicate<T> _test; |
| 336 | 372 |
| 337 _TakeWhileStream(Stream<T> source, bool test(T value)) | 373 _TakeWhileStream(Stream<T> source, bool test(T value)) |
| 338 : this._test = test, super(source); | 374 : this._test = test, super(source); |
| 339 | 375 |
| 340 void _handleData(T inputEvent, _EventSink<T> sink) { | 376 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 341 bool satisfies; | 377 bool satisfies; |
| 342 try { | 378 try { |
| 343 satisfies = _test(inputEvent); | 379 satisfies = _test(inputEvent); |
| 344 } catch (e, s) { | 380 } catch (e, s) { |
| 345 _addErrorWithReplacement(sink, e, s); | 381 _addErrorWithReplacement(sink, e, s); |
| 346 // The test didn't say true. Didn't say false either, but we stop anyway. | 382 // The test didn't say true. Didn't say false either, but we stop anyway. |
| 347 sink._close(); | 383 sink._close(); |
| 348 return; | 384 return; |
| 349 } | 385 } |
| 350 if (satisfies) { | 386 if (satisfies) { |
| 351 sink._add(inputEvent); | 387 sink._add(inputEvent); |
| 352 } else { | 388 } else { |
| 353 sink._close(); | 389 sink._close(); |
| 354 } | 390 } |
| 355 } | 391 } |
| 356 } | 392 } |
| 357 | 393 |
| 358 class _SkipStream<T> extends _ForwardingStream<T, T> { | 394 class _SkipStream<T> extends _ForwardingStream<T, T> { |
| 359 int _remaining; | 395 final int _count; |
| 360 | 396 |
| 361 _SkipStream(Stream<T> source, int count) | 397 _SkipStream(Stream<T> source, int count) |
| 362 : this._remaining = count, super(source) { | 398 : this._count = count, super(source) { |
| 363 // This test is done early to avoid handling an async error | 399 // This test is done early to avoid handling an async error |
| 364 // in the _handleData method. | 400 // in the _handleData method. |
| 365 if (count is! int || count < 0) throw new ArgumentError(count); | 401 if (count is! int || count < 0) throw new ArgumentError(count); |
| 366 } | 402 } |
| 367 | 403 |
| 404 StreamSubscription<T> _createSubscription( |
| 405 void onData(T data), |
| 406 Function onError, |
| 407 void onDone(), |
| 408 bool cancelOnError) { |
| 409 return new _StateStreamSubscription<T>( |
| 410 this, onData, onError, onDone, cancelOnError, _count); |
| 411 } |
| 412 |
| 368 void _handleData(T inputEvent, _EventSink<T> sink) { | 413 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 369 if (_remaining > 0) { | 414 _StateStreamSubscription<T> subscription = sink; |
| 370 _remaining--; | 415 int count = subscription._count; |
| 416 if (count > 0) { |
| 417 subscription._count = count - 1; |
| 371 return; | 418 return; |
| 372 } | 419 } |
| 373 sink._add(inputEvent); | 420 sink._add(inputEvent); |
| 374 } | 421 } |
| 375 } | 422 } |
| 376 | 423 |
| 377 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | 424 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
| 378 final _Predicate<T> _test; | 425 final _Predicate<T> _test; |
| 379 bool _hasFailed = false; | |
| 380 | 426 |
| 381 _SkipWhileStream(Stream<T> source, bool test(T value)) | 427 _SkipWhileStream(Stream<T> source, bool test(T value)) |
| 382 : this._test = test, super(source); | 428 : this._test = test, super(source); |
| 383 | 429 |
| 430 StreamSubscription<T> _createSubscription( |
| 431 void onData(T data), |
| 432 Function onError, |
| 433 void onDone(), |
| 434 bool cancelOnError) { |
| 435 return new _StateStreamSubscription<T>( |
| 436 this, onData, onError, onDone, cancelOnError, false); |
| 437 } |
| 438 |
| 384 void _handleData(T inputEvent, _EventSink<T> sink) { | 439 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 385 if (_hasFailed) { | 440 _StateStreamSubscription<T> subscription = sink; |
| 441 bool hasFailed = subscription._flag; |
| 442 if (hasFailed) { |
| 386 sink._add(inputEvent); | 443 sink._add(inputEvent); |
| 387 return; | 444 return; |
| 388 } | 445 } |
| 389 bool satisfies; | 446 bool satisfies; |
| 390 try { | 447 try { |
| 391 satisfies = _test(inputEvent); | 448 satisfies = _test(inputEvent); |
| 392 } catch (e, s) { | 449 } catch (e, s) { |
| 393 _addErrorWithReplacement(sink, e, s); | 450 _addErrorWithReplacement(sink, e, s); |
| 394 // A failure to return a boolean is considered "not matching". | 451 // A failure to return a boolean is considered "not matching". |
| 395 _hasFailed = true; | 452 subscription._flag = true; |
| 396 return; | 453 return; |
| 397 } | 454 } |
| 398 if (!satisfies) { | 455 if (!satisfies) { |
| 399 _hasFailed = true; | 456 subscription._flag = true; |
| 400 sink._add(inputEvent); | 457 sink._add(inputEvent); |
| 401 } | 458 } |
| 402 } | 459 } |
| 403 } | 460 } |
| 404 | 461 |
| 405 typedef bool _Equality<T>(T a, T b); | 462 typedef bool _Equality<T>(T a, T b); |
| 406 | 463 |
| 407 class _DistinctStream<T> extends _ForwardingStream<T, T> { | 464 class _DistinctStream<T> extends _ForwardingStream<T, T> { |
| 408 static var _SENTINEL = new Object(); | 465 static var _SENTINEL = new Object(); |
| 409 | 466 |
| 410 _Equality<T> _equals; | 467 _Equality<T> _equals; |
| 411 var _previous = _SENTINEL; | 468 var _previous = _SENTINEL; |
| 412 | 469 |
| 413 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | 470 _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
| 414 : _equals = equals, super(source); | 471 : _equals = equals, super(source); |
| 415 | 472 |
| 416 void _handleData(T inputEvent, _EventSink<T> sink) { | 473 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 417 if (identical(_previous, _SENTINEL)) { | 474 if (identical(_previous, _SENTINEL)) { |
| 418 _previous = inputEvent; | 475 _previous = inputEvent; |
| 419 return sink._add(inputEvent); | 476 return sink._add(inputEvent); |
| 420 } else { | 477 } else { |
| 421 bool isEqual; | 478 bool isEqual; |
| 422 try { | 479 try { |
| 423 if (_equals == null) { | 480 if (_equals == null) { |
| 424 isEqual = (_previous == inputEvent); | 481 isEqual = (_previous == inputEvent); |
| 425 } else { | 482 } else { |
| 426 isEqual = _equals(_previous, inputEvent); | 483 isEqual = _equals(_previous as Object /*=T*/, inputEvent); |
| 427 } | 484 } |
| 428 } catch (e, s) { | 485 } catch (e, s) { |
| 429 _addErrorWithReplacement(sink, e, s); | 486 _addErrorWithReplacement(sink, e, s); |
| 430 return null; | 487 return null; |
| 431 } | 488 } |
| 432 if (!isEqual) { | 489 if (!isEqual) { |
| 433 sink._add(inputEvent); | 490 sink._add(inputEvent); |
| 434 _previous = inputEvent; | 491 _previous = inputEvent; |
| 435 } | 492 } |
| 436 } | 493 } |
| 437 } | 494 } |
| 438 } | 495 } |
| OLD | NEW |