OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 patch class RawServerSocket { | 5 patch class RawServerSocket { |
6 /* patch */ static Future<RawServerSocket> bind([String address = "127.0.0.1", | 6 /* patch */ static Future<RawServerSocket> bind([String address = "127.0.0.1", |
7 int port = 0, | 7 int port = 0, |
8 int backlog = 0]) { | 8 int backlog = 0]) { |
9 return _RawServerSocket.bind(address, port, backlog); | 9 return _RawServerSocket.bind(address, port, backlog); |
10 } | 10 } |
(...skipping 647 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
658 new _SecureSocket(rawSocket); | 658 new _SecureSocket(rawSocket); |
659 } | 659 } |
660 | 660 |
661 | 661 |
662 class _SocketStreamConsumer extends StreamConsumer<List<int>> { | 662 class _SocketStreamConsumer extends StreamConsumer<List<int>> { |
663 StreamSubscription subscription; | 663 StreamSubscription subscription; |
664 final _Socket socket; | 664 final _Socket socket; |
665 int offset; | 665 int offset; |
666 List<int> buffer; | 666 List<int> buffer; |
667 bool paused = false; | 667 bool paused = false; |
| 668 Completer streamCompleter; |
668 | 669 |
669 _SocketStreamConsumer(this.socket); | 670 _SocketStreamConsumer(this.socket); |
670 | 671 |
671 Future<Socket> consume(Stream<List<int>> stream) { | 672 Future<Socket> addStream(Stream<List<int>> stream) { |
| 673 socket._ensureRawSocketSubscription(); |
| 674 streamCompleter = new Completer<Socket>(); |
672 if (socket._raw != null) { | 675 if (socket._raw != null) { |
673 subscription = stream.listen( | 676 subscription = stream.listen( |
674 (data) { | 677 (data) { |
675 assert(!paused); | |
676 assert(buffer == null); | |
677 buffer = data; | |
678 offset = 0; | |
679 write(); | |
680 }, | |
681 onError: (error) { | |
682 socket._consumerDone(error); | |
683 }, | |
684 onDone: () { | |
685 socket._consumerDone(); | |
686 }, | |
687 unsubscribeOnError: true); | |
688 } | |
689 return socket._doneFuture; | |
690 } | |
691 | |
692 Future<Socket> addStream(Stream<List<int>> stream) { | |
693 Completer completer = new Completer<Socket>(); | |
694 if (socket._raw != null) { | |
695 subscription = stream.listen( | |
696 (data) { | |
697 assert(!paused); | 678 assert(!paused); |
698 assert(buffer == null); | 679 assert(buffer == null); |
699 buffer = data; | 680 buffer = data; |
700 offset = 0; | 681 offset = 0; |
701 write(); | 682 write(); |
702 }, | 683 }, |
703 onError: (error) { | 684 onError: (error) { |
704 socket._consumerDone(error); | 685 socket._consumerDone(); |
705 completer.completeError(error.error, error.stackTrace); | 686 done(error); |
706 }, | 687 }, |
707 onDone: () { | 688 onDone: () { |
708 completer.complete(socket); | 689 done(); |
709 }, | 690 }, |
710 unsubscribeOnError: true); | 691 unsubscribeOnError: true); |
711 } | 692 } |
712 return completer.future; | 693 return streamCompleter.future; |
713 } | 694 } |
714 | 695 |
715 Future<Socket> close() { | 696 Future<Socket> close() { |
716 socket._consumerDone(); | 697 socket._consumerDone(); |
717 return completer.future; | 698 return new Future.immediate(socket); |
718 } | 699 } |
719 | 700 |
720 void write() { | 701 void write() { |
721 try { | 702 try { |
722 if (subscription == null) return; | 703 if (subscription == null) return; |
723 assert(buffer != null); | 704 assert(buffer != null); |
724 // Write as much as possible. | 705 // Write as much as possible. |
725 offset += socket._write(buffer, offset, buffer.length - offset); | 706 offset += socket._write(buffer, offset, buffer.length - offset); |
726 if (offset < buffer.length) { | 707 if (offset < buffer.length) { |
727 if (!paused) { | 708 if (!paused) { |
728 paused = true; | 709 paused = true; |
729 // TODO(ajohnsen): It would be nice to avoid this check. | 710 // TODO(ajohnsen): It would be nice to avoid this check. |
730 // Some info: socket._write can emit an event, if it fails to write. | 711 // Some info: socket._write can emit an event, if it fails to write. |
731 // If the user closes the socket in that event, stop() will be called | 712 // If the user closes the socket in that event, stop() will be called |
732 // before we get a change to pause. | 713 // before we get a change to pause. |
733 if (subscription == null) return; | 714 if (subscription == null) return; |
734 subscription.pause(); | 715 subscription.pause(); |
735 } | 716 } |
736 socket._enableWriteEvent(); | 717 socket._enableWriteEvent(); |
737 } else { | 718 } else { |
738 buffer = null; | 719 buffer = null; |
739 if (paused) { | 720 if (paused) { |
740 paused = false; | 721 paused = false; |
741 subscription.resume(); | 722 subscription.resume(); |
742 } | 723 } |
743 } | 724 } |
744 } catch (e) { | 725 } catch (e) { |
745 stop(); | 726 stop(); |
746 socket._consumerDone(e); | 727 socket._consumerDone(); |
| 728 done(e); |
747 } | 729 } |
748 } | 730 } |
749 | 731 |
| 732 void done([error]) { |
| 733 if (streamCompleter != null) { |
| 734 var tmp = streamCompleter; |
| 735 streamCompleter = null; |
| 736 if (error != null) { |
| 737 tmp.completeError(error); |
| 738 } else { |
| 739 tmp.complete(socket); |
| 740 } |
| 741 } |
| 742 } |
| 743 |
750 void stop() { | 744 void stop() { |
751 if (subscription == null) return; | 745 if (subscription == null) return; |
752 subscription.cancel(); | 746 subscription.cancel(); |
753 subscription = null; | 747 subscription = null; |
754 socket._disableWriteEvent(); | 748 socket._disableWriteEvent(); |
755 } | 749 } |
756 } | 750 } |
757 | 751 |
758 | 752 |
759 class _Socket extends Stream<List<int>> implements Socket { | 753 class _Socket extends Stream<List<int>> implements Socket { |
760 RawSocket _raw; // Set to null when the raw socket is closed. | 754 RawSocket _raw; // Set to null when the raw socket is closed. |
761 bool _closed = false; // Set to true when the raw socket is closed. | 755 bool _closed = false; // Set to true when the raw socket is closed. |
762 StreamController _controller; | 756 StreamController _controller; |
763 bool _controllerClosed = false; | 757 bool _controllerClosed = false; |
764 _SocketStreamConsumer _consumer; | 758 _SocketStreamConsumer _consumer; |
765 IOSink<Socket> _sink; | 759 IOSink _sink; |
766 Completer _doneCompleter; | |
767 var _subscription; | 760 var _subscription; |
768 | 761 |
769 _Socket(RawSocket this._raw) { | 762 _Socket(RawSocket this._raw) { |
770 _controller = new StreamController<List<int>>( | 763 _controller = new StreamController<List<int>>( |
771 onSubscriptionStateChange: _onSubscriptionStateChange, | 764 onSubscriptionStateChange: _onSubscriptionStateChange, |
772 onPauseStateChange: _onPauseStateChange); | 765 onPauseStateChange: _onPauseStateChange); |
773 _consumer = new _SocketStreamConsumer(this); | 766 _consumer = new _SocketStreamConsumer(this); |
774 _sink = new IOSink(_consumer); | 767 _sink = new IOSink(_consumer); |
775 | 768 |
776 // Disable read events until there is a subscription. | 769 // Disable read events until there is a subscription. |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
810 void write(Object obj) => _sink.write(obj); | 803 void write(Object obj) => _sink.write(obj); |
811 | 804 |
812 void writeln([Object obj = ""]) => _sink.writeln(obj); | 805 void writeln([Object obj = ""]) => _sink.writeln(obj); |
813 | 806 |
814 void writeCharCode(int charCode) => _sink.writeCharCode(charCode); | 807 void writeCharCode(int charCode) => _sink.writeCharCode(charCode); |
815 | 808 |
816 void writeAll(Iterable objects, [sep = ""]) => _sink.writeAll(objects, sep); | 809 void writeAll(Iterable objects, [sep = ""]) => _sink.writeAll(objects, sep); |
817 | 810 |
818 void add(List<int> bytes) => _sink.add(bytes); | 811 void add(List<int> bytes) => _sink.add(bytes); |
819 | 812 |
820 Future<Socket> consume(Stream<List<int>> stream) { | 813 Future<Socket> addStream(Stream<List<int>> stream) { |
821 return _sink.consume(stream); | 814 return _sink.addStream(stream); |
822 } | 815 } |
823 | 816 |
824 Future<Socket> writeStream(Stream<List<int>> stream) { | 817 Future<Socket> close() => _sink.close(); |
825 return _sink.writeStream(stream); | |
826 } | |
827 | |
828 close() => _sink.close(); | |
829 | 818 |
830 Future<Socket> get done => _sink.done; | 819 Future<Socket> get done => _sink.done; |
831 | 820 |
832 void destroy() { | 821 void destroy() { |
833 // Destroy can always be called to get rid of a socket. | 822 // Destroy can always be called to get rid of a socket. |
834 if (_raw == null) return; | 823 if (_raw == null) return; |
835 _consumer.stop(); | 824 _consumer.stop(); |
836 _closeRawSocket(); | 825 _closeRawSocket(); |
837 _controllerClosed = true; | 826 _controllerClosed = true; |
838 _controller.close(); | 827 _controller.close(); |
839 } | 828 } |
840 | 829 |
841 bool setOption(SocketOption option, bool enabled) { | 830 bool setOption(SocketOption option, bool enabled) { |
842 if (_raw == null) return false; | 831 if (_raw == null) return false; |
843 return _raw.setOption(option, enabled); | 832 return _raw.setOption(option, enabled); |
844 } | 833 } |
845 | 834 |
846 int get port => _raw.port; | 835 int get port => _raw.port; |
847 String get remoteHost => _raw.remoteHost; | 836 String get remoteHost => _raw.remoteHost; |
848 int get remotePort => _raw.remotePort; | 837 int get remotePort => _raw.remotePort; |
849 | 838 |
850 // Ensure a subscription on the raw socket. Both the stream and the | 839 // Ensure a subscription on the raw socket. Both the stream and the |
851 // consumer needs a subscription as they share the error and done | 840 // consumer needs a subscription as they share the error and done |
852 // events from the raw socket. | 841 // events from the raw socket. |
853 void _ensureRawSocketSubscription() { | 842 void _ensureRawSocketSubscription() { |
854 if (_subscription == null) { | 843 if (_subscription == null && _raw != null) { |
855 _subscription = _raw.listen(_onData, | 844 _subscription = _raw.listen(_onData, |
856 onError: _onError, | 845 onError: _onError, |
857 onDone: _onDone, | 846 onDone: _onDone, |
858 unsubscribeOnError: true); | 847 unsubscribeOnError: true); |
859 } | 848 } |
860 } | 849 } |
861 | 850 |
862 _closeRawSocket() { | 851 _closeRawSocket() { |
863 var tmp = _raw; | 852 var tmp = _raw; |
864 _raw = null; | 853 _raw = null; |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
901 _controller.close(); | 890 _controller.close(); |
902 break; | 891 break; |
903 } | 892 } |
904 } | 893 } |
905 | 894 |
906 void _onDone() { | 895 void _onDone() { |
907 if (!_controllerClosed) { | 896 if (!_controllerClosed) { |
908 _controllerClosed = true; | 897 _controllerClosed = true; |
909 _controller.close(); | 898 _controller.close(); |
910 } | 899 } |
911 _done(); | 900 _consumer.done(); |
912 } | 901 } |
913 | 902 |
914 void _onError(error) { | 903 void _onError(error) { |
915 if (!_controllerClosed) { | 904 if (!_controllerClosed) { |
916 _controllerClosed = true; | 905 _controllerClosed = true; |
917 _controller.addError(error); | 906 _controller.addError(error); |
918 _controller.close(); | 907 _controller.close(); |
919 } | 908 } |
920 _done(error); | 909 _consumer.done(error); |
921 } | |
922 | |
923 get _doneFuture { | |
924 if (_doneCompleter == null) { | |
925 _ensureRawSocketSubscription(); | |
926 _doneCompleter = new Completer(); | |
927 } | |
928 return _doneCompleter.future; | |
929 } | |
930 | |
931 void _done([error]) { | |
932 if (_doneCompleter != null) { | |
933 var tmp = _doneCompleter; | |
934 _doneCompleter = null; | |
935 if (error != null) { | |
936 tmp.completeError(error); | |
937 } else { | |
938 tmp.complete(this); | |
939 } | |
940 } | |
941 } | 910 } |
942 | 911 |
943 int _write(List<int> data, int offset, int length) => | 912 int _write(List<int> data, int offset, int length) => |
944 _raw.write(data, offset, length); | 913 _raw.write(data, offset, length); |
945 | 914 |
946 void _enableWriteEvent() { | 915 void _enableWriteEvent() { |
947 _raw.writeEventsEnabled = true; | 916 _raw.writeEventsEnabled = true; |
948 } | 917 } |
949 | 918 |
950 void _disableWriteEvent() { | 919 void _disableWriteEvent() { |
951 if (_raw != null) { | 920 if (_raw != null) { |
952 _raw.writeEventsEnabled = false; | 921 _raw.writeEventsEnabled = false; |
953 } | 922 } |
954 } | 923 } |
955 | 924 |
956 void _consumerDone([error]) { | 925 void _consumerDone() { |
957 _done(error); | |
958 if (_raw != null) { | 926 if (_raw != null) { |
959 _raw.shutdown(SocketDirection.SEND); | 927 _raw.shutdown(SocketDirection.SEND); |
960 _disableWriteEvent(); | 928 _disableWriteEvent(); |
961 } | 929 } |
962 } | 930 } |
963 } | 931 } |
964 | 932 |
965 | 933 |
966 class _SecureSocket extends _Socket implements SecureSocket { | 934 class _SecureSocket extends _Socket implements SecureSocket { |
967 _SecureSocket(RawSecureSocket raw) : super(raw); | 935 _SecureSocket(RawSecureSocket raw) : super(raw); |
968 | 936 |
969 void set onBadCertificate(bool callback(X509Certificate certificate)) { | 937 void set onBadCertificate(bool callback(X509Certificate certificate)) { |
970 if (_raw == null) { | 938 if (_raw == null) { |
971 throw new StateError("onBadCertificate called on destroyed SecureSocket"); | 939 throw new StateError("onBadCertificate called on destroyed SecureSocket"); |
972 } | 940 } |
973 _raw.onBadCertificate = callback; | 941 _raw.onBadCertificate = callback; |
974 } | 942 } |
975 | 943 |
976 X509Certificate get peerCertificate { | 944 X509Certificate get peerCertificate { |
977 if (_raw == null) { | 945 if (_raw == null) { |
978 throw new StateError("peerCertificate called on destroyed SecureSocket"); | 946 throw new StateError("peerCertificate called on destroyed SecureSocket"); |
979 } | 947 } |
980 return _raw.peerCertificate; | 948 return _raw.peerCertificate; |
981 } | 949 } |
982 } | 950 } |
OLD | NEW |