Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1060)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebase Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 329 matching lines...) Expand 10 before | Expand all | Expand 10 after
340 bool first = true; 340 bool first = true;
341 subscription = this.listen( 341 subscription = this.listen(
342 (T element) { 342 (T element) {
343 if (!first) { 343 if (!first) {
344 buffer.write(separator); 344 buffer.write(separator);
345 } 345 }
346 first = false; 346 first = false;
347 try { 347 try {
348 buffer.write(element); 348 buffer.write(element);
349 } catch (e, s) { 349 } catch (e, s) {
350 subscription.cancel(); 350 subscription.cancel().whenComplete(() {
floitsch 2013/07/12 16:42:34 Why `whenComplete` if `cancel` is guaranteed not t
Lasse Reichstein Nielsen 2013/07/16 12:03:31 Why would that cost more?
floitsch 2013/07/16 12:47:17 My mistake.
351 result._setError(_asyncError(e, s)); 351 result._setError(_asyncError(e, s));
352 });
352 } 353 }
353 }, 354 },
354 onError: (e) { 355 onError: (e) {
355 result._setError(e); 356 result._setError(e);
356 }, 357 },
357 onDone: () { 358 onDone: () {
358 result._setValue(buffer.toString()); 359 result._setValue(buffer.toString());
359 }, 360 },
360 cancelOnError: true); 361 cancelOnError: true);
361 return result; 362 return result;
362 } 363 }
363 364
364 /** 365 /**
365 * Checks whether [needle] occurs in the elements provided by this stream. 366 * Checks whether [needle] occurs in the elements provided by this stream.
366 * 367 *
367 * Completes the [Future] when the answer is known. 368 * Completes the [Future] when the answer is known.
368 * If this stream reports an error, the [Future] will report that error. 369 * If this stream reports an error, the [Future] will report that error.
369 */ 370 */
370 Future<bool> contains(Object needle) { 371 Future<bool> contains(Object needle) {
371 _FutureImpl<bool> future = new _FutureImpl<bool>(); 372 _FutureImpl<bool> future = new _FutureImpl<bool>();
372 StreamSubscription subscription; 373 StreamSubscription subscription;
373 subscription = this.listen( 374 subscription = this.listen(
374 (T element) { 375 (T element) {
375 _runUserCode( 376 _runUserCode(
376 () => (element == needle), 377 () => (element == needle),
377 (bool isMatch) { 378 (bool isMatch) {
378 if (isMatch) { 379 if (isMatch) {
379 subscription.cancel(); 380 subscription.cancel().whenComplete(() {
380 future._setValue(true); 381 future._setValue(true);
382 });
381 } 383 }
382 }, 384 },
383 _cancelAndError(subscription, future) 385 _cancelAndError(subscription, future)
384 ); 386 );
385 }, 387 },
386 onError: future._setError, 388 onError: future._setError,
387 onDone: () { 389 onDone: () {
388 future._setValue(false); 390 future._setValue(false);
389 }, 391 },
390 cancelOnError: true); 392 cancelOnError: true);
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
425 */ 427 */
426 Future<bool> every(bool test(T element)) { 428 Future<bool> every(bool test(T element)) {
427 _FutureImpl<bool> future = new _FutureImpl<bool>(); 429 _FutureImpl<bool> future = new _FutureImpl<bool>();
428 StreamSubscription subscription; 430 StreamSubscription subscription;
429 subscription = this.listen( 431 subscription = this.listen(
430 (T element) { 432 (T element) {
431 _runUserCode( 433 _runUserCode(
432 () => test(element), 434 () => test(element),
433 (bool isMatch) { 435 (bool isMatch) {
434 if (!isMatch) { 436 if (!isMatch) {
435 subscription.cancel(); 437 subscription.cancel().whenComplete(() {
436 future._setValue(false); 438 future._setValue(false);
439 });
437 } 440 }
438 }, 441 },
439 _cancelAndError(subscription, future) 442 _cancelAndError(subscription, future)
440 ); 443 );
441 }, 444 },
442 onError: future._setError, 445 onError: future._setError,
443 onDone: () { 446 onDone: () {
444 future._setValue(true); 447 future._setValue(true);
445 }, 448 },
446 cancelOnError: true); 449 cancelOnError: true);
447 return future; 450 return future;
448 } 451 }
449 452
450 /** 453 /**
451 * Checks whether [test] accepts any element provided by this stream. 454 * Checks whether [test] accepts any element provided by this stream.
452 * 455 *
453 * Completes the [Future] when the answer is known. 456 * Completes the [Future] when the answer is known.
454 * If this stream reports an error, the [Future] will report that error. 457 * If this stream reports an error, the [Future] will report that error.
455 */ 458 */
456 Future<bool> any(bool test(T element)) { 459 Future<bool> any(bool test(T element)) {
457 _FutureImpl<bool> future = new _FutureImpl<bool>(); 460 _FutureImpl<bool> future = new _FutureImpl<bool>();
458 StreamSubscription subscription; 461 StreamSubscription subscription;
459 subscription = this.listen( 462 subscription = this.listen(
460 (T element) { 463 (T element) {
461 _runUserCode( 464 _runUserCode(
462 () => test(element), 465 () => test(element),
463 (bool isMatch) { 466 (bool isMatch) {
464 if (isMatch) { 467 if (isMatch) {
465 subscription.cancel(); 468 subscription.cancel().whenComplete(() {
466 future._setValue(true); 469 future._setValue(true);
470 });
467 } 471 }
468 }, 472 },
469 _cancelAndError(subscription, future) 473 _cancelAndError(subscription, future)
470 ); 474 );
471 }, 475 },
472 onError: future._setError, 476 onError: future._setError,
473 onDone: () { 477 onDone: () {
474 future._setValue(false); 478 future._setValue(false);
475 }, 479 },
476 cancelOnError: true); 480 cancelOnError: true);
(...skipping 14 matching lines...) Expand all
491 cancelOnError: true); 495 cancelOnError: true);
492 return future; 496 return future;
493 } 497 }
494 498
495 /** Reports whether this stream contains any elements. */ 499 /** Reports whether this stream contains any elements. */
496 Future<bool> get isEmpty { 500 Future<bool> get isEmpty {
497 _FutureImpl<bool> future = new _FutureImpl<bool>(); 501 _FutureImpl<bool> future = new _FutureImpl<bool>();
498 StreamSubscription subscription; 502 StreamSubscription subscription;
499 subscription = this.listen( 503 subscription = this.listen(
500 (_) { 504 (_) {
501 subscription.cancel(); 505 subscription.cancel().whenComplete(() {
502 future._setValue(false); 506 future._setValue(false);
507 });
503 }, 508 },
504 onError: future._setError, 509 onError: future._setError,
505 onDone: () { 510 onDone: () {
506 future._setValue(true); 511 future._setValue(true);
507 }, 512 },
508 cancelOnError: true); 513 cancelOnError: true);
509 return future; 514 return future;
510 } 515 }
511 516
512 /** Collects the data of this stream in a [List]. */ 517 /** Collects the data of this stream in a [List]. */
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
616 * Returns the first element. 621 * Returns the first element.
617 * 622 *
618 * If [this] is empty throws a [StateError]. Otherwise this method is 623 * If [this] is empty throws a [StateError]. Otherwise this method is
619 * equivalent to [:this.elementAt(0):] 624 * equivalent to [:this.elementAt(0):]
620 */ 625 */
621 Future<T> get first { 626 Future<T> get first {
622 _FutureImpl<T> future = new _FutureImpl<T>(); 627 _FutureImpl<T> future = new _FutureImpl<T>();
623 StreamSubscription subscription; 628 StreamSubscription subscription;
624 subscription = this.listen( 629 subscription = this.listen(
625 (T value) { 630 (T value) {
626 subscription.cancel(); 631 subscription.cancel().whenComplete(() {
627 future._setValue(value); 632 future._setValue(value);
628 return; 633 });
629 }, 634 },
630 onError: future._setError, 635 onError: future._setError,
631 onDone: () { 636 onDone: () {
632 future._setError(new StateError("No elements")); 637 future._setError(new StateError("No elements"));
633 }, 638 },
634 cancelOnError: true); 639 cancelOnError: true);
635 return future; 640 return future;
636 } 641 }
637 642
638 /** 643 /**
(...skipping 29 matching lines...) Expand all
668 * If [this] is empty or has more than one element throws a [StateError]. 673 * If [this] is empty or has more than one element throws a [StateError].
669 */ 674 */
670 Future<T> get single { 675 Future<T> get single {
671 _FutureImpl<T> future = new _FutureImpl<T>(); 676 _FutureImpl<T> future = new _FutureImpl<T>();
672 T result = null; 677 T result = null;
673 bool foundResult = false; 678 bool foundResult = false;
674 StreamSubscription subscription; 679 StreamSubscription subscription;
675 subscription = this.listen( 680 subscription = this.listen(
676 (T value) { 681 (T value) {
677 if (foundResult) { 682 if (foundResult) {
678 subscription.cancel(); 683 subscription.cancel().whenComplete(() {
679 // This is the second element we get. 684 // This is the second element we get.
680 Error error = new StateError("More than one element"); 685 Error error = new StateError("More than one element");
681 future._setError(error); 686 future._setError(error);
687 });
682 return; 688 return;
683 } 689 }
684 foundResult = true; 690 foundResult = true;
685 result = value; 691 result = value;
686 }, 692 },
687 onError: future._setError, 693 onError: future._setError,
688 onDone: () { 694 onDone: () {
689 if (foundResult) { 695 if (foundResult) {
690 future._setValue(result); 696 future._setValue(result);
691 return; 697 return;
(...skipping 20 matching lines...) Expand all
712 */ 718 */
713 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { 719 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) {
714 _FutureImpl<dynamic> future = new _FutureImpl(); 720 _FutureImpl<dynamic> future = new _FutureImpl();
715 StreamSubscription subscription; 721 StreamSubscription subscription;
716 subscription = this.listen( 722 subscription = this.listen(
717 (T value) { 723 (T value) {
718 _runUserCode( 724 _runUserCode(
719 () => test(value), 725 () => test(value),
720 (bool isMatch) { 726 (bool isMatch) {
721 if (isMatch) { 727 if (isMatch) {
722 subscription.cancel(); 728 subscription.cancel().whenComplete(() {
723 future._setValue(value); 729 future._setValue(value);
730 });
724 } 731 }
725 }, 732 },
726 _cancelAndError(subscription, future) 733 _cancelAndError(subscription, future)
727 ); 734 );
728 }, 735 },
729 onError: future._setError, 736 onError: future._setError,
730 onDone: () { 737 onDone: () {
731 if (defaultValue != null) { 738 if (defaultValue != null) {
732 _runUserCode(defaultValue, future._setValue, future._setError); 739 _runUserCode(defaultValue, future._setValue, future._setError);
733 return; 740 return;
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
790 T result = null; 797 T result = null;
791 bool foundResult = false; 798 bool foundResult = false;
792 StreamSubscription subscription; 799 StreamSubscription subscription;
793 subscription = this.listen( 800 subscription = this.listen(
794 (T value) { 801 (T value) {
795 _runUserCode( 802 _runUserCode(
796 () => true == test(value), 803 () => true == test(value),
797 (bool isMatch) { 804 (bool isMatch) {
798 if (isMatch) { 805 if (isMatch) {
799 if (foundResult) { 806 if (foundResult) {
800 subscription.cancel(); 807 subscription.cancel().whenComplete(() {
801 future._setError( 808 future._setError(
802 new StateError('Multiple matches for "single"')); 809 new StateError('Multiple matches for "single"'));
810 });
803 return; 811 return;
804 } 812 }
805 foundResult = true; 813 foundResult = true;
806 result = value; 814 result = value;
807 } 815 }
808 }, 816 },
809 _cancelAndError(subscription, future) 817 _cancelAndError(subscription, future)
810 ); 818 );
811 }, 819 },
812 onError: future._setError, 820 onError: future._setError,
(...skipping 16 matching lines...) Expand all
829 * If this stream provides fewer than [index] elements before closing, 837 * If this stream provides fewer than [index] elements before closing,
830 * an error is reported. 838 * an error is reported.
831 */ 839 */
832 Future<T> elementAt(int index) { 840 Future<T> elementAt(int index) {
833 if (index is! int || index < 0) throw new ArgumentError(index); 841 if (index is! int || index < 0) throw new ArgumentError(index);
834 _FutureImpl<T> future = new _FutureImpl<T>(); 842 _FutureImpl<T> future = new _FutureImpl<T>();
835 StreamSubscription subscription; 843 StreamSubscription subscription;
836 subscription = this.listen( 844 subscription = this.listen(
837 (T value) { 845 (T value) {
838 if (index == 0) { 846 if (index == 0) {
839 subscription.cancel(); 847 subscription.cancel().whenComplete(() {
840 future._setValue(value); 848 future._setValue(value);
849 });
841 return; 850 return;
842 } 851 }
843 index -= 1; 852 index -= 1;
844 }, 853 },
845 onError: future._setError, 854 onError: future._setError,
846 onDone: () { 855 onDone: () {
847 future._setError(new StateError("Not enough elements for elementAt")); 856 future._setError(new StateError("Not enough elements for elementAt"));
848 }, 857 },
849 cancelOnError: true); 858 cancelOnError: true);
850 return future; 859 return future;
851 } 860 }
852 } 861 }
853 862
854 /** 863 /**
855 * A control object for the subscription on a [Stream]. 864 * A control object for the subscription on a [Stream].
856 * 865 *
857 * When you subscribe on a [Stream] using [Stream.listen], 866 * When you subscribe on a [Stream] using [Stream.listen],
858 * a [StreamSubscription] object is returned. This object 867 * a [StreamSubscription] object is returned. This object
859 * is used to later unsubscribe again, or to temporarily pause 868 * is used to later unsubscribe again, or to temporarily pause
860 * the stream's events. 869 * the stream's events.
861 */ 870 */
862 abstract class StreamSubscription<T> { 871 abstract class StreamSubscription<T> {
863 /** 872 /**
864 * Cancels this subscription. It will no longer receive events. 873 * Cancels this subscription. It will no longer receive events.
865 * 874 *
866 * If an event is currently firing, this unsubscription will only 875 * If an event is currently firing, this unsubscription will only
867 * take effect after all subscribers have received the current event. 876 * take effect after all subscribers have received the current event.
877 *
878 * In case the cancel is asynchronous, the returned [Future] can be used to
floitsch 2013/07/12 16:42:34 Returns a future that is completed when the cancel
Lasse Reichstein Nielsen 2013/07/17 07:28:46 Agree, if it always returns a future, then there i
879 * wait for the operation to complete. If in doubt, wait for it.
868 */ 880 */
869 void cancel(); 881 Future cancel();
870 882
871 /** Set or override the data event handler of this subscription. */ 883 /** Set or override the data event handler of this subscription. */
872 void onData(void handleData(T data)); 884 void onData(void handleData(T data));
873 885
874 /** Set or override the error event handler of this subscription. */ 886 /** Set or override the error event handler of this subscription. */
875 void onError(void handleError(error)); 887 void onError(void handleError(error));
876 888
877 /** Set or override the done event handler of this subscription. */ 889 /** Set or override the done event handler of this subscription. */
878 void onDone(void handleDone()); 890 void onDone(void handleDone());
879 891
(...skipping 377 matching lines...) Expand 10 before | Expand all | Expand 10 after
1257 * The stream iterator is automatically canceled if the [moveNext] future 1269 * The stream iterator is automatically canceled if the [moveNext] future
1258 * completes with either `false` or an error. 1270 * completes with either `false` or an error.
1259 * 1271 *
1260 * If a [moveNext] call has been made, it will complete with `false` as value, 1272 * If a [moveNext] call has been made, it will complete with `false` as value,
1261 * as will all further calls to [moveNext]. 1273 * as will all further calls to [moveNext].
1262 * 1274 *
1263 * If you need to stop listening for values before the stream iterator is 1275 * If you need to stop listening for values before the stream iterator is
1264 * automatically closed, you must call [cancel] to ensure that the stream 1276 * automatically closed, you must call [cancel] to ensure that the stream
1265 * is properly closed. 1277 * is properly closed.
1266 */ 1278 */
1267 void cancel(); 1279 Future cancel();
1268 } 1280 }
OLDNEW
« no previous file with comments | « no previous file | sdk/lib/async/stream_controller.dart » ('j') | sdk/lib/async/stream_controller.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698