Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(278)

Side by Side Diff: runtime/bin/socket_patch.dart

Issue 85993002: Add UDP support to dart:io (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Addressed first round of comments Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 patch class RawServerSocket { 5 patch class RawServerSocket {
6 /* patch */ static Future<RawServerSocket> bind(address, 6 /* patch */ static Future<RawServerSocket> bind(address,
7 int port, 7 int port,
8 {int backlog: 0, 8 {int backlog: 0,
9 bool v6Only: false}) { 9 bool v6Only: false}) {
10 return _RawServerSocket.bind(address, port, backlog, v6Only); 10 return _RawServerSocket.bind(address, port, backlog, v6Only);
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
118 _sockaddr_storage[_IPV4_ADDR_OFFSET] < 240; 118 _sockaddr_storage[_IPV4_ADDR_OFFSET] < 240;
119 119
120 case InternetAddressType.IP_V6: 120 case InternetAddressType.IP_V6:
121 // Checking for ff00::/8. 121 // Checking for ff00::/8.
122 return _sockaddr_storage[_IPV6_ADDR_OFFSET] == 0xFF; 122 return _sockaddr_storage[_IPV6_ADDR_OFFSET] == 0xFF;
123 } 123 }
124 } 124 }
125 125
126 Future<InternetAddress> reverse() => _NativeSocket.reverseLookup(this); 126 Future<InternetAddress> reverse() => _NativeSocket.reverseLookup(this);
127 127
128
Anders Johnsen 2013/11/29 12:36:54 intentional?
Søren Gjesse 2013/12/12 11:38:46 No, reverted.
128 _InternetAddress(InternetAddressType this.type, 129 _InternetAddress(InternetAddressType this.type,
129 String this.address, 130 String this.address,
130 String this._host, 131 String this._host,
131 List<int> this._sockaddr_storage); 132 List<int> this._sockaddr_storage);
132 133
133 factory _InternetAddress.parse(String address) { 134 factory _InternetAddress.parse(String address) {
134 var type = address.indexOf(':') == -1 135 var type = address.indexOf(':') == -1
135 ? InternetAddressType.IP_V4 136 ? InternetAddressType.IP_V4
136 : InternetAddressType.IP_V6; 137 : InternetAddressType.IP_V6;
137 var raw = _parse(type._value, address); 138 var raw = _parse(type._value, address);
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
239 static const int PIPE_SOCKET = 17; 240 static const int PIPE_SOCKET = 17;
240 static const int TYPE_NORMAL_SOCKET = 0; 241 static const int TYPE_NORMAL_SOCKET = 0;
241 static const int TYPE_LISTENING_SOCKET = 1 << LISTENING_SOCKET; 242 static const int TYPE_LISTENING_SOCKET = 1 << LISTENING_SOCKET;
242 static const int TYPE_PIPE = 1 << PIPE_SOCKET; 243 static const int TYPE_PIPE = 1 << PIPE_SOCKET;
243 244
244 // Native port messages. 245 // Native port messages.
245 static const HOST_NAME_LOOKUP = 0; 246 static const HOST_NAME_LOOKUP = 0;
246 static const LIST_INTERFACES = 1; 247 static const LIST_INTERFACES = 1;
247 static const REVERSE_LOOKUP = 2; 248 static const REVERSE_LOOKUP = 2;
248 249
250 // Protocol flags.
251 static const int PROTOCOL_IPV4 = 1 << 0;
252 static const int PROTOCOL_IPV6 = 1 << 1;
253
249 // Socket close state 254 // Socket close state
250 bool isClosed = false; 255 bool isClosed = false;
251 bool isClosing = false; 256 bool isClosing = false;
252 bool isClosedRead = false; 257 bool isClosedRead = false;
253 bool isClosedWrite = false; 258 bool isClosedWrite = false;
254 Completer closeCompleter = new Completer(); 259 Completer closeCompleter = new Completer();
255 260
256 // Handlers and receive port for socket events from the event handler. 261 // Handlers and receive port for socket events from the event handler.
257 int eventMask = 0; 262 int eventMask = 0;
258 List eventHandlers; 263 List eventHandlers;
259 RawReceivePort eventPort; 264 RawReceivePort eventPort;
260 265
261 // Indicates if native interrupts can be activated. 266 // Indicates if native interrupts can be activated.
262 bool canActivateEvents = true; 267 bool canActivateEvents = true;
263 268
264 // The type flags for this socket. 269 // The type flags for this socket.
265 final int typeFlags; 270 final int typeFlags;
266 271
267 // Holds the port of the socket, null if not known. 272 // Holds the port of the socket, 0 if not known.
268 int localPort; 273 int localPort = 0;
269 274
270 // Holds the address used to connect or bind the socket. 275 // Holds the address used to connect or bind the socket.
271 InternetAddress address; 276 InternetAddress address;
272 277
273 static Future<List<InternetAddress>> lookup( 278 static Future<List<InternetAddress>> lookup(
274 String host, {InternetAddressType type: InternetAddressType.ANY}) { 279 String host, {InternetAddressType type: InternetAddressType.ANY}) {
275 return _IOService.dispatch(_SOCKET_LOOKUP, [host, type._value]) 280 return _IOService.dispatch(_SOCKET_LOOKUP, [host, type._value])
276 .then((response) { 281 .then((response) {
277 if (isErrorResponse(response)) { 282 if (isErrorResponse(response)) {
278 throw createError(response, "Failed host lookup: '$host'"); 283 throw createError(response, "Failed host lookup: '$host'");
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after
392 throw new SocketException("Failed to create server socket", 397 throw new SocketException("Failed to create server socket",
393 osError: result, 398 osError: result,
394 address: address, 399 address: address,
395 port: port); 400 port: port);
396 } 401 }
397 if (port != 0) socket.localPort = port; 402 if (port != 0) socket.localPort = port;
398 return socket; 403 return socket;
399 }); 404 });
400 } 405 }
401 406
407 static Future<_NativeSocket> bindDatagram(
408 host, int port, bool reuseAddress) {
409 return new Future.value(host)
410 .then((host) {
411 if (host is _InternetAddress) return host;
412 return lookup(host)
413 .then((list) {
414 if (list.length == 0) {
415 throw createError(response, "Failed host lookup: '$host'");
416 }
417 return list[0];
418 });
419 })
420 .then((address) {
421 var socket = new _NativeSocket.datagram(address);
422 var result = socket.nativeCreateBindDatagram(
423 address._sockaddr_storage, port, reuseAddress);
424 if (result is OSError) {
425 throw new SocketException("Failed to create datagram socket",
426 osError: result,
427 address: address,
428 port: port);
429 }
430 if (port != 0) socket.localPort = port;
431 return socket;
432 });
433 }
434
435 _NativeSocket.datagram(this.address)
436 : typeFlags = TYPE_NORMAL_SOCKET {
437 eventHandlers = new List(EVENT_COUNT + 1);
438 }
439
402 _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET { 440 _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET {
403 eventHandlers = new List(EVENT_COUNT + 1); 441 eventHandlers = new List(EVENT_COUNT + 1);
404 } 442 }
405 443
406 _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET { 444 _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET {
407 eventHandlers = new List(EVENT_COUNT + 1); 445 eventHandlers = new List(EVENT_COUNT + 1);
408 } 446 }
409 447
410 _NativeSocket.pipe() : typeFlags = TYPE_PIPE { 448 _NativeSocket.pipe() : typeFlags = TYPE_PIPE {
411 eventHandlers = new List(EVENT_COUNT + 1); 449 eventHandlers = new List(EVENT_COUNT + 1);
(...skipping 22 matching lines...) Expand all
434 } 472 }
435 if (isClosing || isClosed) return null; 473 if (isClosing || isClosed) return null;
436 var result = nativeRead(len == null ? -1 : len); 474 var result = nativeRead(len == null ? -1 : len);
437 if (result is OSError) { 475 if (result is OSError) {
438 reportError(result, "Read failed"); 476 reportError(result, "Read failed");
439 return null; 477 return null;
440 } 478 }
441 return result; 479 return result;
442 } 480 }
443 481
482 Datagram receive() {
483 if (isClosing || isClosed) return null;
484 var result = nativeRecvFrom();
485 if (result is OSError) {
486 reportError(result, "Receive failed");
Anders Johnsen 2013/11/29 12:36:54 Put in scheduleMicrotask.
Søren Gjesse 2013/12/12 11:38:46 Done.
487 return null;
488 }
489 return result;
490 }
491
444 int write(List<int> buffer, int offset, int bytes) { 492 int write(List<int> buffer, int offset, int bytes) {
445 if (buffer is! List) throw new ArgumentError(); 493 if (buffer is! List) throw new ArgumentError();
446 if (offset == null) offset = 0; 494 if (offset == null) offset = 0;
447 if (bytes == null) { 495 if (bytes == null) {
448 if (offset > buffer.length) { 496 if (offset > buffer.length) {
449 throw new RangeError.value(offset); 497 throw new RangeError.value(offset);
450 } 498 }
451 bytes = buffer.length - offset; 499 bytes = buffer.length - offset;
452 } 500 }
453 if (offset < 0) throw new RangeError.value(offset); 501 if (offset < 0) throw new RangeError.value(offset);
(...skipping 10 matching lines...) Expand all
464 _ensureFastAndSerializableByteData(buffer, offset, offset + bytes); 512 _ensureFastAndSerializableByteData(buffer, offset, offset + bytes);
465 var result = 513 var result =
466 nativeWrite(bufferAndStart.buffer, bufferAndStart.start, bytes); 514 nativeWrite(bufferAndStart.buffer, bufferAndStart.start, bytes);
467 if (result is OSError) { 515 if (result is OSError) {
468 reportError(result, "Write failed"); 516 reportError(result, "Write failed");
469 result = 0; 517 result = 0;
470 } 518 }
471 return result; 519 return result;
472 } 520 }
473 521
522 int send(List<int> buffer, int offset, int bytes,
523 InternetAddress address, int port) {
524 if (isClosing || isClosed) return 0;
525 _BufferAndStart bufferAndStart =
526 _ensureFastAndSerializableByteData(
527 buffer, offset, bytes);
528 var result = nativeSendTo(
Anders Johnsen 2013/11/29 12:36:54 Use result?
Søren Gjesse 2013/12/12 11:38:46 Done.
529 bufferAndStart.buffer, bufferAndStart.start, bytes,
530 address._sockaddr_storage, port);
531 }
532
474 _NativeSocket accept() { 533 _NativeSocket accept() {
475 // Don't issue accept if we're closing. 534 // Don't issue accept if we're closing.
476 if (isClosing || isClosed) return null; 535 if (isClosing || isClosed) return null;
477 var socket = new _NativeSocket.normal(); 536 var socket = new _NativeSocket.normal();
478 if (nativeAccept(socket) != true) return null; 537 if (nativeAccept(socket) != true) return null;
479 socket.localPort = localPort; 538 socket.localPort = localPort;
480 socket.address = address; 539 socket.address = address;
481 return socket; 540 return socket;
482 } 541 }
483 542
484 int get port { 543 int get port {
485 if (localPort != null) return localPort; 544 if (localPort != 0) return localPort;
486 return localPort = nativeGetPort(); 545 return localPort = nativeGetPort();
487 } 546 }
488 547
489 int get remotePort { 548 int get remotePort {
490 return nativeGetRemotePeer()[1]; 549 return nativeGetRemotePeer()[1];
491 } 550 }
492 551
493 InternetAddress get remoteAddress { 552 InternetAddress get remoteAddress {
494 var result = nativeGetRemotePeer()[0]; 553 var result = nativeGetRemotePeer()[0];
495 var type = new InternetAddressType._from(result[0]); 554 var type = new InternetAddressType._from(result[0]);
(...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after
690 void reportError(error, String message) { 749 void reportError(error, String message) {
691 var e = createError(error, message, address, localPort); 750 var e = createError(error, message, address, localPort);
692 // Invoke the error handler if any. 751 // Invoke the error handler if any.
693 if (eventHandlers[ERROR_EVENT] != null) { 752 if (eventHandlers[ERROR_EVENT] != null) {
694 eventHandlers[ERROR_EVENT](e); 753 eventHandlers[ERROR_EVENT](e);
695 } 754 }
696 // For all errors we close the socket 755 // For all errors we close the socket
697 close(); 756 close();
698 } 757 }
699 758
700 bool setOption(SocketOption option, bool enabled) { 759 bool getOption(SocketOption option) {
Anders Johnsen 2013/11/29 12:36:54 I suppose the result is dynamic?
Søren Gjesse 2013/12/12 11:38:46 Done.
701 if (option is! SocketOption) throw new ArgumentError(options); 760 if (option is! SocketOption) throw new ArgumentError(options);
702 if (enabled is! bool) throw new ArgumentError(enabled); 761 var result = nativeGetOption(option._value, address.type._value);
703 return nativeSetOption(option._value, enabled); 762 if (result is OSError) throw result;
763 return result;
764 }
765
766 bool setOption(SocketOption option, value) {
767 if (option is! SocketOption) throw new ArgumentError(options);
768 var result = nativeSetOption(option._value, address.type._value, value);
769 if (result is OSError) throw result;
770 }
771
772 void join(InternetAddress addr, NetworkInterface interface) {
773 var result = nativeJoinMulticast(addr._sockaddr_storage, 0);
774 if (result is OSError) throw result;
775 }
776
777 void leave(InternetAddress addr, NetworkInterface interface) {
778 var result = nativeLeaveMulticast(addr._sockaddr_storage, 0);
779 if (result is OSError) throw result;
704 } 780 }
705 781
706 void nativeSetSocketId(int id) native "Socket_SetSocketId"; 782 void nativeSetSocketId(int id) native "Socket_SetSocketId";
707 nativeAvailable() native "Socket_Available"; 783 nativeAvailable() native "Socket_Available";
708 nativeRead(int len) native "Socket_Read"; 784 nativeRead(int len) native "Socket_Read";
785 nativeRecvFrom() native "Socket_RecvFrom";
709 nativeWrite(List<int> buffer, int offset, int bytes) 786 nativeWrite(List<int> buffer, int offset, int bytes)
710 native "Socket_WriteList"; 787 native "Socket_WriteList";
788 nativeSendTo(List<int> buffer, int offset, int bytes,
789 List<int> address, int port)
790 native "Socket_SendTo";
711 nativeCreateConnect(List<int> addr, 791 nativeCreateConnect(List<int> addr,
712 int port) native "Socket_CreateConnect"; 792 int port) native "Socket_CreateConnect";
713 nativeCreateBindListen(List<int> addr, int port, int backlog, bool v6Only) 793 nativeCreateBindListen(List<int> addr, int port, int backlog, bool v6Only)
714 native "ServerSocket_CreateBindListen"; 794 native "ServerSocket_CreateBindListen";
795 nativeCreateBindDatagram(List<int> addr, int port, bool reuseAddress)
796 native "Socket_CreateBindDatagram";
715 nativeAccept(_NativeSocket socket) native "ServerSocket_Accept"; 797 nativeAccept(_NativeSocket socket) native "ServerSocket_Accept";
716 int nativeGetPort() native "Socket_GetPort"; 798 int nativeGetPort() native "Socket_GetPort";
717 List nativeGetRemotePeer() native "Socket_GetRemotePeer"; 799 List nativeGetRemotePeer() native "Socket_GetRemotePeer";
718 OSError nativeGetError() native "Socket_GetError"; 800 OSError nativeGetError() native "Socket_GetError";
719 bool nativeSetOption(int option, bool enabled) native "Socket_SetOption"; 801 nativeGetOption(int option, int protocol) native "Socket_GetOption";
802 bool nativeSetOption(int option, int protocol, value)
803 native "Socket_SetOption";
804 bool nativeJoinMulticast(List<int> addr, int interface)
805 native "Socket_JoinMulticast";
806 bool nativeLeaveMulticast(List<int> addr, int interface)
807 native "Socket_LeaveMulticast";
720 } 808 }
721 809
722 810
723 class _RawServerSocket extends Stream<RawSocket> 811 class _RawServerSocket extends Stream<RawSocket>
724 implements RawServerSocket { 812 implements RawServerSocket {
725 final _NativeSocket _socket; 813 final _NativeSocket _socket;
726 StreamController<RawSocket> _controller; 814 StreamController<RawSocket> _controller;
727 815
728 static Future<_RawServerSocket> bind(address, 816 static Future<_RawServerSocket> bind(address,
729 int port, 817 int port,
(...skipping 542 matching lines...) Expand 10 before | Expand all | Expand 10 after
1272 if (_detachReady != null) { 1360 if (_detachReady != null) {
1273 _detachReady.complete(null); 1361 _detachReady.complete(null);
1274 } else { 1362 } else {
1275 if (_raw != null) { 1363 if (_raw != null) {
1276 _raw.shutdown(SocketDirection.SEND); 1364 _raw.shutdown(SocketDirection.SEND);
1277 _disableWriteEvent(); 1365 _disableWriteEvent();
1278 } 1366 }
1279 } 1367 }
1280 } 1368 }
1281 } 1369 }
1370
1371
1372 class _RawDatagramSocket extends Stream implements RawDatagramSocket {
1373 _NativeSocket _socket;
1374 StreamController<RawSocketEvent> _controller;
1375 bool _readEventsEnabled = true;
1376 bool _writeEventsEnabled = true;
1377
1378 _RawDatagramSocket(this._socket) {
Anders Johnsen 2013/11/29 12:36:54 Update with zone wrappers, like in RawSocket.
Søren Gjesse 2013/12/12 11:38:46 Done.
1379 _controller = new StreamController(sync: true,
1380 onListen: _onSubscriptionStateChange,
1381 onCancel: _onSubscriptionStateChange,
1382 onPause: _onPauseStateChange,
1383 onResume: _onPauseStateChange);
1384 _socket.closeFuture.then((_) => _controller.close());
1385 _socket.setHandlers(
1386 read: () => _controller.add(RawSocketEvent.READ),
1387 write: () {
1388 // The write event handler is automatically disabled by the
1389 // event handler when it fires.
1390 _writeEventsEnabled = false;
1391 _controller.add(RawSocketEvent.WRITE);
1392 },
1393 closed: () => _controller.add(RawSocketEvent.READ_CLOSED),
1394 destroyed: () => _controller.add(RawSocketEvent.CLOSED),
1395 error: (e) {
1396 _controller.addError(e);
1397 close();
1398 }
1399 );
1400 }
1401
1402 static Future<RawDatagramSocket> bind(
Anders Johnsen 2013/11/29 12:36:54 patch method?
Søren Gjesse 2013/12/12 11:38:46 Done.
1403 host, int port, bool reuseAddress) {
1404 if (port < 0 || port > 0xffff)
1405 throw new ArgumentError("Invalid port $port");
1406 return _NativeSocket.bindDatagram(host, port, reuseAddress)
1407 .then((socket) => new _RawDatagramSocket(socket));
1408 }
1409
1410 StreamSubscription<RawSocketEvent> listen(void onData(RawSocketEvent event),
1411 {Function onError,
1412 void onDone(),
1413 bool cancelOnError}) {
1414 return _controller.stream.listen(
1415 onData,
1416 onError: onError,
1417 onDone: onDone,
1418 cancelOnError: cancelOnError);
1419 }
1420
1421 Future close() => _socket.close().then((_) => this);
1422
1423 bool send(List<data> buffer, InternetAddress address, int port) {
1424 var result = _socket.send(buffer, 0, buffer.length, address, port);
1425 if (result is OSError) {
1426 throw result;
1427 }
1428 assert(result == buffer.length);
Anders Johnsen 2013/11/29 12:36:54 It can be 0?
Søren Gjesse 2013/12/12 11:38:46 Yes, Changed to just call the native socket as the
1429 return true;
1430 }
1431
1432 Datagram receive() {
1433 return _socket.receive();
1434 }
1435
1436 void join(InternetAddress group, [NetworkInterface interface]) {
1437 _socket.join(group, interface);
1438 }
1439
1440 void leave(InternetAddress group, [NetworkInterface interface]) {
1441 _socket.leave(group, interface);
1442 }
1443
1444 bool get readEventsEnabled => _readEventsEnabled;
1445 void set readEventsEnabled(bool value) {
1446 if (value != _readEventsEnabled) {
1447 _readEventsEnabled = value;
1448 if (!_controller.isPaused) _resume();
1449 }
1450 }
1451
1452 bool get writeEventsEnabled => _writeEventsEnabled;
1453 void set writeEventsEnabled(bool value) {
1454 if (value != _writeEventsEnabled) {
1455 _writeEventsEnabled = value;
1456 if (!_controller.isPaused) _resume();
1457 }
1458 }
1459
1460 bool get multicastLoopback =>
1461 _socket.getOption(SocketOption._IP_MULTICAST_LOOP);
1462 void set multicastLoopback(bool value) =>
1463 _socket.setOption(SocketOption._IP_MULTICAST_LOOP, value);
1464
1465 int get multicastHops =>
1466 _socket.getOption(SocketOption._IP_MULTICAST_HOPS);
1467 void set multicastHops(int value) =>
1468 _socket.setOption(SocketOption._IP_MULTICAST_HOPS, value);
1469
1470 NetworkInterface get multicastInterface =>
1471 throw "Not implemented";
1472 void set multicastInterface(NetworkInterface value) =>
1473 throw "Not implemented";
1474
1475 bool get broadcastEnabled =>
1476 _socket.getOption(SocketOption._IP_BROADCAST);
1477 void set broadcastEnabled(bool value) =>
1478 _socket.setOption(SocketOption._IP_BROADCAST, value);
1479
1480 int get port => _socket.port;
1481
1482 InternetAddress get address => _socket.address;
1483
1484 _pause() {
1485 _socket.setListening(read: false, write: false);
1486 }
1487
1488 void _resume() {
1489 _socket.setListening(read: _readEventsEnabled, write: _writeEventsEnabled);
1490 }
1491
1492 void _onPauseStateChange() {
1493 if (_controller.isPaused) {
1494 _pause();
1495 } else {
1496 _resume();
1497 }
1498 }
1499
1500 void _onSubscriptionStateChange() {
1501 if (_controller.hasListener) {
1502 _resume();
1503 } else {
1504 close();
1505 }
1506 }
1507 }
1508
1509 Datagram _makeDatagram(List<int> data,
1510 bool ipV6,
1511 String address,
1512 List<int> sockaddr_storage,
1513 int port) {
1514 var addressType =
1515 ipV6 ? InternetAddressType.IP_V6 : InternetAddressType.IP_V4;
1516 return new Datagram(
1517 data,
1518 new _InternetAddress(addressType, address, null, sockaddr_storage),
1519 port);
1520 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698