Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(439)

Side by Side Diff: runtime/bin/socket_patch.dart

Issue 250513002: Add support for cloning server-sockets. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Review feedback. Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/bin/socket.cc ('k') | sdk/lib/io/socket.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 patch class RawServerSocket { 5 patch class RawServerSocket {
6 /* patch */ static Future<RawServerSocket> bind(address, 6 /* patch */ static Future<RawServerSocket> bind(address,
7 int port, 7 int port,
8 {int backlog: 0, 8 {int backlog: 0,
9 bool v6Only: false}) { 9 bool v6Only: false}) {
10 return _RawServerSocket.bind(address, port, backlog, v6Only); 10 return _RawServerSocket.bind(address, port, backlog, v6Only);
(...skipping 372 matching lines...) Expand 10 before | Expand all | Expand 10 after
383 383
384 // Native port messages. 384 // Native port messages.
385 static const HOST_NAME_LOOKUP = 0; 385 static const HOST_NAME_LOOKUP = 0;
386 static const LIST_INTERFACES = 1; 386 static const LIST_INTERFACES = 1;
387 static const REVERSE_LOOKUP = 2; 387 static const REVERSE_LOOKUP = 2;
388 388
389 // Protocol flags. 389 // Protocol flags.
390 static const int PROTOCOL_IPV4 = 1 << 0; 390 static const int PROTOCOL_IPV4 = 1 << 0;
391 static const int PROTOCOL_IPV6 = 1 << 1; 391 static const int PROTOCOL_IPV6 = 1 << 1;
392 392
393 static const int NORMAL_TOKEN_BATCH_SIZE = 8;
394 static const int LISTENING_TOKEN_BATCH_SIZE = 2;
395
393 // Socket close state 396 // Socket close state
394 bool isClosed = false; 397 bool isClosed = false;
395 bool isClosing = false; 398 bool isClosing = false;
396 bool isClosedRead = false; 399 bool isClosedRead = false;
397 bool isClosedWrite = false; 400 bool isClosedWrite = false;
398 Completer closeCompleter = new Completer.sync(); 401 Completer closeCompleter = new Completer.sync();
399 402
400 // Handlers and receive port for socket events from the event handler. 403 // Handlers and receive port for socket events from the event handler.
401 final List eventHandlers = new List(EVENT_COUNT + 1); 404 final List eventHandlers = new List(EVENT_COUNT + 1);
402 RawReceivePort eventPort; 405 RawReceivePort eventPort;
(...skipping 302 matching lines...) Expand 10 before | Expand all | Expand 10 after
705 } 708 }
706 return result; 709 return result;
707 } 710 }
708 711
709 _NativeSocket accept() { 712 _NativeSocket accept() {
710 // Don't issue accept if we're closing. 713 // Don't issue accept if we're closing.
711 if (isClosing || isClosed) return null; 714 if (isClosing || isClosed) return null;
712 assert(available > 0); 715 assert(available > 0);
713 available--; 716 available--;
714 tokens++; 717 tokens++;
715 returnTokens(); 718 returnTokens(LISTENING_TOKEN_BATCH_SIZE);
716 var socket = new _NativeSocket.normal(); 719 var socket = new _NativeSocket.normal();
717 if (nativeAccept(socket) != true) return null; 720 if (nativeAccept(socket) != true) return null;
718 socket.localPort = localPort; 721 socket.localPort = localPort;
719 socket.address = address; 722 socket.address = address;
720 totalRead += 1; 723 totalRead += 1;
721 return socket; 724 return socket;
722 } 725 }
723 726
724 int get port { 727 int get port {
725 if (localPort != 0) return localPort; 728 if (localPort != 0) return localPort;
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after
839 } 842 }
840 } else if (!isClosed) { 843 } else if (!isClosed) {
841 // If the connection is closed right after it's accepted, there's a 844 // If the connection is closed right after it's accepted, there's a
842 // chance the close-handler is not set. 845 // chance the close-handler is not set.
843 if (handler != null) handler(); 846 if (handler != null) handler();
844 } 847 }
845 } 848 }
846 } 849 }
847 if (!isListening) { 850 if (!isListening) {
848 tokens++; 851 tokens++;
849 returnTokens(); 852 returnTokens(NORMAL_TOKEN_BATCH_SIZE);
850 } 853 }
851 } 854 }
852 855
853 void returnTokens() { 856 void returnTokens(int tokenBatchSize) {
854 if (eventPort != null && !isClosing && !isClosed) { 857 if (eventPort != null && !isClosing && !isClosed) {
855 if (tokens == 8) { 858 // Return in batches.
856 // Return in batches of 8. 859 if (tokens == tokenBatchSize) {
857 assert(tokens < (1 << FIRST_COMMAND)); 860 assert(tokens < (1 << FIRST_COMMAND));
858 sendToEventHandler((1 << RETURN_TOKEN_COMMAND) | tokens); 861 sendToEventHandler((1 << RETURN_TOKEN_COMMAND) | tokens);
859 tokens = 0; 862 tokens = 0;
860 } 863 }
861 } 864 }
862 } 865 }
863 866
864 void setHandlers({read, write, error, closed, destroyed}) { 867 void setHandlers({read, write, error, closed, destroyed}) {
865 eventHandlers[READ_EVENT] = read; 868 eventHandlers[READ_EVENT] = read;
866 eventHandlers[WRITE_EVENT] = write; 869 eventHandlers[WRITE_EVENT] = write;
(...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after
1061 native "Socket_SendTo"; 1064 native "Socket_SendTo";
1062 nativeCreateConnect(List<int> addr, 1065 nativeCreateConnect(List<int> addr,
1063 int port) native "Socket_CreateConnect"; 1066 int port) native "Socket_CreateConnect";
1064 nativeCreateBindListen(List<int> addr, int port, int backlog, bool v6Only) 1067 nativeCreateBindListen(List<int> addr, int port, int backlog, bool v6Only)
1065 native "ServerSocket_CreateBindListen"; 1068 native "ServerSocket_CreateBindListen";
1066 nativeCreateBindDatagram(List<int> addr, int port, bool reuseAddress) 1069 nativeCreateBindDatagram(List<int> addr, int port, bool reuseAddress)
1067 native "Socket_CreateBindDatagram"; 1070 native "Socket_CreateBindDatagram";
1068 nativeAccept(_NativeSocket socket) native "ServerSocket_Accept"; 1071 nativeAccept(_NativeSocket socket) native "ServerSocket_Accept";
1069 int nativeGetPort() native "Socket_GetPort"; 1072 int nativeGetPort() native "Socket_GetPort";
1070 List nativeGetRemotePeer() native "Socket_GetRemotePeer"; 1073 List nativeGetRemotePeer() native "Socket_GetRemotePeer";
1074 int nativeGetSocketId() native "Socket_GetSocketId";
1071 OSError nativeGetError() native "Socket_GetError"; 1075 OSError nativeGetError() native "Socket_GetError";
1072 nativeGetOption(int option, int protocol) native "Socket_GetOption"; 1076 nativeGetOption(int option, int protocol) native "Socket_GetOption";
1073 bool nativeSetOption(int option, int protocol, value) 1077 bool nativeSetOption(int option, int protocol, value)
1074 native "Socket_SetOption"; 1078 native "Socket_SetOption";
1075 bool nativeJoinMulticast( 1079 bool nativeJoinMulticast(
1076 List<int> addr, List<int> interfaceAddr, int interfaceIndex) 1080 List<int> addr, List<int> interfaceAddr, int interfaceIndex)
1077 native "Socket_JoinMulticast"; 1081 native "Socket_JoinMulticast";
1078 bool nativeLeaveMulticast( 1082 bool nativeLeaveMulticast(
1079 List<int> addr, List<int> interfaceAddr, int interfaceIndex) 1083 List<int> addr, List<int> interfaceAddr, int interfaceIndex)
1080 native "Socket_LeaveMulticast"; 1084 native "Socket_LeaveMulticast";
1081 } 1085 }
1082 1086
1083 1087
1084 class _RawServerSocket extends Stream<RawSocket> 1088 class _RawServerSocket extends Stream<RawSocket>
1085 implements RawServerSocket { 1089 implements RawServerSocket {
1086 final _NativeSocket _socket; 1090 final _NativeSocket _socket;
1087 StreamController<RawSocket> _controller; 1091 StreamController<RawSocket> _controller;
1092 ReceivePort _referencePort;
1088 1093
1089 static Future<_RawServerSocket> bind(address, 1094 static Future<_RawServerSocket> bind(address,
1090 int port, 1095 int port,
1091 int backlog, 1096 int backlog,
1092 bool v6Only) { 1097 bool v6Only) {
1093 if (port < 0 || port > 0xFFFF) 1098 if (port < 0 || port > 0xFFFF)
1094 throw new ArgumentError("Invalid port $port"); 1099 throw new ArgumentError("Invalid port $port");
1095 if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog"); 1100 if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog");
1096 return _NativeSocket.bind(address, port, backlog, v6Only) 1101 return _NativeSocket.bind(address, port, backlog, v6Only)
1097 .then((socket) => new _RawServerSocket(socket)); 1102 .then((socket) => new _RawServerSocket(socket));
(...skipping 20 matching lines...) Expand all
1118 var socket = _socket.accept(); 1123 var socket = _socket.accept();
1119 if (socket == null) return; 1124 if (socket == null) return;
1120 _controller.add(new _RawSocket(socket)); 1125 _controller.add(new _RawSocket(socket));
1121 if (_controller.isPaused) return; 1126 if (_controller.isPaused) return;
1122 } 1127 }
1123 }), 1128 }),
1124 error: zone.bindUnaryCallback((e) { 1129 error: zone.bindUnaryCallback((e) {
1125 _controller.addError(e); 1130 _controller.addError(e);
1126 _controller.close(); 1131 _controller.close();
1127 }), 1132 }),
1128 destroyed: _controller.close); 1133 destroyed: () {
1134 _controller.close();
1135 if (_referencePort != null) {
1136 _referencePort.close();
1137 }
1138 });
1129 return _controller.stream.listen( 1139 return _controller.stream.listen(
1130 onData, 1140 onData,
1131 onError: onError, 1141 onError: onError,
1132 onDone: onDone, 1142 onDone: onDone,
1133 cancelOnError: cancelOnError); 1143 cancelOnError: cancelOnError);
1134 } 1144 }
1135 1145
1136 int get port => _socket.port; 1146 int get port => _socket.port;
1137 1147
1138 InternetAddress get address => _socket.address; 1148 InternetAddress get address => _socket.address;
(...skipping 16 matching lines...) Expand all
1155 } 1165 }
1156 } 1166 }
1157 1167
1158 void _onPauseStateChange() { 1168 void _onPauseStateChange() {
1159 if (_controller.isPaused) { 1169 if (_controller.isPaused) {
1160 _pause(); 1170 _pause();
1161 } else { 1171 } else {
1162 _resume(); 1172 _resume();
1163 } 1173 }
1164 } 1174 }
1175
1176 RawServerSocketReference get reference {
1177 if (_referencePort == null) {
1178 _referencePort = new ReceivePort();
1179 _referencePort.listen((sendPort) {
1180 sendPort.send(
1181 [_socket.nativeGetSocketId(),
1182 _socket.address,
1183 _socket.localPort]);
1184 });
1185 }
1186 return new _RawServerSocketReference(_referencePort.sendPort);
1187 }
1165 } 1188 }
1166 1189
1167 1190
1191 class _RawServerSocketReference implements RawServerSocketReference {
1192 final SendPort _sendPort;
1193
1194 _RawServerSocketReference(this._sendPort);
1195
1196 Future<RawServerSocket> create() {
1197 var port = new ReceivePort();
1198 _sendPort.send(port.sendPort);
1199 return port.first.then((args) {
1200 port.close();
1201 var native = new _NativeSocket.listen();
1202 native.nativeSetSocketId(args[0]);
1203 native.address = args[1];
1204 native.localPort = args[2];
1205 return new _RawServerSocket(native);
1206 });
1207 }
1208
1209 int get hashCode => _sendPort.hashCode;
1210
1211 bool operator==(Object other)
1212 => other is _RawServerSocketReference && _sendPort == other._sendPort;
1213 }
1214
1215
1168 class _RawSocket extends Stream<RawSocketEvent> 1216 class _RawSocket extends Stream<RawSocketEvent>
1169 implements RawSocket { 1217 implements RawSocket {
1170 final _NativeSocket _socket; 1218 final _NativeSocket _socket;
1171 StreamController<RawSocketEvent> _controller; 1219 StreamController<RawSocketEvent> _controller;
1172 bool _readEventsEnabled = true; 1220 bool _readEventsEnabled = true;
1173 bool _writeEventsEnabled = true; 1221 bool _writeEventsEnabled = true;
1174 1222
1175 // Flag to handle Ctrl-D closing of stdio on Mac OS. 1223 // Flag to handle Ctrl-D closing of stdio on Mac OS.
1176 bool _isMacOSTerminalInput = false; 1224 bool _isMacOSTerminalInput = false;
1177 1225
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after
1315 1363
1316 patch class ServerSocket { 1364 patch class ServerSocket {
1317 /* patch */ static Future<ServerSocket> bind(address, 1365 /* patch */ static Future<ServerSocket> bind(address,
1318 int port, 1366 int port,
1319 {int backlog: 0, 1367 {int backlog: 0,
1320 bool v6Only: false}) { 1368 bool v6Only: false}) {
1321 return _ServerSocket.bind(address, port, backlog, v6Only); 1369 return _ServerSocket.bind(address, port, backlog, v6Only);
1322 } 1370 }
1323 } 1371 }
1324 1372
1373
1374 class _ServerSocketReference implements ServerSocketReference {
1375 final RawServerSocketReference _rawReference;
1376
1377 _ServerSocketReference(this._rawReference);
1378
1379 Future<ServerSocket> create() {
1380 return _rawReference.create().then((raw) => new _ServerSocket(raw));
1381 }
1382 }
1383
1384
1325 class _ServerSocket extends Stream<Socket> 1385 class _ServerSocket extends Stream<Socket>
1326 implements ServerSocket { 1386 implements ServerSocket {
1327 final _socket; 1387 final _socket;
1328 1388
1329 static Future<_ServerSocket> bind(address, 1389 static Future<_ServerSocket> bind(address,
1330 int port, 1390 int port,
1331 int backlog, 1391 int backlog,
1332 bool v6Only) { 1392 bool v6Only) {
1333 return _RawServerSocket.bind(address, port, backlog, v6Only) 1393 return _RawServerSocket.bind(address, port, backlog, v6Only)
1334 .then((socket) => new _ServerSocket(socket)); 1394 .then((socket) => new _ServerSocket(socket));
(...skipping 10 matching lines...) Expand all
1345 onError: onError, 1405 onError: onError,
1346 onDone: onDone, 1406 onDone: onDone,
1347 cancelOnError: cancelOnError); 1407 cancelOnError: cancelOnError);
1348 } 1408 }
1349 1409
1350 int get port => _socket.port; 1410 int get port => _socket.port;
1351 1411
1352 InternetAddress get address => _socket.address; 1412 InternetAddress get address => _socket.address;
1353 1413
1354 Future close() => _socket.close().then((_) => this); 1414 Future close() => _socket.close().then((_) => this);
1415
1416 ServerSocketReference get reference {
1417 return new _ServerSocketReference(_socket.reference);
1418 }
1355 } 1419 }
1356 1420
1357 1421
1358 patch class Socket { 1422 patch class Socket {
1359 /* patch */ static Future<Socket> connect(host, int port) { 1423 /* patch */ static Future<Socket> connect(host, int port) {
1360 return RawSocket.connect(host, port).then( 1424 return RawSocket.connect(host, port).then(
1361 (socket) => new _Socket(socket)); 1425 (socket) => new _Socket(socket));
1362 } 1426 }
1363 } 1427 }
1364 1428
(...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after
1795 String address, 1859 String address,
1796 List<int> in_addr, 1860 List<int> in_addr,
1797 int port) { 1861 int port) {
1798 return new Datagram( 1862 return new Datagram(
1799 data, 1863 data,
1800 new _InternetAddress(address, null, in_addr), 1864 new _InternetAddress(address, null, in_addr),
1801 port); 1865 port);
1802 } 1866 }
1803 1867
1804 String _socketsStats() => _SocketsObservatory.toJSON(); 1868 String _socketsStats() => _SocketsObservatory.toJSON();
OLDNEW
« no previous file with comments | « runtime/bin/socket.cc ('k') | sdk/lib/io/socket.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698