Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(65)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove dir stuff. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 329 matching lines...) Expand 10 before | Expand all | Expand 10 after
340 bool first = true; 340 bool first = true;
341 subscription = this.listen( 341 subscription = this.listen(
342 (T element) { 342 (T element) {
343 if (!first) { 343 if (!first) {
344 buffer.write(separator); 344 buffer.write(separator);
345 } 345 }
346 first = false; 346 first = false;
347 try { 347 try {
348 buffer.write(element); 348 buffer.write(element);
349 } catch (e, s) { 349 } catch (e, s) {
350 subscription.cancel(); 350 _cancelAndError(subscription, result)(_asyncError(e, s));
floitsch 2013/10/12 18:53:57 It seems wrong to create a closure and then invoke
Lasse Reichstein Nielsen 2013/10/14 11:32:33 Agree. If it is because _cancelAndError(subscripti
Anders Johnsen 2013/10/16 11:52:21 Done.
Anders Johnsen 2013/10/16 11:52:21 Done.
351 result._completeError(_asyncError(e, s));
352 } 351 }
353 }, 352 },
354 onError: (e) { 353 onError: (e) {
355 result._completeError(e); 354 result._completeError(e);
356 }, 355 },
357 onDone: () { 356 onDone: () {
358 result._complete(buffer.toString()); 357 result._complete(buffer.toString());
359 }, 358 },
360 cancelOnError: true); 359 cancelOnError: true);
361 return result; 360 return result;
362 } 361 }
363 362
364 /** 363 /**
365 * Checks whether [needle] occurs in the elements provided by this stream. 364 * Checks whether [needle] occurs in the elements provided by this stream.
366 * 365 *
367 * Completes the [Future] when the answer is known. 366 * Completes the [Future] when the answer is known.
368 * If this stream reports an error, the [Future] will report that error. 367 * If this stream reports an error, the [Future] will report that error.
369 */ 368 */
370 Future<bool> contains(Object needle) { 369 Future<bool> contains(Object needle) {
371 _Future<bool> future = new _Future<bool>(); 370 _Future<bool> future = new _Future<bool>();
372 StreamSubscription subscription; 371 StreamSubscription subscription;
373 subscription = this.listen( 372 subscription = this.listen(
374 (T element) { 373 (T element) {
375 _runUserCode( 374 _runUserCode(
376 () => (element == needle), 375 () => (element == needle),
377 (bool isMatch) { 376 (bool isMatch) {
378 if (isMatch) { 377 if (isMatch) {
379 subscription.cancel(); 378 _cancelAndValue(subscription, future)(true);
380 future._complete(true);
381 } 379 }
382 }, 380 },
383 _cancelAndError(subscription, future) 381 _cancelAndError(subscription, future)
384 ); 382 );
385 }, 383 },
386 onError: future._completeError, 384 onError: future._completeError,
387 onDone: () { 385 onDone: () {
388 future._complete(false); 386 future._complete(false);
389 }, 387 },
390 cancelOnError: true); 388 cancelOnError: true);
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
425 */ 423 */
426 Future<bool> every(bool test(T element)) { 424 Future<bool> every(bool test(T element)) {
427 _Future<bool> future = new _Future<bool>(); 425 _Future<bool> future = new _Future<bool>();
428 StreamSubscription subscription; 426 StreamSubscription subscription;
429 subscription = this.listen( 427 subscription = this.listen(
430 (T element) { 428 (T element) {
431 _runUserCode( 429 _runUserCode(
432 () => test(element), 430 () => test(element),
433 (bool isMatch) { 431 (bool isMatch) {
434 if (!isMatch) { 432 if (!isMatch) {
435 subscription.cancel(); 433 _cancelAndValue(subscription, future)(false);
436 future._complete(false);
437 } 434 }
438 }, 435 },
439 _cancelAndError(subscription, future) 436 _cancelAndError(subscription, future)
440 ); 437 );
441 }, 438 },
442 onError: future._completeError, 439 onError: future._completeError,
443 onDone: () { 440 onDone: () {
444 future._complete(true); 441 future._complete(true);
445 }, 442 },
446 cancelOnError: true); 443 cancelOnError: true);
447 return future; 444 return future;
448 } 445 }
449 446
450 /** 447 /**
451 * Checks whether [test] accepts any element provided by this stream. 448 * Checks whether [test] accepts any element provided by this stream.
452 * 449 *
453 * Completes the [Future] when the answer is known. 450 * Completes the [Future] when the answer is known.
454 * If this stream reports an error, the [Future] will report that error. 451 * If this stream reports an error, the [Future] will report that error.
455 */ 452 */
456 Future<bool> any(bool test(T element)) { 453 Future<bool> any(bool test(T element)) {
457 _Future<bool> future = new _Future<bool>(); 454 _Future<bool> future = new _Future<bool>();
458 StreamSubscription subscription; 455 StreamSubscription subscription;
459 subscription = this.listen( 456 subscription = this.listen(
460 (T element) { 457 (T element) {
461 _runUserCode( 458 _runUserCode(
462 () => test(element), 459 () => test(element),
463 (bool isMatch) { 460 (bool isMatch) {
464 if (isMatch) { 461 if (isMatch) {
465 subscription.cancel(); 462 _cancelAndValue(subscription, future)(true);
466 future._complete(true);
467 } 463 }
468 }, 464 },
469 _cancelAndError(subscription, future) 465 _cancelAndError(subscription, future)
470 ); 466 );
471 }, 467 },
472 onError: future._completeError, 468 onError: future._completeError,
473 onDone: () { 469 onDone: () {
474 future._complete(false); 470 future._complete(false);
475 }, 471 },
476 cancelOnError: true); 472 cancelOnError: true);
(...skipping 14 matching lines...) Expand all
491 cancelOnError: true); 487 cancelOnError: true);
492 return future; 488 return future;
493 } 489 }
494 490
495 /** Reports whether this stream contains any elements. */ 491 /** Reports whether this stream contains any elements. */
496 Future<bool> get isEmpty { 492 Future<bool> get isEmpty {
497 _Future<bool> future = new _Future<bool>(); 493 _Future<bool> future = new _Future<bool>();
498 StreamSubscription subscription; 494 StreamSubscription subscription;
499 subscription = this.listen( 495 subscription = this.listen(
500 (_) { 496 (_) {
501 subscription.cancel(); 497 _cancelAndValue(subscription, future)(false);
502 future._complete(false);
503 }, 498 },
504 onError: future._completeError, 499 onError: future._completeError,
505 onDone: () { 500 onDone: () {
506 future._complete(true); 501 future._complete(true);
507 }, 502 },
508 cancelOnError: true); 503 cancelOnError: true);
509 return future; 504 return future;
510 } 505 }
511 506
512 /** Collects the data of this stream in a [List]. */ 507 /** Collects the data of this stream in a [List]. */
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
624 * the resulting future completes with a [StateError]. 619 * the resulting future completes with a [StateError].
625 * 620 *
626 * Except for the type of the error, this method is equivalent to 621 * Except for the type of the error, this method is equivalent to
627 * [:this.elementAt(0):]. 622 * [:this.elementAt(0):].
628 */ 623 */
629 Future<T> get first { 624 Future<T> get first {
630 _Future<T> future = new _Future<T>(); 625 _Future<T> future = new _Future<T>();
631 StreamSubscription subscription; 626 StreamSubscription subscription;
632 subscription = this.listen( 627 subscription = this.listen(
633 (T value) { 628 (T value) {
634 subscription.cancel(); 629 _cancelAndValue(subscription, future)(value);
635 future._complete(value);
636 return;
637 }, 630 },
638 onError: future._completeError, 631 onError: future._completeError,
639 onDone: () { 632 onDone: () {
640 future._completeError(new StateError("No elements")); 633 future._completeError(new StateError("No elements"));
641 }, 634 },
642 cancelOnError: true); 635 cancelOnError: true);
643 return future; 636 return future;
644 } 637 }
645 638
646 /** 639 /**
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
680 * If [this] is empty or has more than one element throws a [StateError]. 673 * If [this] is empty or has more than one element throws a [StateError].
681 */ 674 */
682 Future<T> get single { 675 Future<T> get single {
683 _Future<T> future = new _Future<T>(); 676 _Future<T> future = new _Future<T>();
684 T result = null; 677 T result = null;
685 bool foundResult = false; 678 bool foundResult = false;
686 StreamSubscription subscription; 679 StreamSubscription subscription;
687 subscription = this.listen( 680 subscription = this.listen(
688 (T value) { 681 (T value) {
689 if (foundResult) { 682 if (foundResult) {
690 subscription.cancel();
691 // This is the second element we get. 683 // This is the second element we get.
692 Error error = new StateError("More than one element"); 684 Error error = new StateError("More than one element");
693 future._completeError(error); 685 _cancelAndError(subscription, future)(error);
694 return; 686 return;
695 } 687 }
696 foundResult = true; 688 foundResult = true;
697 result = value; 689 result = value;
698 }, 690 },
699 onError: future._completeError, 691 onError: future._completeError,
700 onDone: () { 692 onDone: () {
701 if (foundResult) { 693 if (foundResult) {
702 future._complete(result); 694 future._complete(result);
703 return; 695 return;
(...skipping 20 matching lines...) Expand all
724 */ 716 */
725 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { 717 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) {
726 _Future<dynamic> future = new _Future(); 718 _Future<dynamic> future = new _Future();
727 StreamSubscription subscription; 719 StreamSubscription subscription;
728 subscription = this.listen( 720 subscription = this.listen(
729 (T value) { 721 (T value) {
730 _runUserCode( 722 _runUserCode(
731 () => test(value), 723 () => test(value),
732 (bool isMatch) { 724 (bool isMatch) {
733 if (isMatch) { 725 if (isMatch) {
734 subscription.cancel(); 726 _cancelAndValue(subscription, future)(value);
735 future._complete(value);
736 } 727 }
737 }, 728 },
738 _cancelAndError(subscription, future) 729 _cancelAndError(subscription, future)
739 ); 730 );
740 }, 731 },
741 onError: future._completeError, 732 onError: future._completeError,
742 onDone: () { 733 onDone: () {
743 if (defaultValue != null) { 734 if (defaultValue != null) {
744 _runUserCode(defaultValue, future._complete, future._completeError); 735 _runUserCode(defaultValue, future._complete, future._completeError);
745 return; 736 return;
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
802 T result = null; 793 T result = null;
803 bool foundResult = false; 794 bool foundResult = false;
804 StreamSubscription subscription; 795 StreamSubscription subscription;
805 subscription = this.listen( 796 subscription = this.listen(
806 (T value) { 797 (T value) {
807 _runUserCode( 798 _runUserCode(
808 () => true == test(value), 799 () => true == test(value),
809 (bool isMatch) { 800 (bool isMatch) {
810 if (isMatch) { 801 if (isMatch) {
811 if (foundResult) { 802 if (foundResult) {
812 subscription.cancel(); 803 _cancelAndError(subscription, future)(
813 future._completeError(
814 new StateError('Multiple matches for "single"')); 804 new StateError('Multiple matches for "single"'));
815 return; 805 return;
816 } 806 }
817 foundResult = true; 807 foundResult = true;
818 result = value; 808 result = value;
819 } 809 }
820 }, 810 },
821 _cancelAndError(subscription, future) 811 _cancelAndError(subscription, future)
822 ); 812 );
823 }, 813 },
(...skipping 20 matching lines...) Expand all
844 * If a done event occurs before the value is found, the future completes 834 * If a done event occurs before the value is found, the future completes
845 * with a [RangeError]. 835 * with a [RangeError].
846 */ 836 */
847 Future<T> elementAt(int index) { 837 Future<T> elementAt(int index) {
848 if (index is! int || index < 0) throw new ArgumentError(index); 838 if (index is! int || index < 0) throw new ArgumentError(index);
849 _Future<T> future = new _Future<T>(); 839 _Future<T> future = new _Future<T>();
850 StreamSubscription subscription; 840 StreamSubscription subscription;
851 subscription = this.listen( 841 subscription = this.listen(
852 (T value) { 842 (T value) {
853 if (index == 0) { 843 if (index == 0) {
854 subscription.cancel(); 844 _cancelAndValue(subscription, future)(value);
855 future._complete(value);
856 return; 845 return;
857 } 846 }
858 index -= 1; 847 index -= 1;
859 }, 848 },
860 onError: future._completeError, 849 onError: future._completeError,
861 onDone: () { 850 onDone: () {
862 future._completeError(new RangeError.value(index)); 851 future._completeError(new RangeError.value(index));
863 }, 852 },
864 cancelOnError: true); 853 cancelOnError: true);
865 return future; 854 return future;
866 } 855 }
867 } 856 }
868 857
869 /** 858 /**
870 * A control object for the subscription on a [Stream]. 859 * A control object for the subscription on a [Stream].
871 * 860 *
872 * When you subscribe on a [Stream] using [Stream.listen], 861 * When you subscribe on a [Stream] using [Stream.listen],
873 * a [StreamSubscription] object is returned. This object 862 * a [StreamSubscription] object is returned. This object
874 * is used to later unsubscribe again, or to temporarily pause 863 * is used to later unsubscribe again, or to temporarily pause
875 * the stream's events. 864 * the stream's events.
876 */ 865 */
877 abstract class StreamSubscription<T> { 866 abstract class StreamSubscription<T> {
878 /** 867 /**
879 * Cancels this subscription. It will no longer receive events. 868 * Cancels this subscription. It will no longer receive events.
880 * 869 *
881 * If an event is currently firing, this unsubscription will only 870 * If an event is currently firing, this unsubscription will only
882 * take effect after all subscribers have received the current event. 871 * take effect after all subscribers have received the current event.
872 *
873 * In case the cancel is asynchronous, the returned [Future] can be used to
874 * wait for the operation to complete. If in doubt, wait for it.
floitsch 2013/10/12 18:53:57 Note that it can be `null`.
Anders Johnsen 2013/10/16 11:52:21 Done.
883 */ 875 */
884 void cancel(); 876 Future cancel();
885 877
886 /** Set or override the data event handler of this subscription. */ 878 /** Set or override the data event handler of this subscription. */
887 void onData(void handleData(T data)); 879 void onData(void handleData(T data));
888 880
889 /** Set or override the error event handler of this subscription. */ 881 /** Set or override the error event handler of this subscription. */
890 void onError(void handleError(error)); 882 void onError(void handleError(error));
891 883
892 /** Set or override the done event handler of this subscription. */ 884 /** Set or override the done event handler of this subscription. */
893 void onDone(void handleDone()); 885 void onDone(void handleDone());
894 886
(...skipping 375 matching lines...) Expand 10 before | Expand all | Expand 10 after
1270 * Cancels the stream iterator (and the underlying stream subscription) early. 1262 * Cancels the stream iterator (and the underlying stream subscription) early.
1271 * 1263 *
1272 * The stream iterator is automatically canceled if the [moveNext] future 1264 * The stream iterator is automatically canceled if the [moveNext] future
1273 * completes with either `false` or an error. 1265 * completes with either `false` or an error.
1274 * 1266 *
1275 * If a [moveNext] call has been made, it will complete with `false` as value, 1267 * If a [moveNext] call has been made, it will complete with `false` as value,
1276 * as will all further calls to [moveNext]. 1268 * as will all further calls to [moveNext].
1277 * 1269 *
1278 * If you need to stop listening for values before the stream iterator is 1270 * If you need to stop listening for values before the stream iterator is
1279 * automatically closed, you must call [cancel] to ensure that the stream 1271 * automatically closed, you must call [cancel] to ensure that the stream
1280 * is properly closed. 1272 * is properly closed.
floitsch 2013/10/12 18:53:57 Add comment (even if it is just pointing to the St
Anders Johnsen 2013/10/16 11:52:21 Done.
1281 */ 1273 */
1282 void cancel(); 1274 Future cancel();
1283 } 1275 }
OLDNEW
« no previous file with comments | « no previous file | sdk/lib/async/stream_controller.dart » ('j') | sdk/lib/async/stream_controller.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698