Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(864)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 14251013: Rename unsubscribeOnError to cancelOnError. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/embedders/openglui/common/gl.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after
167 * Adds a subscription to this stream. 167 * Adds a subscription to this stream.
168 * 168 *
169 * On each data event from this stream, the subscriber's [onData] handler 169 * On each data event from this stream, the subscriber's [onData] handler
170 * is called. If [onData] is null, nothing happens. 170 * is called. If [onData] is null, nothing happens.
171 * 171 *
172 * On errors from this stream, the [onError] handler is given a 172 * On errors from this stream, the [onError] handler is given a
173 * [AsyncError] object describing the error. 173 * [AsyncError] object describing the error.
174 * 174 *
175 * If this stream closes, the [onDone] handler is called. 175 * If this stream closes, the [onDone] handler is called.
176 * 176 *
177 * If [unsubscribeOnError] is true, the subscription is ended when 177 * If [cancelOnError] is true, the subscription is ended when
178 * the first error is reported. The default is false. 178 * the first error is reported. The default is false.
179 */ 179 */
180 StreamSubscription<T> listen(void onData(T event), 180 StreamSubscription<T> listen(void onData(T event),
181 { void onError(AsyncError error), 181 { void onError(AsyncError error),
182 void onDone(), 182 void onDone(),
183 bool unsubscribeOnError}); 183 bool cancelOnError});
184 184
185 /** 185 /**
186 * Creates a new stream from this stream that discards some data events. 186 * Creates a new stream from this stream that discards some data events.
187 * 187 *
188 * The new stream sends the same error and done events as this stream, 188 * The new stream sends the same error and done events as this stream,
189 * but it only sends the data events that satisfy the [test]. 189 * but it only sends the data events that satisfy the [test].
190 */ 190 */
191 Stream<T> where(bool test(T event)) { 191 Stream<T> where(bool test(T event)) {
192 return new _WhereStream<T>(this, test); 192 return new _WhereStream<T>(this, test);
193 } 193 }
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
271 } 271 }
272 }, 272 },
273 onError: result._setError, 273 onError: result._setError,
274 onDone: () { 274 onDone: () {
275 if (!seenFirst) { 275 if (!seenFirst) {
276 result._setError(new AsyncError(new StateError("No elements"))); 276 result._setError(new AsyncError(new StateError("No elements")));
277 } else { 277 } else {
278 result._setValue(value); 278 result._setValue(value);
279 } 279 }
280 }, 280 },
281 unsubscribeOnError: true 281 cancelOnError: true
282 ); 282 );
283 return result; 283 return result;
284 } 284 }
285 285
286 /** Reduces a sequence of values by repeatedly applying [combine]. */ 286 /** Reduces a sequence of values by repeatedly applying [combine]. */
287 Future fold(var initialValue, combine(var previous, T element)) { 287 Future fold(var initialValue, combine(var previous, T element)) {
288 _FutureImpl result = new _FutureImpl(); 288 _FutureImpl result = new _FutureImpl();
289 var value = initialValue; 289 var value = initialValue;
290 StreamSubscription subscription; 290 StreamSubscription subscription;
291 subscription = this.listen( 291 subscription = this.listen(
292 // TODO(ahe): Restore type when feature is implemented in dart2js 292 // TODO(ahe): Restore type when feature is implemented in dart2js
293 // checked mode. http://dartbug.com/7733 293 // checked mode. http://dartbug.com/7733
294 (/*T*/ element) { 294 (/*T*/ element) {
295 _runUserCode( 295 _runUserCode(
296 () => combine(value, element), 296 () => combine(value, element),
297 (newValue) { value = newValue; }, 297 (newValue) { value = newValue; },
298 _cancelAndError(subscription, result) 298 _cancelAndError(subscription, result)
299 ); 299 );
300 }, 300 },
301 onError: (AsyncError e) { 301 onError: (AsyncError e) {
302 result._setError(e); 302 result._setError(e);
303 }, 303 },
304 onDone: () { 304 onDone: () {
305 result._setValue(value); 305 result._setValue(value);
306 }, 306 },
307 unsubscribeOnError: true); 307 cancelOnError: true);
308 return result; 308 return result;
309 } 309 }
310 310
311 /** 311 /**
312 * Checks whether [match] occurs in the elements provided by this stream. 312 * Checks whether [match] occurs in the elements provided by this stream.
313 * 313 *
314 * Completes the [Future] when the answer is known. 314 * Completes the [Future] when the answer is known.
315 * If this stream reports an error, the [Future] will report that error. 315 * If this stream reports an error, the [Future] will report that error.
316 */ 316 */
317 Future<bool> contains(T match) { 317 Future<bool> contains(T match) {
(...skipping 11 matching lines...) Expand all
329 future._setValue(true); 329 future._setValue(true);
330 } 330 }
331 }, 331 },
332 _cancelAndError(subscription, future) 332 _cancelAndError(subscription, future)
333 ); 333 );
334 }, 334 },
335 onError: future._setError, 335 onError: future._setError,
336 onDone: () { 336 onDone: () {
337 future._setValue(false); 337 future._setValue(false);
338 }, 338 },
339 unsubscribeOnError: true); 339 cancelOnError: true);
340 return future; 340 return future;
341 } 341 }
342 342
343 /** 343 /**
344 * Checks whether [test] accepts all elements provided by this stream. 344 * Checks whether [test] accepts all elements provided by this stream.
345 * 345 *
346 * Completes the [Future] when the answer is known. 346 * Completes the [Future] when the answer is known.
347 * If this stream reports an error, the [Future] will report that error. 347 * If this stream reports an error, the [Future] will report that error.
348 */ 348 */
349 Future<bool> every(bool test(T element)) { 349 Future<bool> every(bool test(T element)) {
(...skipping 11 matching lines...) Expand all
361 future._setValue(false); 361 future._setValue(false);
362 } 362 }
363 }, 363 },
364 _cancelAndError(subscription, future) 364 _cancelAndError(subscription, future)
365 ); 365 );
366 }, 366 },
367 onError: future._setError, 367 onError: future._setError,
368 onDone: () { 368 onDone: () {
369 future._setValue(true); 369 future._setValue(true);
370 }, 370 },
371 unsubscribeOnError: true); 371 cancelOnError: true);
372 return future; 372 return future;
373 } 373 }
374 374
375 /** 375 /**
376 * Checks whether [test] accepts any element provided by this stream. 376 * Checks whether [test] accepts any element provided by this stream.
377 * 377 *
378 * Completes the [Future] when the answer is known. 378 * Completes the [Future] when the answer is known.
379 * If this stream reports an error, the [Future] will report that error. 379 * If this stream reports an error, the [Future] will report that error.
380 */ 380 */
381 Future<bool> any(bool test(T element)) { 381 Future<bool> any(bool test(T element)) {
(...skipping 11 matching lines...) Expand all
393 future._setValue(true); 393 future._setValue(true);
394 } 394 }
395 }, 395 },
396 _cancelAndError(subscription, future) 396 _cancelAndError(subscription, future)
397 ); 397 );
398 }, 398 },
399 onError: future._setError, 399 onError: future._setError,
400 onDone: () { 400 onDone: () {
401 future._setValue(false); 401 future._setValue(false);
402 }, 402 },
403 unsubscribeOnError: true); 403 cancelOnError: true);
404 return future; 404 return future;
405 } 405 }
406 406
407 407
408 /** Counts the elements in the stream. */ 408 /** Counts the elements in the stream. */
409 Future<int> get length { 409 Future<int> get length {
410 _FutureImpl<int> future = new _FutureImpl<int>(); 410 _FutureImpl<int> future = new _FutureImpl<int>();
411 int count = 0; 411 int count = 0;
412 this.listen( 412 this.listen(
413 (_) { count++; }, 413 (_) { count++; },
414 onError: future._setError, 414 onError: future._setError,
415 onDone: () { 415 onDone: () {
416 future._setValue(count); 416 future._setValue(count);
417 }, 417 },
418 unsubscribeOnError: true); 418 cancelOnError: true);
419 return future; 419 return future;
420 } 420 }
421 421
422 /** Reports whether this stream contains any elements. */ 422 /** Reports whether this stream contains any elements. */
423 Future<bool> get isEmpty { 423 Future<bool> get isEmpty {
424 _FutureImpl<bool> future = new _FutureImpl<bool>(); 424 _FutureImpl<bool> future = new _FutureImpl<bool>();
425 StreamSubscription subscription; 425 StreamSubscription subscription;
426 subscription = this.listen( 426 subscription = this.listen(
427 (_) { 427 (_) {
428 subscription.cancel(); 428 subscription.cancel();
429 future._setValue(false); 429 future._setValue(false);
430 }, 430 },
431 onError: future._setError, 431 onError: future._setError,
432 onDone: () { 432 onDone: () {
433 future._setValue(true); 433 future._setValue(true);
434 }, 434 },
435 unsubscribeOnError: true); 435 cancelOnError: true);
436 return future; 436 return future;
437 } 437 }
438 438
439 /** Collects the data of this stream in a [List]. */ 439 /** Collects the data of this stream in a [List]. */
440 Future<List<T>> toList() { 440 Future<List<T>> toList() {
441 List<T> result = <T>[]; 441 List<T> result = <T>[];
442 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); 442 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>();
443 this.listen( 443 this.listen(
444 // TODO(ahe): Restore type when feature is implemented in dart2js 444 // TODO(ahe): Restore type when feature is implemented in dart2js
445 // checked mode. http://dartbug.com/7733 445 // checked mode. http://dartbug.com/7733
446 (/*T*/ data) { 446 (/*T*/ data) {
447 result.add(data); 447 result.add(data);
448 }, 448 },
449 onError: future._setError, 449 onError: future._setError,
450 onDone: () { 450 onDone: () {
451 future._setValue(result); 451 future._setValue(result);
452 }, 452 },
453 unsubscribeOnError: true); 453 cancelOnError: true);
454 return future; 454 return future;
455 } 455 }
456 456
457 /** Collects the data of this stream in a [Set]. */ 457 /** Collects the data of this stream in a [Set]. */
458 Future<Set<T>> toSet() { 458 Future<Set<T>> toSet() {
459 Set<T> result = new Set<T>(); 459 Set<T> result = new Set<T>();
460 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); 460 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>();
461 this.listen( 461 this.listen(
462 // TODO(ahe): Restore type when feature is implemented in dart2js 462 // TODO(ahe): Restore type when feature is implemented in dart2js
463 // checked mode. http://dartbug.com/7733 463 // checked mode. http://dartbug.com/7733
464 (/*T*/ data) { 464 (/*T*/ data) {
465 result.add(data); 465 result.add(data);
466 }, 466 },
467 onError: future._setError, 467 onError: future._setError,
468 onDone: () { 468 onDone: () {
469 future._setValue(result); 469 future._setValue(result);
470 }, 470 },
471 unsubscribeOnError: true); 471 cancelOnError: true);
472 return future; 472 return future;
473 } 473 }
474 474
475 /** 475 /**
476 * Provides at most the first [n] values of this stream. 476 * Provides at most the first [n] values of this stream.
477 * 477 *
478 * Forwards the first [n] data events of this stream, and all error 478 * Forwards the first [n] data events of this stream, and all error
479 * events, to the returned stream, and ends with a done event. 479 * events, to the returned stream, and ends with a done event.
480 * 480 *
481 * If this stream produces fewer than [count] values before it's done, 481 * If this stream produces fewer than [count] values before it's done,
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
543 // checked mode. http://dartbug.com/7733 543 // checked mode. http://dartbug.com/7733
544 (/*T*/ value) { 544 (/*T*/ value) {
545 subscription.cancel(); 545 subscription.cancel();
546 future._setValue(value); 546 future._setValue(value);
547 return; 547 return;
548 }, 548 },
549 onError: future._setError, 549 onError: future._setError,
550 onDone: () { 550 onDone: () {
551 future._setError(new AsyncError(new StateError("No elements"))); 551 future._setError(new AsyncError(new StateError("No elements")));
552 }, 552 },
553 unsubscribeOnError: true); 553 cancelOnError: true);
554 return future; 554 return future;
555 } 555 }
556 556
557 /** 557 /**
558 * Returns the last element. 558 * Returns the last element.
559 * 559 *
560 * If [this] is empty throws a [StateError]. 560 * If [this] is empty throws a [StateError].
561 */ 561 */
562 Future<T> get last { 562 Future<T> get last {
563 _FutureImpl<T> future = new _FutureImpl<T>(); 563 _FutureImpl<T> future = new _FutureImpl<T>();
564 T result = null; 564 T result = null;
565 bool foundResult = false; 565 bool foundResult = false;
566 StreamSubscription subscription; 566 StreamSubscription subscription;
567 subscription = this.listen( 567 subscription = this.listen(
568 // TODO(ahe): Restore type when feature is implemented in dart2js 568 // TODO(ahe): Restore type when feature is implemented in dart2js
569 // checked mode. http://dartbug.com/7733 569 // checked mode. http://dartbug.com/7733
570 (/*T*/ value) { 570 (/*T*/ value) {
571 foundResult = true; 571 foundResult = true;
572 result = value; 572 result = value;
573 }, 573 },
574 onError: future._setError, 574 onError: future._setError,
575 onDone: () { 575 onDone: () {
576 if (foundResult) { 576 if (foundResult) {
577 future._setValue(result); 577 future._setValue(result);
578 return; 578 return;
579 } 579 }
580 future._setError(new AsyncError(new StateError("No elements"))); 580 future._setError(new AsyncError(new StateError("No elements")));
581 }, 581 },
582 unsubscribeOnError: true); 582 cancelOnError: true);
583 return future; 583 return future;
584 } 584 }
585 585
586 /** 586 /**
587 * Returns the single element. 587 * Returns the single element.
588 * 588 *
589 * If [this] is empty or has more than one element throws a [StateError]. 589 * If [this] is empty or has more than one element throws a [StateError].
590 */ 590 */
591 Future<T> get single { 591 Future<T> get single {
592 _FutureImpl<T> future = new _FutureImpl<T>(); 592 _FutureImpl<T> future = new _FutureImpl<T>();
(...skipping 15 matching lines...) Expand all
608 result = value; 608 result = value;
609 }, 609 },
610 onError: future._setError, 610 onError: future._setError,
611 onDone: () { 611 onDone: () {
612 if (foundResult) { 612 if (foundResult) {
613 future._setValue(result); 613 future._setValue(result);
614 return; 614 return;
615 } 615 }
616 future._setError(new AsyncError(new StateError("No elements"))); 616 future._setError(new AsyncError(new StateError("No elements")));
617 }, 617 },
618 unsubscribeOnError: true); 618 cancelOnError: true);
619 return future; 619 return future;
620 } 620 }
621 621
622 /** 622 /**
623 * Finds the first element of this stream matching [test]. 623 * Finds the first element of this stream matching [test].
624 * 624 *
625 * Returns a future that is filled with the first element of this stream 625 * Returns a future that is filled with the first element of this stream
626 * that [test] returns true for. 626 * that [test] returns true for.
627 * 627 *
628 * If no such element is found before this stream is done, and a 628 * If no such element is found before this stream is done, and a
(...skipping 24 matching lines...) Expand all
653 }, 653 },
654 onError: future._setError, 654 onError: future._setError,
655 onDone: () { 655 onDone: () {
656 if (defaultValue != null) { 656 if (defaultValue != null) {
657 _runUserCode(defaultValue, future._setValue, future._setError); 657 _runUserCode(defaultValue, future._setValue, future._setError);
658 return; 658 return;
659 } 659 }
660 future._setError( 660 future._setError(
661 new AsyncError(new StateError("firstMatch ended without match"))); 661 new AsyncError(new StateError("firstMatch ended without match")));
662 }, 662 },
663 unsubscribeOnError: true); 663 cancelOnError: true);
664 return future; 664 return future;
665 } 665 }
666 666
667 /** 667 /**
668 * Finds the last element in this stream matching [test]. 668 * Finds the last element in this stream matching [test].
669 * 669 *
670 * As [firstWhere], except that the last matching element is found. 670 * As [firstWhere], except that the last matching element is found.
671 * That means that the result cannot be provided before this stream 671 * That means that the result cannot be provided before this stream
672 * is done. 672 * is done.
673 */ 673 */
(...skipping 23 matching lines...) Expand all
697 future._setValue(result); 697 future._setValue(result);
698 return; 698 return;
699 } 699 }
700 if (defaultValue != null) { 700 if (defaultValue != null) {
701 _runUserCode(defaultValue, future._setValue, future._setError); 701 _runUserCode(defaultValue, future._setValue, future._setError);
702 return; 702 return;
703 } 703 }
704 future._setError( 704 future._setError(
705 new AsyncError(new StateError("lastMatch ended without match"))); 705 new AsyncError(new StateError("lastMatch ended without match")));
706 }, 706 },
707 unsubscribeOnError: true); 707 cancelOnError: true);
708 return future; 708 return future;
709 } 709 }
710 710
711 /** 711 /**
712 * Finds the single element in this stream matching [test]. 712 * Finds the single element in this stream matching [test].
713 * 713 *
714 * Like [lastMatch], except that it is an error if more than one 714 * Like [lastMatch], except that it is an error if more than one
715 * matching element occurs in the stream. 715 * matching element occurs in the stream.
716 */ 716 */
717 Future<T> singleWhere(bool test(T value)) { 717 Future<T> singleWhere(bool test(T value)) {
(...skipping 24 matching lines...) Expand all
742 }, 742 },
743 onError: future._setError, 743 onError: future._setError,
744 onDone: () { 744 onDone: () {
745 if (foundResult) { 745 if (foundResult) {
746 future._setValue(result); 746 future._setValue(result);
747 return; 747 return;
748 } 748 }
749 future._setError( 749 future._setError(
750 new AsyncError(new StateError("single ended without match"))); 750 new AsyncError(new StateError("single ended without match")));
751 }, 751 },
752 unsubscribeOnError: true); 752 cancelOnError: true);
753 return future; 753 return future;
754 } 754 }
755 755
756 /** 756 /**
757 * Returns the value of the [index]th data event of this stream. 757 * Returns the value of the [index]th data event of this stream.
758 * 758 *
759 * If an error event occurs, the future will end with this error. 759 * If an error event occurs, the future will end with this error.
760 * 760 *
761 * If this stream provides fewer than [index] elements before closing, 761 * If this stream provides fewer than [index] elements before closing,
762 * an error is reported. 762 * an error is reported.
(...skipping 11 matching lines...) Expand all
774 future._setValue(value); 774 future._setValue(value);
775 return; 775 return;
776 } 776 }
777 index -= 1; 777 index -= 1;
778 }, 778 },
779 onError: future._setError, 779 onError: future._setError,
780 onDone: () { 780 onDone: () {
781 future._setError(new AsyncError( 781 future._setError(new AsyncError(
782 new StateError("Not enough elements for elementAt"))); 782 new StateError("Not enough elements for elementAt")));
783 }, 783 },
784 unsubscribeOnError: true); 784 cancelOnError: true);
785 return future; 785 return future;
786 } 786 }
787 } 787 }
788 788
789 /** 789 /**
790 * A control object for the subscription on a [Stream]. 790 * A control object for the subscription on a [Stream].
791 * 791 *
792 * When you subscribe on a [Stream] using [Stream.listen], 792 * When you subscribe on a [Stream] using [Stream.listen],
793 * a [StreamSubscription] object is returned. This object 793 * a [StreamSubscription] object is returned. This object
794 * is used to later unsubscribe again, or to temporarily pause 794 * is used to later unsubscribe again, or to temporarily pause
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
831 */ 831 */
832 void resume(); 832 void resume();
833 833
834 /** 834 /**
835 * Returns a future that handles the [onDone] and [onError] callbacks. 835 * Returns a future that handles the [onDone] and [onError] callbacks.
836 * 836 *
837 * This method *overwrites* the existing [onDone] and [onError] callbacks 837 * This method *overwrites* the existing [onDone] and [onError] callbacks
838 * with new ones that complete the returned future. 838 * with new ones that complete the returned future.
839 * 839 *
840 * In case of an error the subscription will automatically cancel (even 840 * In case of an error the subscription will automatically cancel (even
841 * when it was listening with `unsubscribeOnError` set to `false`). 841 * when it was listening with `cancelOnError` set to `false`).
842 * 842 *
843 * In case of a `done` event the future completes with the given 843 * In case of a `done` event the future completes with the given
844 * [futureValue]. 844 * [futureValue].
845 */ 845 */
846 Future asFuture([var futureValue]); 846 Future asFuture([var futureValue]);
847 } 847 }
848 848
849 849
850 /** 850 /**
851 * An interface that abstracts creation or handling of [Stream] events. 851 * An interface that abstracts creation or handling of [Stream] events.
(...skipping 14 matching lines...) Expand all
866 866
867 StreamView(this._stream); 867 StreamView(this._stream);
868 868
869 bool get isBroadcast => _stream.isBroadcast; 869 bool get isBroadcast => _stream.isBroadcast;
870 870
871 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); 871 Stream<T> asBroadcastStream() => _stream.asBroadcastStream();
872 872
873 StreamSubscription<T> listen(void onData(T value), 873 StreamSubscription<T> listen(void onData(T value),
874 { void onError(AsyncError error), 874 { void onError(AsyncError error),
875 void onDone(), 875 void onDone(),
876 bool unsubscribeOnError }) { 876 bool cancelOnError }) {
877 return _stream.listen(onData, onError: onError, onDone: onDone, 877 return _stream.listen(onData, onError: onError, onDone: onDone,
878 unsubscribeOnError: unsubscribeOnError); 878 cancelOnError: cancelOnError);
879 } 879 }
880 } 880 }
881 881
882 /** 882 /**
883 * [EventSink] wrapper that only exposes the [EventSink] interface. 883 * [EventSink] wrapper that only exposes the [EventSink] interface.
884 */ 884 */
885 class _EventSinkView<T> extends EventSink<T> { 885 class _EventSinkView<T> extends EventSink<T> {
886 final EventSink<T> _sink; 886 final EventSink<T> _sink;
887 887
888 _EventSinkView(this._sink); 888 _EventSinkView(this._sink);
(...skipping 189 matching lines...) Expand 10 before | Expand all | Expand 10 after
1078 class EventTransformStream<S, T> extends Stream<T> { 1078 class EventTransformStream<S, T> extends Stream<T> {
1079 final Stream<S> _source; 1079 final Stream<S> _source;
1080 final StreamEventTransformer _transformer; 1080 final StreamEventTransformer _transformer;
1081 EventTransformStream(Stream<S> source, 1081 EventTransformStream(Stream<S> source,
1082 StreamEventTransformer<S, T> transformer) 1082 StreamEventTransformer<S, T> transformer)
1083 : _source = source, _transformer = transformer; 1083 : _source = source, _transformer = transformer;
1084 1084
1085 StreamSubscription<T> listen(void onData(T data), 1085 StreamSubscription<T> listen(void onData(T data),
1086 { void onError(AsyncError error), 1086 { void onError(AsyncError error),
1087 void onDone(), 1087 void onDone(),
1088 bool unsubscribeOnError }) { 1088 bool cancelOnError }) {
1089 unsubscribeOnError = identical(true, unsubscribeOnError); 1089 cancelOnError = identical(true, cancelOnError);
1090 return new _EventTransformStreamSubscription(_source, _transformer, 1090 return new _EventTransformStreamSubscription(_source, _transformer,
1091 onData, onError, onDone, 1091 onData, onError, onDone,
1092 unsubscribeOnError); 1092 cancelOnError);
1093 } 1093 }
1094 } 1094 }
1095 1095
1096 class _EventTransformStreamSubscription<S, T> 1096 class _EventTransformStreamSubscription<S, T>
1097 extends _BaseStreamSubscription<T> 1097 extends _BaseStreamSubscription<T>
1098 implements _EventOutputSink<T> { 1098 implements _EventOutputSink<T> {
1099 /** The transformer used to transform events. */ 1099 /** The transformer used to transform events. */
1100 final StreamEventTransformer<S, T> _transformer; 1100 final StreamEventTransformer<S, T> _transformer;
1101 /** Whether to unsubscribe when emitting an error. */ 1101 /** Whether to unsubscribe when emitting an error. */
Lasse Reichstein Nielsen 2013/04/15 16:11:03 unsubscribe -> cancel :)
floitsch 2013/04/15 16:13:52 Please file a bug. I will do it later. ;P
1102 final bool _unsubscribeOnError; 1102 final bool _cancelOnError;
1103 /** Whether this stream has sent a done event. */ 1103 /** Whether this stream has sent a done event. */
1104 bool _isClosed = false; 1104 bool _isClosed = false;
1105 /** Source of incoming events. */ 1105 /** Source of incoming events. */
1106 StreamSubscription<S> _subscription; 1106 StreamSubscription<S> _subscription;
1107 /** Cached EventSink wrapper for this class. */ 1107 /** Cached EventSink wrapper for this class. */
1108 EventSink<T> _sink; 1108 EventSink<T> _sink;
1109 1109
1110 _EventTransformStreamSubscription(Stream<S> source, 1110 _EventTransformStreamSubscription(Stream<S> source,
1111 this._transformer, 1111 this._transformer,
1112 void onData(T data), 1112 void onData(T data),
1113 void onError(AsyncError error), 1113 void onError(AsyncError error),
1114 void onDone(), 1114 void onDone(),
1115 this._unsubscribeOnError) 1115 this._cancelOnError)
1116 : super(onData, onError, onDone) { 1116 : super(onData, onError, onDone) {
1117 _sink = new _EventOutputSinkWrapper<T>(this); 1117 _sink = new _EventOutputSinkWrapper<T>(this);
1118 _subscription = source.listen(_handleData, 1118 _subscription = source.listen(_handleData,
1119 onError: _handleError, 1119 onError: _handleError,
1120 onDone: _handleDone); 1120 onDone: _handleDone);
1121 } 1121 }
1122 1122
1123 /** Whether this subscription is still subscribed to its source. */ 1123 /** Whether this subscription is still subscribed to its source. */
1124 bool get _isSubscribed => _subscription != null; 1124 bool get _isSubscribed => _subscription != null;
1125 1125
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
1167 1167
1168 // EventOutputSink interface. 1168 // EventOutputSink interface.
1169 void _sendData(T data) { 1169 void _sendData(T data) {
1170 if (_isClosed) return; 1170 if (_isClosed) return;
1171 _onData(data); 1171 _onData(data);
1172 } 1172 }
1173 1173
1174 void _sendError(AsyncError error) { 1174 void _sendError(AsyncError error) {
1175 if (_isClosed) return; 1175 if (_isClosed) return;
1176 _onError(error); 1176 _onError(error);
1177 if (_unsubscribeOnError) { 1177 if (_cancelOnError) {
1178 cancel(); 1178 cancel();
1179 } 1179 }
1180 } 1180 }
1181 1181
1182 void _sendDone() { 1182 void _sendDone() {
1183 if (_isClosed) throw new StateError("Already closed."); 1183 if (_isClosed) throw new StateError("Already closed.");
1184 _isClosed = true; 1184 _isClosed = true;
1185 if (_isSubscribed) { 1185 if (_isSubscribed) {
1186 _subscription.cancel(); 1186 _subscription.cancel();
1187 _subscription = null; 1187 _subscription = null;
1188 } 1188 }
1189 _onDone(); 1189 _onDone();
1190 } 1190 }
1191 } 1191 }
1192 1192
1193 class _EventOutputSinkWrapper<T> extends EventSink<T> { 1193 class _EventOutputSinkWrapper<T> extends EventSink<T> {
1194 _EventOutputSink _sink; 1194 _EventOutputSink _sink;
1195 _EventOutputSinkWrapper(this._sink); 1195 _EventOutputSinkWrapper(this._sink);
1196 1196
1197 void add(T data) { _sink._sendData(data); } 1197 void add(T data) { _sink._sendData(data); }
1198 void addError(AsyncError error) { _sink._sendError(error); } 1198 void addError(AsyncError error) { _sink._sendError(error); }
1199 void close() { _sink._sendDone(); } 1199 void close() { _sink._sendDone(); }
1200 } 1200 }
OLDNEW
« no previous file with comments | « runtime/embedders/openglui/common/gl.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698