Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1094)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 14251006: Remove AsyncError with Expando. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebuild DOM and rebase. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 return new _SingleStreamMultiplexer<T>(this); 163 return new _SingleStreamMultiplexer<T>(this);
164 } 164 }
165 165
166 /** 166 /**
167 * Adds a subscription to this stream. 167 * Adds a subscription to this stream.
168 * 168 *
169 * On each data event from this stream, the subscriber's [onData] handler 169 * On each data event from this stream, the subscriber's [onData] handler
170 * is called. If [onData] is null, nothing happens. 170 * is called. If [onData] is null, nothing happens.
171 * 171 *
172 * On errors from this stream, the [onError] handler is given a 172 * On errors from this stream, the [onError] handler is given a
173 * [AsyncError] object describing the error. 173 * object describing the error.
174 * 174 *
175 * If this stream closes, the [onDone] handler is called. 175 * If this stream closes, the [onDone] handler is called.
176 * 176 *
177 * If [unsubscribeOnError] is true, the subscription is ended when 177 * If [unsubscribeOnError] is true, the subscription is ended when
178 * the first error is reported. The default is false. 178 * the first error is reported. The default is false.
179 */ 179 */
180 StreamSubscription<T> listen(void onData(T event), 180 StreamSubscription<T> listen(void onData(T event),
181 { void onError(AsyncError error), 181 { void onError(error),
182 void onDone(), 182 void onDone(),
183 bool unsubscribeOnError}); 183 bool unsubscribeOnError});
184 184
185 /** 185 /**
186 * Creates a new stream from this stream that discards some data events. 186 * Creates a new stream from this stream that discards some data events.
187 * 187 *
188 * The new stream sends the same error and done events as this stream, 188 * The new stream sends the same error and done events as this stream,
189 * but it only sends the data events that satisfy the [test]. 189 * but it only sends the data events that satisfy the [test].
190 */ 190 */
191 Stream<T> where(bool test(T event)) { 191 Stream<T> where(bool test(T event)) {
(...skipping 18 matching lines...) Expand all
210 * true. If [test] is omitted, every error is considered matching. 210 * true. If [test] is omitted, every error is considered matching.
211 * 211 *
212 * If the error is intercepted, the [handle] function can decide what to do 212 * If the error is intercepted, the [handle] function can decide what to do
213 * with it. It can throw if it wants to raise a new (or the same) error, 213 * with it. It can throw if it wants to raise a new (or the same) error,
214 * or simply return to make the stream forget the error. 214 * or simply return to make the stream forget the error.
215 * 215 *
216 * If you need to transform an error into a data event, use the more generic 216 * If you need to transform an error into a data event, use the more generic
217 * [Stream.transformEvent] to handle the event by writing a data event to 217 * [Stream.transformEvent] to handle the event by writing a data event to
218 * the output sink 218 * the output sink
219 */ 219 */
220 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { 220 Stream<T> handleError(void handle( error), { bool test(error) }) {
221 return new _HandleErrorStream<T>(this, handle, test); 221 return new _HandleErrorStream<T>(this, handle, test);
222 } 222 }
223 223
224 /** 224 /**
225 * Creates a new stream from this stream that converts each element 225 * Creates a new stream from this stream that converts each element
226 * into zero or more events. 226 * into zero or more events.
227 * 227 *
228 * Each incoming event is converted to an [Iterable] of new events, 228 * Each incoming event is converted to an [Iterable] of new events,
229 * and each of these new events are then sent by the returned stream 229 * and each of these new events are then sent by the returned stream
230 * in order. 230 * in order.
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
266 (T newValue) { value = newValue; }, 266 (T newValue) { value = newValue; },
267 _cancelAndError(subscription, result)); 267 _cancelAndError(subscription, result));
268 } else { 268 } else {
269 value = element; 269 value = element;
270 seenFirst = true; 270 seenFirst = true;
271 } 271 }
272 }, 272 },
273 onError: result._setError, 273 onError: result._setError,
274 onDone: () { 274 onDone: () {
275 if (!seenFirst) { 275 if (!seenFirst) {
276 result._setError(new AsyncError(new StateError("No elements"))); 276 result._setError(new StateError("No elements"));
277 } else { 277 } else {
278 result._setValue(value); 278 result._setValue(value);
279 } 279 }
280 }, 280 },
281 unsubscribeOnError: true 281 unsubscribeOnError: true
282 ); 282 );
283 return result; 283 return result;
284 } 284 }
285 285
286 /** Reduces a sequence of values by repeatedly applying [combine]. */ 286 /** Reduces a sequence of values by repeatedly applying [combine]. */
287 Future fold(var initialValue, combine(var previous, T element)) { 287 Future fold(var initialValue, combine(var previous, T element)) {
288 _FutureImpl result = new _FutureImpl(); 288 _FutureImpl result = new _FutureImpl();
289 var value = initialValue; 289 var value = initialValue;
290 StreamSubscription subscription; 290 StreamSubscription subscription;
291 subscription = this.listen( 291 subscription = this.listen(
292 // TODO(ahe): Restore type when feature is implemented in dart2js 292 // TODO(ahe): Restore type when feature is implemented in dart2js
293 // checked mode. http://dartbug.com/7733 293 // checked mode. http://dartbug.com/7733
294 (/*T*/ element) { 294 (/*T*/ element) {
295 _runUserCode( 295 _runUserCode(
296 () => combine(value, element), 296 () => combine(value, element),
297 (newValue) { value = newValue; }, 297 (newValue) { value = newValue; },
298 _cancelAndError(subscription, result) 298 _cancelAndError(subscription, result)
299 ); 299 );
300 }, 300 },
301 onError: (AsyncError e) { 301 onError: (e) {
302 result._setError(e); 302 result._setError(e);
303 }, 303 },
304 onDone: () { 304 onDone: () {
305 result._setValue(value); 305 result._setValue(value);
306 }, 306 },
307 unsubscribeOnError: true); 307 unsubscribeOnError: true);
308 return result; 308 return result;
309 } 309 }
310 310
311 /** 311 /**
(...skipping 229 matching lines...) Expand 10 before | Expand all | Expand 10 after
541 subscription = this.listen( 541 subscription = this.listen(
542 // TODO(ahe): Restore type when feature is implemented in dart2js 542 // TODO(ahe): Restore type when feature is implemented in dart2js
543 // checked mode. http://dartbug.com/7733 543 // checked mode. http://dartbug.com/7733
544 (/*T*/ value) { 544 (/*T*/ value) {
545 subscription.cancel(); 545 subscription.cancel();
546 future._setValue(value); 546 future._setValue(value);
547 return; 547 return;
548 }, 548 },
549 onError: future._setError, 549 onError: future._setError,
550 onDone: () { 550 onDone: () {
551 future._setError(new AsyncError(new StateError("No elements"))); 551 future._setError(new StateError("No elements"));
552 }, 552 },
553 unsubscribeOnError: true); 553 unsubscribeOnError: true);
554 return future; 554 return future;
555 } 555 }
556 556
557 /** 557 /**
558 * Returns the last element. 558 * Returns the last element.
559 * 559 *
560 * If [this] is empty throws a [StateError]. 560 * If [this] is empty throws a [StateError].
561 */ 561 */
562 Future<T> get last { 562 Future<T> get last {
563 _FutureImpl<T> future = new _FutureImpl<T>(); 563 _FutureImpl<T> future = new _FutureImpl<T>();
564 T result = null; 564 T result = null;
565 bool foundResult = false; 565 bool foundResult = false;
566 StreamSubscription subscription; 566 StreamSubscription subscription;
567 subscription = this.listen( 567 subscription = this.listen(
568 // TODO(ahe): Restore type when feature is implemented in dart2js 568 // TODO(ahe): Restore type when feature is implemented in dart2js
569 // checked mode. http://dartbug.com/7733 569 // checked mode. http://dartbug.com/7733
570 (/*T*/ value) { 570 (/*T*/ value) {
571 foundResult = true; 571 foundResult = true;
572 result = value; 572 result = value;
573 }, 573 },
574 onError: future._setError, 574 onError: future._setError,
575 onDone: () { 575 onDone: () {
576 if (foundResult) { 576 if (foundResult) {
577 future._setValue(result); 577 future._setValue(result);
578 return; 578 return;
579 } 579 }
580 future._setError(new AsyncError(new StateError("No elements"))); 580 future._setError(new StateError("No elements"));
581 }, 581 },
582 unsubscribeOnError: true); 582 unsubscribeOnError: true);
583 return future; 583 return future;
584 } 584 }
585 585
586 /** 586 /**
587 * Returns the single element. 587 * Returns the single element.
588 * 588 *
589 * If [this] is empty or has more than one element throws a [StateError]. 589 * If [this] is empty or has more than one element throws a [StateError].
590 */ 590 */
591 Future<T> get single { 591 Future<T> get single {
592 _FutureImpl<T> future = new _FutureImpl<T>(); 592 _FutureImpl<T> future = new _FutureImpl<T>();
593 T result = null; 593 T result = null;
594 bool foundResult = false; 594 bool foundResult = false;
595 StreamSubscription subscription; 595 StreamSubscription subscription;
596 subscription = this.listen( 596 subscription = this.listen(
597 // TODO(ahe): Restore type when feature is implemented in dart2js 597 // TODO(ahe): Restore type when feature is implemented in dart2js
598 // checked mode. http://dartbug.com/7733 598 // checked mode. http://dartbug.com/7733
599 (/*T*/ value) { 599 (/*T*/ value) {
600 if (foundResult) { 600 if (foundResult) {
601 subscription.cancel(); 601 subscription.cancel();
602 // This is the second element we get. 602 // This is the second element we get.
603 Error error = new StateError("More than one element"); 603 Error error = new StateError("More than one element");
604 future._setError(new AsyncError(error)); 604 future._setError(error);
605 return; 605 return;
606 } 606 }
607 foundResult = true; 607 foundResult = true;
608 result = value; 608 result = value;
609 }, 609 },
610 onError: future._setError, 610 onError: future._setError,
611 onDone: () { 611 onDone: () {
612 if (foundResult) { 612 if (foundResult) {
613 future._setValue(result); 613 future._setValue(result);
614 return; 614 return;
615 } 615 }
616 future._setError(new AsyncError(new StateError("No elements"))); 616 future._setError(new StateError("No elements"));
617 }, 617 },
618 unsubscribeOnError: true); 618 unsubscribeOnError: true);
619 return future; 619 return future;
620 } 620 }
621 621
622 /** 622 /**
623 * Finds the first element of this stream matching [test]. 623 * Finds the first element of this stream matching [test].
624 * 624 *
625 * Returns a future that is filled with the first element of this stream 625 * Returns a future that is filled with the first element of this stream
626 * that [test] returns true for. 626 * that [test] returns true for.
(...skipping 23 matching lines...) Expand all
650 }, 650 },
651 _cancelAndError(subscription, future) 651 _cancelAndError(subscription, future)
652 ); 652 );
653 }, 653 },
654 onError: future._setError, 654 onError: future._setError,
655 onDone: () { 655 onDone: () {
656 if (defaultValue != null) { 656 if (defaultValue != null) {
657 _runUserCode(defaultValue, future._setValue, future._setError); 657 _runUserCode(defaultValue, future._setValue, future._setError);
658 return; 658 return;
659 } 659 }
660 future._setError( 660 future._setError(new StateError("firstMatch ended without match"));
661 new AsyncError(new StateError("firstMatch ended without match")));
662 }, 661 },
663 unsubscribeOnError: true); 662 unsubscribeOnError: true);
664 return future; 663 return future;
665 } 664 }
666 665
667 /** 666 /**
668 * Finds the last element in this stream matching [test]. 667 * Finds the last element in this stream matching [test].
669 * 668 *
670 * As [firstWhere], except that the last matching element is found. 669 * As [firstWhere], except that the last matching element is found.
671 * That means that the result cannot be provided before this stream 670 * That means that the result cannot be provided before this stream
(...skipping 22 matching lines...) Expand all
694 onError: future._setError, 693 onError: future._setError,
695 onDone: () { 694 onDone: () {
696 if (foundResult) { 695 if (foundResult) {
697 future._setValue(result); 696 future._setValue(result);
698 return; 697 return;
699 } 698 }
700 if (defaultValue != null) { 699 if (defaultValue != null) {
701 _runUserCode(defaultValue, future._setValue, future._setError); 700 _runUserCode(defaultValue, future._setValue, future._setError);
702 return; 701 return;
703 } 702 }
704 future._setError( 703 future._setError(new StateError("lastMatch ended without match"));
705 new AsyncError(new StateError("lastMatch ended without match")));
706 }, 704 },
707 unsubscribeOnError: true); 705 unsubscribeOnError: true);
708 return future; 706 return future;
709 } 707 }
710 708
711 /** 709 /**
712 * Finds the single element in this stream matching [test]. 710 * Finds the single element in this stream matching [test].
713 * 711 *
714 * Like [lastMatch], except that it is an error if more than one 712 * Like [lastMatch], except that it is an error if more than one
715 * matching element occurs in the stream. 713 * matching element occurs in the stream.
716 */ 714 */
717 Future<T> singleWhere(bool test(T value)) { 715 Future<T> singleWhere(bool test(T value)) {
718 _FutureImpl<T> future = new _FutureImpl<T>(); 716 _FutureImpl<T> future = new _FutureImpl<T>();
719 T result = null; 717 T result = null;
720 bool foundResult = false; 718 bool foundResult = false;
721 StreamSubscription subscription; 719 StreamSubscription subscription;
722 subscription = this.listen( 720 subscription = this.listen(
723 // TODO(ahe): Restore type when feature is implemented in dart2js 721 // TODO(ahe): Restore type when feature is implemented in dart2js
724 // checked mode. http://dartbug.com/7733 722 // checked mode. http://dartbug.com/7733
725 (/*T*/ value) { 723 (/*T*/ value) {
726 _runUserCode( 724 _runUserCode(
727 () => true == test(value), 725 () => true == test(value),
728 (bool isMatch) { 726 (bool isMatch) {
729 if (isMatch) { 727 if (isMatch) {
730 if (foundResult) { 728 if (foundResult) {
731 subscription.cancel(); 729 subscription.cancel();
732 future._setError(new AsyncError( 730 future._setError(
733 new StateError('Multiple matches for "single"'))); 731 new StateError('Multiple matches for "single"'));
734 return; 732 return;
735 } 733 }
736 foundResult = true; 734 foundResult = true;
737 result = value; 735 result = value;
738 } 736 }
739 }, 737 },
740 _cancelAndError(subscription, future) 738 _cancelAndError(subscription, future)
741 ); 739 );
742 }, 740 },
743 onError: future._setError, 741 onError: future._setError,
744 onDone: () { 742 onDone: () {
745 if (foundResult) { 743 if (foundResult) {
746 future._setValue(result); 744 future._setValue(result);
747 return; 745 return;
748 } 746 }
749 future._setError( 747 future._setError(new StateError("single ended without match"));
750 new AsyncError(new StateError("single ended without match")));
751 }, 748 },
752 unsubscribeOnError: true); 749 unsubscribeOnError: true);
753 return future; 750 return future;
754 } 751 }
755 752
756 /** 753 /**
757 * Returns the value of the [index]th data event of this stream. 754 * Returns the value of the [index]th data event of this stream.
758 * 755 *
759 * If an error event occurs, the future will end with this error. 756 * If an error event occurs, the future will end with this error.
760 * 757 *
(...skipping 10 matching lines...) Expand all
771 (/*T*/ value) { 768 (/*T*/ value) {
772 if (index == 0) { 769 if (index == 0) {
773 subscription.cancel(); 770 subscription.cancel();
774 future._setValue(value); 771 future._setValue(value);
775 return; 772 return;
776 } 773 }
777 index -= 1; 774 index -= 1;
778 }, 775 },
779 onError: future._setError, 776 onError: future._setError,
780 onDone: () { 777 onDone: () {
781 future._setError(new AsyncError( 778 future._setError(new StateError("Not enough elements for elementAt"));
782 new StateError("Not enough elements for elementAt")));
783 }, 779 },
784 unsubscribeOnError: true); 780 unsubscribeOnError: true);
785 return future; 781 return future;
786 } 782 }
787 } 783 }
788 784
789 /** 785 /**
790 * A control object for the subscription on a [Stream]. 786 * A control object for the subscription on a [Stream].
791 * 787 *
792 * When you subscribe on a [Stream] using [Stream.listen], 788 * When you subscribe on a [Stream] using [Stream.listen],
793 * a [StreamSubscription] object is returned. This object 789 * a [StreamSubscription] object is returned. This object
794 * is used to later unsubscribe again, or to temporarily pause 790 * is used to later unsubscribe again, or to temporarily pause
795 * the stream's events. 791 * the stream's events.
796 */ 792 */
797 abstract class StreamSubscription<T> { 793 abstract class StreamSubscription<T> {
798 /** 794 /**
799 * Cancels this subscription. It will no longer receive events. 795 * Cancels this subscription. It will no longer receive events.
800 * 796 *
801 * If an event is currently firing, this unsubscription will only 797 * If an event is currently firing, this unsubscription will only
802 * take effect after all subscribers have received the current event. 798 * take effect after all subscribers have received the current event.
803 */ 799 */
804 void cancel(); 800 void cancel();
805 801
806 /** Set or override the data event handler of this subscription. */ 802 /** Set or override the data event handler of this subscription. */
807 void onData(void handleData(T data)); 803 void onData(void handleData(T data));
808 804
809 /** Set or override the error event handler of this subscription. */ 805 /** Set or override the error event handler of this subscription. */
810 void onError(void handleError(AsyncError error)); 806 void onError(void handleError(error));
811 807
812 /** Set or override the done event handler of this subscription. */ 808 /** Set or override the done event handler of this subscription. */
813 void onDone(void handleDone()); 809 void onDone(void handleDone());
814 810
815 /** 811 /**
816 * Request that the stream pauses events until further notice. 812 * Request that the stream pauses events until further notice.
817 * 813 *
818 * If [resumeSignal] is provided, the stream will undo the pause 814 * If [resumeSignal] is provided, the stream will undo the pause
819 * when the future completes. If the future completes with an error, 815 * when the future completes. If the future completes with an error,
820 * it will not be handled! 816 * it will not be handled!
(...skipping 26 matching lines...) Expand all
847 } 843 }
848 844
849 845
850 /** 846 /**
851 * An interface that abstracts creation or handling of [Stream] events. 847 * An interface that abstracts creation or handling of [Stream] events.
852 */ 848 */
853 abstract class EventSink<T> { 849 abstract class EventSink<T> {
854 /** Create a data event */ 850 /** Create a data event */
855 void add(T event); 851 void add(T event);
856 /** Create an async error. */ 852 /** Create an async error. */
857 void addError(AsyncError errorEvent); 853 void addError(errorEvent);
858 /** Request a stream to close. */ 854 /** Request a stream to close. */
859 void close(); 855 void close();
860 } 856 }
861 857
862 858
863 /** [Stream] wrapper that only exposes the [Stream] interface. */ 859 /** [Stream] wrapper that only exposes the [Stream] interface. */
864 class StreamView<T> extends Stream<T> { 860 class StreamView<T> extends Stream<T> {
865 Stream<T> _stream; 861 Stream<T> _stream;
866 862
867 StreamView(this._stream); 863 StreamView(this._stream);
868 864
869 bool get isBroadcast => _stream.isBroadcast; 865 bool get isBroadcast => _stream.isBroadcast;
870 866
871 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); 867 Stream<T> asBroadcastStream() => _stream.asBroadcastStream();
872 868
873 StreamSubscription<T> listen(void onData(T value), 869 StreamSubscription<T> listen(void onData(T value),
874 { void onError(AsyncError error), 870 { void onError(error),
875 void onDone(), 871 void onDone(),
876 bool unsubscribeOnError }) { 872 bool unsubscribeOnError }) {
877 return _stream.listen(onData, onError: onError, onDone: onDone, 873 return _stream.listen(onData, onError: onError, onDone: onDone,
878 unsubscribeOnError: unsubscribeOnError); 874 unsubscribeOnError: unsubscribeOnError);
879 } 875 }
880 } 876 }
881 877
882 /** 878 /**
883 * [EventSink] wrapper that only exposes the [EventSink] interface. 879 * [EventSink] wrapper that only exposes the [EventSink] interface.
884 */ 880 */
885 class _EventSinkView<T> extends EventSink<T> { 881 class _EventSinkView<T> extends EventSink<T> {
886 final EventSink<T> _sink; 882 final EventSink<T> _sink;
887 883
888 _EventSinkView(this._sink); 884 _EventSinkView(this._sink);
889 885
890 void add(T value) { _sink.add(value); } 886 void add(T value) { _sink.add(value); }
891 void addError(AsyncError error) { _sink.addError(error); } 887 void addError(error) { _sink.addError(error); }
892 void close() { _sink.close(); } 888 void close() { _sink.close(); }
893 } 889 }
894 890
895 891
896 /** 892 /**
897 * The target of a [Stream.pipe] call. 893 * The target of a [Stream.pipe] call.
898 * 894 *
899 * The [Stream.pipe] call will pass itself to this object, and then return 895 * The [Stream.pipe] call will pass itself to this object, and then return
900 * the resulting [Future]. The pipe should complete the future when it's 896 * the resulting [Future]. The pipe should complete the future when it's
901 * done. 897 * done.
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
960 * 956 *
961 * stringStream.transform(new StreamTransformer<String, String>( 957 * stringStream.transform(new StreamTransformer<String, String>(
962 * handleData: (Strung value, EventSink<String> sink) { 958 * handleData: (Strung value, EventSink<String> sink) {
963 * sink.add(value); 959 * sink.add(value);
964 * sink.add(value); // Duplicate the incoming events. 960 * sink.add(value); // Duplicate the incoming events.
965 * })); 961 * }));
966 * 962 *
967 */ 963 */
968 factory StreamTransformer({ 964 factory StreamTransformer({
969 void handleData(S data, EventSink<T> sink), 965 void handleData(S data, EventSink<T> sink),
970 void handleError(AsyncError error, EventSink<T> sink), 966 void handleError(error, EventSink<T> sink),
971 void handleDone(EventSink<T> sink)}) { 967 void handleDone(EventSink<T> sink)}) {
972 return new _StreamTransformerImpl<S, T>(handleData, 968 return new _StreamTransformerImpl<S, T>(handleData,
973 handleError, 969 handleError,
974 handleDone); 970 handleDone);
975 } 971 }
976 972
977 Stream<T> bind(Stream<S> stream); 973 Stream<T> bind(Stream<S> stream);
978 } 974 }
979 975
980 976
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
1044 var data = event; 1040 var data = event;
1045 sink.add(data); 1041 sink.add(data);
1046 } 1042 }
1047 1043
1048 /** 1044 /**
1049 * Act on incoming error event. 1045 * Act on incoming error event.
1050 * 1046 *
1051 * The method may generate any number of events on the sink, but should 1047 * The method may generate any number of events on the sink, but should
1052 * not throw. 1048 * not throw.
1053 */ 1049 */
1054 void handleError(AsyncError error, EventSink<T> sink) { 1050 void handleError(error, EventSink<T> sink) {
1055 sink.addError(error); 1051 sink.addError(error);
1056 } 1052 }
1057 1053
1058 /** 1054 /**
1059 * Act on incoming done event. 1055 * Act on incoming done event.
1060 * 1056 *
1061 * The method may generate any number of events on the sink, but should 1057 * The method may generate any number of events on the sink, but should
1062 * not throw. 1058 * not throw.
1063 */ 1059 */
1064 void handleDone(EventSink<T> sink){ 1060 void handleDone(EventSink<T> sink){
(...skipping 11 matching lines...) Expand all
1076 * events on this stream. 1072 * events on this stream.
1077 */ 1073 */
1078 class EventTransformStream<S, T> extends Stream<T> { 1074 class EventTransformStream<S, T> extends Stream<T> {
1079 final Stream<S> _source; 1075 final Stream<S> _source;
1080 final StreamEventTransformer _transformer; 1076 final StreamEventTransformer _transformer;
1081 EventTransformStream(Stream<S> source, 1077 EventTransformStream(Stream<S> source,
1082 StreamEventTransformer<S, T> transformer) 1078 StreamEventTransformer<S, T> transformer)
1083 : _source = source, _transformer = transformer; 1079 : _source = source, _transformer = transformer;
1084 1080
1085 StreamSubscription<T> listen(void onData(T data), 1081 StreamSubscription<T> listen(void onData(T data),
1086 { void onError(AsyncError error), 1082 { void onError(error),
1087 void onDone(), 1083 void onDone(),
1088 bool unsubscribeOnError }) { 1084 bool unsubscribeOnError }) {
1089 unsubscribeOnError = identical(true, unsubscribeOnError); 1085 unsubscribeOnError = identical(true, unsubscribeOnError);
1090 return new _EventTransformStreamSubscription(_source, _transformer, 1086 return new _EventTransformStreamSubscription(_source, _transformer,
1091 onData, onError, onDone, 1087 onData, onError, onDone,
1092 unsubscribeOnError); 1088 unsubscribeOnError);
1093 } 1089 }
1094 } 1090 }
1095 1091
1096 class _EventTransformStreamSubscription<S, T> 1092 class _EventTransformStreamSubscription<S, T>
1097 extends _BaseStreamSubscription<T> 1093 extends _BaseStreamSubscription<T>
1098 implements _EventOutputSink<T> { 1094 implements _EventOutputSink<T> {
1099 /** The transformer used to transform events. */ 1095 /** The transformer used to transform events. */
1100 final StreamEventTransformer<S, T> _transformer; 1096 final StreamEventTransformer<S, T> _transformer;
1101 /** Whether to unsubscribe when emitting an error. */ 1097 /** Whether to unsubscribe when emitting an error. */
1102 final bool _unsubscribeOnError; 1098 final bool _unsubscribeOnError;
1103 /** Whether this stream has sent a done event. */ 1099 /** Whether this stream has sent a done event. */
1104 bool _isClosed = false; 1100 bool _isClosed = false;
1105 /** Source of incoming events. */ 1101 /** Source of incoming events. */
1106 StreamSubscription<S> _subscription; 1102 StreamSubscription<S> _subscription;
1107 /** Cached EventSink wrapper for this class. */ 1103 /** Cached EventSink wrapper for this class. */
1108 EventSink<T> _sink; 1104 EventSink<T> _sink;
1109 1105
1110 _EventTransformStreamSubscription(Stream<S> source, 1106 _EventTransformStreamSubscription(Stream<S> source,
1111 this._transformer, 1107 this._transformer,
1112 void onData(T data), 1108 void onData(T data),
1113 void onError(AsyncError error), 1109 void onError(error),
1114 void onDone(), 1110 void onDone(),
1115 this._unsubscribeOnError) 1111 this._unsubscribeOnError)
1116 : super(onData, onError, onDone) { 1112 : super(onData, onError, onDone) {
1117 _sink = new _EventOutputSinkWrapper<T>(this); 1113 _sink = new _EventOutputSinkWrapper<T>(this);
1118 _subscription = source.listen(_handleData, 1114 _subscription = source.listen(_handleData,
1119 onError: _handleError, 1115 onError: _handleError,
1120 onDone: _handleDone); 1116 onDone: _handleDone);
1121 } 1117 }
1122 1118
1123 /** Whether this subscription is still subscribed to its source. */ 1119 /** Whether this subscription is still subscribed to its source. */
(...skipping 17 matching lines...) Expand all
1141 } 1137 }
1142 1138
1143 void _handleData(S data) { 1139 void _handleData(S data) {
1144 try { 1140 try {
1145 _transformer.handleData(data, _sink); 1141 _transformer.handleData(data, _sink);
1146 } catch (e, s) { 1142 } catch (e, s) {
1147 _sendError(_asyncError(e, s)); 1143 _sendError(_asyncError(e, s));
1148 } 1144 }
1149 } 1145 }
1150 1146
1151 void _handleError(AsyncError error) { 1147 void _handleError(error) {
1152 try { 1148 try {
1153 _transformer.handleError(error, _sink); 1149 _transformer.handleError(error, _sink);
1154 } catch (e, s) { 1150 } catch (e, s) {
1155 _sendError(_asyncError(e, s, error)); 1151 _sendError(_asyncError(e, s));
1156 } 1152 }
1157 } 1153 }
1158 1154
1159 void _handleDone() { 1155 void _handleDone() {
1160 try { 1156 try {
1161 _subscription = null; 1157 _subscription = null;
1162 _transformer.handleDone(_sink); 1158 _transformer.handleDone(_sink);
1163 } catch (e, s) { 1159 } catch (e, s) {
1164 _sendError(_asyncError(e, s)); 1160 _sendError(_asyncError(e, s));
1165 } 1161 }
1166 } 1162 }
1167 1163
1168 // EventOutputSink interface. 1164 // EventOutputSink interface.
1169 void _sendData(T data) { 1165 void _sendData(T data) {
1170 if (_isClosed) return; 1166 if (_isClosed) return;
1171 _onData(data); 1167 _onData(data);
1172 } 1168 }
1173 1169
1174 void _sendError(AsyncError error) { 1170 void _sendError(error) {
1175 if (_isClosed) return; 1171 if (_isClosed) return;
1176 _onError(error); 1172 _onError(error);
1177 if (_unsubscribeOnError) { 1173 if (_unsubscribeOnError) {
1178 cancel(); 1174 cancel();
1179 } 1175 }
1180 } 1176 }
1181 1177
1182 void _sendDone() { 1178 void _sendDone() {
1183 if (_isClosed) throw new StateError("Already closed."); 1179 if (_isClosed) throw new StateError("Already closed.");
1184 _isClosed = true; 1180 _isClosed = true;
1185 if (_isSubscribed) { 1181 if (_isSubscribed) {
1186 _subscription.cancel(); 1182 _subscription.cancel();
1187 _subscription = null; 1183 _subscription = null;
1188 } 1184 }
1189 _onDone(); 1185 _onDone();
1190 } 1186 }
1191 } 1187 }
1192 1188
1193 class _EventOutputSinkWrapper<T> extends EventSink<T> { 1189 class _EventOutputSinkWrapper<T> extends EventSink<T> {
1194 _EventOutputSink _sink; 1190 _EventOutputSink _sink;
1195 _EventOutputSinkWrapper(this._sink); 1191 _EventOutputSinkWrapper(this._sink);
1196 1192
1197 void add(T data) { _sink._sendData(data); } 1193 void add(T data) { _sink._sendData(data); }
1198 void addError(AsyncError error) { _sink._sendError(error); } 1194 void addError(error) { _sink._sendError(error); }
1199 void close() { _sink._sendDone(); } 1195 void close() { _sink._sendDone(); }
1200 } 1196 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698