Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(369)

Side by Side Diff: runtime/bin/socket_patch.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Made tests run (mostly) Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 patch class RawServerSocket { 5 patch class RawServerSocket {
6 /* patch */ static Future<RawServerSocket> bind(address, 6 /* patch */ static Future<RawServerSocket> bind(address,
7 int port, 7 int port,
8 {int backlog: 0, 8 {int backlog: 0,
9 bool v6Only: false}) { 9 bool v6Only: false}) {
10 return _RawServerSocket.bind(address, port, backlog, v6Only); 10 return _RawServerSocket.bind(address, port, backlog, v6Only);
(...skipping 791 matching lines...) Expand 10 before | Expand all | Expand 10 after
802 List<int> buffer; 802 List<int> buffer;
803 bool paused = false; 803 bool paused = false;
804 Completer streamCompleter; 804 Completer streamCompleter;
805 805
806 _SocketStreamConsumer(this.socket); 806 _SocketStreamConsumer(this.socket);
807 807
808 Future<Socket> addStream(Stream<List<int>> stream) { 808 Future<Socket> addStream(Stream<List<int>> stream) {
809 socket._ensureRawSocketSubscription(); 809 socket._ensureRawSocketSubscription();
810 streamCompleter = new Completer<Socket>(); 810 streamCompleter = new Completer<Socket>();
811 if (socket._raw != null) { 811 if (socket._raw != null) {
812 assert(!!!paused);
Anders Johnsen 2013/05/22 13:07:08 !!!?
Lasse Reichstein Nielsen 2013/05/24 06:02:49 !!!!! Deliberately made stupid to mark it as debug
812 subscription = stream.listen( 813 subscription = stream.listen(
813 (data) { 814 (data) {
814 assert(!paused); 815 assert(!paused);
815 assert(buffer == null); 816 assert(buffer == null);
816 buffer = data; 817 buffer = data;
817 offset = 0; 818 offset = 0;
818 write(); 819 write();
819 }, 820 },
820 onError: (error) { 821 onError: (error) {
821 socket._consumerDone(); 822 socket._consumerDone();
(...skipping 13 matching lines...) Expand all
835 } 836 }
836 837
837 void write() { 838 void write() {
838 try { 839 try {
839 if (subscription == null) return; 840 if (subscription == null) return;
840 assert(buffer != null); 841 assert(buffer != null);
841 // Write as much as possible. 842 // Write as much as possible.
842 offset += socket._write(buffer, offset, buffer.length - offset); 843 offset += socket._write(buffer, offset, buffer.length - offset);
843 if (offset < buffer.length) { 844 if (offset < buffer.length) {
844 if (!paused) { 845 if (!paused) {
845 paused = true;
846 // TODO(ajohnsen): It would be nice to avoid this check. 846 // TODO(ajohnsen): It would be nice to avoid this check.
847 // Some info: socket._write can emit an event, if it fails to write. 847 // Some info: socket._write can emit an event, if it fails to write.
848 // If the user closes the socket in that event, stop() will be called 848 // If the user closes the socket in that event, stop() will be called
849 // before we get a change to pause. 849 // before we get a change to pause.
850 if (subscription == null) return; 850 if (subscription == null) return;
Anders Johnsen 2013/05/22 13:07:08 This line (and the comment) can be removed now, du
Lasse Reichstein Nielsen 2013/05/24 06:02:49 YEY
851 paused = true;
Anders Johnsen 2013/05/22 13:07:08 Can you try without these pause?
Lasse Reichstein Nielsen 2013/05/24 06:02:49 I'll try.
851 subscription.pause(); 852 subscription.pause();
852 } 853 }
853 socket._enableWriteEvent(); 854 socket._enableWriteEvent();
854 } else { 855 } else {
855 buffer = null; 856 buffer = null;
856 if (paused) { 857 if (paused) {
857 paused = false; 858 paused = false;
858 subscription.resume(); 859 subscription.resume();
859 } 860 }
860 } 861 }
(...skipping 12 matching lines...) Expand all
873 streamCompleter.complete(socket); 874 streamCompleter.complete(socket);
874 } 875 }
875 streamCompleter = null; 876 streamCompleter = null;
876 } 877 }
877 } 878 }
878 879
879 void stop() { 880 void stop() {
880 if (subscription == null) return; 881 if (subscription == null) return;
881 subscription.cancel(); 882 subscription.cancel();
882 subscription = null; 883 subscription = null;
884 paused = false;
883 socket._disableWriteEvent(); 885 socket._disableWriteEvent();
884 } 886 }
885 } 887 }
886 888
887 889
888 class _Socket extends Stream<List<int>> implements Socket { 890 class _Socket extends Stream<List<int>> implements Socket {
889 RawSocket _raw; // Set to null when the raw socket is closed. 891 RawSocket _raw; // Set to null when the raw socket is closed.
890 bool _closed = false; // Set to true when the raw socket is closed. 892 bool _closed = false; // Set to true when the raw socket is closed.
891 StreamController _controller; 893 StreamController _controller;
892 bool _controllerClosed = false; 894 bool _controllerClosed = false;
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after
1094 _raw.onBadCertificate = callback; 1096 _raw.onBadCertificate = callback;
1095 } 1097 }
1096 1098
1097 X509Certificate get peerCertificate { 1099 X509Certificate get peerCertificate {
1098 if (_raw == null) { 1100 if (_raw == null) {
1099 throw new StateError("peerCertificate called on destroyed SecureSocket"); 1101 throw new StateError("peerCertificate called on destroyed SecureSocket");
1100 } 1102 }
1101 return _raw.peerCertificate; 1103 return _raw.peerCertificate;
1102 } 1104 }
1103 } 1105 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698