OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 329 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
340 bool first = true; | 340 bool first = true; |
341 subscription = this.listen( | 341 subscription = this.listen( |
342 (T element) { | 342 (T element) { |
343 if (!first) { | 343 if (!first) { |
344 buffer.write(separator); | 344 buffer.write(separator); |
345 } | 345 } |
346 first = false; | 346 first = false; |
347 try { | 347 try { |
348 buffer.write(element); | 348 buffer.write(element); |
349 } catch (e, s) { | 349 } catch (e, s) { |
350 subscription.cancel(); | 350 _cancelAndError(subscription, result)(_asyncError(e, s)); |
floitsch
2013/10/12 18:53:57
It seems wrong to create a closure and then invoke
Lasse Reichstein Nielsen
2013/10/14 11:32:33
Agree.
If it is because _cancelAndError(subscripti
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
351 result._completeError(_asyncError(e, s)); | |
352 } | 351 } |
353 }, | 352 }, |
354 onError: (e) { | 353 onError: (e) { |
355 result._completeError(e); | 354 result._completeError(e); |
356 }, | 355 }, |
357 onDone: () { | 356 onDone: () { |
358 result._complete(buffer.toString()); | 357 result._complete(buffer.toString()); |
359 }, | 358 }, |
360 cancelOnError: true); | 359 cancelOnError: true); |
361 return result; | 360 return result; |
362 } | 361 } |
363 | 362 |
364 /** | 363 /** |
365 * Checks whether [needle] occurs in the elements provided by this stream. | 364 * Checks whether [needle] occurs in the elements provided by this stream. |
366 * | 365 * |
367 * Completes the [Future] when the answer is known. | 366 * Completes the [Future] when the answer is known. |
368 * If this stream reports an error, the [Future] will report that error. | 367 * If this stream reports an error, the [Future] will report that error. |
369 */ | 368 */ |
370 Future<bool> contains(Object needle) { | 369 Future<bool> contains(Object needle) { |
371 _Future<bool> future = new _Future<bool>(); | 370 _Future<bool> future = new _Future<bool>(); |
372 StreamSubscription subscription; | 371 StreamSubscription subscription; |
373 subscription = this.listen( | 372 subscription = this.listen( |
374 (T element) { | 373 (T element) { |
375 _runUserCode( | 374 _runUserCode( |
376 () => (element == needle), | 375 () => (element == needle), |
377 (bool isMatch) { | 376 (bool isMatch) { |
378 if (isMatch) { | 377 if (isMatch) { |
379 subscription.cancel(); | 378 _cancelAndValue(subscription, future)(true); |
380 future._complete(true); | |
381 } | 379 } |
382 }, | 380 }, |
383 _cancelAndError(subscription, future) | 381 _cancelAndError(subscription, future) |
384 ); | 382 ); |
385 }, | 383 }, |
386 onError: future._completeError, | 384 onError: future._completeError, |
387 onDone: () { | 385 onDone: () { |
388 future._complete(false); | 386 future._complete(false); |
389 }, | 387 }, |
390 cancelOnError: true); | 388 cancelOnError: true); |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
425 */ | 423 */ |
426 Future<bool> every(bool test(T element)) { | 424 Future<bool> every(bool test(T element)) { |
427 _Future<bool> future = new _Future<bool>(); | 425 _Future<bool> future = new _Future<bool>(); |
428 StreamSubscription subscription; | 426 StreamSubscription subscription; |
429 subscription = this.listen( | 427 subscription = this.listen( |
430 (T element) { | 428 (T element) { |
431 _runUserCode( | 429 _runUserCode( |
432 () => test(element), | 430 () => test(element), |
433 (bool isMatch) { | 431 (bool isMatch) { |
434 if (!isMatch) { | 432 if (!isMatch) { |
435 subscription.cancel(); | 433 _cancelAndValue(subscription, future)(false); |
436 future._complete(false); | |
437 } | 434 } |
438 }, | 435 }, |
439 _cancelAndError(subscription, future) | 436 _cancelAndError(subscription, future) |
440 ); | 437 ); |
441 }, | 438 }, |
442 onError: future._completeError, | 439 onError: future._completeError, |
443 onDone: () { | 440 onDone: () { |
444 future._complete(true); | 441 future._complete(true); |
445 }, | 442 }, |
446 cancelOnError: true); | 443 cancelOnError: true); |
447 return future; | 444 return future; |
448 } | 445 } |
449 | 446 |
450 /** | 447 /** |
451 * Checks whether [test] accepts any element provided by this stream. | 448 * Checks whether [test] accepts any element provided by this stream. |
452 * | 449 * |
453 * Completes the [Future] when the answer is known. | 450 * Completes the [Future] when the answer is known. |
454 * If this stream reports an error, the [Future] will report that error. | 451 * If this stream reports an error, the [Future] will report that error. |
455 */ | 452 */ |
456 Future<bool> any(bool test(T element)) { | 453 Future<bool> any(bool test(T element)) { |
457 _Future<bool> future = new _Future<bool>(); | 454 _Future<bool> future = new _Future<bool>(); |
458 StreamSubscription subscription; | 455 StreamSubscription subscription; |
459 subscription = this.listen( | 456 subscription = this.listen( |
460 (T element) { | 457 (T element) { |
461 _runUserCode( | 458 _runUserCode( |
462 () => test(element), | 459 () => test(element), |
463 (bool isMatch) { | 460 (bool isMatch) { |
464 if (isMatch) { | 461 if (isMatch) { |
465 subscription.cancel(); | 462 _cancelAndValue(subscription, future)(true); |
466 future._complete(true); | |
467 } | 463 } |
468 }, | 464 }, |
469 _cancelAndError(subscription, future) | 465 _cancelAndError(subscription, future) |
470 ); | 466 ); |
471 }, | 467 }, |
472 onError: future._completeError, | 468 onError: future._completeError, |
473 onDone: () { | 469 onDone: () { |
474 future._complete(false); | 470 future._complete(false); |
475 }, | 471 }, |
476 cancelOnError: true); | 472 cancelOnError: true); |
(...skipping 14 matching lines...) Expand all Loading... | |
491 cancelOnError: true); | 487 cancelOnError: true); |
492 return future; | 488 return future; |
493 } | 489 } |
494 | 490 |
495 /** Reports whether this stream contains any elements. */ | 491 /** Reports whether this stream contains any elements. */ |
496 Future<bool> get isEmpty { | 492 Future<bool> get isEmpty { |
497 _Future<bool> future = new _Future<bool>(); | 493 _Future<bool> future = new _Future<bool>(); |
498 StreamSubscription subscription; | 494 StreamSubscription subscription; |
499 subscription = this.listen( | 495 subscription = this.listen( |
500 (_) { | 496 (_) { |
501 subscription.cancel(); | 497 _cancelAndValue(subscription, future)(false); |
502 future._complete(false); | |
503 }, | 498 }, |
504 onError: future._completeError, | 499 onError: future._completeError, |
505 onDone: () { | 500 onDone: () { |
506 future._complete(true); | 501 future._complete(true); |
507 }, | 502 }, |
508 cancelOnError: true); | 503 cancelOnError: true); |
509 return future; | 504 return future; |
510 } | 505 } |
511 | 506 |
512 /** Collects the data of this stream in a [List]. */ | 507 /** Collects the data of this stream in a [List]. */ |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
624 * the resulting future completes with a [StateError]. | 619 * the resulting future completes with a [StateError]. |
625 * | 620 * |
626 * Except for the type of the error, this method is equivalent to | 621 * Except for the type of the error, this method is equivalent to |
627 * [:this.elementAt(0):]. | 622 * [:this.elementAt(0):]. |
628 */ | 623 */ |
629 Future<T> get first { | 624 Future<T> get first { |
630 _Future<T> future = new _Future<T>(); | 625 _Future<T> future = new _Future<T>(); |
631 StreamSubscription subscription; | 626 StreamSubscription subscription; |
632 subscription = this.listen( | 627 subscription = this.listen( |
633 (T value) { | 628 (T value) { |
634 subscription.cancel(); | 629 _cancelAndValue(subscription, future)(value); |
635 future._complete(value); | |
636 return; | |
637 }, | 630 }, |
638 onError: future._completeError, | 631 onError: future._completeError, |
639 onDone: () { | 632 onDone: () { |
640 future._completeError(new StateError("No elements")); | 633 future._completeError(new StateError("No elements")); |
641 }, | 634 }, |
642 cancelOnError: true); | 635 cancelOnError: true); |
643 return future; | 636 return future; |
644 } | 637 } |
645 | 638 |
646 /** | 639 /** |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
680 * If [this] is empty or has more than one element throws a [StateError]. | 673 * If [this] is empty or has more than one element throws a [StateError]. |
681 */ | 674 */ |
682 Future<T> get single { | 675 Future<T> get single { |
683 _Future<T> future = new _Future<T>(); | 676 _Future<T> future = new _Future<T>(); |
684 T result = null; | 677 T result = null; |
685 bool foundResult = false; | 678 bool foundResult = false; |
686 StreamSubscription subscription; | 679 StreamSubscription subscription; |
687 subscription = this.listen( | 680 subscription = this.listen( |
688 (T value) { | 681 (T value) { |
689 if (foundResult) { | 682 if (foundResult) { |
690 subscription.cancel(); | |
691 // This is the second element we get. | 683 // This is the second element we get. |
692 Error error = new StateError("More than one element"); | 684 Error error = new StateError("More than one element"); |
693 future._completeError(error); | 685 _cancelAndError(subscription, future)(error); |
694 return; | 686 return; |
695 } | 687 } |
696 foundResult = true; | 688 foundResult = true; |
697 result = value; | 689 result = value; |
698 }, | 690 }, |
699 onError: future._completeError, | 691 onError: future._completeError, |
700 onDone: () { | 692 onDone: () { |
701 if (foundResult) { | 693 if (foundResult) { |
702 future._complete(result); | 694 future._complete(result); |
703 return; | 695 return; |
(...skipping 20 matching lines...) Expand all Loading... | |
724 */ | 716 */ |
725 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { | 717 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { |
726 _Future<dynamic> future = new _Future(); | 718 _Future<dynamic> future = new _Future(); |
727 StreamSubscription subscription; | 719 StreamSubscription subscription; |
728 subscription = this.listen( | 720 subscription = this.listen( |
729 (T value) { | 721 (T value) { |
730 _runUserCode( | 722 _runUserCode( |
731 () => test(value), | 723 () => test(value), |
732 (bool isMatch) { | 724 (bool isMatch) { |
733 if (isMatch) { | 725 if (isMatch) { |
734 subscription.cancel(); | 726 _cancelAndValue(subscription, future)(value); |
735 future._complete(value); | |
736 } | 727 } |
737 }, | 728 }, |
738 _cancelAndError(subscription, future) | 729 _cancelAndError(subscription, future) |
739 ); | 730 ); |
740 }, | 731 }, |
741 onError: future._completeError, | 732 onError: future._completeError, |
742 onDone: () { | 733 onDone: () { |
743 if (defaultValue != null) { | 734 if (defaultValue != null) { |
744 _runUserCode(defaultValue, future._complete, future._completeError); | 735 _runUserCode(defaultValue, future._complete, future._completeError); |
745 return; | 736 return; |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
802 T result = null; | 793 T result = null; |
803 bool foundResult = false; | 794 bool foundResult = false; |
804 StreamSubscription subscription; | 795 StreamSubscription subscription; |
805 subscription = this.listen( | 796 subscription = this.listen( |
806 (T value) { | 797 (T value) { |
807 _runUserCode( | 798 _runUserCode( |
808 () => true == test(value), | 799 () => true == test(value), |
809 (bool isMatch) { | 800 (bool isMatch) { |
810 if (isMatch) { | 801 if (isMatch) { |
811 if (foundResult) { | 802 if (foundResult) { |
812 subscription.cancel(); | 803 _cancelAndError(subscription, future)( |
813 future._completeError( | |
814 new StateError('Multiple matches for "single"')); | 804 new StateError('Multiple matches for "single"')); |
815 return; | 805 return; |
816 } | 806 } |
817 foundResult = true; | 807 foundResult = true; |
818 result = value; | 808 result = value; |
819 } | 809 } |
820 }, | 810 }, |
821 _cancelAndError(subscription, future) | 811 _cancelAndError(subscription, future) |
822 ); | 812 ); |
823 }, | 813 }, |
(...skipping 20 matching lines...) Expand all Loading... | |
844 * If a done event occurs before the value is found, the future completes | 834 * If a done event occurs before the value is found, the future completes |
845 * with a [RangeError]. | 835 * with a [RangeError]. |
846 */ | 836 */ |
847 Future<T> elementAt(int index) { | 837 Future<T> elementAt(int index) { |
848 if (index is! int || index < 0) throw new ArgumentError(index); | 838 if (index is! int || index < 0) throw new ArgumentError(index); |
849 _Future<T> future = new _Future<T>(); | 839 _Future<T> future = new _Future<T>(); |
850 StreamSubscription subscription; | 840 StreamSubscription subscription; |
851 subscription = this.listen( | 841 subscription = this.listen( |
852 (T value) { | 842 (T value) { |
853 if (index == 0) { | 843 if (index == 0) { |
854 subscription.cancel(); | 844 _cancelAndValue(subscription, future)(value); |
855 future._complete(value); | |
856 return; | 845 return; |
857 } | 846 } |
858 index -= 1; | 847 index -= 1; |
859 }, | 848 }, |
860 onError: future._completeError, | 849 onError: future._completeError, |
861 onDone: () { | 850 onDone: () { |
862 future._completeError(new RangeError.value(index)); | 851 future._completeError(new RangeError.value(index)); |
863 }, | 852 }, |
864 cancelOnError: true); | 853 cancelOnError: true); |
865 return future; | 854 return future; |
866 } | 855 } |
867 } | 856 } |
868 | 857 |
869 /** | 858 /** |
870 * A control object for the subscription on a [Stream]. | 859 * A control object for the subscription on a [Stream]. |
871 * | 860 * |
872 * When you subscribe on a [Stream] using [Stream.listen], | 861 * When you subscribe on a [Stream] using [Stream.listen], |
873 * a [StreamSubscription] object is returned. This object | 862 * a [StreamSubscription] object is returned. This object |
874 * is used to later unsubscribe again, or to temporarily pause | 863 * is used to later unsubscribe again, or to temporarily pause |
875 * the stream's events. | 864 * the stream's events. |
876 */ | 865 */ |
877 abstract class StreamSubscription<T> { | 866 abstract class StreamSubscription<T> { |
878 /** | 867 /** |
879 * Cancels this subscription. It will no longer receive events. | 868 * Cancels this subscription. It will no longer receive events. |
880 * | 869 * |
881 * If an event is currently firing, this unsubscription will only | 870 * If an event is currently firing, this unsubscription will only |
882 * take effect after all subscribers have received the current event. | 871 * take effect after all subscribers have received the current event. |
872 * | |
873 * In case the cancel is asynchronous, the returned [Future] can be used to | |
874 * wait for the operation to complete. If in doubt, wait for it. | |
floitsch
2013/10/12 18:53:57
Note that it can be `null`.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
883 */ | 875 */ |
884 void cancel(); | 876 Future cancel(); |
885 | 877 |
886 /** Set or override the data event handler of this subscription. */ | 878 /** Set or override the data event handler of this subscription. */ |
887 void onData(void handleData(T data)); | 879 void onData(void handleData(T data)); |
888 | 880 |
889 /** Set or override the error event handler of this subscription. */ | 881 /** Set or override the error event handler of this subscription. */ |
890 void onError(void handleError(error)); | 882 void onError(void handleError(error)); |
891 | 883 |
892 /** Set or override the done event handler of this subscription. */ | 884 /** Set or override the done event handler of this subscription. */ |
893 void onDone(void handleDone()); | 885 void onDone(void handleDone()); |
894 | 886 |
(...skipping 375 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1270 * Cancels the stream iterator (and the underlying stream subscription) early. | 1262 * Cancels the stream iterator (and the underlying stream subscription) early. |
1271 * | 1263 * |
1272 * The stream iterator is automatically canceled if the [moveNext] future | 1264 * The stream iterator is automatically canceled if the [moveNext] future |
1273 * completes with either `false` or an error. | 1265 * completes with either `false` or an error. |
1274 * | 1266 * |
1275 * If a [moveNext] call has been made, it will complete with `false` as value, | 1267 * If a [moveNext] call has been made, it will complete with `false` as value, |
1276 * as will all further calls to [moveNext]. | 1268 * as will all further calls to [moveNext]. |
1277 * | 1269 * |
1278 * If you need to stop listening for values before the stream iterator is | 1270 * If you need to stop listening for values before the stream iterator is |
1279 * automatically closed, you must call [cancel] to ensure that the stream | 1271 * automatically closed, you must call [cancel] to ensure that the stream |
1280 * is properly closed. | 1272 * is properly closed. |
floitsch
2013/10/12 18:53:57
Add comment (even if it is just pointing to the St
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
1281 */ | 1273 */ |
1282 void cancel(); | 1274 Future cancel(); |
1283 } | 1275 } |
OLD | NEW |