| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 patch class RawServerSocket { | 5 patch class RawServerSocket { |
| 6 /* patch */ static Future<RawServerSocket> bind([String address = "127.0.0.1", | 6 /* patch */ static Future<RawServerSocket> bind([String address = "127.0.0.1", |
| 7 int port = 0, | 7 int port = 0, |
| 8 int backlog = 0]) { | 8 int backlog = 0]) { |
| 9 return _RawServerSocket.bind(address, port, backlog); | 9 return _RawServerSocket.bind(address, port, backlog); |
| 10 } | 10 } |
| (...skipping 647 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 658 new _SecureSocket(rawSocket); | 658 new _SecureSocket(rawSocket); |
| 659 } | 659 } |
| 660 | 660 |
| 661 | 661 |
| 662 class _SocketStreamConsumer extends StreamConsumer<List<int>> { | 662 class _SocketStreamConsumer extends StreamConsumer<List<int>> { |
| 663 StreamSubscription subscription; | 663 StreamSubscription subscription; |
| 664 final _Socket socket; | 664 final _Socket socket; |
| 665 int offset; | 665 int offset; |
| 666 List<int> buffer; | 666 List<int> buffer; |
| 667 bool paused = false; | 667 bool paused = false; |
| 668 Completer streamCompleter; |
| 668 | 669 |
| 669 _SocketStreamConsumer(this.socket); | 670 _SocketStreamConsumer(this.socket); |
| 670 | 671 |
| 671 Future<Socket> consume(Stream<List<int>> stream) { | 672 Future<Socket> addStream(Stream<List<int>> stream) { |
| 673 socket._ensureRawSocketSubscription(); |
| 674 streamCompleter = new Completer<Socket>(); |
| 672 if (socket._raw != null) { | 675 if (socket._raw != null) { |
| 673 subscription = stream.listen( | 676 subscription = stream.listen( |
| 674 (data) { | 677 (data) { |
| 675 assert(!paused); | |
| 676 assert(buffer == null); | |
| 677 buffer = data; | |
| 678 offset = 0; | |
| 679 write(); | |
| 680 }, | |
| 681 onError: (error) { | |
| 682 socket._consumerDone(error); | |
| 683 }, | |
| 684 onDone: () { | |
| 685 socket._consumerDone(); | |
| 686 }, | |
| 687 unsubscribeOnError: true); | |
| 688 } | |
| 689 return socket._doneFuture; | |
| 690 } | |
| 691 | |
| 692 Future<Socket> addStream(Stream<List<int>> stream) { | |
| 693 Completer completer = new Completer<Socket>(); | |
| 694 if (socket._raw != null) { | |
| 695 subscription = stream.listen( | |
| 696 (data) { | |
| 697 assert(!paused); | 678 assert(!paused); |
| 698 assert(buffer == null); | 679 assert(buffer == null); |
| 699 buffer = data; | 680 buffer = data; |
| 700 offset = 0; | 681 offset = 0; |
| 701 write(); | 682 write(); |
| 702 }, | 683 }, |
| 703 onError: (error) { | 684 onError: (error) { |
| 704 socket._consumerDone(error); | 685 socket._consumerDone(); |
| 705 completer.completeError(error.error, error.stackTrace); | 686 done(error); |
| 706 }, | 687 }, |
| 707 onDone: () { | 688 onDone: () { |
| 708 completer.complete(socket); | 689 done(); |
| 709 }, | 690 }, |
| 710 unsubscribeOnError: true); | 691 unsubscribeOnError: true); |
| 711 } | 692 } |
| 712 return completer.future; | 693 return streamCompleter.future; |
| 713 } | 694 } |
| 714 | 695 |
| 715 Future<Socket> close() { | 696 Future<Socket> close() { |
| 716 socket._consumerDone(); | 697 socket._consumerDone(); |
| 717 return completer.future; | 698 return new Future.immediate(socket); |
| 718 } | 699 } |
| 719 | 700 |
| 720 void write() { | 701 void write() { |
| 721 try { | 702 try { |
| 722 if (subscription == null) return; | 703 if (subscription == null) return; |
| 723 assert(buffer != null); | 704 assert(buffer != null); |
| 724 // Write as much as possible. | 705 // Write as much as possible. |
| 725 offset += socket._write(buffer, offset, buffer.length - offset); | 706 offset += socket._write(buffer, offset, buffer.length - offset); |
| 726 if (offset < buffer.length) { | 707 if (offset < buffer.length) { |
| 727 if (!paused) { | 708 if (!paused) { |
| 728 paused = true; | 709 paused = true; |
| 729 // TODO(ajohnsen): It would be nice to avoid this check. | 710 // TODO(ajohnsen): It would be nice to avoid this check. |
| 730 // Some info: socket._write can emit an event, if it fails to write. | 711 // Some info: socket._write can emit an event, if it fails to write. |
| 731 // If the user closes the socket in that event, stop() will be called | 712 // If the user closes the socket in that event, stop() will be called |
| 732 // before we get a change to pause. | 713 // before we get a change to pause. |
| 733 if (subscription == null) return; | 714 if (subscription == null) return; |
| 734 subscription.pause(); | 715 subscription.pause(); |
| 735 } | 716 } |
| 736 socket._enableWriteEvent(); | 717 socket._enableWriteEvent(); |
| 737 } else { | 718 } else { |
| 738 buffer = null; | 719 buffer = null; |
| 739 if (paused) { | 720 if (paused) { |
| 740 paused = false; | 721 paused = false; |
| 741 subscription.resume(); | 722 subscription.resume(); |
| 742 } | 723 } |
| 743 } | 724 } |
| 744 } catch (e) { | 725 } catch (e) { |
| 745 stop(); | 726 stop(); |
| 746 socket._consumerDone(e); | 727 socket._consumerDone(); |
| 728 done(e); |
| 747 } | 729 } |
| 748 } | 730 } |
| 749 | 731 |
| 732 void done([error]) { |
| 733 if (streamCompleter != null) { |
| 734 var tmp = streamCompleter; |
| 735 streamCompleter = null; |
| 736 if (error != null) { |
| 737 tmp.completeError(error); |
| 738 } else { |
| 739 tmp.complete(socket); |
| 740 } |
| 741 } |
| 742 } |
| 743 |
| 750 void stop() { | 744 void stop() { |
| 751 if (subscription == null) return; | 745 if (subscription == null) return; |
| 752 subscription.cancel(); | 746 subscription.cancel(); |
| 753 subscription = null; | 747 subscription = null; |
| 754 socket._disableWriteEvent(); | 748 socket._disableWriteEvent(); |
| 755 } | 749 } |
| 756 } | 750 } |
| 757 | 751 |
| 758 | 752 |
| 759 class _Socket extends Stream<List<int>> implements Socket { | 753 class _Socket extends Stream<List<int>> implements Socket { |
| 760 RawSocket _raw; // Set to null when the raw socket is closed. | 754 RawSocket _raw; // Set to null when the raw socket is closed. |
| 761 bool _closed = false; // Set to true when the raw socket is closed. | 755 bool _closed = false; // Set to true when the raw socket is closed. |
| 762 StreamController _controller; | 756 StreamController _controller; |
| 763 bool _controllerClosed = false; | 757 bool _controllerClosed = false; |
| 764 _SocketStreamConsumer _consumer; | 758 _SocketStreamConsumer _consumer; |
| 765 IOSink<Socket> _sink; | 759 IOSink _sink; |
| 766 Completer _doneCompleter; | |
| 767 var _subscription; | 760 var _subscription; |
| 768 | 761 |
| 769 _Socket(RawSocket this._raw) { | 762 _Socket(RawSocket this._raw) { |
| 770 _controller = new StreamController<List<int>>( | 763 _controller = new StreamController<List<int>>( |
| 771 onSubscriptionStateChange: _onSubscriptionStateChange, | 764 onSubscriptionStateChange: _onSubscriptionStateChange, |
| 772 onPauseStateChange: _onPauseStateChange); | 765 onPauseStateChange: _onPauseStateChange); |
| 773 _consumer = new _SocketStreamConsumer(this); | 766 _consumer = new _SocketStreamConsumer(this); |
| 774 _sink = new IOSink(_consumer); | 767 _sink = new IOSink(_consumer); |
| 775 | 768 |
| 776 // Disable read events until there is a subscription. | 769 // Disable read events until there is a subscription. |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 810 void write(Object obj) => _sink.write(obj); | 803 void write(Object obj) => _sink.write(obj); |
| 811 | 804 |
| 812 void writeln([Object obj = ""]) => _sink.writeln(obj); | 805 void writeln([Object obj = ""]) => _sink.writeln(obj); |
| 813 | 806 |
| 814 void writeCharCode(int charCode) => _sink.writeCharCode(charCode); | 807 void writeCharCode(int charCode) => _sink.writeCharCode(charCode); |
| 815 | 808 |
| 816 void writeAll(Iterable objects, [sep = ""]) => _sink.writeAll(objects, sep); | 809 void writeAll(Iterable objects, [sep = ""]) => _sink.writeAll(objects, sep); |
| 817 | 810 |
| 818 void add(List<int> bytes) => _sink.add(bytes); | 811 void add(List<int> bytes) => _sink.add(bytes); |
| 819 | 812 |
| 820 Future<Socket> consume(Stream<List<int>> stream) { | 813 Future<Socket> addStream(Stream<List<int>> stream) { |
| 821 return _sink.consume(stream); | 814 return _sink.addStream(stream); |
| 822 } | 815 } |
| 823 | 816 |
| 824 Future<Socket> writeStream(Stream<List<int>> stream) { | 817 Future<Socket> close() => _sink.close(); |
| 825 return _sink.writeStream(stream); | |
| 826 } | |
| 827 | |
| 828 close() => _sink.close(); | |
| 829 | 818 |
| 830 Future<Socket> get done => _sink.done; | 819 Future<Socket> get done => _sink.done; |
| 831 | 820 |
| 832 void destroy() { | 821 void destroy() { |
| 833 // Destroy can always be called to get rid of a socket. | 822 // Destroy can always be called to get rid of a socket. |
| 834 if (_raw == null) return; | 823 if (_raw == null) return; |
| 835 _consumer.stop(); | 824 _consumer.stop(); |
| 836 _closeRawSocket(); | 825 _closeRawSocket(); |
| 837 _controllerClosed = true; | 826 _controllerClosed = true; |
| 838 _controller.close(); | 827 _controller.close(); |
| 839 } | 828 } |
| 840 | 829 |
| 841 bool setOption(SocketOption option, bool enabled) { | 830 bool setOption(SocketOption option, bool enabled) { |
| 842 if (_raw == null) return false; | 831 if (_raw == null) return false; |
| 843 return _raw.setOption(option, enabled); | 832 return _raw.setOption(option, enabled); |
| 844 } | 833 } |
| 845 | 834 |
| 846 int get port => _raw.port; | 835 int get port => _raw.port; |
| 847 String get remoteHost => _raw.remoteHost; | 836 String get remoteHost => _raw.remoteHost; |
| 848 int get remotePort => _raw.remotePort; | 837 int get remotePort => _raw.remotePort; |
| 849 | 838 |
| 850 // Ensure a subscription on the raw socket. Both the stream and the | 839 // Ensure a subscription on the raw socket. Both the stream and the |
| 851 // consumer needs a subscription as they share the error and done | 840 // consumer needs a subscription as they share the error and done |
| 852 // events from the raw socket. | 841 // events from the raw socket. |
| 853 void _ensureRawSocketSubscription() { | 842 void _ensureRawSocketSubscription() { |
| 854 if (_subscription == null) { | 843 if (_subscription == null && _raw != null) { |
| 855 _subscription = _raw.listen(_onData, | 844 _subscription = _raw.listen(_onData, |
| 856 onError: _onError, | 845 onError: _onError, |
| 857 onDone: _onDone, | 846 onDone: _onDone, |
| 858 unsubscribeOnError: true); | 847 unsubscribeOnError: true); |
| 859 } | 848 } |
| 860 } | 849 } |
| 861 | 850 |
| 862 _closeRawSocket() { | 851 _closeRawSocket() { |
| 863 var tmp = _raw; | 852 var tmp = _raw; |
| 864 _raw = null; | 853 _raw = null; |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 901 _controller.close(); | 890 _controller.close(); |
| 902 break; | 891 break; |
| 903 } | 892 } |
| 904 } | 893 } |
| 905 | 894 |
| 906 void _onDone() { | 895 void _onDone() { |
| 907 if (!_controllerClosed) { | 896 if (!_controllerClosed) { |
| 908 _controllerClosed = true; | 897 _controllerClosed = true; |
| 909 _controller.close(); | 898 _controller.close(); |
| 910 } | 899 } |
| 911 _done(); | 900 _consumer.done(); |
| 912 } | 901 } |
| 913 | 902 |
| 914 void _onError(error) { | 903 void _onError(error) { |
| 915 if (!_controllerClosed) { | 904 if (!_controllerClosed) { |
| 916 _controllerClosed = true; | 905 _controllerClosed = true; |
| 917 _controller.addError(error); | 906 _controller.addError(error); |
| 918 _controller.close(); | 907 _controller.close(); |
| 919 } | 908 } |
| 920 _done(error); | 909 _consumer.done(error); |
| 921 } | |
| 922 | |
| 923 get _doneFuture { | |
| 924 if (_doneCompleter == null) { | |
| 925 _ensureRawSocketSubscription(); | |
| 926 _doneCompleter = new Completer(); | |
| 927 } | |
| 928 return _doneCompleter.future; | |
| 929 } | |
| 930 | |
| 931 void _done([error]) { | |
| 932 if (_doneCompleter != null) { | |
| 933 var tmp = _doneCompleter; | |
| 934 _doneCompleter = null; | |
| 935 if (error != null) { | |
| 936 tmp.completeError(error); | |
| 937 } else { | |
| 938 tmp.complete(this); | |
| 939 } | |
| 940 } | |
| 941 } | 910 } |
| 942 | 911 |
| 943 int _write(List<int> data, int offset, int length) => | 912 int _write(List<int> data, int offset, int length) => |
| 944 _raw.write(data, offset, length); | 913 _raw.write(data, offset, length); |
| 945 | 914 |
| 946 void _enableWriteEvent() { | 915 void _enableWriteEvent() { |
| 947 _raw.writeEventsEnabled = true; | 916 _raw.writeEventsEnabled = true; |
| 948 } | 917 } |
| 949 | 918 |
| 950 void _disableWriteEvent() { | 919 void _disableWriteEvent() { |
| 951 if (_raw != null) { | 920 if (_raw != null) { |
| 952 _raw.writeEventsEnabled = false; | 921 _raw.writeEventsEnabled = false; |
| 953 } | 922 } |
| 954 } | 923 } |
| 955 | 924 |
| 956 void _consumerDone([error]) { | 925 void _consumerDone() { |
| 957 _done(error); | |
| 958 if (_raw != null) { | 926 if (_raw != null) { |
| 959 _raw.shutdown(SocketDirection.SEND); | 927 _raw.shutdown(SocketDirection.SEND); |
| 960 _disableWriteEvent(); | 928 _disableWriteEvent(); |
| 961 } | 929 } |
| 962 } | 930 } |
| 963 } | 931 } |
| 964 | 932 |
| 965 | 933 |
| 966 class _SecureSocket extends _Socket implements SecureSocket { | 934 class _SecureSocket extends _Socket implements SecureSocket { |
| 967 _SecureSocket(RawSecureSocket raw) : super(raw); | 935 _SecureSocket(RawSecureSocket raw) : super(raw); |
| 968 | 936 |
| 969 void set onBadCertificate(bool callback(X509Certificate certificate)) { | 937 void set onBadCertificate(bool callback(X509Certificate certificate)) { |
| 970 if (_raw == null) { | 938 if (_raw == null) { |
| 971 throw new StateError("onBadCertificate called on destroyed SecureSocket"); | 939 throw new StateError("onBadCertificate called on destroyed SecureSocket"); |
| 972 } | 940 } |
| 973 _raw.onBadCertificate = callback; | 941 _raw.onBadCertificate = callback; |
| 974 } | 942 } |
| 975 | 943 |
| 976 X509Certificate get peerCertificate { | 944 X509Certificate get peerCertificate { |
| 977 if (_raw == null) { | 945 if (_raw == null) { |
| 978 throw new StateError("peerCertificate called on destroyed SecureSocket"); | 946 throw new StateError("peerCertificate called on destroyed SecureSocket"); |
| 979 } | 947 } |
| 980 return _raw.peerCertificate; | 948 return _raw.peerCertificate; |
| 981 } | 949 } |
| 982 } | 950 } |
| OLD | NEW |