Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(212)

Side by Side Diff: runtime/bin/socket_patch.dart

Issue 250513002: Add support for cloning server-sockets. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rename to reference. Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 patch class RawServerSocket { 5 patch class RawServerSocket {
6 /* patch */ static Future<RawServerSocket> bind(address, 6 /* patch */ static Future<RawServerSocket> bind(address,
7 int port, 7 int port,
8 {int backlog: 0, 8 {int backlog: 0,
9 bool v6Only: false}) { 9 bool v6Only: false}) {
10 return _RawServerSocket.bind(address, port, backlog, v6Only); 10 return _RawServerSocket.bind(address, port, backlog, v6Only);
(...skipping 372 matching lines...) Expand 10 before | Expand all | Expand 10 after
383 383
384 // Native port messages. 384 // Native port messages.
385 static const HOST_NAME_LOOKUP = 0; 385 static const HOST_NAME_LOOKUP = 0;
386 static const LIST_INTERFACES = 1; 386 static const LIST_INTERFACES = 1;
387 static const REVERSE_LOOKUP = 2; 387 static const REVERSE_LOOKUP = 2;
388 388
389 // Protocol flags. 389 // Protocol flags.
390 static const int PROTOCOL_IPV4 = 1 << 0; 390 static const int PROTOCOL_IPV4 = 1 << 0;
391 static const int PROTOCOL_IPV6 = 1 << 1; 391 static const int PROTOCOL_IPV6 = 1 << 1;
392 392
393 static const int NORMAL_TOKEN_BATCH_SIZE = 8;
394 static const int LISTENING_TOKEN_BATCH_SIZE = 2;
395
393 // Socket close state 396 // Socket close state
394 bool isClosed = false; 397 bool isClosed = false;
395 bool isClosing = false; 398 bool isClosing = false;
396 bool isClosedRead = false; 399 bool isClosedRead = false;
397 bool isClosedWrite = false; 400 bool isClosedWrite = false;
398 Completer closeCompleter = new Completer.sync(); 401 Completer closeCompleter = new Completer.sync();
399 402
400 // Handlers and receive port for socket events from the event handler. 403 // Handlers and receive port for socket events from the event handler.
401 final List eventHandlers = new List(EVENT_COUNT + 1); 404 final List eventHandlers = new List(EVENT_COUNT + 1);
402 RawReceivePort eventPort; 405 RawReceivePort eventPort;
(...skipping 302 matching lines...) Expand 10 before | Expand all | Expand 10 after
705 } 708 }
706 return result; 709 return result;
707 } 710 }
708 711
709 _NativeSocket accept() { 712 _NativeSocket accept() {
710 // Don't issue accept if we're closing. 713 // Don't issue accept if we're closing.
711 if (isClosing || isClosed) return null; 714 if (isClosing || isClosed) return null;
712 assert(available > 0); 715 assert(available > 0);
713 available--; 716 available--;
714 tokens++; 717 tokens++;
715 returnTokens(); 718 returnTokens(LISTENING_TOKEN_BATCH_SIZE);
716 var socket = new _NativeSocket.normal(); 719 var socket = new _NativeSocket.normal();
717 if (nativeAccept(socket) != true) return null; 720 if (nativeAccept(socket) != true) return null;
718 socket.localPort = localPort; 721 socket.localPort = localPort;
719 socket.address = address; 722 socket.address = address;
720 totalRead += 1; 723 totalRead += 1;
721 return socket; 724 return socket;
722 } 725 }
723 726
724 int get port { 727 int get port {
725 if (localPort != 0) return localPort; 728 if (localPort != 0) return localPort;
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
838 } 841 }
839 } else if (!isClosed) { 842 } else if (!isClosed) {
840 // If the connection is closed right after it's accepted, there's a 843 // If the connection is closed right after it's accepted, there's a
841 // chance the close-handler is not set. 844 // chance the close-handler is not set.
842 if (handler != null) handler(); 845 if (handler != null) handler();
843 } 846 }
844 } 847 }
845 } 848 }
846 if (!isListening) { 849 if (!isListening) {
847 tokens++; 850 tokens++;
848 returnTokens(); 851 returnTokens(NORMAL_TOKEN_BATCH_SIZE);
849 } 852 }
850 } 853 }
851 854
852 void returnTokens() { 855 void returnTokens(int tokenBatchSize) {
853 if (eventPort != null && !isClosing && !isClosed) { 856 if (eventPort != null && !isClosing && !isClosed) {
854 if (tokens == 8) { 857 // Return in batches.
855 // Return in batches of 8. 858 if (tokens == tokenBatchSize) {
856 assert(tokens < (1 << FIRST_COMMAND)); 859 assert(tokens < (1 << FIRST_COMMAND));
857 sendToEventHandler((1 << RETURN_TOKEN_COMMAND) | tokens); 860 sendToEventHandler((1 << RETURN_TOKEN_COMMAND) | tokens);
858 tokens = 0; 861 tokens = 0;
859 } 862 }
860 } 863 }
861 } 864 }
862 865
863 void setHandlers({read, write, error, closed, destroyed}) { 866 void setHandlers({read, write, error, closed, destroyed}) {
864 eventHandlers[READ_EVENT] = read; 867 eventHandlers[READ_EVENT] = read;
865 eventHandlers[WRITE_EVENT] = write; 868 eventHandlers[WRITE_EVENT] = write;
(...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after
1059 native "Socket_SendTo"; 1062 native "Socket_SendTo";
1060 nativeCreateConnect(List<int> addr, 1063 nativeCreateConnect(List<int> addr,
1061 int port) native "Socket_CreateConnect"; 1064 int port) native "Socket_CreateConnect";
1062 nativeCreateBindListen(List<int> addr, int port, int backlog, bool v6Only) 1065 nativeCreateBindListen(List<int> addr, int port, int backlog, bool v6Only)
1063 native "ServerSocket_CreateBindListen"; 1066 native "ServerSocket_CreateBindListen";
1064 nativeCreateBindDatagram(List<int> addr, int port, bool reuseAddress) 1067 nativeCreateBindDatagram(List<int> addr, int port, bool reuseAddress)
1065 native "Socket_CreateBindDatagram"; 1068 native "Socket_CreateBindDatagram";
1066 nativeAccept(_NativeSocket socket) native "ServerSocket_Accept"; 1069 nativeAccept(_NativeSocket socket) native "ServerSocket_Accept";
1067 int nativeGetPort() native "Socket_GetPort"; 1070 int nativeGetPort() native "Socket_GetPort";
1068 List nativeGetRemotePeer() native "Socket_GetRemotePeer"; 1071 List nativeGetRemotePeer() native "Socket_GetRemotePeer";
1072 int nativeGetSocketId() native "Socket_GetSocketId";
1069 OSError nativeGetError() native "Socket_GetError"; 1073 OSError nativeGetError() native "Socket_GetError";
1070 nativeGetOption(int option, int protocol) native "Socket_GetOption"; 1074 nativeGetOption(int option, int protocol) native "Socket_GetOption";
1071 bool nativeSetOption(int option, int protocol, value) 1075 bool nativeSetOption(int option, int protocol, value)
1072 native "Socket_SetOption"; 1076 native "Socket_SetOption";
1073 bool nativeJoinMulticast( 1077 bool nativeJoinMulticast(
1074 List<int> addr, List<int> interfaceAddr, int interfaceIndex) 1078 List<int> addr, List<int> interfaceAddr, int interfaceIndex)
1075 native "Socket_JoinMulticast"; 1079 native "Socket_JoinMulticast";
1076 bool nativeLeaveMulticast( 1080 bool nativeLeaveMulticast(
1077 List<int> addr, List<int> interfaceAddr, int interfaceIndex) 1081 List<int> addr, List<int> interfaceAddr, int interfaceIndex)
1078 native "Socket_LeaveMulticast"; 1082 native "Socket_LeaveMulticast";
1079 } 1083 }
1080 1084
1081 1085
1082 class _RawServerSocket extends Stream<RawSocket> 1086 class _RawServerSocket extends Stream<RawSocket>
1083 implements RawServerSocket { 1087 implements RawServerSocket {
1084 final _NativeSocket _socket; 1088 final _NativeSocket _socket;
1085 StreamController<RawSocket> _controller; 1089 StreamController<RawSocket> _controller;
1090 ReceivePort _port;
Søren Gjesse 2014/05/06 08:47:39 Call this variable something less generic, e.g. _r
Anders Johnsen 2014/05/06 12:34:32 Done.
1086 1091
1087 static Future<_RawServerSocket> bind(address, 1092 static Future<_RawServerSocket> bind(address,
1088 int port, 1093 int port,
1089 int backlog, 1094 int backlog,
1090 bool v6Only) { 1095 bool v6Only) {
1091 if (port < 0 || port > 0xFFFF) 1096 if (port < 0 || port > 0xFFFF)
1092 throw new ArgumentError("Invalid port $port"); 1097 throw new ArgumentError("Invalid port $port");
1093 if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog"); 1098 if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog");
1094 return _NativeSocket.bind(address, port, backlog, v6Only) 1099 return _NativeSocket.bind(address, port, backlog, v6Only)
1095 .then((socket) => new _RawServerSocket(socket)); 1100 .then((socket) => new _RawServerSocket(socket));
(...skipping 20 matching lines...) Expand all
1116 var socket = _socket.accept(); 1121 var socket = _socket.accept();
1117 if (socket == null) return; 1122 if (socket == null) return;
1118 _controller.add(new _RawSocket(socket)); 1123 _controller.add(new _RawSocket(socket));
1119 if (_controller.isPaused) return; 1124 if (_controller.isPaused) return;
1120 } 1125 }
1121 }), 1126 }),
1122 error: zone.bindUnaryCallback((e) { 1127 error: zone.bindUnaryCallback((e) {
1123 _controller.addError(e); 1128 _controller.addError(e);
1124 _controller.close(); 1129 _controller.close();
1125 }), 1130 }),
1126 destroyed: _controller.close); 1131 destroyed: () {
1132 _controller.close();
1133 if (_port != null) {
1134 _port.close();
1135 }
1136 });
1127 return _controller.stream.listen( 1137 return _controller.stream.listen(
1128 onData, 1138 onData,
1129 onError: onError, 1139 onError: onError,
1130 onDone: onDone, 1140 onDone: onDone,
1131 cancelOnError: cancelOnError); 1141 cancelOnError: cancelOnError);
1132 } 1142 }
1133 1143
1134 int get port => _socket.port; 1144 int get port => _socket.port;
1135 1145
1136 InternetAddress get address => _socket.address; 1146 InternetAddress get address => _socket.address;
(...skipping 16 matching lines...) Expand all
1153 } 1163 }
1154 } 1164 }
1155 1165
1156 void _onPauseStateChange() { 1166 void _onPauseStateChange() {
1157 if (_controller.isPaused) { 1167 if (_controller.isPaused) {
1158 _pause(); 1168 _pause();
1159 } else { 1169 } else {
1160 _resume(); 1170 _resume();
1161 } 1171 }
1162 } 1172 }
1173
1174 RawServerSocketReference get reference {
1175 if (_port == null) {
1176 _port = new ReceivePort();
1177 _port.listen((sendPort) {
1178 sendPort.send(
1179 [_socket.nativeGetSocketId(),
1180 _socket.address,
1181 _socket.localPort]);
1182 });
1183 }
1184 return new _RawServerSocketReference(_port.sendPort);
1185 }
1163 } 1186 }
1164 1187
1165 1188
1189 class _RawServerSocketReference implements RawServerSocketReference {
1190 final SendPort _sendPort;
1191
1192 _RawServerSocketReference(this._sendPort);
1193
1194 Future<RawServerSocket> create() {
Lasse Reichstein Nielsen 2014/05/06 09:04:20 I would still prefer to just return a RawServerSoc
Anders Johnsen 2014/05/06 12:34:32 The problem is that it may return an invalid objec
1195 var port = new ReceivePort();
Lasse Reichstein Nielsen 2014/05/06 09:04:20 Consider using a RawReceivePort. It requires you t
Anders Johnsen 2014/05/06 12:34:32 Done.
1196 _sendPort.send(port.sendPort);
1197 return port.first.then((args) {
1198 port.close();
1199 var native = new _NativeSocket.listen();
1200 native.nativeSetSocketId(args[0]);
1201 native.address = args[1];
1202 native.localPort = args[2];
1203 return new _RawServerSocket(native);
1204 });
1205 }
Lasse Reichstein Nielsen 2014/05/06 09:04:20 Consider adding: int get hashCode => _sendPort.
Anders Johnsen 2014/05/06 12:34:32 Done.
1206 }
1207
1208
1166 class _RawSocket extends Stream<RawSocketEvent> 1209 class _RawSocket extends Stream<RawSocketEvent>
1167 implements RawSocket { 1210 implements RawSocket {
1168 final _NativeSocket _socket; 1211 final _NativeSocket _socket;
1169 StreamController<RawSocketEvent> _controller; 1212 StreamController<RawSocketEvent> _controller;
1170 bool _readEventsEnabled = true; 1213 bool _readEventsEnabled = true;
1171 bool _writeEventsEnabled = true; 1214 bool _writeEventsEnabled = true;
1172 1215
1173 // Flag to handle Ctrl-D closing of stdio on Mac OS. 1216 // Flag to handle Ctrl-D closing of stdio on Mac OS.
1174 bool _isMacOSTerminalInput = false; 1217 bool _isMacOSTerminalInput = false;
1175 1218
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after
1313 1356
1314 patch class ServerSocket { 1357 patch class ServerSocket {
1315 /* patch */ static Future<ServerSocket> bind(address, 1358 /* patch */ static Future<ServerSocket> bind(address,
1316 int port, 1359 int port,
1317 {int backlog: 0, 1360 {int backlog: 0,
1318 bool v6Only: false}) { 1361 bool v6Only: false}) {
1319 return _ServerSocket.bind(address, port, backlog, v6Only); 1362 return _ServerSocket.bind(address, port, backlog, v6Only);
1320 } 1363 }
1321 } 1364 }
1322 1365
1366
1367 class _ServerSocketReference implements ServerSocketReference {
1368 final RawServerSocketReference _rawReference;
1369
1370 _ServerSocketReference(this._rawReference);
1371
1372 Future<ServerSocket> create() {
1373 return _rawReference.create().then((raw) => new _ServerSocket(raw));
1374 }
1375 }
1376
1377
1323 class _ServerSocket extends Stream<Socket> 1378 class _ServerSocket extends Stream<Socket>
1324 implements ServerSocket { 1379 implements ServerSocket {
1325 final _socket; 1380 final _socket;
1326 1381
1327 static Future<_ServerSocket> bind(address, 1382 static Future<_ServerSocket> bind(address,
1328 int port, 1383 int port,
1329 int backlog, 1384 int backlog,
1330 bool v6Only) { 1385 bool v6Only) {
1331 return _RawServerSocket.bind(address, port, backlog, v6Only) 1386 return _RawServerSocket.bind(address, port, backlog, v6Only)
1332 .then((socket) => new _ServerSocket(socket)); 1387 .then((socket) => new _ServerSocket(socket));
(...skipping 10 matching lines...) Expand all
1343 onError: onError, 1398 onError: onError,
1344 onDone: onDone, 1399 onDone: onDone,
1345 cancelOnError: cancelOnError); 1400 cancelOnError: cancelOnError);
1346 } 1401 }
1347 1402
1348 int get port => _socket.port; 1403 int get port => _socket.port;
1349 1404
1350 InternetAddress get address => _socket.address; 1405 InternetAddress get address => _socket.address;
1351 1406
1352 Future close() => _socket.close().then((_) => this); 1407 Future close() => _socket.close().then((_) => this);
1408
1409 ServerSocketReference get reference {
1410 return new _ServerSocketReference(_socket.reference);
1411 }
1353 } 1412 }
1354 1413
1355 1414
1356 patch class Socket { 1415 patch class Socket {
1357 /* patch */ static Future<Socket> connect(host, int port) { 1416 /* patch */ static Future<Socket> connect(host, int port) {
1358 return RawSocket.connect(host, port).then( 1417 return RawSocket.connect(host, port).then(
1359 (socket) => new _Socket(socket)); 1418 (socket) => new _Socket(socket));
1360 } 1419 }
1361 } 1420 }
1362 1421
(...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after
1793 String address, 1852 String address,
1794 List<int> in_addr, 1853 List<int> in_addr,
1795 int port) { 1854 int port) {
1796 return new Datagram( 1855 return new Datagram(
1797 data, 1856 data,
1798 new _InternetAddress(address, null, in_addr), 1857 new _InternetAddress(address, null, in_addr),
1799 port); 1858 port);
1800 } 1859 }
1801 1860
1802 String _socketsStats() => _SocketsObservatory.toJSON(); 1861 String _socketsStats() => _SocketsObservatory.toJSON();
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698