OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 329 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
340 bool first = true; | 340 bool first = true; |
341 subscription = this.listen( | 341 subscription = this.listen( |
342 (T element) { | 342 (T element) { |
343 if (!first) { | 343 if (!first) { |
344 buffer.write(separator); | 344 buffer.write(separator); |
345 } | 345 } |
346 first = false; | 346 first = false; |
347 try { | 347 try { |
348 buffer.write(element); | 348 buffer.write(element); |
349 } catch (e, s) { | 349 } catch (e, s) { |
350 subscription.cancel(); | 350 subscription.cancel().whenComplete(() { |
floitsch
2013/07/12 16:42:34
Why `whenComplete` if `cancel` is guaranteed not t
Lasse Reichstein Nielsen
2013/07/16 12:03:31
Why would that cost more?
floitsch
2013/07/16 12:47:17
My mistake.
| |
351 result._setError(_asyncError(e, s)); | 351 result._setError(_asyncError(e, s)); |
352 }); | |
352 } | 353 } |
353 }, | 354 }, |
354 onError: (e) { | 355 onError: (e) { |
355 result._setError(e); | 356 result._setError(e); |
356 }, | 357 }, |
357 onDone: () { | 358 onDone: () { |
358 result._setValue(buffer.toString()); | 359 result._setValue(buffer.toString()); |
359 }, | 360 }, |
360 cancelOnError: true); | 361 cancelOnError: true); |
361 return result; | 362 return result; |
362 } | 363 } |
363 | 364 |
364 /** | 365 /** |
365 * Checks whether [needle] occurs in the elements provided by this stream. | 366 * Checks whether [needle] occurs in the elements provided by this stream. |
366 * | 367 * |
367 * Completes the [Future] when the answer is known. | 368 * Completes the [Future] when the answer is known. |
368 * If this stream reports an error, the [Future] will report that error. | 369 * If this stream reports an error, the [Future] will report that error. |
369 */ | 370 */ |
370 Future<bool> contains(Object needle) { | 371 Future<bool> contains(Object needle) { |
371 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 372 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
372 StreamSubscription subscription; | 373 StreamSubscription subscription; |
373 subscription = this.listen( | 374 subscription = this.listen( |
374 (T element) { | 375 (T element) { |
375 _runUserCode( | 376 _runUserCode( |
376 () => (element == needle), | 377 () => (element == needle), |
377 (bool isMatch) { | 378 (bool isMatch) { |
378 if (isMatch) { | 379 if (isMatch) { |
379 subscription.cancel(); | 380 subscription.cancel().whenComplete(() { |
380 future._setValue(true); | 381 future._setValue(true); |
382 }); | |
381 } | 383 } |
382 }, | 384 }, |
383 _cancelAndError(subscription, future) | 385 _cancelAndError(subscription, future) |
384 ); | 386 ); |
385 }, | 387 }, |
386 onError: future._setError, | 388 onError: future._setError, |
387 onDone: () { | 389 onDone: () { |
388 future._setValue(false); | 390 future._setValue(false); |
389 }, | 391 }, |
390 cancelOnError: true); | 392 cancelOnError: true); |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
425 */ | 427 */ |
426 Future<bool> every(bool test(T element)) { | 428 Future<bool> every(bool test(T element)) { |
427 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 429 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
428 StreamSubscription subscription; | 430 StreamSubscription subscription; |
429 subscription = this.listen( | 431 subscription = this.listen( |
430 (T element) { | 432 (T element) { |
431 _runUserCode( | 433 _runUserCode( |
432 () => test(element), | 434 () => test(element), |
433 (bool isMatch) { | 435 (bool isMatch) { |
434 if (!isMatch) { | 436 if (!isMatch) { |
435 subscription.cancel(); | 437 subscription.cancel().whenComplete(() { |
436 future._setValue(false); | 438 future._setValue(false); |
439 }); | |
437 } | 440 } |
438 }, | 441 }, |
439 _cancelAndError(subscription, future) | 442 _cancelAndError(subscription, future) |
440 ); | 443 ); |
441 }, | 444 }, |
442 onError: future._setError, | 445 onError: future._setError, |
443 onDone: () { | 446 onDone: () { |
444 future._setValue(true); | 447 future._setValue(true); |
445 }, | 448 }, |
446 cancelOnError: true); | 449 cancelOnError: true); |
447 return future; | 450 return future; |
448 } | 451 } |
449 | 452 |
450 /** | 453 /** |
451 * Checks whether [test] accepts any element provided by this stream. | 454 * Checks whether [test] accepts any element provided by this stream. |
452 * | 455 * |
453 * Completes the [Future] when the answer is known. | 456 * Completes the [Future] when the answer is known. |
454 * If this stream reports an error, the [Future] will report that error. | 457 * If this stream reports an error, the [Future] will report that error. |
455 */ | 458 */ |
456 Future<bool> any(bool test(T element)) { | 459 Future<bool> any(bool test(T element)) { |
457 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 460 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
458 StreamSubscription subscription; | 461 StreamSubscription subscription; |
459 subscription = this.listen( | 462 subscription = this.listen( |
460 (T element) { | 463 (T element) { |
461 _runUserCode( | 464 _runUserCode( |
462 () => test(element), | 465 () => test(element), |
463 (bool isMatch) { | 466 (bool isMatch) { |
464 if (isMatch) { | 467 if (isMatch) { |
465 subscription.cancel(); | 468 subscription.cancel().whenComplete(() { |
466 future._setValue(true); | 469 future._setValue(true); |
470 }); | |
467 } | 471 } |
468 }, | 472 }, |
469 _cancelAndError(subscription, future) | 473 _cancelAndError(subscription, future) |
470 ); | 474 ); |
471 }, | 475 }, |
472 onError: future._setError, | 476 onError: future._setError, |
473 onDone: () { | 477 onDone: () { |
474 future._setValue(false); | 478 future._setValue(false); |
475 }, | 479 }, |
476 cancelOnError: true); | 480 cancelOnError: true); |
(...skipping 14 matching lines...) Expand all Loading... | |
491 cancelOnError: true); | 495 cancelOnError: true); |
492 return future; | 496 return future; |
493 } | 497 } |
494 | 498 |
495 /** Reports whether this stream contains any elements. */ | 499 /** Reports whether this stream contains any elements. */ |
496 Future<bool> get isEmpty { | 500 Future<bool> get isEmpty { |
497 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 501 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
498 StreamSubscription subscription; | 502 StreamSubscription subscription; |
499 subscription = this.listen( | 503 subscription = this.listen( |
500 (_) { | 504 (_) { |
501 subscription.cancel(); | 505 subscription.cancel().whenComplete(() { |
502 future._setValue(false); | 506 future._setValue(false); |
507 }); | |
503 }, | 508 }, |
504 onError: future._setError, | 509 onError: future._setError, |
505 onDone: () { | 510 onDone: () { |
506 future._setValue(true); | 511 future._setValue(true); |
507 }, | 512 }, |
508 cancelOnError: true); | 513 cancelOnError: true); |
509 return future; | 514 return future; |
510 } | 515 } |
511 | 516 |
512 /** Collects the data of this stream in a [List]. */ | 517 /** Collects the data of this stream in a [List]. */ |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
616 * Returns the first element. | 621 * Returns the first element. |
617 * | 622 * |
618 * If [this] is empty throws a [StateError]. Otherwise this method is | 623 * If [this] is empty throws a [StateError]. Otherwise this method is |
619 * equivalent to [:this.elementAt(0):] | 624 * equivalent to [:this.elementAt(0):] |
620 */ | 625 */ |
621 Future<T> get first { | 626 Future<T> get first { |
622 _FutureImpl<T> future = new _FutureImpl<T>(); | 627 _FutureImpl<T> future = new _FutureImpl<T>(); |
623 StreamSubscription subscription; | 628 StreamSubscription subscription; |
624 subscription = this.listen( | 629 subscription = this.listen( |
625 (T value) { | 630 (T value) { |
626 subscription.cancel(); | 631 subscription.cancel().whenComplete(() { |
627 future._setValue(value); | 632 future._setValue(value); |
628 return; | 633 }); |
629 }, | 634 }, |
630 onError: future._setError, | 635 onError: future._setError, |
631 onDone: () { | 636 onDone: () { |
632 future._setError(new StateError("No elements")); | 637 future._setError(new StateError("No elements")); |
633 }, | 638 }, |
634 cancelOnError: true); | 639 cancelOnError: true); |
635 return future; | 640 return future; |
636 } | 641 } |
637 | 642 |
638 /** | 643 /** |
(...skipping 29 matching lines...) Expand all Loading... | |
668 * If [this] is empty or has more than one element throws a [StateError]. | 673 * If [this] is empty or has more than one element throws a [StateError]. |
669 */ | 674 */ |
670 Future<T> get single { | 675 Future<T> get single { |
671 _FutureImpl<T> future = new _FutureImpl<T>(); | 676 _FutureImpl<T> future = new _FutureImpl<T>(); |
672 T result = null; | 677 T result = null; |
673 bool foundResult = false; | 678 bool foundResult = false; |
674 StreamSubscription subscription; | 679 StreamSubscription subscription; |
675 subscription = this.listen( | 680 subscription = this.listen( |
676 (T value) { | 681 (T value) { |
677 if (foundResult) { | 682 if (foundResult) { |
678 subscription.cancel(); | 683 subscription.cancel().whenComplete(() { |
679 // This is the second element we get. | 684 // This is the second element we get. |
680 Error error = new StateError("More than one element"); | 685 Error error = new StateError("More than one element"); |
681 future._setError(error); | 686 future._setError(error); |
687 }); | |
682 return; | 688 return; |
683 } | 689 } |
684 foundResult = true; | 690 foundResult = true; |
685 result = value; | 691 result = value; |
686 }, | 692 }, |
687 onError: future._setError, | 693 onError: future._setError, |
688 onDone: () { | 694 onDone: () { |
689 if (foundResult) { | 695 if (foundResult) { |
690 future._setValue(result); | 696 future._setValue(result); |
691 return; | 697 return; |
(...skipping 20 matching lines...) Expand all Loading... | |
712 */ | 718 */ |
713 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { | 719 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { |
714 _FutureImpl<dynamic> future = new _FutureImpl(); | 720 _FutureImpl<dynamic> future = new _FutureImpl(); |
715 StreamSubscription subscription; | 721 StreamSubscription subscription; |
716 subscription = this.listen( | 722 subscription = this.listen( |
717 (T value) { | 723 (T value) { |
718 _runUserCode( | 724 _runUserCode( |
719 () => test(value), | 725 () => test(value), |
720 (bool isMatch) { | 726 (bool isMatch) { |
721 if (isMatch) { | 727 if (isMatch) { |
722 subscription.cancel(); | 728 subscription.cancel().whenComplete(() { |
723 future._setValue(value); | 729 future._setValue(value); |
730 }); | |
724 } | 731 } |
725 }, | 732 }, |
726 _cancelAndError(subscription, future) | 733 _cancelAndError(subscription, future) |
727 ); | 734 ); |
728 }, | 735 }, |
729 onError: future._setError, | 736 onError: future._setError, |
730 onDone: () { | 737 onDone: () { |
731 if (defaultValue != null) { | 738 if (defaultValue != null) { |
732 _runUserCode(defaultValue, future._setValue, future._setError); | 739 _runUserCode(defaultValue, future._setValue, future._setError); |
733 return; | 740 return; |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
790 T result = null; | 797 T result = null; |
791 bool foundResult = false; | 798 bool foundResult = false; |
792 StreamSubscription subscription; | 799 StreamSubscription subscription; |
793 subscription = this.listen( | 800 subscription = this.listen( |
794 (T value) { | 801 (T value) { |
795 _runUserCode( | 802 _runUserCode( |
796 () => true == test(value), | 803 () => true == test(value), |
797 (bool isMatch) { | 804 (bool isMatch) { |
798 if (isMatch) { | 805 if (isMatch) { |
799 if (foundResult) { | 806 if (foundResult) { |
800 subscription.cancel(); | 807 subscription.cancel().whenComplete(() { |
801 future._setError( | 808 future._setError( |
802 new StateError('Multiple matches for "single"')); | 809 new StateError('Multiple matches for "single"')); |
810 }); | |
803 return; | 811 return; |
804 } | 812 } |
805 foundResult = true; | 813 foundResult = true; |
806 result = value; | 814 result = value; |
807 } | 815 } |
808 }, | 816 }, |
809 _cancelAndError(subscription, future) | 817 _cancelAndError(subscription, future) |
810 ); | 818 ); |
811 }, | 819 }, |
812 onError: future._setError, | 820 onError: future._setError, |
(...skipping 16 matching lines...) Expand all Loading... | |
829 * If this stream provides fewer than [index] elements before closing, | 837 * If this stream provides fewer than [index] elements before closing, |
830 * an error is reported. | 838 * an error is reported. |
831 */ | 839 */ |
832 Future<T> elementAt(int index) { | 840 Future<T> elementAt(int index) { |
833 if (index is! int || index < 0) throw new ArgumentError(index); | 841 if (index is! int || index < 0) throw new ArgumentError(index); |
834 _FutureImpl<T> future = new _FutureImpl<T>(); | 842 _FutureImpl<T> future = new _FutureImpl<T>(); |
835 StreamSubscription subscription; | 843 StreamSubscription subscription; |
836 subscription = this.listen( | 844 subscription = this.listen( |
837 (T value) { | 845 (T value) { |
838 if (index == 0) { | 846 if (index == 0) { |
839 subscription.cancel(); | 847 subscription.cancel().whenComplete(() { |
840 future._setValue(value); | 848 future._setValue(value); |
849 }); | |
841 return; | 850 return; |
842 } | 851 } |
843 index -= 1; | 852 index -= 1; |
844 }, | 853 }, |
845 onError: future._setError, | 854 onError: future._setError, |
846 onDone: () { | 855 onDone: () { |
847 future._setError(new StateError("Not enough elements for elementAt")); | 856 future._setError(new StateError("Not enough elements for elementAt")); |
848 }, | 857 }, |
849 cancelOnError: true); | 858 cancelOnError: true); |
850 return future; | 859 return future; |
851 } | 860 } |
852 } | 861 } |
853 | 862 |
854 /** | 863 /** |
855 * A control object for the subscription on a [Stream]. | 864 * A control object for the subscription on a [Stream]. |
856 * | 865 * |
857 * When you subscribe on a [Stream] using [Stream.listen], | 866 * When you subscribe on a [Stream] using [Stream.listen], |
858 * a [StreamSubscription] object is returned. This object | 867 * a [StreamSubscription] object is returned. This object |
859 * is used to later unsubscribe again, or to temporarily pause | 868 * is used to later unsubscribe again, or to temporarily pause |
860 * the stream's events. | 869 * the stream's events. |
861 */ | 870 */ |
862 abstract class StreamSubscription<T> { | 871 abstract class StreamSubscription<T> { |
863 /** | 872 /** |
864 * Cancels this subscription. It will no longer receive events. | 873 * Cancels this subscription. It will no longer receive events. |
865 * | 874 * |
866 * If an event is currently firing, this unsubscription will only | 875 * If an event is currently firing, this unsubscription will only |
867 * take effect after all subscribers have received the current event. | 876 * take effect after all subscribers have received the current event. |
877 * | |
878 * In case the cancel is asynchronous, the returned [Future] can be used to | |
floitsch
2013/07/12 16:42:34
Returns a future that is completed when the cancel
Lasse Reichstein Nielsen
2013/07/17 07:28:46
Agree, if it always returns a future, then there i
| |
879 * wait for the operation to complete. If in doubt, wait for it. | |
868 */ | 880 */ |
869 void cancel(); | 881 Future cancel(); |
870 | 882 |
871 /** Set or override the data event handler of this subscription. */ | 883 /** Set or override the data event handler of this subscription. */ |
872 void onData(void handleData(T data)); | 884 void onData(void handleData(T data)); |
873 | 885 |
874 /** Set or override the error event handler of this subscription. */ | 886 /** Set or override the error event handler of this subscription. */ |
875 void onError(void handleError(error)); | 887 void onError(void handleError(error)); |
876 | 888 |
877 /** Set or override the done event handler of this subscription. */ | 889 /** Set or override the done event handler of this subscription. */ |
878 void onDone(void handleDone()); | 890 void onDone(void handleDone()); |
879 | 891 |
(...skipping 377 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1257 * The stream iterator is automatically canceled if the [moveNext] future | 1269 * The stream iterator is automatically canceled if the [moveNext] future |
1258 * completes with either `false` or an error. | 1270 * completes with either `false` or an error. |
1259 * | 1271 * |
1260 * If a [moveNext] call has been made, it will complete with `false` as value, | 1272 * If a [moveNext] call has been made, it will complete with `false` as value, |
1261 * as will all further calls to [moveNext]. | 1273 * as will all further calls to [moveNext]. |
1262 * | 1274 * |
1263 * If you need to stop listening for values before the stream iterator is | 1275 * If you need to stop listening for values before the stream iterator is |
1264 * automatically closed, you must call [cancel] to ensure that the stream | 1276 * automatically closed, you must call [cancel] to ensure that the stream |
1265 * is properly closed. | 1277 * is properly closed. |
1266 */ | 1278 */ |
1267 void cancel(); | 1279 Future cancel(); |
1268 } | 1280 } |
OLD | NEW |