Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 329 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 340 bool first = true; | 340 bool first = true; |
| 341 subscription = this.listen( | 341 subscription = this.listen( |
| 342 (T element) { | 342 (T element) { |
| 343 if (!first) { | 343 if (!first) { |
| 344 buffer.write(separator); | 344 buffer.write(separator); |
| 345 } | 345 } |
| 346 first = false; | 346 first = false; |
| 347 try { | 347 try { |
| 348 buffer.write(element); | 348 buffer.write(element); |
| 349 } catch (e, s) { | 349 } catch (e, s) { |
| 350 subscription.cancel(); | 350 subscription.cancel().whenComplete(() { |
|
floitsch
2013/07/12 16:42:34
Why `whenComplete` if `cancel` is guaranteed not t
Lasse Reichstein Nielsen
2013/07/16 12:03:31
Why would that cost more?
floitsch
2013/07/16 12:47:17
My mistake.
| |
| 351 result._setError(_asyncError(e, s)); | 351 result._setError(_asyncError(e, s)); |
| 352 }); | |
| 352 } | 353 } |
| 353 }, | 354 }, |
| 354 onError: (e) { | 355 onError: (e) { |
| 355 result._setError(e); | 356 result._setError(e); |
| 356 }, | 357 }, |
| 357 onDone: () { | 358 onDone: () { |
| 358 result._setValue(buffer.toString()); | 359 result._setValue(buffer.toString()); |
| 359 }, | 360 }, |
| 360 cancelOnError: true); | 361 cancelOnError: true); |
| 361 return result; | 362 return result; |
| 362 } | 363 } |
| 363 | 364 |
| 364 /** | 365 /** |
| 365 * Checks whether [needle] occurs in the elements provided by this stream. | 366 * Checks whether [needle] occurs in the elements provided by this stream. |
| 366 * | 367 * |
| 367 * Completes the [Future] when the answer is known. | 368 * Completes the [Future] when the answer is known. |
| 368 * If this stream reports an error, the [Future] will report that error. | 369 * If this stream reports an error, the [Future] will report that error. |
| 369 */ | 370 */ |
| 370 Future<bool> contains(Object needle) { | 371 Future<bool> contains(Object needle) { |
| 371 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 372 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 372 StreamSubscription subscription; | 373 StreamSubscription subscription; |
| 373 subscription = this.listen( | 374 subscription = this.listen( |
| 374 (T element) { | 375 (T element) { |
| 375 _runUserCode( | 376 _runUserCode( |
| 376 () => (element == needle), | 377 () => (element == needle), |
| 377 (bool isMatch) { | 378 (bool isMatch) { |
| 378 if (isMatch) { | 379 if (isMatch) { |
| 379 subscription.cancel(); | 380 subscription.cancel().whenComplete(() { |
| 380 future._setValue(true); | 381 future._setValue(true); |
| 382 }); | |
| 381 } | 383 } |
| 382 }, | 384 }, |
| 383 _cancelAndError(subscription, future) | 385 _cancelAndError(subscription, future) |
| 384 ); | 386 ); |
| 385 }, | 387 }, |
| 386 onError: future._setError, | 388 onError: future._setError, |
| 387 onDone: () { | 389 onDone: () { |
| 388 future._setValue(false); | 390 future._setValue(false); |
| 389 }, | 391 }, |
| 390 cancelOnError: true); | 392 cancelOnError: true); |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 425 */ | 427 */ |
| 426 Future<bool> every(bool test(T element)) { | 428 Future<bool> every(bool test(T element)) { |
| 427 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 429 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 428 StreamSubscription subscription; | 430 StreamSubscription subscription; |
| 429 subscription = this.listen( | 431 subscription = this.listen( |
| 430 (T element) { | 432 (T element) { |
| 431 _runUserCode( | 433 _runUserCode( |
| 432 () => test(element), | 434 () => test(element), |
| 433 (bool isMatch) { | 435 (bool isMatch) { |
| 434 if (!isMatch) { | 436 if (!isMatch) { |
| 435 subscription.cancel(); | 437 subscription.cancel().whenComplete(() { |
| 436 future._setValue(false); | 438 future._setValue(false); |
| 439 }); | |
| 437 } | 440 } |
| 438 }, | 441 }, |
| 439 _cancelAndError(subscription, future) | 442 _cancelAndError(subscription, future) |
| 440 ); | 443 ); |
| 441 }, | 444 }, |
| 442 onError: future._setError, | 445 onError: future._setError, |
| 443 onDone: () { | 446 onDone: () { |
| 444 future._setValue(true); | 447 future._setValue(true); |
| 445 }, | 448 }, |
| 446 cancelOnError: true); | 449 cancelOnError: true); |
| 447 return future; | 450 return future; |
| 448 } | 451 } |
| 449 | 452 |
| 450 /** | 453 /** |
| 451 * Checks whether [test] accepts any element provided by this stream. | 454 * Checks whether [test] accepts any element provided by this stream. |
| 452 * | 455 * |
| 453 * Completes the [Future] when the answer is known. | 456 * Completes the [Future] when the answer is known. |
| 454 * If this stream reports an error, the [Future] will report that error. | 457 * If this stream reports an error, the [Future] will report that error. |
| 455 */ | 458 */ |
| 456 Future<bool> any(bool test(T element)) { | 459 Future<bool> any(bool test(T element)) { |
| 457 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 460 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 458 StreamSubscription subscription; | 461 StreamSubscription subscription; |
| 459 subscription = this.listen( | 462 subscription = this.listen( |
| 460 (T element) { | 463 (T element) { |
| 461 _runUserCode( | 464 _runUserCode( |
| 462 () => test(element), | 465 () => test(element), |
| 463 (bool isMatch) { | 466 (bool isMatch) { |
| 464 if (isMatch) { | 467 if (isMatch) { |
| 465 subscription.cancel(); | 468 subscription.cancel().whenComplete(() { |
| 466 future._setValue(true); | 469 future._setValue(true); |
| 470 }); | |
| 467 } | 471 } |
| 468 }, | 472 }, |
| 469 _cancelAndError(subscription, future) | 473 _cancelAndError(subscription, future) |
| 470 ); | 474 ); |
| 471 }, | 475 }, |
| 472 onError: future._setError, | 476 onError: future._setError, |
| 473 onDone: () { | 477 onDone: () { |
| 474 future._setValue(false); | 478 future._setValue(false); |
| 475 }, | 479 }, |
| 476 cancelOnError: true); | 480 cancelOnError: true); |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 491 cancelOnError: true); | 495 cancelOnError: true); |
| 492 return future; | 496 return future; |
| 493 } | 497 } |
| 494 | 498 |
| 495 /** Reports whether this stream contains any elements. */ | 499 /** Reports whether this stream contains any elements. */ |
| 496 Future<bool> get isEmpty { | 500 Future<bool> get isEmpty { |
| 497 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 501 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 498 StreamSubscription subscription; | 502 StreamSubscription subscription; |
| 499 subscription = this.listen( | 503 subscription = this.listen( |
| 500 (_) { | 504 (_) { |
| 501 subscription.cancel(); | 505 subscription.cancel().whenComplete(() { |
| 502 future._setValue(false); | 506 future._setValue(false); |
| 507 }); | |
| 503 }, | 508 }, |
| 504 onError: future._setError, | 509 onError: future._setError, |
| 505 onDone: () { | 510 onDone: () { |
| 506 future._setValue(true); | 511 future._setValue(true); |
| 507 }, | 512 }, |
| 508 cancelOnError: true); | 513 cancelOnError: true); |
| 509 return future; | 514 return future; |
| 510 } | 515 } |
| 511 | 516 |
| 512 /** Collects the data of this stream in a [List]. */ | 517 /** Collects the data of this stream in a [List]. */ |
| (...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 616 * Returns the first element. | 621 * Returns the first element. |
| 617 * | 622 * |
| 618 * If [this] is empty throws a [StateError]. Otherwise this method is | 623 * If [this] is empty throws a [StateError]. Otherwise this method is |
| 619 * equivalent to [:this.elementAt(0):] | 624 * equivalent to [:this.elementAt(0):] |
| 620 */ | 625 */ |
| 621 Future<T> get first { | 626 Future<T> get first { |
| 622 _FutureImpl<T> future = new _FutureImpl<T>(); | 627 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 623 StreamSubscription subscription; | 628 StreamSubscription subscription; |
| 624 subscription = this.listen( | 629 subscription = this.listen( |
| 625 (T value) { | 630 (T value) { |
| 626 subscription.cancel(); | 631 subscription.cancel().whenComplete(() { |
| 627 future._setValue(value); | 632 future._setValue(value); |
| 628 return; | 633 }); |
| 629 }, | 634 }, |
| 630 onError: future._setError, | 635 onError: future._setError, |
| 631 onDone: () { | 636 onDone: () { |
| 632 future._setError(new StateError("No elements")); | 637 future._setError(new StateError("No elements")); |
| 633 }, | 638 }, |
| 634 cancelOnError: true); | 639 cancelOnError: true); |
| 635 return future; | 640 return future; |
| 636 } | 641 } |
| 637 | 642 |
| 638 /** | 643 /** |
| (...skipping 29 matching lines...) Expand all Loading... | |
| 668 * If [this] is empty or has more than one element throws a [StateError]. | 673 * If [this] is empty or has more than one element throws a [StateError]. |
| 669 */ | 674 */ |
| 670 Future<T> get single { | 675 Future<T> get single { |
| 671 _FutureImpl<T> future = new _FutureImpl<T>(); | 676 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 672 T result = null; | 677 T result = null; |
| 673 bool foundResult = false; | 678 bool foundResult = false; |
| 674 StreamSubscription subscription; | 679 StreamSubscription subscription; |
| 675 subscription = this.listen( | 680 subscription = this.listen( |
| 676 (T value) { | 681 (T value) { |
| 677 if (foundResult) { | 682 if (foundResult) { |
| 678 subscription.cancel(); | 683 subscription.cancel().whenComplete(() { |
| 679 // This is the second element we get. | 684 // This is the second element we get. |
| 680 Error error = new StateError("More than one element"); | 685 Error error = new StateError("More than one element"); |
| 681 future._setError(error); | 686 future._setError(error); |
| 687 }); | |
| 682 return; | 688 return; |
| 683 } | 689 } |
| 684 foundResult = true; | 690 foundResult = true; |
| 685 result = value; | 691 result = value; |
| 686 }, | 692 }, |
| 687 onError: future._setError, | 693 onError: future._setError, |
| 688 onDone: () { | 694 onDone: () { |
| 689 if (foundResult) { | 695 if (foundResult) { |
| 690 future._setValue(result); | 696 future._setValue(result); |
| 691 return; | 697 return; |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 712 */ | 718 */ |
| 713 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { | 719 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { |
| 714 _FutureImpl<dynamic> future = new _FutureImpl(); | 720 _FutureImpl<dynamic> future = new _FutureImpl(); |
| 715 StreamSubscription subscription; | 721 StreamSubscription subscription; |
| 716 subscription = this.listen( | 722 subscription = this.listen( |
| 717 (T value) { | 723 (T value) { |
| 718 _runUserCode( | 724 _runUserCode( |
| 719 () => test(value), | 725 () => test(value), |
| 720 (bool isMatch) { | 726 (bool isMatch) { |
| 721 if (isMatch) { | 727 if (isMatch) { |
| 722 subscription.cancel(); | 728 subscription.cancel().whenComplete(() { |
| 723 future._setValue(value); | 729 future._setValue(value); |
| 730 }); | |
| 724 } | 731 } |
| 725 }, | 732 }, |
| 726 _cancelAndError(subscription, future) | 733 _cancelAndError(subscription, future) |
| 727 ); | 734 ); |
| 728 }, | 735 }, |
| 729 onError: future._setError, | 736 onError: future._setError, |
| 730 onDone: () { | 737 onDone: () { |
| 731 if (defaultValue != null) { | 738 if (defaultValue != null) { |
| 732 _runUserCode(defaultValue, future._setValue, future._setError); | 739 _runUserCode(defaultValue, future._setValue, future._setError); |
| 733 return; | 740 return; |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 790 T result = null; | 797 T result = null; |
| 791 bool foundResult = false; | 798 bool foundResult = false; |
| 792 StreamSubscription subscription; | 799 StreamSubscription subscription; |
| 793 subscription = this.listen( | 800 subscription = this.listen( |
| 794 (T value) { | 801 (T value) { |
| 795 _runUserCode( | 802 _runUserCode( |
| 796 () => true == test(value), | 803 () => true == test(value), |
| 797 (bool isMatch) { | 804 (bool isMatch) { |
| 798 if (isMatch) { | 805 if (isMatch) { |
| 799 if (foundResult) { | 806 if (foundResult) { |
| 800 subscription.cancel(); | 807 subscription.cancel().whenComplete(() { |
| 801 future._setError( | 808 future._setError( |
| 802 new StateError('Multiple matches for "single"')); | 809 new StateError('Multiple matches for "single"')); |
| 810 }); | |
| 803 return; | 811 return; |
| 804 } | 812 } |
| 805 foundResult = true; | 813 foundResult = true; |
| 806 result = value; | 814 result = value; |
| 807 } | 815 } |
| 808 }, | 816 }, |
| 809 _cancelAndError(subscription, future) | 817 _cancelAndError(subscription, future) |
| 810 ); | 818 ); |
| 811 }, | 819 }, |
| 812 onError: future._setError, | 820 onError: future._setError, |
| (...skipping 16 matching lines...) Expand all Loading... | |
| 829 * If this stream provides fewer than [index] elements before closing, | 837 * If this stream provides fewer than [index] elements before closing, |
| 830 * an error is reported. | 838 * an error is reported. |
| 831 */ | 839 */ |
| 832 Future<T> elementAt(int index) { | 840 Future<T> elementAt(int index) { |
| 833 if (index is! int || index < 0) throw new ArgumentError(index); | 841 if (index is! int || index < 0) throw new ArgumentError(index); |
| 834 _FutureImpl<T> future = new _FutureImpl<T>(); | 842 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 835 StreamSubscription subscription; | 843 StreamSubscription subscription; |
| 836 subscription = this.listen( | 844 subscription = this.listen( |
| 837 (T value) { | 845 (T value) { |
| 838 if (index == 0) { | 846 if (index == 0) { |
| 839 subscription.cancel(); | 847 subscription.cancel().whenComplete(() { |
| 840 future._setValue(value); | 848 future._setValue(value); |
| 849 }); | |
| 841 return; | 850 return; |
| 842 } | 851 } |
| 843 index -= 1; | 852 index -= 1; |
| 844 }, | 853 }, |
| 845 onError: future._setError, | 854 onError: future._setError, |
| 846 onDone: () { | 855 onDone: () { |
| 847 future._setError(new StateError("Not enough elements for elementAt")); | 856 future._setError(new StateError("Not enough elements for elementAt")); |
| 848 }, | 857 }, |
| 849 cancelOnError: true); | 858 cancelOnError: true); |
| 850 return future; | 859 return future; |
| 851 } | 860 } |
| 852 } | 861 } |
| 853 | 862 |
| 854 /** | 863 /** |
| 855 * A control object for the subscription on a [Stream]. | 864 * A control object for the subscription on a [Stream]. |
| 856 * | 865 * |
| 857 * When you subscribe on a [Stream] using [Stream.listen], | 866 * When you subscribe on a [Stream] using [Stream.listen], |
| 858 * a [StreamSubscription] object is returned. This object | 867 * a [StreamSubscription] object is returned. This object |
| 859 * is used to later unsubscribe again, or to temporarily pause | 868 * is used to later unsubscribe again, or to temporarily pause |
| 860 * the stream's events. | 869 * the stream's events. |
| 861 */ | 870 */ |
| 862 abstract class StreamSubscription<T> { | 871 abstract class StreamSubscription<T> { |
| 863 /** | 872 /** |
| 864 * Cancels this subscription. It will no longer receive events. | 873 * Cancels this subscription. It will no longer receive events. |
| 865 * | 874 * |
| 866 * If an event is currently firing, this unsubscription will only | 875 * If an event is currently firing, this unsubscription will only |
| 867 * take effect after all subscribers have received the current event. | 876 * take effect after all subscribers have received the current event. |
| 877 * | |
| 878 * In case the cancel is asynchronous, the returned [Future] can be used to | |
|
floitsch
2013/07/12 16:42:34
Returns a future that is completed when the cancel
Lasse Reichstein Nielsen
2013/07/17 07:28:46
Agree, if it always returns a future, then there i
| |
| 879 * wait for the operation to complete. If in doubt, wait for it. | |
| 868 */ | 880 */ |
| 869 void cancel(); | 881 Future cancel(); |
| 870 | 882 |
| 871 /** Set or override the data event handler of this subscription. */ | 883 /** Set or override the data event handler of this subscription. */ |
| 872 void onData(void handleData(T data)); | 884 void onData(void handleData(T data)); |
| 873 | 885 |
| 874 /** Set or override the error event handler of this subscription. */ | 886 /** Set or override the error event handler of this subscription. */ |
| 875 void onError(void handleError(error)); | 887 void onError(void handleError(error)); |
| 876 | 888 |
| 877 /** Set or override the done event handler of this subscription. */ | 889 /** Set or override the done event handler of this subscription. */ |
| 878 void onDone(void handleDone()); | 890 void onDone(void handleDone()); |
| 879 | 891 |
| (...skipping 377 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1257 * The stream iterator is automatically canceled if the [moveNext] future | 1269 * The stream iterator is automatically canceled if the [moveNext] future |
| 1258 * completes with either `false` or an error. | 1270 * completes with either `false` or an error. |
| 1259 * | 1271 * |
| 1260 * If a [moveNext] call has been made, it will complete with `false` as value, | 1272 * If a [moveNext] call has been made, it will complete with `false` as value, |
| 1261 * as will all further calls to [moveNext]. | 1273 * as will all further calls to [moveNext]. |
| 1262 * | 1274 * |
| 1263 * If you need to stop listening for values before the stream iterator is | 1275 * If you need to stop listening for values before the stream iterator is |
| 1264 * automatically closed, you must call [cancel] to ensure that the stream | 1276 * automatically closed, you must call [cancel] to ensure that the stream |
| 1265 * is properly closed. | 1277 * is properly closed. |
| 1266 */ | 1278 */ |
| 1267 void cancel(); | 1279 Future cancel(); |
| 1268 } | 1280 } |
| OLD | NEW |