OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
163 return new _SingleStreamMultiplexer<T>(this); | 163 return new _SingleStreamMultiplexer<T>(this); |
164 } | 164 } |
165 | 165 |
166 /** | 166 /** |
167 * Adds a subscription to this stream. | 167 * Adds a subscription to this stream. |
168 * | 168 * |
169 * On each data event from this stream, the subscriber's [onData] handler | 169 * On each data event from this stream, the subscriber's [onData] handler |
170 * is called. If [onData] is null, nothing happens. | 170 * is called. If [onData] is null, nothing happens. |
171 * | 171 * |
172 * On errors from this stream, the [onError] handler is given a | 172 * On errors from this stream, the [onError] handler is given a |
173 * [AsyncError] object describing the error. | 173 * object describing the error. |
174 * | 174 * |
175 * If this stream closes, the [onDone] handler is called. | 175 * If this stream closes, the [onDone] handler is called. |
176 * | 176 * |
177 * If [unsubscribeOnError] is true, the subscription is ended when | 177 * If [unsubscribeOnError] is true, the subscription is ended when |
178 * the first error is reported. The default is false. | 178 * the first error is reported. The default is false. |
179 */ | 179 */ |
180 StreamSubscription<T> listen(void onData(T event), | 180 StreamSubscription<T> listen(void onData(T event), |
181 { void onError(AsyncError error), | 181 { void onError(error), |
182 void onDone(), | 182 void onDone(), |
183 bool unsubscribeOnError}); | 183 bool unsubscribeOnError}); |
184 | 184 |
185 /** | 185 /** |
186 * Creates a new stream from this stream that discards some data events. | 186 * Creates a new stream from this stream that discards some data events. |
187 * | 187 * |
188 * The new stream sends the same error and done events as this stream, | 188 * The new stream sends the same error and done events as this stream, |
189 * but it only sends the data events that satisfy the [test]. | 189 * but it only sends the data events that satisfy the [test]. |
190 */ | 190 */ |
191 Stream<T> where(bool test(T event)) { | 191 Stream<T> where(bool test(T event)) { |
(...skipping 18 matching lines...) Expand all Loading... |
210 * true. If [test] is omitted, every error is considered matching. | 210 * true. If [test] is omitted, every error is considered matching. |
211 * | 211 * |
212 * If the error is intercepted, the [handle] function can decide what to do | 212 * If the error is intercepted, the [handle] function can decide what to do |
213 * with it. It can throw if it wants to raise a new (or the same) error, | 213 * with it. It can throw if it wants to raise a new (or the same) error, |
214 * or simply return to make the stream forget the error. | 214 * or simply return to make the stream forget the error. |
215 * | 215 * |
216 * If you need to transform an error into a data event, use the more generic | 216 * If you need to transform an error into a data event, use the more generic |
217 * [Stream.transformEvent] to handle the event by writing a data event to | 217 * [Stream.transformEvent] to handle the event by writing a data event to |
218 * the output sink | 218 * the output sink |
219 */ | 219 */ |
220 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { | 220 Stream<T> handleError(void handle( error), { bool test(error) }) { |
221 return new _HandleErrorStream<T>(this, handle, test); | 221 return new _HandleErrorStream<T>(this, handle, test); |
222 } | 222 } |
223 | 223 |
224 /** | 224 /** |
225 * Creates a new stream from this stream that converts each element | 225 * Creates a new stream from this stream that converts each element |
226 * into zero or more events. | 226 * into zero or more events. |
227 * | 227 * |
228 * Each incoming event is converted to an [Iterable] of new events, | 228 * Each incoming event is converted to an [Iterable] of new events, |
229 * and each of these new events are then sent by the returned stream | 229 * and each of these new events are then sent by the returned stream |
230 * in order. | 230 * in order. |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
266 (T newValue) { value = newValue; }, | 266 (T newValue) { value = newValue; }, |
267 _cancelAndError(subscription, result)); | 267 _cancelAndError(subscription, result)); |
268 } else { | 268 } else { |
269 value = element; | 269 value = element; |
270 seenFirst = true; | 270 seenFirst = true; |
271 } | 271 } |
272 }, | 272 }, |
273 onError: result._setError, | 273 onError: result._setError, |
274 onDone: () { | 274 onDone: () { |
275 if (!seenFirst) { | 275 if (!seenFirst) { |
276 result._setError(new AsyncError(new StateError("No elements"))); | 276 result._setError(new StateError("No elements")); |
277 } else { | 277 } else { |
278 result._setValue(value); | 278 result._setValue(value); |
279 } | 279 } |
280 }, | 280 }, |
281 unsubscribeOnError: true | 281 unsubscribeOnError: true |
282 ); | 282 ); |
283 return result; | 283 return result; |
284 } | 284 } |
285 | 285 |
286 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 286 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
287 Future fold(var initialValue, combine(var previous, T element)) { | 287 Future fold(var initialValue, combine(var previous, T element)) { |
288 _FutureImpl result = new _FutureImpl(); | 288 _FutureImpl result = new _FutureImpl(); |
289 var value = initialValue; | 289 var value = initialValue; |
290 StreamSubscription subscription; | 290 StreamSubscription subscription; |
291 subscription = this.listen( | 291 subscription = this.listen( |
292 // TODO(ahe): Restore type when feature is implemented in dart2js | 292 // TODO(ahe): Restore type when feature is implemented in dart2js |
293 // checked mode. http://dartbug.com/7733 | 293 // checked mode. http://dartbug.com/7733 |
294 (/*T*/ element) { | 294 (/*T*/ element) { |
295 _runUserCode( | 295 _runUserCode( |
296 () => combine(value, element), | 296 () => combine(value, element), |
297 (newValue) { value = newValue; }, | 297 (newValue) { value = newValue; }, |
298 _cancelAndError(subscription, result) | 298 _cancelAndError(subscription, result) |
299 ); | 299 ); |
300 }, | 300 }, |
301 onError: (AsyncError e) { | 301 onError: (e) { |
302 result._setError(e); | 302 result._setError(e); |
303 }, | 303 }, |
304 onDone: () { | 304 onDone: () { |
305 result._setValue(value); | 305 result._setValue(value); |
306 }, | 306 }, |
307 unsubscribeOnError: true); | 307 unsubscribeOnError: true); |
308 return result; | 308 return result; |
309 } | 309 } |
310 | 310 |
311 /** | 311 /** |
(...skipping 229 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
541 subscription = this.listen( | 541 subscription = this.listen( |
542 // TODO(ahe): Restore type when feature is implemented in dart2js | 542 // TODO(ahe): Restore type when feature is implemented in dart2js |
543 // checked mode. http://dartbug.com/7733 | 543 // checked mode. http://dartbug.com/7733 |
544 (/*T*/ value) { | 544 (/*T*/ value) { |
545 subscription.cancel(); | 545 subscription.cancel(); |
546 future._setValue(value); | 546 future._setValue(value); |
547 return; | 547 return; |
548 }, | 548 }, |
549 onError: future._setError, | 549 onError: future._setError, |
550 onDone: () { | 550 onDone: () { |
551 future._setError(new AsyncError(new StateError("No elements"))); | 551 future._setError(new StateError("No elements")); |
552 }, | 552 }, |
553 unsubscribeOnError: true); | 553 unsubscribeOnError: true); |
554 return future; | 554 return future; |
555 } | 555 } |
556 | 556 |
557 /** | 557 /** |
558 * Returns the last element. | 558 * Returns the last element. |
559 * | 559 * |
560 * If [this] is empty throws a [StateError]. | 560 * If [this] is empty throws a [StateError]. |
561 */ | 561 */ |
562 Future<T> get last { | 562 Future<T> get last { |
563 _FutureImpl<T> future = new _FutureImpl<T>(); | 563 _FutureImpl<T> future = new _FutureImpl<T>(); |
564 T result = null; | 564 T result = null; |
565 bool foundResult = false; | 565 bool foundResult = false; |
566 StreamSubscription subscription; | 566 StreamSubscription subscription; |
567 subscription = this.listen( | 567 subscription = this.listen( |
568 // TODO(ahe): Restore type when feature is implemented in dart2js | 568 // TODO(ahe): Restore type when feature is implemented in dart2js |
569 // checked mode. http://dartbug.com/7733 | 569 // checked mode. http://dartbug.com/7733 |
570 (/*T*/ value) { | 570 (/*T*/ value) { |
571 foundResult = true; | 571 foundResult = true; |
572 result = value; | 572 result = value; |
573 }, | 573 }, |
574 onError: future._setError, | 574 onError: future._setError, |
575 onDone: () { | 575 onDone: () { |
576 if (foundResult) { | 576 if (foundResult) { |
577 future._setValue(result); | 577 future._setValue(result); |
578 return; | 578 return; |
579 } | 579 } |
580 future._setError(new AsyncError(new StateError("No elements"))); | 580 future._setError(new StateError("No elements")); |
581 }, | 581 }, |
582 unsubscribeOnError: true); | 582 unsubscribeOnError: true); |
583 return future; | 583 return future; |
584 } | 584 } |
585 | 585 |
586 /** | 586 /** |
587 * Returns the single element. | 587 * Returns the single element. |
588 * | 588 * |
589 * If [this] is empty or has more than one element throws a [StateError]. | 589 * If [this] is empty or has more than one element throws a [StateError]. |
590 */ | 590 */ |
591 Future<T> get single { | 591 Future<T> get single { |
592 _FutureImpl<T> future = new _FutureImpl<T>(); | 592 _FutureImpl<T> future = new _FutureImpl<T>(); |
593 T result = null; | 593 T result = null; |
594 bool foundResult = false; | 594 bool foundResult = false; |
595 StreamSubscription subscription; | 595 StreamSubscription subscription; |
596 subscription = this.listen( | 596 subscription = this.listen( |
597 // TODO(ahe): Restore type when feature is implemented in dart2js | 597 // TODO(ahe): Restore type when feature is implemented in dart2js |
598 // checked mode. http://dartbug.com/7733 | 598 // checked mode. http://dartbug.com/7733 |
599 (/*T*/ value) { | 599 (/*T*/ value) { |
600 if (foundResult) { | 600 if (foundResult) { |
601 subscription.cancel(); | 601 subscription.cancel(); |
602 // This is the second element we get. | 602 // This is the second element we get. |
603 Error error = new StateError("More than one element"); | 603 Error error = new StateError("More than one element"); |
604 future._setError(new AsyncError(error)); | 604 future._setError(error); |
605 return; | 605 return; |
606 } | 606 } |
607 foundResult = true; | 607 foundResult = true; |
608 result = value; | 608 result = value; |
609 }, | 609 }, |
610 onError: future._setError, | 610 onError: future._setError, |
611 onDone: () { | 611 onDone: () { |
612 if (foundResult) { | 612 if (foundResult) { |
613 future._setValue(result); | 613 future._setValue(result); |
614 return; | 614 return; |
615 } | 615 } |
616 future._setError(new AsyncError(new StateError("No elements"))); | 616 future._setError(new StateError("No elements")); |
617 }, | 617 }, |
618 unsubscribeOnError: true); | 618 unsubscribeOnError: true); |
619 return future; | 619 return future; |
620 } | 620 } |
621 | 621 |
622 /** | 622 /** |
623 * Finds the first element of this stream matching [test]. | 623 * Finds the first element of this stream matching [test]. |
624 * | 624 * |
625 * Returns a future that is filled with the first element of this stream | 625 * Returns a future that is filled with the first element of this stream |
626 * that [test] returns true for. | 626 * that [test] returns true for. |
(...skipping 23 matching lines...) Expand all Loading... |
650 }, | 650 }, |
651 _cancelAndError(subscription, future) | 651 _cancelAndError(subscription, future) |
652 ); | 652 ); |
653 }, | 653 }, |
654 onError: future._setError, | 654 onError: future._setError, |
655 onDone: () { | 655 onDone: () { |
656 if (defaultValue != null) { | 656 if (defaultValue != null) { |
657 _runUserCode(defaultValue, future._setValue, future._setError); | 657 _runUserCode(defaultValue, future._setValue, future._setError); |
658 return; | 658 return; |
659 } | 659 } |
660 future._setError( | 660 future._setError(new StateError("firstMatch ended without match")); |
661 new AsyncError(new StateError("firstMatch ended without match"))); | |
662 }, | 661 }, |
663 unsubscribeOnError: true); | 662 unsubscribeOnError: true); |
664 return future; | 663 return future; |
665 } | 664 } |
666 | 665 |
667 /** | 666 /** |
668 * Finds the last element in this stream matching [test]. | 667 * Finds the last element in this stream matching [test]. |
669 * | 668 * |
670 * As [firstWhere], except that the last matching element is found. | 669 * As [firstWhere], except that the last matching element is found. |
671 * That means that the result cannot be provided before this stream | 670 * That means that the result cannot be provided before this stream |
(...skipping 22 matching lines...) Expand all Loading... |
694 onError: future._setError, | 693 onError: future._setError, |
695 onDone: () { | 694 onDone: () { |
696 if (foundResult) { | 695 if (foundResult) { |
697 future._setValue(result); | 696 future._setValue(result); |
698 return; | 697 return; |
699 } | 698 } |
700 if (defaultValue != null) { | 699 if (defaultValue != null) { |
701 _runUserCode(defaultValue, future._setValue, future._setError); | 700 _runUserCode(defaultValue, future._setValue, future._setError); |
702 return; | 701 return; |
703 } | 702 } |
704 future._setError( | 703 future._setError(new StateError("lastMatch ended without match")); |
705 new AsyncError(new StateError("lastMatch ended without match"))); | |
706 }, | 704 }, |
707 unsubscribeOnError: true); | 705 unsubscribeOnError: true); |
708 return future; | 706 return future; |
709 } | 707 } |
710 | 708 |
711 /** | 709 /** |
712 * Finds the single element in this stream matching [test]. | 710 * Finds the single element in this stream matching [test]. |
713 * | 711 * |
714 * Like [lastMatch], except that it is an error if more than one | 712 * Like [lastMatch], except that it is an error if more than one |
715 * matching element occurs in the stream. | 713 * matching element occurs in the stream. |
716 */ | 714 */ |
717 Future<T> singleWhere(bool test(T value)) { | 715 Future<T> singleWhere(bool test(T value)) { |
718 _FutureImpl<T> future = new _FutureImpl<T>(); | 716 _FutureImpl<T> future = new _FutureImpl<T>(); |
719 T result = null; | 717 T result = null; |
720 bool foundResult = false; | 718 bool foundResult = false; |
721 StreamSubscription subscription; | 719 StreamSubscription subscription; |
722 subscription = this.listen( | 720 subscription = this.listen( |
723 // TODO(ahe): Restore type when feature is implemented in dart2js | 721 // TODO(ahe): Restore type when feature is implemented in dart2js |
724 // checked mode. http://dartbug.com/7733 | 722 // checked mode. http://dartbug.com/7733 |
725 (/*T*/ value) { | 723 (/*T*/ value) { |
726 _runUserCode( | 724 _runUserCode( |
727 () => true == test(value), | 725 () => true == test(value), |
728 (bool isMatch) { | 726 (bool isMatch) { |
729 if (isMatch) { | 727 if (isMatch) { |
730 if (foundResult) { | 728 if (foundResult) { |
731 subscription.cancel(); | 729 subscription.cancel(); |
732 future._setError(new AsyncError( | 730 future._setError( |
733 new StateError('Multiple matches for "single"'))); | 731 new StateError('Multiple matches for "single"')); |
734 return; | 732 return; |
735 } | 733 } |
736 foundResult = true; | 734 foundResult = true; |
737 result = value; | 735 result = value; |
738 } | 736 } |
739 }, | 737 }, |
740 _cancelAndError(subscription, future) | 738 _cancelAndError(subscription, future) |
741 ); | 739 ); |
742 }, | 740 }, |
743 onError: future._setError, | 741 onError: future._setError, |
744 onDone: () { | 742 onDone: () { |
745 if (foundResult) { | 743 if (foundResult) { |
746 future._setValue(result); | 744 future._setValue(result); |
747 return; | 745 return; |
748 } | 746 } |
749 future._setError( | 747 future._setError(new StateError("single ended without match")); |
750 new AsyncError(new StateError("single ended without match"))); | |
751 }, | 748 }, |
752 unsubscribeOnError: true); | 749 unsubscribeOnError: true); |
753 return future; | 750 return future; |
754 } | 751 } |
755 | 752 |
756 /** | 753 /** |
757 * Returns the value of the [index]th data event of this stream. | 754 * Returns the value of the [index]th data event of this stream. |
758 * | 755 * |
759 * If an error event occurs, the future will end with this error. | 756 * If an error event occurs, the future will end with this error. |
760 * | 757 * |
(...skipping 10 matching lines...) Expand all Loading... |
771 (/*T*/ value) { | 768 (/*T*/ value) { |
772 if (index == 0) { | 769 if (index == 0) { |
773 subscription.cancel(); | 770 subscription.cancel(); |
774 future._setValue(value); | 771 future._setValue(value); |
775 return; | 772 return; |
776 } | 773 } |
777 index -= 1; | 774 index -= 1; |
778 }, | 775 }, |
779 onError: future._setError, | 776 onError: future._setError, |
780 onDone: () { | 777 onDone: () { |
781 future._setError(new AsyncError( | 778 future._setError(new StateError("Not enough elements for elementAt")); |
782 new StateError("Not enough elements for elementAt"))); | |
783 }, | 779 }, |
784 unsubscribeOnError: true); | 780 unsubscribeOnError: true); |
785 return future; | 781 return future; |
786 } | 782 } |
787 } | 783 } |
788 | 784 |
789 /** | 785 /** |
790 * A control object for the subscription on a [Stream]. | 786 * A control object for the subscription on a [Stream]. |
791 * | 787 * |
792 * When you subscribe on a [Stream] using [Stream.listen], | 788 * When you subscribe on a [Stream] using [Stream.listen], |
793 * a [StreamSubscription] object is returned. This object | 789 * a [StreamSubscription] object is returned. This object |
794 * is used to later unsubscribe again, or to temporarily pause | 790 * is used to later unsubscribe again, or to temporarily pause |
795 * the stream's events. | 791 * the stream's events. |
796 */ | 792 */ |
797 abstract class StreamSubscription<T> { | 793 abstract class StreamSubscription<T> { |
798 /** | 794 /** |
799 * Cancels this subscription. It will no longer receive events. | 795 * Cancels this subscription. It will no longer receive events. |
800 * | 796 * |
801 * If an event is currently firing, this unsubscription will only | 797 * If an event is currently firing, this unsubscription will only |
802 * take effect after all subscribers have received the current event. | 798 * take effect after all subscribers have received the current event. |
803 */ | 799 */ |
804 void cancel(); | 800 void cancel(); |
805 | 801 |
806 /** Set or override the data event handler of this subscription. */ | 802 /** Set or override the data event handler of this subscription. */ |
807 void onData(void handleData(T data)); | 803 void onData(void handleData(T data)); |
808 | 804 |
809 /** Set or override the error event handler of this subscription. */ | 805 /** Set or override the error event handler of this subscription. */ |
810 void onError(void handleError(AsyncError error)); | 806 void onError(void handleError(error)); |
811 | 807 |
812 /** Set or override the done event handler of this subscription. */ | 808 /** Set or override the done event handler of this subscription. */ |
813 void onDone(void handleDone()); | 809 void onDone(void handleDone()); |
814 | 810 |
815 /** | 811 /** |
816 * Request that the stream pauses events until further notice. | 812 * Request that the stream pauses events until further notice. |
817 * | 813 * |
818 * If [resumeSignal] is provided, the stream will undo the pause | 814 * If [resumeSignal] is provided, the stream will undo the pause |
819 * when the future completes. If the future completes with an error, | 815 * when the future completes. If the future completes with an error, |
820 * it will not be handled! | 816 * it will not be handled! |
(...skipping 26 matching lines...) Expand all Loading... |
847 } | 843 } |
848 | 844 |
849 | 845 |
850 /** | 846 /** |
851 * An interface that abstracts creation or handling of [Stream] events. | 847 * An interface that abstracts creation or handling of [Stream] events. |
852 */ | 848 */ |
853 abstract class EventSink<T> { | 849 abstract class EventSink<T> { |
854 /** Create a data event */ | 850 /** Create a data event */ |
855 void add(T event); | 851 void add(T event); |
856 /** Create an async error. */ | 852 /** Create an async error. */ |
857 void addError(AsyncError errorEvent); | 853 void addError(errorEvent); |
858 /** Request a stream to close. */ | 854 /** Request a stream to close. */ |
859 void close(); | 855 void close(); |
860 } | 856 } |
861 | 857 |
862 | 858 |
863 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 859 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
864 class StreamView<T> extends Stream<T> { | 860 class StreamView<T> extends Stream<T> { |
865 Stream<T> _stream; | 861 Stream<T> _stream; |
866 | 862 |
867 StreamView(this._stream); | 863 StreamView(this._stream); |
868 | 864 |
869 bool get isBroadcast => _stream.isBroadcast; | 865 bool get isBroadcast => _stream.isBroadcast; |
870 | 866 |
871 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); | 867 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); |
872 | 868 |
873 StreamSubscription<T> listen(void onData(T value), | 869 StreamSubscription<T> listen(void onData(T value), |
874 { void onError(AsyncError error), | 870 { void onError(error), |
875 void onDone(), | 871 void onDone(), |
876 bool unsubscribeOnError }) { | 872 bool unsubscribeOnError }) { |
877 return _stream.listen(onData, onError: onError, onDone: onDone, | 873 return _stream.listen(onData, onError: onError, onDone: onDone, |
878 unsubscribeOnError: unsubscribeOnError); | 874 unsubscribeOnError: unsubscribeOnError); |
879 } | 875 } |
880 } | 876 } |
881 | 877 |
882 /** | 878 /** |
883 * [EventSink] wrapper that only exposes the [EventSink] interface. | 879 * [EventSink] wrapper that only exposes the [EventSink] interface. |
884 */ | 880 */ |
885 class _EventSinkView<T> extends EventSink<T> { | 881 class _EventSinkView<T> extends EventSink<T> { |
886 final EventSink<T> _sink; | 882 final EventSink<T> _sink; |
887 | 883 |
888 _EventSinkView(this._sink); | 884 _EventSinkView(this._sink); |
889 | 885 |
890 void add(T value) { _sink.add(value); } | 886 void add(T value) { _sink.add(value); } |
891 void addError(AsyncError error) { _sink.addError(error); } | 887 void addError(error) { _sink.addError(error); } |
892 void close() { _sink.close(); } | 888 void close() { _sink.close(); } |
893 } | 889 } |
894 | 890 |
895 | 891 |
896 /** | 892 /** |
897 * The target of a [Stream.pipe] call. | 893 * The target of a [Stream.pipe] call. |
898 * | 894 * |
899 * The [Stream.pipe] call will pass itself to this object, and then return | 895 * The [Stream.pipe] call will pass itself to this object, and then return |
900 * the resulting [Future]. The pipe should complete the future when it's | 896 * the resulting [Future]. The pipe should complete the future when it's |
901 * done. | 897 * done. |
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
960 * | 956 * |
961 * stringStream.transform(new StreamTransformer<String, String>( | 957 * stringStream.transform(new StreamTransformer<String, String>( |
962 * handleData: (Strung value, EventSink<String> sink) { | 958 * handleData: (Strung value, EventSink<String> sink) { |
963 * sink.add(value); | 959 * sink.add(value); |
964 * sink.add(value); // Duplicate the incoming events. | 960 * sink.add(value); // Duplicate the incoming events. |
965 * })); | 961 * })); |
966 * | 962 * |
967 */ | 963 */ |
968 factory StreamTransformer({ | 964 factory StreamTransformer({ |
969 void handleData(S data, EventSink<T> sink), | 965 void handleData(S data, EventSink<T> sink), |
970 void handleError(AsyncError error, EventSink<T> sink), | 966 void handleError(error, EventSink<T> sink), |
971 void handleDone(EventSink<T> sink)}) { | 967 void handleDone(EventSink<T> sink)}) { |
972 return new _StreamTransformerImpl<S, T>(handleData, | 968 return new _StreamTransformerImpl<S, T>(handleData, |
973 handleError, | 969 handleError, |
974 handleDone); | 970 handleDone); |
975 } | 971 } |
976 | 972 |
977 Stream<T> bind(Stream<S> stream); | 973 Stream<T> bind(Stream<S> stream); |
978 } | 974 } |
979 | 975 |
980 | 976 |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1044 var data = event; | 1040 var data = event; |
1045 sink.add(data); | 1041 sink.add(data); |
1046 } | 1042 } |
1047 | 1043 |
1048 /** | 1044 /** |
1049 * Act on incoming error event. | 1045 * Act on incoming error event. |
1050 * | 1046 * |
1051 * The method may generate any number of events on the sink, but should | 1047 * The method may generate any number of events on the sink, but should |
1052 * not throw. | 1048 * not throw. |
1053 */ | 1049 */ |
1054 void handleError(AsyncError error, EventSink<T> sink) { | 1050 void handleError(error, EventSink<T> sink) { |
1055 sink.addError(error); | 1051 sink.addError(error); |
1056 } | 1052 } |
1057 | 1053 |
1058 /** | 1054 /** |
1059 * Act on incoming done event. | 1055 * Act on incoming done event. |
1060 * | 1056 * |
1061 * The method may generate any number of events on the sink, but should | 1057 * The method may generate any number of events on the sink, but should |
1062 * not throw. | 1058 * not throw. |
1063 */ | 1059 */ |
1064 void handleDone(EventSink<T> sink){ | 1060 void handleDone(EventSink<T> sink){ |
(...skipping 11 matching lines...) Expand all Loading... |
1076 * events on this stream. | 1072 * events on this stream. |
1077 */ | 1073 */ |
1078 class EventTransformStream<S, T> extends Stream<T> { | 1074 class EventTransformStream<S, T> extends Stream<T> { |
1079 final Stream<S> _source; | 1075 final Stream<S> _source; |
1080 final StreamEventTransformer _transformer; | 1076 final StreamEventTransformer _transformer; |
1081 EventTransformStream(Stream<S> source, | 1077 EventTransformStream(Stream<S> source, |
1082 StreamEventTransformer<S, T> transformer) | 1078 StreamEventTransformer<S, T> transformer) |
1083 : _source = source, _transformer = transformer; | 1079 : _source = source, _transformer = transformer; |
1084 | 1080 |
1085 StreamSubscription<T> listen(void onData(T data), | 1081 StreamSubscription<T> listen(void onData(T data), |
1086 { void onError(AsyncError error), | 1082 { void onError(error), |
1087 void onDone(), | 1083 void onDone(), |
1088 bool unsubscribeOnError }) { | 1084 bool unsubscribeOnError }) { |
1089 unsubscribeOnError = identical(true, unsubscribeOnError); | 1085 unsubscribeOnError = identical(true, unsubscribeOnError); |
1090 return new _EventTransformStreamSubscription(_source, _transformer, | 1086 return new _EventTransformStreamSubscription(_source, _transformer, |
1091 onData, onError, onDone, | 1087 onData, onError, onDone, |
1092 unsubscribeOnError); | 1088 unsubscribeOnError); |
1093 } | 1089 } |
1094 } | 1090 } |
1095 | 1091 |
1096 class _EventTransformStreamSubscription<S, T> | 1092 class _EventTransformStreamSubscription<S, T> |
1097 extends _BaseStreamSubscription<T> | 1093 extends _BaseStreamSubscription<T> |
1098 implements _EventOutputSink<T> { | 1094 implements _EventOutputSink<T> { |
1099 /** The transformer used to transform events. */ | 1095 /** The transformer used to transform events. */ |
1100 final StreamEventTransformer<S, T> _transformer; | 1096 final StreamEventTransformer<S, T> _transformer; |
1101 /** Whether to unsubscribe when emitting an error. */ | 1097 /** Whether to unsubscribe when emitting an error. */ |
1102 final bool _unsubscribeOnError; | 1098 final bool _unsubscribeOnError; |
1103 /** Whether this stream has sent a done event. */ | 1099 /** Whether this stream has sent a done event. */ |
1104 bool _isClosed = false; | 1100 bool _isClosed = false; |
1105 /** Source of incoming events. */ | 1101 /** Source of incoming events. */ |
1106 StreamSubscription<S> _subscription; | 1102 StreamSubscription<S> _subscription; |
1107 /** Cached EventSink wrapper for this class. */ | 1103 /** Cached EventSink wrapper for this class. */ |
1108 EventSink<T> _sink; | 1104 EventSink<T> _sink; |
1109 | 1105 |
1110 _EventTransformStreamSubscription(Stream<S> source, | 1106 _EventTransformStreamSubscription(Stream<S> source, |
1111 this._transformer, | 1107 this._transformer, |
1112 void onData(T data), | 1108 void onData(T data), |
1113 void onError(AsyncError error), | 1109 void onError(error), |
1114 void onDone(), | 1110 void onDone(), |
1115 this._unsubscribeOnError) | 1111 this._unsubscribeOnError) |
1116 : super(onData, onError, onDone) { | 1112 : super(onData, onError, onDone) { |
1117 _sink = new _EventOutputSinkWrapper<T>(this); | 1113 _sink = new _EventOutputSinkWrapper<T>(this); |
1118 _subscription = source.listen(_handleData, | 1114 _subscription = source.listen(_handleData, |
1119 onError: _handleError, | 1115 onError: _handleError, |
1120 onDone: _handleDone); | 1116 onDone: _handleDone); |
1121 } | 1117 } |
1122 | 1118 |
1123 /** Whether this subscription is still subscribed to its source. */ | 1119 /** Whether this subscription is still subscribed to its source. */ |
(...skipping 17 matching lines...) Expand all Loading... |
1141 } | 1137 } |
1142 | 1138 |
1143 void _handleData(S data) { | 1139 void _handleData(S data) { |
1144 try { | 1140 try { |
1145 _transformer.handleData(data, _sink); | 1141 _transformer.handleData(data, _sink); |
1146 } catch (e, s) { | 1142 } catch (e, s) { |
1147 _sendError(_asyncError(e, s)); | 1143 _sendError(_asyncError(e, s)); |
1148 } | 1144 } |
1149 } | 1145 } |
1150 | 1146 |
1151 void _handleError(AsyncError error) { | 1147 void _handleError(error) { |
1152 try { | 1148 try { |
1153 _transformer.handleError(error, _sink); | 1149 _transformer.handleError(error, _sink); |
1154 } catch (e, s) { | 1150 } catch (e, s) { |
1155 _sendError(_asyncError(e, s, error)); | 1151 _sendError(_asyncError(e, s)); |
1156 } | 1152 } |
1157 } | 1153 } |
1158 | 1154 |
1159 void _handleDone() { | 1155 void _handleDone() { |
1160 try { | 1156 try { |
1161 _subscription = null; | 1157 _subscription = null; |
1162 _transformer.handleDone(_sink); | 1158 _transformer.handleDone(_sink); |
1163 } catch (e, s) { | 1159 } catch (e, s) { |
1164 _sendError(_asyncError(e, s)); | 1160 _sendError(_asyncError(e, s)); |
1165 } | 1161 } |
1166 } | 1162 } |
1167 | 1163 |
1168 // EventOutputSink interface. | 1164 // EventOutputSink interface. |
1169 void _sendData(T data) { | 1165 void _sendData(T data) { |
1170 if (_isClosed) return; | 1166 if (_isClosed) return; |
1171 _onData(data); | 1167 _onData(data); |
1172 } | 1168 } |
1173 | 1169 |
1174 void _sendError(AsyncError error) { | 1170 void _sendError(error) { |
1175 if (_isClosed) return; | 1171 if (_isClosed) return; |
1176 _onError(error); | 1172 _onError(error); |
1177 if (_unsubscribeOnError) { | 1173 if (_unsubscribeOnError) { |
1178 cancel(); | 1174 cancel(); |
1179 } | 1175 } |
1180 } | 1176 } |
1181 | 1177 |
1182 void _sendDone() { | 1178 void _sendDone() { |
1183 if (_isClosed) throw new StateError("Already closed."); | 1179 if (_isClosed) throw new StateError("Already closed."); |
1184 _isClosed = true; | 1180 _isClosed = true; |
1185 if (_isSubscribed) { | 1181 if (_isSubscribed) { |
1186 _subscription.cancel(); | 1182 _subscription.cancel(); |
1187 _subscription = null; | 1183 _subscription = null; |
1188 } | 1184 } |
1189 _onDone(); | 1185 _onDone(); |
1190 } | 1186 } |
1191 } | 1187 } |
1192 | 1188 |
1193 class _EventOutputSinkWrapper<T> extends EventSink<T> { | 1189 class _EventOutputSinkWrapper<T> extends EventSink<T> { |
1194 _EventOutputSink _sink; | 1190 _EventOutputSink _sink; |
1195 _EventOutputSinkWrapper(this._sink); | 1191 _EventOutputSinkWrapper(this._sink); |
1196 | 1192 |
1197 void add(T data) { _sink._sendData(data); } | 1193 void add(T data) { _sink._sendData(data); } |
1198 void addError(AsyncError error) { _sink._sendError(error); } | 1194 void addError(error) { _sink._sendError(error); } |
1199 void close() { _sink._sendDone(); } | 1195 void close() { _sink._sendDone(); } |
1200 } | 1196 } |
OLD | NEW |