Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(305)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Mark failing tests. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/_internal/pub/pub.status ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 318 matching lines...) Expand 10 before | Expand all | Expand 10 after
329 Future<T> reduce(T combine(T previous, T element)) { 329 Future<T> reduce(T combine(T previous, T element)) {
330 _Future<T> result = new _Future<T>(); 330 _Future<T> result = new _Future<T>();
331 bool seenFirst = false; 331 bool seenFirst = false;
332 T value; 332 T value;
333 StreamSubscription subscription; 333 StreamSubscription subscription;
334 subscription = this.listen( 334 subscription = this.listen(
335 (T element) { 335 (T element) {
336 if (seenFirst) { 336 if (seenFirst) {
337 _runUserCode(() => combine(value, element), 337 _runUserCode(() => combine(value, element),
338 (T newValue) { value = newValue; }, 338 (T newValue) { value = newValue; },
339 _cancelAndError(subscription, result)); 339 _cancelAndErrorClosure(subscription, result));
340 } else { 340 } else {
341 value = element; 341 value = element;
342 seenFirst = true; 342 seenFirst = true;
343 } 343 }
344 }, 344 },
345 onError: result._completeError, 345 onError: result._completeError,
346 onDone: () { 346 onDone: () {
347 if (!seenFirst) { 347 if (!seenFirst) {
348 result._completeError(new StateError("No elements")); 348 result._completeError(new StateError("No elements"));
349 } else { 349 } else {
350 result._complete(value); 350 result._complete(value);
351 } 351 }
352 }, 352 },
353 cancelOnError: true 353 cancelOnError: true
354 ); 354 );
355 return result; 355 return result;
356 } 356 }
357 357
358 /** Reduces a sequence of values by repeatedly applying [combine]. */ 358 /** Reduces a sequence of values by repeatedly applying [combine]. */
359 Future fold(var initialValue, combine(var previous, T element)) { 359 Future fold(var initialValue, combine(var previous, T element)) {
360 _Future result = new _Future(); 360 _Future result = new _Future();
361 var value = initialValue; 361 var value = initialValue;
362 StreamSubscription subscription; 362 StreamSubscription subscription;
363 subscription = this.listen( 363 subscription = this.listen(
364 (T element) { 364 (T element) {
365 _runUserCode( 365 _runUserCode(
366 () => combine(value, element), 366 () => combine(value, element),
367 (newValue) { value = newValue; }, 367 (newValue) { value = newValue; },
368 _cancelAndError(subscription, result) 368 _cancelAndErrorClosure(subscription, result)
369 ); 369 );
370 }, 370 },
371 onError: (e, st) { 371 onError: (e, st) {
372 result._completeError(e, st); 372 result._completeError(e, st);
373 }, 373 },
374 onDone: () { 374 onDone: () {
375 result._complete(value); 375 result._complete(value);
376 }, 376 },
377 cancelOnError: true); 377 cancelOnError: true);
378 return result; 378 return result;
(...skipping 16 matching lines...) Expand all
395 bool first = true; 395 bool first = true;
396 subscription = this.listen( 396 subscription = this.listen(
397 (T element) { 397 (T element) {
398 if (!first) { 398 if (!first) {
399 buffer.write(separator); 399 buffer.write(separator);
400 } 400 }
401 first = false; 401 first = false;
402 try { 402 try {
403 buffer.write(element); 403 buffer.write(element);
404 } catch (e, s) { 404 } catch (e, s) {
405 subscription.cancel(); 405 _cancelAndError(subscription, result, _asyncError(e, s), s);
406 result._completeError(_asyncError(e, s));
407 } 406 }
408 }, 407 },
409 onError: (e) { 408 onError: (e) {
410 result._completeError(e); 409 result._completeError(e);
411 }, 410 },
412 onDone: () { 411 onDone: () {
413 result._complete(buffer.toString()); 412 result._complete(buffer.toString());
414 }, 413 },
415 cancelOnError: true); 414 cancelOnError: true);
416 return result; 415 return result;
417 } 416 }
418 417
419 /** 418 /**
420 * Checks whether [needle] occurs in the elements provided by this stream. 419 * Checks whether [needle] occurs in the elements provided by this stream.
421 * 420 *
422 * Completes the [Future] when the answer is known. 421 * Completes the [Future] when the answer is known.
423 * If this stream reports an error, the [Future] will report that error. 422 * If this stream reports an error, the [Future] will report that error.
424 */ 423 */
425 Future<bool> contains(Object needle) { 424 Future<bool> contains(Object needle) {
426 _Future<bool> future = new _Future<bool>(); 425 _Future<bool> future = new _Future<bool>();
427 StreamSubscription subscription; 426 StreamSubscription subscription;
428 subscription = this.listen( 427 subscription = this.listen(
429 (T element) { 428 (T element) {
430 _runUserCode( 429 _runUserCode(
431 () => (element == needle), 430 () => (element == needle),
432 (bool isMatch) { 431 (bool isMatch) {
433 if (isMatch) { 432 if (isMatch) {
434 subscription.cancel(); 433 _cancelAndValue(subscription, future, true);
435 future._complete(true);
436 } 434 }
437 }, 435 },
438 _cancelAndError(subscription, future) 436 _cancelAndErrorClosure(subscription, future)
439 ); 437 );
440 }, 438 },
441 onError: future._completeError, 439 onError: future._completeError,
442 onDone: () { 440 onDone: () {
443 future._complete(false); 441 future._complete(false);
444 }, 442 },
445 cancelOnError: true); 443 cancelOnError: true);
446 return future; 444 return future;
447 } 445 }
448 446
449 /** 447 /**
450 * Executes [action] on each data event of the stream. 448 * Executes [action] on each data event of the stream.
451 * 449 *
452 * Completes the returned [Future] when all events of the stream 450 * Completes the returned [Future] when all events of the stream
453 * have been processed. Completes the future with an error if the 451 * have been processed. Completes the future with an error if the
454 * stream has an error event, or if [action] throws. 452 * stream has an error event, or if [action] throws.
455 */ 453 */
456 Future forEach(void action(T element)) { 454 Future forEach(void action(T element)) {
457 _Future future = new _Future(); 455 _Future future = new _Future();
458 StreamSubscription subscription; 456 StreamSubscription subscription;
459 subscription = this.listen( 457 subscription = this.listen(
460 (T element) { 458 (T element) {
461 _runUserCode( 459 _runUserCode(
462 () => action(element), 460 () => action(element),
463 (_) {}, 461 (_) {},
464 _cancelAndError(subscription, future) 462 _cancelAndErrorClosure(subscription, future)
465 ); 463 );
466 }, 464 },
467 onError: future._completeError, 465 onError: future._completeError,
468 onDone: () { 466 onDone: () {
469 future._complete(null); 467 future._complete(null);
470 }, 468 },
471 cancelOnError: true); 469 cancelOnError: true);
472 return future; 470 return future;
473 } 471 }
474 472
475 /** 473 /**
476 * Checks whether [test] accepts all elements provided by this stream. 474 * Checks whether [test] accepts all elements provided by this stream.
477 * 475 *
478 * Completes the [Future] when the answer is known. 476 * Completes the [Future] when the answer is known.
479 * If this stream reports an error, the [Future] will report that error. 477 * If this stream reports an error, the [Future] will report that error.
480 */ 478 */
481 Future<bool> every(bool test(T element)) { 479 Future<bool> every(bool test(T element)) {
482 _Future<bool> future = new _Future<bool>(); 480 _Future<bool> future = new _Future<bool>();
483 StreamSubscription subscription; 481 StreamSubscription subscription;
484 subscription = this.listen( 482 subscription = this.listen(
485 (T element) { 483 (T element) {
486 _runUserCode( 484 _runUserCode(
487 () => test(element), 485 () => test(element),
488 (bool isMatch) { 486 (bool isMatch) {
489 if (!isMatch) { 487 if (!isMatch) {
490 subscription.cancel(); 488 _cancelAndValue(subscription, future, false);
491 future._complete(false);
492 } 489 }
493 }, 490 },
494 _cancelAndError(subscription, future) 491 _cancelAndErrorClosure(subscription, future)
495 ); 492 );
496 }, 493 },
497 onError: future._completeError, 494 onError: future._completeError,
498 onDone: () { 495 onDone: () {
499 future._complete(true); 496 future._complete(true);
500 }, 497 },
501 cancelOnError: true); 498 cancelOnError: true);
502 return future; 499 return future;
503 } 500 }
504 501
505 /** 502 /**
506 * Checks whether [test] accepts any element provided by this stream. 503 * Checks whether [test] accepts any element provided by this stream.
507 * 504 *
508 * Completes the [Future] when the answer is known. 505 * Completes the [Future] when the answer is known.
509 * If this stream reports an error, the [Future] will report that error. 506 * If this stream reports an error, the [Future] will report that error.
510 */ 507 */
511 Future<bool> any(bool test(T element)) { 508 Future<bool> any(bool test(T element)) {
512 _Future<bool> future = new _Future<bool>(); 509 _Future<bool> future = new _Future<bool>();
513 StreamSubscription subscription; 510 StreamSubscription subscription;
514 subscription = this.listen( 511 subscription = this.listen(
515 (T element) { 512 (T element) {
516 _runUserCode( 513 _runUserCode(
517 () => test(element), 514 () => test(element),
518 (bool isMatch) { 515 (bool isMatch) {
519 if (isMatch) { 516 if (isMatch) {
520 subscription.cancel(); 517 _cancelAndValue(subscription, future, true);
521 future._complete(true);
522 } 518 }
523 }, 519 },
524 _cancelAndError(subscription, future) 520 _cancelAndErrorClosure(subscription, future)
525 ); 521 );
526 }, 522 },
527 onError: future._completeError, 523 onError: future._completeError,
528 onDone: () { 524 onDone: () {
529 future._complete(false); 525 future._complete(false);
530 }, 526 },
531 cancelOnError: true); 527 cancelOnError: true);
532 return future; 528 return future;
533 } 529 }
534 530
(...skipping 11 matching lines...) Expand all
546 cancelOnError: true); 542 cancelOnError: true);
547 return future; 543 return future;
548 } 544 }
549 545
550 /** Reports whether this stream contains any elements. */ 546 /** Reports whether this stream contains any elements. */
551 Future<bool> get isEmpty { 547 Future<bool> get isEmpty {
552 _Future<bool> future = new _Future<bool>(); 548 _Future<bool> future = new _Future<bool>();
553 StreamSubscription subscription; 549 StreamSubscription subscription;
554 subscription = this.listen( 550 subscription = this.listen(
555 (_) { 551 (_) {
556 subscription.cancel(); 552 _cancelAndValue(subscription, future, false);
557 future._complete(false);
558 }, 553 },
559 onError: future._completeError, 554 onError: future._completeError,
560 onDone: () { 555 onDone: () {
561 future._complete(true); 556 future._complete(true);
562 }, 557 },
563 cancelOnError: true); 558 cancelOnError: true);
564 return future; 559 return future;
565 } 560 }
566 561
567 /** Collects the data of this stream in a [List]. */ 562 /** Collects the data of this stream in a [List]. */
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
679 * the resulting future completes with a [StateError]. 674 * the resulting future completes with a [StateError].
680 * 675 *
681 * Except for the type of the error, this method is equivalent to 676 * Except for the type of the error, this method is equivalent to
682 * [:this.elementAt(0):]. 677 * [:this.elementAt(0):].
683 */ 678 */
684 Future<T> get first { 679 Future<T> get first {
685 _Future<T> future = new _Future<T>(); 680 _Future<T> future = new _Future<T>();
686 StreamSubscription subscription; 681 StreamSubscription subscription;
687 subscription = this.listen( 682 subscription = this.listen(
688 (T value) { 683 (T value) {
689 subscription.cancel(); 684 _cancelAndValue(subscription, future, value);
690 future._complete(value);
691 return;
692 }, 685 },
693 onError: future._completeError, 686 onError: future._completeError,
694 onDone: () { 687 onDone: () {
695 future._completeError(new StateError("No elements")); 688 future._completeError(new StateError("No elements"));
696 }, 689 },
697 cancelOnError: true); 690 cancelOnError: true);
698 return future; 691 return future;
699 } 692 }
700 693
701 /** 694 /**
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
735 * If [this] is empty or has more than one element throws a [StateError]. 728 * If [this] is empty or has more than one element throws a [StateError].
736 */ 729 */
737 Future<T> get single { 730 Future<T> get single {
738 _Future<T> future = new _Future<T>(); 731 _Future<T> future = new _Future<T>();
739 T result = null; 732 T result = null;
740 bool foundResult = false; 733 bool foundResult = false;
741 StreamSubscription subscription; 734 StreamSubscription subscription;
742 subscription = this.listen( 735 subscription = this.listen(
743 (T value) { 736 (T value) {
744 if (foundResult) { 737 if (foundResult) {
745 subscription.cancel();
746 // This is the second element we get. 738 // This is the second element we get.
747 Error error = new StateError("More than one element"); 739 Error error = new StateError("More than one element");
748 future._completeError(error); 740 _cancelAndError(subscription, future, error, null);
749 return; 741 return;
750 } 742 }
751 foundResult = true; 743 foundResult = true;
752 result = value; 744 result = value;
753 }, 745 },
754 onError: future._completeError, 746 onError: future._completeError,
755 onDone: () { 747 onDone: () {
756 if (foundResult) { 748 if (foundResult) {
757 future._complete(result); 749 future._complete(result);
758 return; 750 return;
(...skipping 20 matching lines...) Expand all
779 */ 771 */
780 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { 772 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) {
781 _Future<dynamic> future = new _Future(); 773 _Future<dynamic> future = new _Future();
782 StreamSubscription subscription; 774 StreamSubscription subscription;
783 subscription = this.listen( 775 subscription = this.listen(
784 (T value) { 776 (T value) {
785 _runUserCode( 777 _runUserCode(
786 () => test(value), 778 () => test(value),
787 (bool isMatch) { 779 (bool isMatch) {
788 if (isMatch) { 780 if (isMatch) {
789 subscription.cancel(); 781 _cancelAndValue(subscription, future, value);
790 future._complete(value);
791 } 782 }
792 }, 783 },
793 _cancelAndError(subscription, future) 784 _cancelAndErrorClosure(subscription, future)
794 ); 785 );
795 }, 786 },
796 onError: future._completeError, 787 onError: future._completeError,
797 onDone: () { 788 onDone: () {
798 if (defaultValue != null) { 789 if (defaultValue != null) {
799 _runUserCode(defaultValue, future._complete, future._completeError); 790 _runUserCode(defaultValue, future._complete, future._completeError);
800 return; 791 return;
801 } 792 }
802 future._completeError(new StateError("firstMatch ended without match")); 793 future._completeError(new StateError("firstMatch ended without match"));
803 }, 794 },
(...skipping 16 matching lines...) Expand all
820 subscription = this.listen( 811 subscription = this.listen(
821 (T value) { 812 (T value) {
822 _runUserCode( 813 _runUserCode(
823 () => true == test(value), 814 () => true == test(value),
824 (bool isMatch) { 815 (bool isMatch) {
825 if (isMatch) { 816 if (isMatch) {
826 foundResult = true; 817 foundResult = true;
827 result = value; 818 result = value;
828 } 819 }
829 }, 820 },
830 _cancelAndError(subscription, future) 821 _cancelAndErrorClosure(subscription, future)
831 ); 822 );
832 }, 823 },
833 onError: future._completeError, 824 onError: future._completeError,
834 onDone: () { 825 onDone: () {
835 if (foundResult) { 826 if (foundResult) {
836 future._complete(result); 827 future._complete(result);
837 return; 828 return;
838 } 829 }
839 if (defaultValue != null) { 830 if (defaultValue != null) {
840 _runUserCode(defaultValue, future._complete, future._completeError); 831 _runUserCode(defaultValue, future._complete, future._completeError);
(...skipping 16 matching lines...) Expand all
857 T result = null; 848 T result = null;
858 bool foundResult = false; 849 bool foundResult = false;
859 StreamSubscription subscription; 850 StreamSubscription subscription;
860 subscription = this.listen( 851 subscription = this.listen(
861 (T value) { 852 (T value) {
862 _runUserCode( 853 _runUserCode(
863 () => true == test(value), 854 () => true == test(value),
864 (bool isMatch) { 855 (bool isMatch) {
865 if (isMatch) { 856 if (isMatch) {
866 if (foundResult) { 857 if (foundResult) {
867 subscription.cancel(); 858 _cancelAndError(
868 future._completeError( 859 subscription,
869 new StateError('Multiple matches for "single"')); 860 future,
861 new StateError('Multiple matches for "single"'),
862 null);
870 return; 863 return;
871 } 864 }
872 foundResult = true; 865 foundResult = true;
873 result = value; 866 result = value;
874 } 867 }
875 }, 868 },
876 _cancelAndError(subscription, future) 869 _cancelAndErrorClosure(subscription, future)
877 ); 870 );
878 }, 871 },
879 onError: future._completeError, 872 onError: future._completeError,
880 onDone: () { 873 onDone: () {
881 if (foundResult) { 874 if (foundResult) {
882 future._complete(result); 875 future._complete(result);
883 return; 876 return;
884 } 877 }
885 future._completeError(new StateError("single ended without match")); 878 future._completeError(new StateError("single ended without match"));
886 }, 879 },
(...skipping 12 matching lines...) Expand all
899 * If a done event occurs before the value is found, the future completes 892 * If a done event occurs before the value is found, the future completes
900 * with a [RangeError]. 893 * with a [RangeError].
901 */ 894 */
902 Future<T> elementAt(int index) { 895 Future<T> elementAt(int index) {
903 if (index is! int || index < 0) throw new ArgumentError(index); 896 if (index is! int || index < 0) throw new ArgumentError(index);
904 _Future<T> future = new _Future<T>(); 897 _Future<T> future = new _Future<T>();
905 StreamSubscription subscription; 898 StreamSubscription subscription;
906 subscription = this.listen( 899 subscription = this.listen(
907 (T value) { 900 (T value) {
908 if (index == 0) { 901 if (index == 0) {
909 subscription.cancel(); 902 _cancelAndValue(subscription, future, value);
910 future._complete(value);
911 return; 903 return;
912 } 904 }
913 index -= 1; 905 index -= 1;
914 }, 906 },
915 onError: future._completeError, 907 onError: future._completeError,
916 onDone: () { 908 onDone: () {
917 future._completeError(new RangeError.value(index)); 909 future._completeError(new RangeError.value(index));
918 }, 910 },
919 cancelOnError: true); 911 cancelOnError: true);
920 return future; 912 return future;
921 } 913 }
922 } 914 }
923 915
924 /** 916 /**
925 * A control object for the subscription on a [Stream]. 917 * A control object for the subscription on a [Stream].
926 * 918 *
927 * When you subscribe on a [Stream] using [Stream.listen], 919 * When you subscribe on a [Stream] using [Stream.listen],
928 * a [StreamSubscription] object is returned. This object 920 * a [StreamSubscription] object is returned. This object
929 * is used to later unsubscribe again, or to temporarily pause 921 * is used to later unsubscribe again, or to temporarily pause
930 * the stream's events. 922 * the stream's events.
931 */ 923 */
932 abstract class StreamSubscription<T> { 924 abstract class StreamSubscription<T> {
933 /** 925 /**
934 * Cancels this subscription. It will no longer receive events. 926 * Cancels this subscription. It will no longer receive events.
935 * 927 *
936 * If an event is currently firing, this unsubscription will only 928 * If an event is currently firing, this unsubscription will only
937 * take effect after all subscribers have received the current event. 929 * take effect after all subscribers have received the current event.
930 *
931 * Returns a future if the cancel-operation is not completed synchronously.
932 * Otherwise returns `null`.
938 */ 933 */
939 void cancel(); 934 Future cancel();
940 935
941 /** Set or override the data event handler of this subscription. */ 936 /** Set or override the data event handler of this subscription. */
942 void onData(void handleData(T data)); 937 void onData(void handleData(T data));
943 938
944 /** 939 /**
945 * Set or override the error event handler of this subscription. 940 * Set or override the error event handler of this subscription.
946 * 941 *
947 * This method overrides the handler that has been set at the invocation of 942 * This method overrides the handler that has been set at the invocation of
948 * [Stream.listen]. 943 * [Stream.listen].
949 */ 944 */
(...skipping 263 matching lines...) Expand 10 before | Expand all | Expand 10 after
1213 * 1208 *
1214 * The stream iterator is automatically canceled if the [moveNext] future 1209 * The stream iterator is automatically canceled if the [moveNext] future
1215 * completes with either `false` or an error. 1210 * completes with either `false` or an error.
1216 * 1211 *
1217 * If a [moveNext] call has been made, it will complete with `false` as value, 1212 * If a [moveNext] call has been made, it will complete with `false` as value,
1218 * as will all further calls to [moveNext]. 1213 * as will all further calls to [moveNext].
1219 * 1214 *
1220 * If you need to stop listening for values before the stream iterator is 1215 * If you need to stop listening for values before the stream iterator is
1221 * automatically closed, you must call [cancel] to ensure that the stream 1216 * automatically closed, you must call [cancel] to ensure that the stream
1222 * is properly closed. 1217 * is properly closed.
1218 *
1219 * Returns a future if the cancel-operation is not completed synchronously.
1220 * Otherwise returns `null`.
1223 */ 1221 */
1224 void cancel(); 1222 Future cancel();
1225 } 1223 }
OLDNEW
« no previous file with comments | « sdk/lib/_internal/pub/pub.status ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698