Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 329 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 340 bool first = true; | 340 bool first = true; |
| 341 subscription = this.listen( | 341 subscription = this.listen( |
| 342 (T element) { | 342 (T element) { |
| 343 if (!first) { | 343 if (!first) { |
| 344 buffer.write(separator); | 344 buffer.write(separator); |
| 345 } | 345 } |
| 346 first = false; | 346 first = false; |
| 347 try { | 347 try { |
| 348 buffer.write(element); | 348 buffer.write(element); |
| 349 } catch (e, s) { | 349 } catch (e, s) { |
| 350 subscription.cancel(); | 350 _cancelAndError(subscription, result)(_asyncError(e, s)); |
|
floitsch
2013/10/12 18:53:57
It seems wrong to create a closure and then invoke
Lasse Reichstein Nielsen
2013/10/14 11:32:33
Agree.
If it is because _cancelAndError(subscripti
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 351 result._completeError(_asyncError(e, s)); | |
| 352 } | 351 } |
| 353 }, | 352 }, |
| 354 onError: (e) { | 353 onError: (e) { |
| 355 result._completeError(e); | 354 result._completeError(e); |
| 356 }, | 355 }, |
| 357 onDone: () { | 356 onDone: () { |
| 358 result._complete(buffer.toString()); | 357 result._complete(buffer.toString()); |
| 359 }, | 358 }, |
| 360 cancelOnError: true); | 359 cancelOnError: true); |
| 361 return result; | 360 return result; |
| 362 } | 361 } |
| 363 | 362 |
| 364 /** | 363 /** |
| 365 * Checks whether [needle] occurs in the elements provided by this stream. | 364 * Checks whether [needle] occurs in the elements provided by this stream. |
| 366 * | 365 * |
| 367 * Completes the [Future] when the answer is known. | 366 * Completes the [Future] when the answer is known. |
| 368 * If this stream reports an error, the [Future] will report that error. | 367 * If this stream reports an error, the [Future] will report that error. |
| 369 */ | 368 */ |
| 370 Future<bool> contains(Object needle) { | 369 Future<bool> contains(Object needle) { |
| 371 _Future<bool> future = new _Future<bool>(); | 370 _Future<bool> future = new _Future<bool>(); |
| 372 StreamSubscription subscription; | 371 StreamSubscription subscription; |
| 373 subscription = this.listen( | 372 subscription = this.listen( |
| 374 (T element) { | 373 (T element) { |
| 375 _runUserCode( | 374 _runUserCode( |
| 376 () => (element == needle), | 375 () => (element == needle), |
| 377 (bool isMatch) { | 376 (bool isMatch) { |
| 378 if (isMatch) { | 377 if (isMatch) { |
| 379 subscription.cancel(); | 378 _cancelAndValue(subscription, future)(true); |
| 380 future._complete(true); | |
| 381 } | 379 } |
| 382 }, | 380 }, |
| 383 _cancelAndError(subscription, future) | 381 _cancelAndError(subscription, future) |
| 384 ); | 382 ); |
| 385 }, | 383 }, |
| 386 onError: future._completeError, | 384 onError: future._completeError, |
| 387 onDone: () { | 385 onDone: () { |
| 388 future._complete(false); | 386 future._complete(false); |
| 389 }, | 387 }, |
| 390 cancelOnError: true); | 388 cancelOnError: true); |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 425 */ | 423 */ |
| 426 Future<bool> every(bool test(T element)) { | 424 Future<bool> every(bool test(T element)) { |
| 427 _Future<bool> future = new _Future<bool>(); | 425 _Future<bool> future = new _Future<bool>(); |
| 428 StreamSubscription subscription; | 426 StreamSubscription subscription; |
| 429 subscription = this.listen( | 427 subscription = this.listen( |
| 430 (T element) { | 428 (T element) { |
| 431 _runUserCode( | 429 _runUserCode( |
| 432 () => test(element), | 430 () => test(element), |
| 433 (bool isMatch) { | 431 (bool isMatch) { |
| 434 if (!isMatch) { | 432 if (!isMatch) { |
| 435 subscription.cancel(); | 433 _cancelAndValue(subscription, future)(false); |
| 436 future._complete(false); | |
| 437 } | 434 } |
| 438 }, | 435 }, |
| 439 _cancelAndError(subscription, future) | 436 _cancelAndError(subscription, future) |
| 440 ); | 437 ); |
| 441 }, | 438 }, |
| 442 onError: future._completeError, | 439 onError: future._completeError, |
| 443 onDone: () { | 440 onDone: () { |
| 444 future._complete(true); | 441 future._complete(true); |
| 445 }, | 442 }, |
| 446 cancelOnError: true); | 443 cancelOnError: true); |
| 447 return future; | 444 return future; |
| 448 } | 445 } |
| 449 | 446 |
| 450 /** | 447 /** |
| 451 * Checks whether [test] accepts any element provided by this stream. | 448 * Checks whether [test] accepts any element provided by this stream. |
| 452 * | 449 * |
| 453 * Completes the [Future] when the answer is known. | 450 * Completes the [Future] when the answer is known. |
| 454 * If this stream reports an error, the [Future] will report that error. | 451 * If this stream reports an error, the [Future] will report that error. |
| 455 */ | 452 */ |
| 456 Future<bool> any(bool test(T element)) { | 453 Future<bool> any(bool test(T element)) { |
| 457 _Future<bool> future = new _Future<bool>(); | 454 _Future<bool> future = new _Future<bool>(); |
| 458 StreamSubscription subscription; | 455 StreamSubscription subscription; |
| 459 subscription = this.listen( | 456 subscription = this.listen( |
| 460 (T element) { | 457 (T element) { |
| 461 _runUserCode( | 458 _runUserCode( |
| 462 () => test(element), | 459 () => test(element), |
| 463 (bool isMatch) { | 460 (bool isMatch) { |
| 464 if (isMatch) { | 461 if (isMatch) { |
| 465 subscription.cancel(); | 462 _cancelAndValue(subscription, future)(true); |
| 466 future._complete(true); | |
| 467 } | 463 } |
| 468 }, | 464 }, |
| 469 _cancelAndError(subscription, future) | 465 _cancelAndError(subscription, future) |
| 470 ); | 466 ); |
| 471 }, | 467 }, |
| 472 onError: future._completeError, | 468 onError: future._completeError, |
| 473 onDone: () { | 469 onDone: () { |
| 474 future._complete(false); | 470 future._complete(false); |
| 475 }, | 471 }, |
| 476 cancelOnError: true); | 472 cancelOnError: true); |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 491 cancelOnError: true); | 487 cancelOnError: true); |
| 492 return future; | 488 return future; |
| 493 } | 489 } |
| 494 | 490 |
| 495 /** Reports whether this stream contains any elements. */ | 491 /** Reports whether this stream contains any elements. */ |
| 496 Future<bool> get isEmpty { | 492 Future<bool> get isEmpty { |
| 497 _Future<bool> future = new _Future<bool>(); | 493 _Future<bool> future = new _Future<bool>(); |
| 498 StreamSubscription subscription; | 494 StreamSubscription subscription; |
| 499 subscription = this.listen( | 495 subscription = this.listen( |
| 500 (_) { | 496 (_) { |
| 501 subscription.cancel(); | 497 _cancelAndValue(subscription, future)(false); |
| 502 future._complete(false); | |
| 503 }, | 498 }, |
| 504 onError: future._completeError, | 499 onError: future._completeError, |
| 505 onDone: () { | 500 onDone: () { |
| 506 future._complete(true); | 501 future._complete(true); |
| 507 }, | 502 }, |
| 508 cancelOnError: true); | 503 cancelOnError: true); |
| 509 return future; | 504 return future; |
| 510 } | 505 } |
| 511 | 506 |
| 512 /** Collects the data of this stream in a [List]. */ | 507 /** Collects the data of this stream in a [List]. */ |
| (...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 624 * the resulting future completes with a [StateError]. | 619 * the resulting future completes with a [StateError]. |
| 625 * | 620 * |
| 626 * Except for the type of the error, this method is equivalent to | 621 * Except for the type of the error, this method is equivalent to |
| 627 * [:this.elementAt(0):]. | 622 * [:this.elementAt(0):]. |
| 628 */ | 623 */ |
| 629 Future<T> get first { | 624 Future<T> get first { |
| 630 _Future<T> future = new _Future<T>(); | 625 _Future<T> future = new _Future<T>(); |
| 631 StreamSubscription subscription; | 626 StreamSubscription subscription; |
| 632 subscription = this.listen( | 627 subscription = this.listen( |
| 633 (T value) { | 628 (T value) { |
| 634 subscription.cancel(); | 629 _cancelAndValue(subscription, future)(value); |
| 635 future._complete(value); | |
| 636 return; | |
| 637 }, | 630 }, |
| 638 onError: future._completeError, | 631 onError: future._completeError, |
| 639 onDone: () { | 632 onDone: () { |
| 640 future._completeError(new StateError("No elements")); | 633 future._completeError(new StateError("No elements")); |
| 641 }, | 634 }, |
| 642 cancelOnError: true); | 635 cancelOnError: true); |
| 643 return future; | 636 return future; |
| 644 } | 637 } |
| 645 | 638 |
| 646 /** | 639 /** |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 680 * If [this] is empty or has more than one element throws a [StateError]. | 673 * If [this] is empty or has more than one element throws a [StateError]. |
| 681 */ | 674 */ |
| 682 Future<T> get single { | 675 Future<T> get single { |
| 683 _Future<T> future = new _Future<T>(); | 676 _Future<T> future = new _Future<T>(); |
| 684 T result = null; | 677 T result = null; |
| 685 bool foundResult = false; | 678 bool foundResult = false; |
| 686 StreamSubscription subscription; | 679 StreamSubscription subscription; |
| 687 subscription = this.listen( | 680 subscription = this.listen( |
| 688 (T value) { | 681 (T value) { |
| 689 if (foundResult) { | 682 if (foundResult) { |
| 690 subscription.cancel(); | |
| 691 // This is the second element we get. | 683 // This is the second element we get. |
| 692 Error error = new StateError("More than one element"); | 684 Error error = new StateError("More than one element"); |
| 693 future._completeError(error); | 685 _cancelAndError(subscription, future)(error); |
| 694 return; | 686 return; |
| 695 } | 687 } |
| 696 foundResult = true; | 688 foundResult = true; |
| 697 result = value; | 689 result = value; |
| 698 }, | 690 }, |
| 699 onError: future._completeError, | 691 onError: future._completeError, |
| 700 onDone: () { | 692 onDone: () { |
| 701 if (foundResult) { | 693 if (foundResult) { |
| 702 future._complete(result); | 694 future._complete(result); |
| 703 return; | 695 return; |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 724 */ | 716 */ |
| 725 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { | 717 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { |
| 726 _Future<dynamic> future = new _Future(); | 718 _Future<dynamic> future = new _Future(); |
| 727 StreamSubscription subscription; | 719 StreamSubscription subscription; |
| 728 subscription = this.listen( | 720 subscription = this.listen( |
| 729 (T value) { | 721 (T value) { |
| 730 _runUserCode( | 722 _runUserCode( |
| 731 () => test(value), | 723 () => test(value), |
| 732 (bool isMatch) { | 724 (bool isMatch) { |
| 733 if (isMatch) { | 725 if (isMatch) { |
| 734 subscription.cancel(); | 726 _cancelAndValue(subscription, future)(value); |
| 735 future._complete(value); | |
| 736 } | 727 } |
| 737 }, | 728 }, |
| 738 _cancelAndError(subscription, future) | 729 _cancelAndError(subscription, future) |
| 739 ); | 730 ); |
| 740 }, | 731 }, |
| 741 onError: future._completeError, | 732 onError: future._completeError, |
| 742 onDone: () { | 733 onDone: () { |
| 743 if (defaultValue != null) { | 734 if (defaultValue != null) { |
| 744 _runUserCode(defaultValue, future._complete, future._completeError); | 735 _runUserCode(defaultValue, future._complete, future._completeError); |
| 745 return; | 736 return; |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 802 T result = null; | 793 T result = null; |
| 803 bool foundResult = false; | 794 bool foundResult = false; |
| 804 StreamSubscription subscription; | 795 StreamSubscription subscription; |
| 805 subscription = this.listen( | 796 subscription = this.listen( |
| 806 (T value) { | 797 (T value) { |
| 807 _runUserCode( | 798 _runUserCode( |
| 808 () => true == test(value), | 799 () => true == test(value), |
| 809 (bool isMatch) { | 800 (bool isMatch) { |
| 810 if (isMatch) { | 801 if (isMatch) { |
| 811 if (foundResult) { | 802 if (foundResult) { |
| 812 subscription.cancel(); | 803 _cancelAndError(subscription, future)( |
| 813 future._completeError( | |
| 814 new StateError('Multiple matches for "single"')); | 804 new StateError('Multiple matches for "single"')); |
| 815 return; | 805 return; |
| 816 } | 806 } |
| 817 foundResult = true; | 807 foundResult = true; |
| 818 result = value; | 808 result = value; |
| 819 } | 809 } |
| 820 }, | 810 }, |
| 821 _cancelAndError(subscription, future) | 811 _cancelAndError(subscription, future) |
| 822 ); | 812 ); |
| 823 }, | 813 }, |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 844 * If a done event occurs before the value is found, the future completes | 834 * If a done event occurs before the value is found, the future completes |
| 845 * with a [RangeError]. | 835 * with a [RangeError]. |
| 846 */ | 836 */ |
| 847 Future<T> elementAt(int index) { | 837 Future<T> elementAt(int index) { |
| 848 if (index is! int || index < 0) throw new ArgumentError(index); | 838 if (index is! int || index < 0) throw new ArgumentError(index); |
| 849 _Future<T> future = new _Future<T>(); | 839 _Future<T> future = new _Future<T>(); |
| 850 StreamSubscription subscription; | 840 StreamSubscription subscription; |
| 851 subscription = this.listen( | 841 subscription = this.listen( |
| 852 (T value) { | 842 (T value) { |
| 853 if (index == 0) { | 843 if (index == 0) { |
| 854 subscription.cancel(); | 844 _cancelAndValue(subscription, future)(value); |
| 855 future._complete(value); | |
| 856 return; | 845 return; |
| 857 } | 846 } |
| 858 index -= 1; | 847 index -= 1; |
| 859 }, | 848 }, |
| 860 onError: future._completeError, | 849 onError: future._completeError, |
| 861 onDone: () { | 850 onDone: () { |
| 862 future._completeError(new RangeError.value(index)); | 851 future._completeError(new RangeError.value(index)); |
| 863 }, | 852 }, |
| 864 cancelOnError: true); | 853 cancelOnError: true); |
| 865 return future; | 854 return future; |
| 866 } | 855 } |
| 867 } | 856 } |
| 868 | 857 |
| 869 /** | 858 /** |
| 870 * A control object for the subscription on a [Stream]. | 859 * A control object for the subscription on a [Stream]. |
| 871 * | 860 * |
| 872 * When you subscribe on a [Stream] using [Stream.listen], | 861 * When you subscribe on a [Stream] using [Stream.listen], |
| 873 * a [StreamSubscription] object is returned. This object | 862 * a [StreamSubscription] object is returned. This object |
| 874 * is used to later unsubscribe again, or to temporarily pause | 863 * is used to later unsubscribe again, or to temporarily pause |
| 875 * the stream's events. | 864 * the stream's events. |
| 876 */ | 865 */ |
| 877 abstract class StreamSubscription<T> { | 866 abstract class StreamSubscription<T> { |
| 878 /** | 867 /** |
| 879 * Cancels this subscription. It will no longer receive events. | 868 * Cancels this subscription. It will no longer receive events. |
| 880 * | 869 * |
| 881 * If an event is currently firing, this unsubscription will only | 870 * If an event is currently firing, this unsubscription will only |
| 882 * take effect after all subscribers have received the current event. | 871 * take effect after all subscribers have received the current event. |
| 872 * | |
| 873 * In case the cancel is asynchronous, the returned [Future] can be used to | |
| 874 * wait for the operation to complete. If in doubt, wait for it. | |
|
floitsch
2013/10/12 18:53:57
Note that it can be `null`.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 883 */ | 875 */ |
| 884 void cancel(); | 876 Future cancel(); |
| 885 | 877 |
| 886 /** Set or override the data event handler of this subscription. */ | 878 /** Set or override the data event handler of this subscription. */ |
| 887 void onData(void handleData(T data)); | 879 void onData(void handleData(T data)); |
| 888 | 880 |
| 889 /** Set or override the error event handler of this subscription. */ | 881 /** Set or override the error event handler of this subscription. */ |
| 890 void onError(void handleError(error)); | 882 void onError(void handleError(error)); |
| 891 | 883 |
| 892 /** Set or override the done event handler of this subscription. */ | 884 /** Set or override the done event handler of this subscription. */ |
| 893 void onDone(void handleDone()); | 885 void onDone(void handleDone()); |
| 894 | 886 |
| (...skipping 375 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1270 * Cancels the stream iterator (and the underlying stream subscription) early. | 1262 * Cancels the stream iterator (and the underlying stream subscription) early. |
| 1271 * | 1263 * |
| 1272 * The stream iterator is automatically canceled if the [moveNext] future | 1264 * The stream iterator is automatically canceled if the [moveNext] future |
| 1273 * completes with either `false` or an error. | 1265 * completes with either `false` or an error. |
| 1274 * | 1266 * |
| 1275 * If a [moveNext] call has been made, it will complete with `false` as value, | 1267 * If a [moveNext] call has been made, it will complete with `false` as value, |
| 1276 * as will all further calls to [moveNext]. | 1268 * as will all further calls to [moveNext]. |
| 1277 * | 1269 * |
| 1278 * If you need to stop listening for values before the stream iterator is | 1270 * If you need to stop listening for values before the stream iterator is |
| 1279 * automatically closed, you must call [cancel] to ensure that the stream | 1271 * automatically closed, you must call [cancel] to ensure that the stream |
| 1280 * is properly closed. | 1272 * is properly closed. |
|
floitsch
2013/10/12 18:53:57
Add comment (even if it is just pointing to the St
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 1281 */ | 1273 */ |
| 1282 void cancel(); | 1274 Future cancel(); |
| 1283 } | 1275 } |
| OLD | NEW |