Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(15)

Side by Side Diff: runtime/bin/socket_patch.dart

Issue 14028017: Remove .writeStream, .consume and rewrite IOSink to correctly implement a (sane) well-defined behav… (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Review comments. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 patch class RawServerSocket { 5 patch class RawServerSocket {
6 /* patch */ static Future<RawServerSocket> bind([String address = "127.0.0.1", 6 /* patch */ static Future<RawServerSocket> bind([String address = "127.0.0.1",
7 int port = 0, 7 int port = 0,
8 int backlog = 0]) { 8 int backlog = 0]) {
9 return _RawServerSocket.bind(address, port, backlog); 9 return _RawServerSocket.bind(address, port, backlog);
10 } 10 }
(...skipping 647 matching lines...) Expand 10 before | Expand all | Expand 10 after
658 new _SecureSocket(rawSocket); 658 new _SecureSocket(rawSocket);
659 } 659 }
660 660
661 661
662 class _SocketStreamConsumer extends StreamConsumer<List<int>> { 662 class _SocketStreamConsumer extends StreamConsumer<List<int>> {
663 StreamSubscription subscription; 663 StreamSubscription subscription;
664 final _Socket socket; 664 final _Socket socket;
665 int offset; 665 int offset;
666 List<int> buffer; 666 List<int> buffer;
667 bool paused = false; 667 bool paused = false;
668 Completer streamCompleter;
668 669
669 _SocketStreamConsumer(this.socket); 670 _SocketStreamConsumer(this.socket);
670 671
671 Future<Socket> consume(Stream<List<int>> stream) { 672 Future<Socket> addStream(Stream<List<int>> stream) {
673 socket._ensureRawSocketSubscription();
674 streamCompleter = new Completer<Socket>();
672 if (socket._raw != null) { 675 if (socket._raw != null) {
673 subscription = stream.listen( 676 subscription = stream.listen(
674 (data) { 677 (data) {
675 assert(!paused);
676 assert(buffer == null);
677 buffer = data;
678 offset = 0;
679 write();
680 },
681 onError: (error) {
682 socket._consumerDone(error);
683 },
684 onDone: () {
685 socket._consumerDone();
686 },
687 unsubscribeOnError: true);
688 }
689 return socket._doneFuture;
690 }
691
692 Future<Socket> addStream(Stream<List<int>> stream) {
693 Completer completer = new Completer<Socket>();
694 if (socket._raw != null) {
695 subscription = stream.listen(
696 (data) {
697 assert(!paused); 678 assert(!paused);
698 assert(buffer == null); 679 assert(buffer == null);
699 buffer = data; 680 buffer = data;
700 offset = 0; 681 offset = 0;
701 write(); 682 write();
702 }, 683 },
703 onError: (error) { 684 onError: (error) {
704 socket._consumerDone(error); 685 socket._consumerDone();
705 completer.completeError(error.error, error.stackTrace); 686 done(error);
706 }, 687 },
707 onDone: () { 688 onDone: () {
708 completer.complete(socket); 689 done();
709 }, 690 },
710 unsubscribeOnError: true); 691 unsubscribeOnError: true);
711 } 692 }
712 return completer.future; 693 return streamCompleter.future;
713 } 694 }
714 695
715 Future<Socket> close() { 696 Future<Socket> close() {
716 socket._consumerDone(); 697 socket._consumerDone();
717 return completer.future; 698 return new Future.immediate(socket);
718 } 699 }
719 700
720 void write() { 701 void write() {
721 try { 702 try {
722 if (subscription == null) return; 703 if (subscription == null) return;
723 assert(buffer != null); 704 assert(buffer != null);
724 // Write as much as possible. 705 // Write as much as possible.
725 offset += socket._write(buffer, offset, buffer.length - offset); 706 offset += socket._write(buffer, offset, buffer.length - offset);
726 if (offset < buffer.length) { 707 if (offset < buffer.length) {
727 if (!paused) { 708 if (!paused) {
728 paused = true; 709 paused = true;
729 // TODO(ajohnsen): It would be nice to avoid this check. 710 // TODO(ajohnsen): It would be nice to avoid this check.
730 // Some info: socket._write can emit an event, if it fails to write. 711 // Some info: socket._write can emit an event, if it fails to write.
731 // If the user closes the socket in that event, stop() will be called 712 // If the user closes the socket in that event, stop() will be called
732 // before we get a change to pause. 713 // before we get a change to pause.
733 if (subscription == null) return; 714 if (subscription == null) return;
734 subscription.pause(); 715 subscription.pause();
735 } 716 }
736 socket._enableWriteEvent(); 717 socket._enableWriteEvent();
737 } else { 718 } else {
738 buffer = null; 719 buffer = null;
739 if (paused) { 720 if (paused) {
740 paused = false; 721 paused = false;
741 subscription.resume(); 722 subscription.resume();
742 } 723 }
743 } 724 }
744 } catch (e) { 725 } catch (e) {
745 stop(); 726 stop();
746 socket._consumerDone(e); 727 socket._consumerDone();
728 done(e);
747 } 729 }
748 } 730 }
749 731
732 void done([error]) {
733 if (streamCompleter != null) {
734 var tmp = streamCompleter;
735 streamCompleter = null;
736 if (error != null) {
737 tmp.completeError(error);
738 } else {
739 tmp.complete(socket);
740 }
741 }
742 }
743
750 void stop() { 744 void stop() {
751 if (subscription == null) return; 745 if (subscription == null) return;
752 subscription.cancel(); 746 subscription.cancel();
753 subscription = null; 747 subscription = null;
754 socket._disableWriteEvent(); 748 socket._disableWriteEvent();
755 } 749 }
756 } 750 }
757 751
758 752
759 class _Socket extends Stream<List<int>> implements Socket { 753 class _Socket extends Stream<List<int>> implements Socket {
760 RawSocket _raw; // Set to null when the raw socket is closed. 754 RawSocket _raw; // Set to null when the raw socket is closed.
761 bool _closed = false; // Set to true when the raw socket is closed. 755 bool _closed = false; // Set to true when the raw socket is closed.
762 StreamController _controller; 756 StreamController _controller;
763 bool _controllerClosed = false; 757 bool _controllerClosed = false;
764 _SocketStreamConsumer _consumer; 758 _SocketStreamConsumer _consumer;
765 IOSink<Socket> _sink; 759 IOSink _sink;
766 Completer _doneCompleter;
767 var _subscription; 760 var _subscription;
768 761
769 _Socket(RawSocket this._raw) { 762 _Socket(RawSocket this._raw) {
770 _controller = new StreamController<List<int>>( 763 _controller = new StreamController<List<int>>(
771 onSubscriptionStateChange: _onSubscriptionStateChange, 764 onSubscriptionStateChange: _onSubscriptionStateChange,
772 onPauseStateChange: _onPauseStateChange); 765 onPauseStateChange: _onPauseStateChange);
773 _consumer = new _SocketStreamConsumer(this); 766 _consumer = new _SocketStreamConsumer(this);
774 _sink = new IOSink(_consumer); 767 _sink = new IOSink(_consumer);
775 768
776 // Disable read events until there is a subscription. 769 // Disable read events until there is a subscription.
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
810 void write(Object obj) => _sink.write(obj); 803 void write(Object obj) => _sink.write(obj);
811 804
812 void writeln([Object obj = ""]) => _sink.writeln(obj); 805 void writeln([Object obj = ""]) => _sink.writeln(obj);
813 806
814 void writeCharCode(int charCode) => _sink.writeCharCode(charCode); 807 void writeCharCode(int charCode) => _sink.writeCharCode(charCode);
815 808
816 void writeAll(Iterable objects, [sep = ""]) => _sink.writeAll(objects, sep); 809 void writeAll(Iterable objects, [sep = ""]) => _sink.writeAll(objects, sep);
817 810
818 void add(List<int> bytes) => _sink.add(bytes); 811 void add(List<int> bytes) => _sink.add(bytes);
819 812
820 Future<Socket> consume(Stream<List<int>> stream) { 813 Future<Socket> addStream(Stream<List<int>> stream) {
821 return _sink.consume(stream); 814 return _sink.addStream(stream);
822 } 815 }
823 816
824 Future<Socket> writeStream(Stream<List<int>> stream) { 817 Future<Socket> close() => _sink.close();
825 return _sink.writeStream(stream);
826 }
827
828 close() => _sink.close();
829 818
830 Future<Socket> get done => _sink.done; 819 Future<Socket> get done => _sink.done;
831 820
832 void destroy() { 821 void destroy() {
833 // Destroy can always be called to get rid of a socket. 822 // Destroy can always be called to get rid of a socket.
834 if (_raw == null) return; 823 if (_raw == null) return;
835 _consumer.stop(); 824 _consumer.stop();
836 _closeRawSocket(); 825 _closeRawSocket();
837 _controllerClosed = true; 826 _controllerClosed = true;
838 _controller.close(); 827 _controller.close();
839 } 828 }
840 829
841 bool setOption(SocketOption option, bool enabled) { 830 bool setOption(SocketOption option, bool enabled) {
842 if (_raw == null) return false; 831 if (_raw == null) return false;
843 return _raw.setOption(option, enabled); 832 return _raw.setOption(option, enabled);
844 } 833 }
845 834
846 int get port => _raw.port; 835 int get port => _raw.port;
847 String get remoteHost => _raw.remoteHost; 836 String get remoteHost => _raw.remoteHost;
848 int get remotePort => _raw.remotePort; 837 int get remotePort => _raw.remotePort;
849 838
850 // Ensure a subscription on the raw socket. Both the stream and the 839 // Ensure a subscription on the raw socket. Both the stream and the
851 // consumer needs a subscription as they share the error and done 840 // consumer needs a subscription as they share the error and done
852 // events from the raw socket. 841 // events from the raw socket.
853 void _ensureRawSocketSubscription() { 842 void _ensureRawSocketSubscription() {
854 if (_subscription == null) { 843 if (_subscription == null && _raw != null) {
855 _subscription = _raw.listen(_onData, 844 _subscription = _raw.listen(_onData,
856 onError: _onError, 845 onError: _onError,
857 onDone: _onDone, 846 onDone: _onDone,
858 unsubscribeOnError: true); 847 unsubscribeOnError: true);
859 } 848 }
860 } 849 }
861 850
862 _closeRawSocket() { 851 _closeRawSocket() {
863 var tmp = _raw; 852 var tmp = _raw;
864 _raw = null; 853 _raw = null;
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
901 _controller.close(); 890 _controller.close();
902 break; 891 break;
903 } 892 }
904 } 893 }
905 894
906 void _onDone() { 895 void _onDone() {
907 if (!_controllerClosed) { 896 if (!_controllerClosed) {
908 _controllerClosed = true; 897 _controllerClosed = true;
909 _controller.close(); 898 _controller.close();
910 } 899 }
911 _done(); 900 _consumer.done();
912 } 901 }
913 902
914 void _onError(error) { 903 void _onError(error) {
915 if (!_controllerClosed) { 904 if (!_controllerClosed) {
916 _controllerClosed = true; 905 _controllerClosed = true;
917 _controller.addError(error); 906 _controller.addError(error);
918 _controller.close(); 907 _controller.close();
919 } 908 }
920 _done(error); 909 _consumer.done(error);
921 }
922
923 get _doneFuture {
924 if (_doneCompleter == null) {
925 _ensureRawSocketSubscription();
926 _doneCompleter = new Completer();
927 }
928 return _doneCompleter.future;
929 }
930
931 void _done([error]) {
932 if (_doneCompleter != null) {
933 var tmp = _doneCompleter;
934 _doneCompleter = null;
935 if (error != null) {
936 tmp.completeError(error);
937 } else {
938 tmp.complete(this);
939 }
940 }
941 } 910 }
942 911
943 int _write(List<int> data, int offset, int length) => 912 int _write(List<int> data, int offset, int length) =>
944 _raw.write(data, offset, length); 913 _raw.write(data, offset, length);
945 914
946 void _enableWriteEvent() { 915 void _enableWriteEvent() {
947 _raw.writeEventsEnabled = true; 916 _raw.writeEventsEnabled = true;
948 } 917 }
949 918
950 void _disableWriteEvent() { 919 void _disableWriteEvent() {
951 if (_raw != null) { 920 if (_raw != null) {
952 _raw.writeEventsEnabled = false; 921 _raw.writeEventsEnabled = false;
953 } 922 }
954 } 923 }
955 924
956 void _consumerDone([error]) { 925 void _consumerDone() {
957 _done(error);
958 if (_raw != null) { 926 if (_raw != null) {
959 _raw.shutdown(SocketDirection.SEND); 927 _raw.shutdown(SocketDirection.SEND);
960 _disableWriteEvent(); 928 _disableWriteEvent();
961 } 929 }
962 } 930 }
963 } 931 }
964 932
965 933
966 class _SecureSocket extends _Socket implements SecureSocket { 934 class _SecureSocket extends _Socket implements SecureSocket {
967 _SecureSocket(RawSecureSocket raw) : super(raw); 935 _SecureSocket(RawSecureSocket raw) : super(raw);
968 936
969 void set onBadCertificate(bool callback(X509Certificate certificate)) { 937 void set onBadCertificate(bool callback(X509Certificate certificate)) {
970 if (_raw == null) { 938 if (_raw == null) {
971 throw new StateError("onBadCertificate called on destroyed SecureSocket"); 939 throw new StateError("onBadCertificate called on destroyed SecureSocket");
972 } 940 }
973 _raw.onBadCertificate = callback; 941 _raw.onBadCertificate = callback;
974 } 942 }
975 943
976 X509Certificate get peerCertificate { 944 X509Certificate get peerCertificate {
977 if (_raw == null) { 945 if (_raw == null) {
978 throw new StateError("peerCertificate called on destroyed SecureSocket"); 946 throw new StateError("peerCertificate called on destroyed SecureSocket");
979 } 947 }
980 return _raw.peerCertificate; 948 return _raw.peerCertificate;
981 } 949 }
982 } 950 }
OLDNEW
« no previous file with comments | « pkg/scheduled_test/lib/src/scheduled_server/safe_http_server.dart ('k') | sdk/lib/async/stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698