| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 163 return new _SingleStreamMultiplexer<T>(this); | 163 return new _SingleStreamMultiplexer<T>(this); |
| 164 } | 164 } |
| 165 | 165 |
| 166 /** | 166 /** |
| 167 * Adds a subscription to this stream. | 167 * Adds a subscription to this stream. |
| 168 * | 168 * |
| 169 * On each data event from this stream, the subscriber's [onData] handler | 169 * On each data event from this stream, the subscriber's [onData] handler |
| 170 * is called. If [onData] is null, nothing happens. | 170 * is called. If [onData] is null, nothing happens. |
| 171 * | 171 * |
| 172 * On errors from this stream, the [onError] handler is given a | 172 * On errors from this stream, the [onError] handler is given a |
| 173 * [AsyncError] object describing the error. | 173 * object describing the error. |
| 174 * | 174 * |
| 175 * If this stream closes, the [onDone] handler is called. | 175 * If this stream closes, the [onDone] handler is called. |
| 176 * | 176 * |
| 177 * If [unsubscribeOnError] is true, the subscription is ended when | 177 * If [unsubscribeOnError] is true, the subscription is ended when |
| 178 * the first error is reported. The default is false. | 178 * the first error is reported. The default is false. |
| 179 */ | 179 */ |
| 180 StreamSubscription<T> listen(void onData(T event), | 180 StreamSubscription<T> listen(void onData(T event), |
| 181 { void onError(AsyncError error), | 181 { void onError(error), |
| 182 void onDone(), | 182 void onDone(), |
| 183 bool unsubscribeOnError}); | 183 bool unsubscribeOnError}); |
| 184 | 184 |
| 185 /** | 185 /** |
| 186 * Creates a new stream from this stream that discards some data events. | 186 * Creates a new stream from this stream that discards some data events. |
| 187 * | 187 * |
| 188 * The new stream sends the same error and done events as this stream, | 188 * The new stream sends the same error and done events as this stream, |
| 189 * but it only sends the data events that satisfy the [test]. | 189 * but it only sends the data events that satisfy the [test]. |
| 190 */ | 190 */ |
| 191 Stream<T> where(bool test(T event)) { | 191 Stream<T> where(bool test(T event)) { |
| (...skipping 18 matching lines...) Expand all Loading... |
| 210 * true. If [test] is omitted, every error is considered matching. | 210 * true. If [test] is omitted, every error is considered matching. |
| 211 * | 211 * |
| 212 * If the error is intercepted, the [handle] function can decide what to do | 212 * If the error is intercepted, the [handle] function can decide what to do |
| 213 * with it. It can throw if it wants to raise a new (or the same) error, | 213 * with it. It can throw if it wants to raise a new (or the same) error, |
| 214 * or simply return to make the stream forget the error. | 214 * or simply return to make the stream forget the error. |
| 215 * | 215 * |
| 216 * If you need to transform an error into a data event, use the more generic | 216 * If you need to transform an error into a data event, use the more generic |
| 217 * [Stream.transformEvent] to handle the event by writing a data event to | 217 * [Stream.transformEvent] to handle the event by writing a data event to |
| 218 * the output sink | 218 * the output sink |
| 219 */ | 219 */ |
| 220 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { | 220 Stream<T> handleError(void handle( error), { bool test(error) }) { |
| 221 return new _HandleErrorStream<T>(this, handle, test); | 221 return new _HandleErrorStream<T>(this, handle, test); |
| 222 } | 222 } |
| 223 | 223 |
| 224 /** | 224 /** |
| 225 * Creates a new stream from this stream that converts each element | 225 * Creates a new stream from this stream that converts each element |
| 226 * into zero or more events. | 226 * into zero or more events. |
| 227 * | 227 * |
| 228 * Each incoming event is converted to an [Iterable] of new events, | 228 * Each incoming event is converted to an [Iterable] of new events, |
| 229 * and each of these new events are then sent by the returned stream | 229 * and each of these new events are then sent by the returned stream |
| 230 * in order. | 230 * in order. |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 266 (T newValue) { value = newValue; }, | 266 (T newValue) { value = newValue; }, |
| 267 _cancelAndError(subscription, result)); | 267 _cancelAndError(subscription, result)); |
| 268 } else { | 268 } else { |
| 269 value = element; | 269 value = element; |
| 270 seenFirst = true; | 270 seenFirst = true; |
| 271 } | 271 } |
| 272 }, | 272 }, |
| 273 onError: result._setError, | 273 onError: result._setError, |
| 274 onDone: () { | 274 onDone: () { |
| 275 if (!seenFirst) { | 275 if (!seenFirst) { |
| 276 result._setError(new AsyncError(new StateError("No elements"))); | 276 result._setError(new StateError("No elements")); |
| 277 } else { | 277 } else { |
| 278 result._setValue(value); | 278 result._setValue(value); |
| 279 } | 279 } |
| 280 }, | 280 }, |
| 281 unsubscribeOnError: true | 281 unsubscribeOnError: true |
| 282 ); | 282 ); |
| 283 return result; | 283 return result; |
| 284 } | 284 } |
| 285 | 285 |
| 286 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 286 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
| 287 Future fold(var initialValue, combine(var previous, T element)) { | 287 Future fold(var initialValue, combine(var previous, T element)) { |
| 288 _FutureImpl result = new _FutureImpl(); | 288 _FutureImpl result = new _FutureImpl(); |
| 289 var value = initialValue; | 289 var value = initialValue; |
| 290 StreamSubscription subscription; | 290 StreamSubscription subscription; |
| 291 subscription = this.listen( | 291 subscription = this.listen( |
| 292 // TODO(ahe): Restore type when feature is implemented in dart2js | 292 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 293 // checked mode. http://dartbug.com/7733 | 293 // checked mode. http://dartbug.com/7733 |
| 294 (/*T*/ element) { | 294 (/*T*/ element) { |
| 295 _runUserCode( | 295 _runUserCode( |
| 296 () => combine(value, element), | 296 () => combine(value, element), |
| 297 (newValue) { value = newValue; }, | 297 (newValue) { value = newValue; }, |
| 298 _cancelAndError(subscription, result) | 298 _cancelAndError(subscription, result) |
| 299 ); | 299 ); |
| 300 }, | 300 }, |
| 301 onError: (AsyncError e) { | 301 onError: (e) { |
| 302 result._setError(e); | 302 result._setError(e); |
| 303 }, | 303 }, |
| 304 onDone: () { | 304 onDone: () { |
| 305 result._setValue(value); | 305 result._setValue(value); |
| 306 }, | 306 }, |
| 307 unsubscribeOnError: true); | 307 unsubscribeOnError: true); |
| 308 return result; | 308 return result; |
| 309 } | 309 } |
| 310 | 310 |
| 311 /** | 311 /** |
| (...skipping 229 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 541 subscription = this.listen( | 541 subscription = this.listen( |
| 542 // TODO(ahe): Restore type when feature is implemented in dart2js | 542 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 543 // checked mode. http://dartbug.com/7733 | 543 // checked mode. http://dartbug.com/7733 |
| 544 (/*T*/ value) { | 544 (/*T*/ value) { |
| 545 subscription.cancel(); | 545 subscription.cancel(); |
| 546 future._setValue(value); | 546 future._setValue(value); |
| 547 return; | 547 return; |
| 548 }, | 548 }, |
| 549 onError: future._setError, | 549 onError: future._setError, |
| 550 onDone: () { | 550 onDone: () { |
| 551 future._setError(new AsyncError(new StateError("No elements"))); | 551 future._setError(new StateError("No elements")); |
| 552 }, | 552 }, |
| 553 unsubscribeOnError: true); | 553 unsubscribeOnError: true); |
| 554 return future; | 554 return future; |
| 555 } | 555 } |
| 556 | 556 |
| 557 /** | 557 /** |
| 558 * Returns the last element. | 558 * Returns the last element. |
| 559 * | 559 * |
| 560 * If [this] is empty throws a [StateError]. | 560 * If [this] is empty throws a [StateError]. |
| 561 */ | 561 */ |
| 562 Future<T> get last { | 562 Future<T> get last { |
| 563 _FutureImpl<T> future = new _FutureImpl<T>(); | 563 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 564 T result = null; | 564 T result = null; |
| 565 bool foundResult = false; | 565 bool foundResult = false; |
| 566 StreamSubscription subscription; | 566 StreamSubscription subscription; |
| 567 subscription = this.listen( | 567 subscription = this.listen( |
| 568 // TODO(ahe): Restore type when feature is implemented in dart2js | 568 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 569 // checked mode. http://dartbug.com/7733 | 569 // checked mode. http://dartbug.com/7733 |
| 570 (/*T*/ value) { | 570 (/*T*/ value) { |
| 571 foundResult = true; | 571 foundResult = true; |
| 572 result = value; | 572 result = value; |
| 573 }, | 573 }, |
| 574 onError: future._setError, | 574 onError: future._setError, |
| 575 onDone: () { | 575 onDone: () { |
| 576 if (foundResult) { | 576 if (foundResult) { |
| 577 future._setValue(result); | 577 future._setValue(result); |
| 578 return; | 578 return; |
| 579 } | 579 } |
| 580 future._setError(new AsyncError(new StateError("No elements"))); | 580 future._setError(new StateError("No elements")); |
| 581 }, | 581 }, |
| 582 unsubscribeOnError: true); | 582 unsubscribeOnError: true); |
| 583 return future; | 583 return future; |
| 584 } | 584 } |
| 585 | 585 |
| 586 /** | 586 /** |
| 587 * Returns the single element. | 587 * Returns the single element. |
| 588 * | 588 * |
| 589 * If [this] is empty or has more than one element throws a [StateError]. | 589 * If [this] is empty or has more than one element throws a [StateError]. |
| 590 */ | 590 */ |
| 591 Future<T> get single { | 591 Future<T> get single { |
| 592 _FutureImpl<T> future = new _FutureImpl<T>(); | 592 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 593 T result = null; | 593 T result = null; |
| 594 bool foundResult = false; | 594 bool foundResult = false; |
| 595 StreamSubscription subscription; | 595 StreamSubscription subscription; |
| 596 subscription = this.listen( | 596 subscription = this.listen( |
| 597 // TODO(ahe): Restore type when feature is implemented in dart2js | 597 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 598 // checked mode. http://dartbug.com/7733 | 598 // checked mode. http://dartbug.com/7733 |
| 599 (/*T*/ value) { | 599 (/*T*/ value) { |
| 600 if (foundResult) { | 600 if (foundResult) { |
| 601 subscription.cancel(); | 601 subscription.cancel(); |
| 602 // This is the second element we get. | 602 // This is the second element we get. |
| 603 Error error = new StateError("More than one element"); | 603 Error error = new StateError("More than one element"); |
| 604 future._setError(new AsyncError(error)); | 604 future._setError(error); |
| 605 return; | 605 return; |
| 606 } | 606 } |
| 607 foundResult = true; | 607 foundResult = true; |
| 608 result = value; | 608 result = value; |
| 609 }, | 609 }, |
| 610 onError: future._setError, | 610 onError: future._setError, |
| 611 onDone: () { | 611 onDone: () { |
| 612 if (foundResult) { | 612 if (foundResult) { |
| 613 future._setValue(result); | 613 future._setValue(result); |
| 614 return; | 614 return; |
| 615 } | 615 } |
| 616 future._setError(new AsyncError(new StateError("No elements"))); | 616 future._setError(new StateError("No elements")); |
| 617 }, | 617 }, |
| 618 unsubscribeOnError: true); | 618 unsubscribeOnError: true); |
| 619 return future; | 619 return future; |
| 620 } | 620 } |
| 621 | 621 |
| 622 /** | 622 /** |
| 623 * Finds the first element of this stream matching [test]. | 623 * Finds the first element of this stream matching [test]. |
| 624 * | 624 * |
| 625 * Returns a future that is filled with the first element of this stream | 625 * Returns a future that is filled with the first element of this stream |
| 626 * that [test] returns true for. | 626 * that [test] returns true for. |
| (...skipping 23 matching lines...) Expand all Loading... |
| 650 }, | 650 }, |
| 651 _cancelAndError(subscription, future) | 651 _cancelAndError(subscription, future) |
| 652 ); | 652 ); |
| 653 }, | 653 }, |
| 654 onError: future._setError, | 654 onError: future._setError, |
| 655 onDone: () { | 655 onDone: () { |
| 656 if (defaultValue != null) { | 656 if (defaultValue != null) { |
| 657 _runUserCode(defaultValue, future._setValue, future._setError); | 657 _runUserCode(defaultValue, future._setValue, future._setError); |
| 658 return; | 658 return; |
| 659 } | 659 } |
| 660 future._setError( | 660 future._setError(new StateError("firstMatch ended without match")); |
| 661 new AsyncError(new StateError("firstMatch ended without match"))); | |
| 662 }, | 661 }, |
| 663 unsubscribeOnError: true); | 662 unsubscribeOnError: true); |
| 664 return future; | 663 return future; |
| 665 } | 664 } |
| 666 | 665 |
| 667 /** | 666 /** |
| 668 * Finds the last element in this stream matching [test]. | 667 * Finds the last element in this stream matching [test]. |
| 669 * | 668 * |
| 670 * As [firstWhere], except that the last matching element is found. | 669 * As [firstWhere], except that the last matching element is found. |
| 671 * That means that the result cannot be provided before this stream | 670 * That means that the result cannot be provided before this stream |
| (...skipping 22 matching lines...) Expand all Loading... |
| 694 onError: future._setError, | 693 onError: future._setError, |
| 695 onDone: () { | 694 onDone: () { |
| 696 if (foundResult) { | 695 if (foundResult) { |
| 697 future._setValue(result); | 696 future._setValue(result); |
| 698 return; | 697 return; |
| 699 } | 698 } |
| 700 if (defaultValue != null) { | 699 if (defaultValue != null) { |
| 701 _runUserCode(defaultValue, future._setValue, future._setError); | 700 _runUserCode(defaultValue, future._setValue, future._setError); |
| 702 return; | 701 return; |
| 703 } | 702 } |
| 704 future._setError( | 703 future._setError(new StateError("lastMatch ended without match")); |
| 705 new AsyncError(new StateError("lastMatch ended without match"))); | |
| 706 }, | 704 }, |
| 707 unsubscribeOnError: true); | 705 unsubscribeOnError: true); |
| 708 return future; | 706 return future; |
| 709 } | 707 } |
| 710 | 708 |
| 711 /** | 709 /** |
| 712 * Finds the single element in this stream matching [test]. | 710 * Finds the single element in this stream matching [test]. |
| 713 * | 711 * |
| 714 * Like [lastMatch], except that it is an error if more than one | 712 * Like [lastMatch], except that it is an error if more than one |
| 715 * matching element occurs in the stream. | 713 * matching element occurs in the stream. |
| 716 */ | 714 */ |
| 717 Future<T> singleWhere(bool test(T value)) { | 715 Future<T> singleWhere(bool test(T value)) { |
| 718 _FutureImpl<T> future = new _FutureImpl<T>(); | 716 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 719 T result = null; | 717 T result = null; |
| 720 bool foundResult = false; | 718 bool foundResult = false; |
| 721 StreamSubscription subscription; | 719 StreamSubscription subscription; |
| 722 subscription = this.listen( | 720 subscription = this.listen( |
| 723 // TODO(ahe): Restore type when feature is implemented in dart2js | 721 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 724 // checked mode. http://dartbug.com/7733 | 722 // checked mode. http://dartbug.com/7733 |
| 725 (/*T*/ value) { | 723 (/*T*/ value) { |
| 726 _runUserCode( | 724 _runUserCode( |
| 727 () => true == test(value), | 725 () => true == test(value), |
| 728 (bool isMatch) { | 726 (bool isMatch) { |
| 729 if (isMatch) { | 727 if (isMatch) { |
| 730 if (foundResult) { | 728 if (foundResult) { |
| 731 subscription.cancel(); | 729 subscription.cancel(); |
| 732 future._setError(new AsyncError( | 730 future._setError( |
| 733 new StateError('Multiple matches for "single"'))); | 731 new StateError('Multiple matches for "single"')); |
| 734 return; | 732 return; |
| 735 } | 733 } |
| 736 foundResult = true; | 734 foundResult = true; |
| 737 result = value; | 735 result = value; |
| 738 } | 736 } |
| 739 }, | 737 }, |
| 740 _cancelAndError(subscription, future) | 738 _cancelAndError(subscription, future) |
| 741 ); | 739 ); |
| 742 }, | 740 }, |
| 743 onError: future._setError, | 741 onError: future._setError, |
| 744 onDone: () { | 742 onDone: () { |
| 745 if (foundResult) { | 743 if (foundResult) { |
| 746 future._setValue(result); | 744 future._setValue(result); |
| 747 return; | 745 return; |
| 748 } | 746 } |
| 749 future._setError( | 747 future._setError(new StateError("single ended without match")); |
| 750 new AsyncError(new StateError("single ended without match"))); | |
| 751 }, | 748 }, |
| 752 unsubscribeOnError: true); | 749 unsubscribeOnError: true); |
| 753 return future; | 750 return future; |
| 754 } | 751 } |
| 755 | 752 |
| 756 /** | 753 /** |
| 757 * Returns the value of the [index]th data event of this stream. | 754 * Returns the value of the [index]th data event of this stream. |
| 758 * | 755 * |
| 759 * If an error event occurs, the future will end with this error. | 756 * If an error event occurs, the future will end with this error. |
| 760 * | 757 * |
| (...skipping 10 matching lines...) Expand all Loading... |
| 771 (/*T*/ value) { | 768 (/*T*/ value) { |
| 772 if (index == 0) { | 769 if (index == 0) { |
| 773 subscription.cancel(); | 770 subscription.cancel(); |
| 774 future._setValue(value); | 771 future._setValue(value); |
| 775 return; | 772 return; |
| 776 } | 773 } |
| 777 index -= 1; | 774 index -= 1; |
| 778 }, | 775 }, |
| 779 onError: future._setError, | 776 onError: future._setError, |
| 780 onDone: () { | 777 onDone: () { |
| 781 future._setError(new AsyncError( | 778 future._setError(new StateError("Not enough elements for elementAt")); |
| 782 new StateError("Not enough elements for elementAt"))); | |
| 783 }, | 779 }, |
| 784 unsubscribeOnError: true); | 780 unsubscribeOnError: true); |
| 785 return future; | 781 return future; |
| 786 } | 782 } |
| 787 } | 783 } |
| 788 | 784 |
| 789 /** | 785 /** |
| 790 * A control object for the subscription on a [Stream]. | 786 * A control object for the subscription on a [Stream]. |
| 791 * | 787 * |
| 792 * When you subscribe on a [Stream] using [Stream.listen], | 788 * When you subscribe on a [Stream] using [Stream.listen], |
| 793 * a [StreamSubscription] object is returned. This object | 789 * a [StreamSubscription] object is returned. This object |
| 794 * is used to later unsubscribe again, or to temporarily pause | 790 * is used to later unsubscribe again, or to temporarily pause |
| 795 * the stream's events. | 791 * the stream's events. |
| 796 */ | 792 */ |
| 797 abstract class StreamSubscription<T> { | 793 abstract class StreamSubscription<T> { |
| 798 /** | 794 /** |
| 799 * Cancels this subscription. It will no longer receive events. | 795 * Cancels this subscription. It will no longer receive events. |
| 800 * | 796 * |
| 801 * If an event is currently firing, this unsubscription will only | 797 * If an event is currently firing, this unsubscription will only |
| 802 * take effect after all subscribers have received the current event. | 798 * take effect after all subscribers have received the current event. |
| 803 */ | 799 */ |
| 804 void cancel(); | 800 void cancel(); |
| 805 | 801 |
| 806 /** Set or override the data event handler of this subscription. */ | 802 /** Set or override the data event handler of this subscription. */ |
| 807 void onData(void handleData(T data)); | 803 void onData(void handleData(T data)); |
| 808 | 804 |
| 809 /** Set or override the error event handler of this subscription. */ | 805 /** Set or override the error event handler of this subscription. */ |
| 810 void onError(void handleError(AsyncError error)); | 806 void onError(void handleError(error)); |
| 811 | 807 |
| 812 /** Set or override the done event handler of this subscription. */ | 808 /** Set or override the done event handler of this subscription. */ |
| 813 void onDone(void handleDone()); | 809 void onDone(void handleDone()); |
| 814 | 810 |
| 815 /** | 811 /** |
| 816 * Request that the stream pauses events until further notice. | 812 * Request that the stream pauses events until further notice. |
| 817 * | 813 * |
| 818 * If [resumeSignal] is provided, the stream will undo the pause | 814 * If [resumeSignal] is provided, the stream will undo the pause |
| 819 * when the future completes. If the future completes with an error, | 815 * when the future completes. If the future completes with an error, |
| 820 * it will not be handled! | 816 * it will not be handled! |
| (...skipping 26 matching lines...) Expand all Loading... |
| 847 } | 843 } |
| 848 | 844 |
| 849 | 845 |
| 850 /** | 846 /** |
| 851 * An interface that abstracts creation or handling of [Stream] events. | 847 * An interface that abstracts creation or handling of [Stream] events. |
| 852 */ | 848 */ |
| 853 abstract class EventSink<T> { | 849 abstract class EventSink<T> { |
| 854 /** Create a data event */ | 850 /** Create a data event */ |
| 855 void add(T event); | 851 void add(T event); |
| 856 /** Create an async error. */ | 852 /** Create an async error. */ |
| 857 void addError(AsyncError errorEvent); | 853 void addError(errorEvent); |
| 858 /** Request a stream to close. */ | 854 /** Request a stream to close. */ |
| 859 void close(); | 855 void close(); |
| 860 } | 856 } |
| 861 | 857 |
| 862 | 858 |
| 863 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 859 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
| 864 class StreamView<T> extends Stream<T> { | 860 class StreamView<T> extends Stream<T> { |
| 865 Stream<T> _stream; | 861 Stream<T> _stream; |
| 866 | 862 |
| 867 StreamView(this._stream); | 863 StreamView(this._stream); |
| 868 | 864 |
| 869 bool get isBroadcast => _stream.isBroadcast; | 865 bool get isBroadcast => _stream.isBroadcast; |
| 870 | 866 |
| 871 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); | 867 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); |
| 872 | 868 |
| 873 StreamSubscription<T> listen(void onData(T value), | 869 StreamSubscription<T> listen(void onData(T value), |
| 874 { void onError(AsyncError error), | 870 { void onError(error), |
| 875 void onDone(), | 871 void onDone(), |
| 876 bool unsubscribeOnError }) { | 872 bool unsubscribeOnError }) { |
| 877 return _stream.listen(onData, onError: onError, onDone: onDone, | 873 return _stream.listen(onData, onError: onError, onDone: onDone, |
| 878 unsubscribeOnError: unsubscribeOnError); | 874 unsubscribeOnError: unsubscribeOnError); |
| 879 } | 875 } |
| 880 } | 876 } |
| 881 | 877 |
| 882 /** | 878 /** |
| 883 * [EventSink] wrapper that only exposes the [EventSink] interface. | 879 * [EventSink] wrapper that only exposes the [EventSink] interface. |
| 884 */ | 880 */ |
| 885 class _EventSinkView<T> extends EventSink<T> { | 881 class _EventSinkView<T> extends EventSink<T> { |
| 886 final EventSink<T> _sink; | 882 final EventSink<T> _sink; |
| 887 | 883 |
| 888 _EventSinkView(this._sink); | 884 _EventSinkView(this._sink); |
| 889 | 885 |
| 890 void add(T value) { _sink.add(value); } | 886 void add(T value) { _sink.add(value); } |
| 891 void addError(AsyncError error) { _sink.addError(error); } | 887 void addError(error) { _sink.addError(error); } |
| 892 void close() { _sink.close(); } | 888 void close() { _sink.close(); } |
| 893 } | 889 } |
| 894 | 890 |
| 895 | 891 |
| 896 /** | 892 /** |
| 897 * The target of a [Stream.pipe] call. | 893 * The target of a [Stream.pipe] call. |
| 898 * | 894 * |
| 899 * The [Stream.pipe] call will pass itself to this object, and then return | 895 * The [Stream.pipe] call will pass itself to this object, and then return |
| 900 * the resulting [Future]. The pipe should complete the future when it's | 896 * the resulting [Future]. The pipe should complete the future when it's |
| 901 * done. | 897 * done. |
| (...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 960 * | 956 * |
| 961 * stringStream.transform(new StreamTransformer<String, String>( | 957 * stringStream.transform(new StreamTransformer<String, String>( |
| 962 * handleData: (Strung value, EventSink<String> sink) { | 958 * handleData: (Strung value, EventSink<String> sink) { |
| 963 * sink.add(value); | 959 * sink.add(value); |
| 964 * sink.add(value); // Duplicate the incoming events. | 960 * sink.add(value); // Duplicate the incoming events. |
| 965 * })); | 961 * })); |
| 966 * | 962 * |
| 967 */ | 963 */ |
| 968 factory StreamTransformer({ | 964 factory StreamTransformer({ |
| 969 void handleData(S data, EventSink<T> sink), | 965 void handleData(S data, EventSink<T> sink), |
| 970 void handleError(AsyncError error, EventSink<T> sink), | 966 void handleError(error, EventSink<T> sink), |
| 971 void handleDone(EventSink<T> sink)}) { | 967 void handleDone(EventSink<T> sink)}) { |
| 972 return new _StreamTransformerImpl<S, T>(handleData, | 968 return new _StreamTransformerImpl<S, T>(handleData, |
| 973 handleError, | 969 handleError, |
| 974 handleDone); | 970 handleDone); |
| 975 } | 971 } |
| 976 | 972 |
| 977 Stream<T> bind(Stream<S> stream); | 973 Stream<T> bind(Stream<S> stream); |
| 978 } | 974 } |
| 979 | 975 |
| 980 | 976 |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1044 var data = event; | 1040 var data = event; |
| 1045 sink.add(data); | 1041 sink.add(data); |
| 1046 } | 1042 } |
| 1047 | 1043 |
| 1048 /** | 1044 /** |
| 1049 * Act on incoming error event. | 1045 * Act on incoming error event. |
| 1050 * | 1046 * |
| 1051 * The method may generate any number of events on the sink, but should | 1047 * The method may generate any number of events on the sink, but should |
| 1052 * not throw. | 1048 * not throw. |
| 1053 */ | 1049 */ |
| 1054 void handleError(AsyncError error, EventSink<T> sink) { | 1050 void handleError(error, EventSink<T> sink) { |
| 1055 sink.addError(error); | 1051 sink.addError(error); |
| 1056 } | 1052 } |
| 1057 | 1053 |
| 1058 /** | 1054 /** |
| 1059 * Act on incoming done event. | 1055 * Act on incoming done event. |
| 1060 * | 1056 * |
| 1061 * The method may generate any number of events on the sink, but should | 1057 * The method may generate any number of events on the sink, but should |
| 1062 * not throw. | 1058 * not throw. |
| 1063 */ | 1059 */ |
| 1064 void handleDone(EventSink<T> sink){ | 1060 void handleDone(EventSink<T> sink){ |
| (...skipping 11 matching lines...) Expand all Loading... |
| 1076 * events on this stream. | 1072 * events on this stream. |
| 1077 */ | 1073 */ |
| 1078 class EventTransformStream<S, T> extends Stream<T> { | 1074 class EventTransformStream<S, T> extends Stream<T> { |
| 1079 final Stream<S> _source; | 1075 final Stream<S> _source; |
| 1080 final StreamEventTransformer _transformer; | 1076 final StreamEventTransformer _transformer; |
| 1081 EventTransformStream(Stream<S> source, | 1077 EventTransformStream(Stream<S> source, |
| 1082 StreamEventTransformer<S, T> transformer) | 1078 StreamEventTransformer<S, T> transformer) |
| 1083 : _source = source, _transformer = transformer; | 1079 : _source = source, _transformer = transformer; |
| 1084 | 1080 |
| 1085 StreamSubscription<T> listen(void onData(T data), | 1081 StreamSubscription<T> listen(void onData(T data), |
| 1086 { void onError(AsyncError error), | 1082 { void onError(error), |
| 1087 void onDone(), | 1083 void onDone(), |
| 1088 bool unsubscribeOnError }) { | 1084 bool unsubscribeOnError }) { |
| 1089 unsubscribeOnError = identical(true, unsubscribeOnError); | 1085 unsubscribeOnError = identical(true, unsubscribeOnError); |
| 1090 return new _EventTransformStreamSubscription(_source, _transformer, | 1086 return new _EventTransformStreamSubscription(_source, _transformer, |
| 1091 onData, onError, onDone, | 1087 onData, onError, onDone, |
| 1092 unsubscribeOnError); | 1088 unsubscribeOnError); |
| 1093 } | 1089 } |
| 1094 } | 1090 } |
| 1095 | 1091 |
| 1096 class _EventTransformStreamSubscription<S, T> | 1092 class _EventTransformStreamSubscription<S, T> |
| 1097 extends _BaseStreamSubscription<T> | 1093 extends _BaseStreamSubscription<T> |
| 1098 implements _EventOutputSink<T> { | 1094 implements _EventOutputSink<T> { |
| 1099 /** The transformer used to transform events. */ | 1095 /** The transformer used to transform events. */ |
| 1100 final StreamEventTransformer<S, T> _transformer; | 1096 final StreamEventTransformer<S, T> _transformer; |
| 1101 /** Whether to unsubscribe when emitting an error. */ | 1097 /** Whether to unsubscribe when emitting an error. */ |
| 1102 final bool _unsubscribeOnError; | 1098 final bool _unsubscribeOnError; |
| 1103 /** Whether this stream has sent a done event. */ | 1099 /** Whether this stream has sent a done event. */ |
| 1104 bool _isClosed = false; | 1100 bool _isClosed = false; |
| 1105 /** Source of incoming events. */ | 1101 /** Source of incoming events. */ |
| 1106 StreamSubscription<S> _subscription; | 1102 StreamSubscription<S> _subscription; |
| 1107 /** Cached EventSink wrapper for this class. */ | 1103 /** Cached EventSink wrapper for this class. */ |
| 1108 EventSink<T> _sink; | 1104 EventSink<T> _sink; |
| 1109 | 1105 |
| 1110 _EventTransformStreamSubscription(Stream<S> source, | 1106 _EventTransformStreamSubscription(Stream<S> source, |
| 1111 this._transformer, | 1107 this._transformer, |
| 1112 void onData(T data), | 1108 void onData(T data), |
| 1113 void onError(AsyncError error), | 1109 void onError(error), |
| 1114 void onDone(), | 1110 void onDone(), |
| 1115 this._unsubscribeOnError) | 1111 this._unsubscribeOnError) |
| 1116 : super(onData, onError, onDone) { | 1112 : super(onData, onError, onDone) { |
| 1117 _sink = new _EventOutputSinkWrapper<T>(this); | 1113 _sink = new _EventOutputSinkWrapper<T>(this); |
| 1118 _subscription = source.listen(_handleData, | 1114 _subscription = source.listen(_handleData, |
| 1119 onError: _handleError, | 1115 onError: _handleError, |
| 1120 onDone: _handleDone); | 1116 onDone: _handleDone); |
| 1121 } | 1117 } |
| 1122 | 1118 |
| 1123 /** Whether this subscription is still subscribed to its source. */ | 1119 /** Whether this subscription is still subscribed to its source. */ |
| (...skipping 17 matching lines...) Expand all Loading... |
| 1141 } | 1137 } |
| 1142 | 1138 |
| 1143 void _handleData(S data) { | 1139 void _handleData(S data) { |
| 1144 try { | 1140 try { |
| 1145 _transformer.handleData(data, _sink); | 1141 _transformer.handleData(data, _sink); |
| 1146 } catch (e, s) { | 1142 } catch (e, s) { |
| 1147 _sendError(_asyncError(e, s)); | 1143 _sendError(_asyncError(e, s)); |
| 1148 } | 1144 } |
| 1149 } | 1145 } |
| 1150 | 1146 |
| 1151 void _handleError(AsyncError error) { | 1147 void _handleError(error) { |
| 1152 try { | 1148 try { |
| 1153 _transformer.handleError(error, _sink); | 1149 _transformer.handleError(error, _sink); |
| 1154 } catch (e, s) { | 1150 } catch (e, s) { |
| 1155 _sendError(_asyncError(e, s, error)); | 1151 _sendError(_asyncError(e, s)); |
| 1156 } | 1152 } |
| 1157 } | 1153 } |
| 1158 | 1154 |
| 1159 void _handleDone() { | 1155 void _handleDone() { |
| 1160 try { | 1156 try { |
| 1161 _subscription = null; | 1157 _subscription = null; |
| 1162 _transformer.handleDone(_sink); | 1158 _transformer.handleDone(_sink); |
| 1163 } catch (e, s) { | 1159 } catch (e, s) { |
| 1164 _sendError(_asyncError(e, s)); | 1160 _sendError(_asyncError(e, s)); |
| 1165 } | 1161 } |
| 1166 } | 1162 } |
| 1167 | 1163 |
| 1168 // EventOutputSink interface. | 1164 // EventOutputSink interface. |
| 1169 void _sendData(T data) { | 1165 void _sendData(T data) { |
| 1170 if (_isClosed) return; | 1166 if (_isClosed) return; |
| 1171 _onData(data); | 1167 _onData(data); |
| 1172 } | 1168 } |
| 1173 | 1169 |
| 1174 void _sendError(AsyncError error) { | 1170 void _sendError(error) { |
| 1175 if (_isClosed) return; | 1171 if (_isClosed) return; |
| 1176 _onError(error); | 1172 _onError(error); |
| 1177 if (_unsubscribeOnError) { | 1173 if (_unsubscribeOnError) { |
| 1178 cancel(); | 1174 cancel(); |
| 1179 } | 1175 } |
| 1180 } | 1176 } |
| 1181 | 1177 |
| 1182 void _sendDone() { | 1178 void _sendDone() { |
| 1183 if (_isClosed) throw new StateError("Already closed."); | 1179 if (_isClosed) throw new StateError("Already closed."); |
| 1184 _isClosed = true; | 1180 _isClosed = true; |
| 1185 if (_isSubscribed) { | 1181 if (_isSubscribed) { |
| 1186 _subscription.cancel(); | 1182 _subscription.cancel(); |
| 1187 _subscription = null; | 1183 _subscription = null; |
| 1188 } | 1184 } |
| 1189 _onDone(); | 1185 _onDone(); |
| 1190 } | 1186 } |
| 1191 } | 1187 } |
| 1192 | 1188 |
| 1193 class _EventOutputSinkWrapper<T> extends EventSink<T> { | 1189 class _EventOutputSinkWrapper<T> extends EventSink<T> { |
| 1194 _EventOutputSink _sink; | 1190 _EventOutputSink _sink; |
| 1195 _EventOutputSinkWrapper(this._sink); | 1191 _EventOutputSinkWrapper(this._sink); |
| 1196 | 1192 |
| 1197 void add(T data) { _sink._sendData(data); } | 1193 void add(T data) { _sink._sendData(data); } |
| 1198 void addError(AsyncError error) { _sink._sendError(error); } | 1194 void addError(error) { _sink._sendError(error); } |
| 1199 void close() { _sink._sendDone(); } | 1195 void close() { _sink._sendDone(); } |
| 1200 } | 1196 } |
| OLD | NEW |