Index: runtime/bin/socket_patch.dart |
diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart |
index d313a0d410364d17aac109f01864bc7248d5dbcc..6e0ed13294768617e4734f504cfd106d03294eb7 100644 |
--- a/runtime/bin/socket_patch.dart |
+++ b/runtime/bin/socket_patch.dart |
@@ -247,6 +247,10 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
static const LIST_INTERFACES = 1; |
static const REVERSE_LOOKUP = 2; |
+ // Protocol flags. |
+ static const int PROTOCOL_IPV4 = 1 << 0; |
+ static const int PROTOCOL_IPV6 = 1 << 1; |
+ |
// Socket close state |
bool isClosed = false; |
bool isClosing = false; |
@@ -265,8 +269,8 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
// The type flags for this socket. |
final int typeFlags; |
- // Holds the port of the socket, null if not known. |
- int localPort; |
+ // Holds the port of the socket, 0 if not known. |
+ int localPort = 0; |
// Holds the address used to connect or bind the socket. |
InternetAddress address; |
@@ -398,6 +402,39 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
}); |
} |
+ static Future<_NativeSocket> bindDatagram( |
+ host, int port, bool reuseAddress) { |
+ return new Future.value(host) |
+ .then((host) { |
+ if (host is _InternetAddress) return host; |
+ return lookup(host) |
+ .then((list) { |
+ if (list.length == 0) { |
+ throw createError(response, "Failed host lookup: '$host'"); |
+ } |
+ return list[0]; |
+ }); |
+ }) |
+ .then((address) { |
+ var socket = new _NativeSocket.datagram(address); |
+ var result = socket.nativeCreateBindDatagram( |
+ address._sockaddr_storage, port, reuseAddress); |
+ if (result is OSError) { |
+ throw new SocketException("Failed to create datagram socket", |
+ osError: result, |
+ address: address, |
+ port: port); |
+ } |
+ if (port != 0) socket.localPort = port; |
+ return socket; |
+ }); |
+ } |
+ |
+ _NativeSocket.datagram(this.address) |
+ : typeFlags = TYPE_NORMAL_SOCKET { |
+ eventHandlers = new List(EVENT_COUNT + 1); |
+ } |
+ |
_NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET { |
eventHandlers = new List(EVENT_COUNT + 1); |
} |
@@ -440,6 +477,16 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
return result; |
} |
+ Datagram receive() { |
+ if (isClosing || isClosed) return null; |
+ var result = nativeRecvFrom(); |
+ if (result is OSError) { |
+ reportError(result, "Receive failed"); |
+ return null; |
+ } |
+ return result; |
+ } |
+ |
int write(List<int> buffer, int offset, int bytes) { |
if (buffer is! List) throw new ArgumentError(); |
if (offset == null) offset = 0; |
@@ -470,6 +517,22 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
return result; |
} |
+ int send(List<int> buffer, int offset, int bytes, |
+ InternetAddress address, int port) { |
+ if (isClosing || isClosed) return 0; |
+ _BufferAndStart bufferAndStart = |
+ _ensureFastAndSerializableByteData( |
+ buffer, offset, bytes); |
+ var result = nativeSendTo( |
+ bufferAndStart.buffer, bufferAndStart.start, bytes, |
+ address._sockaddr_storage, port); |
+ if (result is OSError) { |
+ scheduleMicrotask(() => reportError(result, "Send failed")); |
+ result = 0; |
+ } |
+ return result; |
+ } |
+ |
_NativeSocket accept() { |
// Don't issue accept if we're closing. |
if (isClosing || isClosed) return null; |
@@ -481,7 +544,7 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
} |
int get port { |
- if (localPort != null) return localPort; |
+ if (localPort != 0) return localPort; |
return localPort = nativeGetPort(); |
} |
@@ -697,26 +760,91 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
close(); |
} |
- bool setOption(SocketOption option, bool enabled) { |
+ getOption(SocketOption option) { |
if (option is! SocketOption) throw new ArgumentError(options); |
- if (enabled is! bool) throw new ArgumentError(enabled); |
- return nativeSetOption(option._value, enabled); |
+ var result = nativeGetOption(option._value, address.type._value); |
+ if (result is OSError) throw result; |
+ return result; |
+ } |
+ |
+ bool setOption(SocketOption option, value) { |
+ if (option is! SocketOption) throw new ArgumentError(options); |
+ var result = nativeSetOption(option._value, address.type._value, value); |
+ if (result is OSError) throw result; |
+ } |
+ |
+ InternetAddress multicastAddress( |
+ InternetAddress addr, NetworkInterface interface) { |
+ // On Mac OS using the interface index for joining IPv4 multicast groups |
+ // is not supported. Here the IP address of the interface is needed. |
+ if (Platform.isMacOS && addr.type == InternetAddressType.IP_V4) { |
+ if (interface != null) { |
+ for (int i = 0; i < interface.addresses.length; i++) { |
+ if (addr.type == InternetAddressType.IP_V4) { |
+ return interface.addresses[i]; |
+ } |
+ } |
+ // No IPv4 address found on the interface. |
+ throw new SocketException( |
+ "The network interface does not have an address " |
+ "of the same family as the multicast address"); |
+ } else { |
+ // Default to the ANY address if on iterface is specified. |
+ return InternetAddress.ANY_IP_V4; |
+ } |
+ } else { |
+ return null; |
+ } |
+ } |
+ |
+ void joinMulticast(InternetAddress addr, NetworkInterface interface) { |
+ var interfaceAddr = multicastAddress(addr, interface); |
+ var interfaceIndex = interface == null ? 0 : interface.index; |
+ var result = nativeJoinMulticast( |
+ addr._sockaddr_storage, |
+ interfaceAddr == null ? null : interfaceAddr._sockaddr_storage, |
+ interfaceIndex); |
+ if (result is OSError) throw result; |
+ } |
+ |
+ void leaveMulticast(InternetAddress addr, NetworkInterface interface) { |
+ var interfaceAddr = multicastAddress(addr, interface); |
+ var interfaceIndex = interface == null ? 0 : interface.index; |
+ var result = nativeLeaveMulticast( |
+ addr._sockaddr_storage, |
+ interfaceAddr == null ? null : interfaceAddr._sockaddr_storage, |
+ interfaceIndex); |
+ if (result is OSError) throw result; |
} |
void nativeSetSocketId(int id) native "Socket_SetSocketId"; |
nativeAvailable() native "Socket_Available"; |
nativeRead(int len) native "Socket_Read"; |
+ nativeRecvFrom() native "Socket_RecvFrom"; |
nativeWrite(List<int> buffer, int offset, int bytes) |
native "Socket_WriteList"; |
+ nativeSendTo(List<int> buffer, int offset, int bytes, |
+ List<int> address, int port) |
+ native "Socket_SendTo"; |
nativeCreateConnect(List<int> addr, |
int port) native "Socket_CreateConnect"; |
nativeCreateBindListen(List<int> addr, int port, int backlog, bool v6Only) |
native "ServerSocket_CreateBindListen"; |
+ nativeCreateBindDatagram(List<int> addr, int port, bool reuseAddress) |
+ native "Socket_CreateBindDatagram"; |
nativeAccept(_NativeSocket socket) native "ServerSocket_Accept"; |
int nativeGetPort() native "Socket_GetPort"; |
List nativeGetRemotePeer() native "Socket_GetRemotePeer"; |
OSError nativeGetError() native "Socket_GetError"; |
- bool nativeSetOption(int option, bool enabled) native "Socket_SetOption"; |
+ nativeGetOption(int option, int protocol) native "Socket_GetOption"; |
+ bool nativeSetOption(int option, int protocol, value) |
+ native "Socket_SetOption"; |
+ bool nativeJoinMulticast( |
+ List<int> addr, List<int> interfaceAddr, int interfaceIndex) |
+ native "Socket_JoinMulticast"; |
+ bool nativeLeaveMulticast( |
+ List<int> addr, List<int> interfaceAddr, int interfaceIndex) |
+ native "Socket_LeaveMulticast"; |
} |
@@ -1281,3 +1409,156 @@ class _Socket extends Stream<List<int>> implements Socket { |
} |
} |
} |
+ |
+ |
+patch class RawDatagramSocket { |
+ /* patch */ static Future<RawDatagramSocket> bind( |
+ host, int port, {bool reuseAddress: true}) { |
+ return _RawDatagramSocket.bind(host, port, reuseAddress); |
+ } |
+} |
+ |
+class _RawDatagramSocket extends Stream implements RawDatagramSocket { |
+ _NativeSocket _socket; |
+ StreamController<RawSocketEvent> _controller; |
+ bool _readEventsEnabled = true; |
+ bool _writeEventsEnabled = true; |
+ |
+ _RawDatagramSocket(this._socket) { |
+ var zone = Zone.current; |
+ _controller = new StreamController(sync: true, |
+ onListen: _onSubscriptionStateChange, |
+ onCancel: _onSubscriptionStateChange, |
+ onPause: _onPauseStateChange, |
+ onResume: _onPauseStateChange); |
+ _socket.closeFuture.then((_) => _controller.close()); |
+ _socket.setHandlers( |
+ read: () => _controller.add(RawSocketEvent.READ), |
+ write: () { |
+ // The write event handler is automatically disabled by the |
+ // event handler when it fires. |
+ _writeEventsEnabled = false; |
+ _controller.add(RawSocketEvent.WRITE); |
+ }, |
+ closed: () => _controller.add(RawSocketEvent.READ_CLOSED), |
+ destroyed: () => _controller.add(RawSocketEvent.CLOSED), |
+ error: zone.bindUnaryCallback((e) { |
+ _controller.addError(e); |
+ close(); |
+ }) |
+ ); |
+ } |
+ |
+ static Future<RawDatagramSocket> bind( |
+ host, int port, bool reuseAddress) { |
+ if (port < 0 || port > 0xffff) |
+ throw new ArgumentError("Invalid port $port"); |
+ return _NativeSocket.bindDatagram(host, port, reuseAddress) |
+ .then((socket) => new _RawDatagramSocket(socket)); |
+ } |
+ |
+ StreamSubscription<RawSocketEvent> listen(void onData(RawSocketEvent event), |
+ {Function onError, |
+ void onDone(), |
+ bool cancelOnError}) { |
+ return _controller.stream.listen( |
+ onData, |
+ onError: onError, |
+ onDone: onDone, |
+ cancelOnError: cancelOnError); |
+ } |
+ |
+ Future close() => _socket.close().then((_) => this); |
+ |
+ int send(List<data> buffer, InternetAddress address, int port) => |
+ _socket.send(buffer, 0, buffer.length, address, port); |
+ |
+ Datagram receive() { |
+ return _socket.receive(); |
+ } |
+ |
+ void joinMulticast(InternetAddress group, [NetworkInterface interface]) { |
+ _socket.joinMulticast(group, interface); |
+ } |
+ |
+ void leaveMulticast(InternetAddress group, [NetworkInterface interface]) { |
+ _socket.leaveMulticast(group, interface); |
+ } |
+ |
+ bool get readEventsEnabled => _readEventsEnabled; |
+ void set readEventsEnabled(bool value) { |
+ if (value != _readEventsEnabled) { |
+ _readEventsEnabled = value; |
+ if (!_controller.isPaused) _resume(); |
+ } |
+ } |
+ |
+ bool get writeEventsEnabled => _writeEventsEnabled; |
+ void set writeEventsEnabled(bool value) { |
+ if (value != _writeEventsEnabled) { |
+ _writeEventsEnabled = value; |
+ if (!_controller.isPaused) _resume(); |
+ } |
+ } |
+ |
+ bool get multicastLoopback => |
+ _socket.getOption(SocketOption._IP_MULTICAST_LOOP); |
+ void set multicastLoopback(bool value) => |
+ _socket.setOption(SocketOption._IP_MULTICAST_LOOP, value); |
+ |
+ int get multicastHops => |
+ _socket.getOption(SocketOption._IP_MULTICAST_HOPS); |
+ void set multicastHops(int value) => |
+ _socket.setOption(SocketOption._IP_MULTICAST_HOPS, value); |
+ |
+ NetworkInterface get multicastInterface => |
+ throw "Not implemented"; |
+ void set multicastInterface(NetworkInterface value) => |
+ throw "Not implemented"; |
+ |
+ bool get broadcastEnabled => |
+ _socket.getOption(SocketOption._IP_BROADCAST); |
+ void set broadcastEnabled(bool value) => |
+ _socket.setOption(SocketOption._IP_BROADCAST, value); |
+ |
+ int get port => _socket.port; |
+ |
+ InternetAddress get address => _socket.address; |
+ |
+ _pause() { |
+ _socket.setListening(read: false, write: false); |
+ } |
+ |
+ void _resume() { |
+ _socket.setListening(read: _readEventsEnabled, write: _writeEventsEnabled); |
+ } |
+ |
+ void _onPauseStateChange() { |
+ if (_controller.isPaused) { |
+ _pause(); |
+ } else { |
+ _resume(); |
+ } |
+ } |
+ |
+ void _onSubscriptionStateChange() { |
+ if (_controller.hasListener) { |
+ _resume(); |
+ } else { |
+ close(); |
+ } |
+ } |
+} |
+ |
+Datagram _makeDatagram(List<int> data, |
+ bool ipV6, |
+ String address, |
+ List<int> sockaddr_storage, |
+ int port) { |
+ var addressType = |
+ ipV6 ? InternetAddressType.IP_V6 : InternetAddressType.IP_V4; |
+ return new Datagram( |
+ data, |
+ new _InternetAddress(addressType, address, null, sockaddr_storage), |
+ port); |
+} |