Index: runtime/bin/socket_patch.dart |
diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart |
index bc91a3272cf623e0f2f1e875e9720e5860e99fac..6382fc70dbd0d7a75f21d105752bd15b80383170 100644 |
--- a/runtime/bin/socket_patch.dart |
+++ b/runtime/bin/socket_patch.dart |
@@ -1,20 +1,25 @@ |
-// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-patch class ServerSocket { |
- /* patch */ factory ServerSocket(String bindAddress, int port, int backlog) { |
- return new _ServerSocket(bindAddress, port, backlog); |
+patch class RawServerSocket { |
+ /* patch */ static Future<RawServerSocket> bind([String address = "127.0.0.1", |
+ int port = 0, |
+ int backlog = 0]) { |
+ return _RawServerSocket.bind(address, port, backlog); |
} |
} |
-patch class Socket { |
- /* patch */ factory Socket(String host, int port) => new _Socket(host, port); |
+patch class RawSocket { |
+ /* patch */ static Future<RawSocket> connect(String host, int port) { |
+ return _RawSocket.connect(host, port); |
+ } |
} |
-class _SocketBase extends NativeFieldWrapperClass1 { |
+// The _NativeSocket class encapsulates an OS socket. |
+class _NativeSocket extends NativeFieldWrapperClass1 { |
// Bit flags used when communicating between the eventhandler and |
// dart code. The EVENT flags are used to indicate events of |
// interest when sending a message from dart code to the |
@@ -24,584 +29,894 @@ class _SocketBase extends NativeFieldWrapperClass1 { |
// eventhandler. COMMAND flags are never received from the |
// eventhandler. Additional flags are used to communicate other |
// information. |
- static const int _IN_EVENT = 0; |
- static const int _OUT_EVENT = 1; |
- static const int _ERROR_EVENT = 2; |
- static const int _CLOSE_EVENT = 3; |
- |
- static const int _CLOSE_COMMAND = 8; |
- static const int _SHUTDOWN_READ_COMMAND = 9; |
- static const int _SHUTDOWN_WRITE_COMMAND = 10; |
- |
- // Flag send to the eventhandler providing additional information on |
- // the type of the file descriptor. |
- static const int _LISTENING_SOCKET = 16; |
- static const int _PIPE = 17; |
- |
- static const int _FIRST_EVENT = _IN_EVENT; |
- static const int _LAST_EVENT = _CLOSE_EVENT; |
- |
- static const int _FIRST_COMMAND = _CLOSE_COMMAND; |
- static const int _LAST_COMMAND = _SHUTDOWN_WRITE_COMMAND; |
- |
- _SocketBase () { |
- _handlerMap = new List.fixedLength(_LAST_EVENT + 1); |
- _handlerMask = 0; |
- _canActivateHandlers = true; |
- _closed = true; |
+ static const int READ_EVENT = 0; |
+ static const int WRITE_EVENT = 1; |
+ static const int ERROR_EVENT = 2; |
+ static const int CLOSED_EVENT = 3; |
+ static const int FIRST_EVENT = READ_EVENT; |
+ static const int LAST_EVENT = CLOSED_EVENT; |
+ static const int EVENT_COUNT = LAST_EVENT - FIRST_EVENT + 1; |
+ |
+ static const int CLOSE_COMMAND = 8; |
+ static const int SHUTDOWN_READ_COMMAND = 9; |
+ static const int SHUTDOWN_WRITE_COMMAND = 10; |
+ static const int FIRST_COMMAND = CLOSE_COMMAND; |
+ static const int LAST_COMMAND = SHUTDOWN_WRITE_COMMAND; |
+ |
+ // Type flag send to the eventhandler providing additional |
+ // information on the type of the file descriptor. |
+ static const int LISTENING_SOCKET = 16; |
+ static const int PIPE_SOCKET = 17; |
+ static const int TYPE_NORMAL_SOCKET = 0; |
+ static const int TYPE_LISTENING_SOCKET = 1 << LISTENING_SOCKET; |
+ static const int TYPE_PIPE = 1 << PIPE_SOCKET; |
+ |
+ // Native port messages. |
+ static const HOST_NAME_LOOKUP = 0; |
+ |
+ // Socket close state |
+ bool isClosed = false; |
+ bool isClosedRead = false; |
+ bool isClosedWrite = false; |
+ Completer closeCompleter = new Completer(); |
+ |
+ // Handlers and receive port for socket events from the event handler. |
+ int eventMask = 0; |
+ List eventHandlers; |
+ ReceivePort eventPort; |
+ |
+ // Indicates if native interrupts can be activated. |
+ bool canActivateEvents = true; |
+ |
+ // The type flags for this socket. |
+ final int typeFlags; |
+ |
+ // Holds the port of the socket, null if not known. |
+ int localPort; |
+ |
+ // Native port for socket services. |
+ static SendPort socketService; |
+ |
+ static Future<_NativeSocket> connect(String host, int port) { |
+ var completer = new Completer(); |
+ ensureSocketService(); |
+ socketService.call([HOST_NAME_LOOKUP, host]).then((response) { |
+ if (isErrorResponse(response)) { |
+ completer.completeError( |
+ createError(response, "Failed host name lookup")); |
+ } else { |
+ var socket = new _NativeSocket.normal(); |
+ var result = socket.nativeCreateConnect(response, port); |
+ if (result is OSError) { |
+ completer.completeError(createError(result, "Connection failed")); |
+ } else { |
+ // Setup handlers for receiving the first write event which |
+ // indicate that the socket is fully connected. |
+ socket.setHandlers( |
+ write: () { |
+ socket.setListening(read: false, write: false); |
+ completer.complete(socket); |
+ }, |
+ error: (e) { |
+ socket.close(); |
+ completer.completeError(createError(e, "Connection failed")); |
+ } |
+ ); |
+ socket.setListening(read: false, write: true); |
+ } |
+ } |
+ }); |
+ return completer.future; |
+ } |
+ |
+ static Future<_NativeSocket> bind(String address, |
+ int port, |
+ int backlog) { |
+ var socket = new _NativeSocket.listen(); |
+ var result = socket.nativeCreateBindListen(address, port, backlog); |
+ if (result is OSError) { |
+ return new Future.immediateError( |
+ new SocketIOException("Failed to create server socket", result)); |
+ } |
+ if (port != 0) socket.localPort = port; |
+ return new Future.immediate(socket); |
+ } |
+ |
+ _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET { |
+ eventHandlers = new List.fixedLength(EVENT_COUNT + 1); |
+ _EventHandler._start(); |
+ } |
+ |
+ _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET { |
+ eventHandlers = new List.fixedLength(EVENT_COUNT + 1); |
+ _EventHandler._start(); |
+ } |
+ |
+ _NativeSocket.pipe() : typeFlags = TYPE_PIPE { |
+ eventHandlers = new List.fixedLength(EVENT_COUNT + 1); |
_EventHandler._start(); |
- _hashCode = _nextHashCode; |
- _nextHashCode = (_nextHashCode + 1) & 0xFFFFFFF; |
+ } |
+ |
+ int available() { |
+ if (isClosed) return 0; |
+ var result = nativeAvailable(); |
+ if (result is OSError) { |
+ reportError(result, "Available failed"); |
+ return 0; |
+ } else { |
+ return result; |
+ } |
+ } |
+ |
+ List<int> read(int len) { |
+ if (len != null && len <= 0) { |
+ throw new ArgumentError("Illegal length $len"); |
+ } |
+ var result = nativeRead(len == null ? -1 : len); |
+ if (result is OSError) { |
+ reportError(result, "Read failed"); |
+ return null; |
+ } |
+ return result; |
+ } |
+ |
+ int write(List<int> buffer, int offset, int bytes) { |
+ if (buffer is! List) throw new ArgumentError(); |
+ if (offset == null) offset = 0; |
+ if (bytes == null) bytes = buffer.length; |
+ if (offset < 0) throw new RangeError.value(offset); |
+ if (bytes < 0) throw new RangeError.value(bytes); |
+ if ((offset + bytes) > buffer.length) { |
+ throw new RangeError.value(offset + bytes); |
+ } |
+ if (offset is! int || bytes is! int) { |
+ throw new ArgumentError("Invalid arguments to write on Socket"); |
+ } |
+ if (isClosed) return 0; |
+ if (bytes == 0) return 0; |
+ _BufferAndOffset bufferAndOffset = |
+ _ensureFastAndSerializableBuffer(buffer, offset, bytes); |
+ var result = |
+ nativeWrite(bufferAndOffset.buffer, bufferAndOffset.offset, bytes); |
+ if (result is OSError) { |
+ reportError(result, "Write failed"); |
+ result = 0; |
+ } |
+ return result; |
+ } |
+ |
+ _NativeSocket accept() { |
+ var socket = new _NativeSocket.normal(); |
+ if (nativeAccept(socket) != true) return null; |
+ return socket; |
+ } |
+ |
+ int get port { |
+ if (localPort != null) return localPort; |
+ return localPort = nativeGetPort(); |
+ } |
+ |
+ int get remotePort { |
+ return nativeGetRemotePeer()[1]; |
+ } |
+ |
+ String get remoteHost { |
+ return nativeGetRemotePeer()[0]; |
} |
// Multiplexes socket events to the socket handlers. |
- void _multiplex(int event_mask) { |
- _canActivateHandlers = false; |
- for (int i = _FIRST_EVENT; i <= _LAST_EVENT; i++) { |
- if (((event_mask & (1 << i)) != 0)) { |
- if ((i == _CLOSE_EVENT) && this is _Socket && !_closed) { |
- _closedRead = true; |
- if (_closedWrite) _close(); |
+ void multiplex(int events) { |
+ canActivateEvents = false; |
+ for (int i = FIRST_EVENT; i <= LAST_EVENT; i++) { |
+ if (((events & (1 << i)) != 0)) { |
+ if (i == CLOSED_EVENT && |
+ typeFlags != TYPE_LISTENING_SOCKET && |
+ !isClosed) { |
+ isClosedRead = true; |
} |
- var eventHandler = _handlerMap[i]; |
- if (eventHandler != null || i == _ERROR_EVENT) { |
+ var handler = eventHandlers[i]; |
+ assert(handler != null); |
+ if (i == WRITE_EVENT) { |
+ // If the event was disabled before we had a chance to fire the event, |
+ // discard it. If we register again, we'll get a new one. |
+ if ((eventMask & (1 << i)) == 0) continue; |
// Unregister the out handler before executing it. There is |
// no need to notify the eventhandler as handlers are |
// disabled while the event is handled. |
- if (i == _OUT_EVENT) _setHandler(i, null, notifyEventhandler: false); |
- |
- // Don't call the in handler if there is no data available |
- // after all. |
- if ((i == _IN_EVENT) && (this is _Socket) && (available() == 0)) { |
- continue; |
- } |
- if (i == _ERROR_EVENT) { |
- _reportError(_getError(), ""); |
- close(); |
- } else { |
- eventHandler(); |
- } |
+ eventMask &= ~(1 << i); |
+ } |
+ |
+ // Don't call the in handler if there is no data available |
+ // after all. |
+ if (i == READ_EVENT && |
+ typeFlags != TYPE_LISTENING_SOCKET && |
+ available() == 0) { |
+ continue; |
+ } |
+ if (i == ERROR_EVENT) { |
+ reportError(nativeGetError(), ""); |
+ } else if (!isClosed) { |
+ handler(); |
} |
} |
} |
- _canActivateHandlers = true; |
- _activateHandlers(); |
+ if (isClosedRead && isClosedWrite) close(); |
+ canActivateEvents = true; |
+ activateHandlers(); |
} |
- void _setHandler(int event, |
- Function callback, |
- {bool notifyEventhandler: true}) { |
- if (callback == null) { |
- _handlerMask &= ~(1 << event); |
- } else { |
- _handlerMask |= (1 << event); |
- } |
- _handlerMap[event] = callback; |
- // If the socket is only for writing then close the receive port |
- // when not waiting for any events. |
- if (this is _Socket && |
- _closedRead && |
- _handlerMask == 0 && |
- _handler != null) { |
- _handler.close(); |
- _handler = null; |
- } else { |
- if (notifyEventhandler) _activateHandlers(); |
- } |
+ void setHandlers({read: null, write: null, error: null, closed: null}) { |
+ eventHandlers[READ_EVENT] = read; |
+ eventHandlers[WRITE_EVENT] = write; |
+ eventHandlers[ERROR_EVENT] = error; |
+ eventHandlers[CLOSED_EVENT] = closed; |
} |
- OSError _getError() native "Socket_GetError"; |
- |
- int _getPort() native "Socket_GetPort"; |
- |
- void set onError(void callback(e)) { |
- _setHandler(_ERROR_EVENT, callback); |
+ void setListening({read: true, write: true}) { |
+ eventMask = (1 << CLOSED_EVENT) | (1 << ERROR_EVENT); |
+ if (read) eventMask |= (1 << READ_EVENT); |
+ if (write) eventMask |= (1 << WRITE_EVENT); |
+ activateHandlers(); |
} |
- void _activateHandlers() { |
- if (_canActivateHandlers && !_closed) { |
- if (_handlerMask == 0) { |
- if (_handler != null) { |
- _handler.close(); |
- _handler = null; |
- } |
- return; |
- } |
- int data = _handlerMask; |
- if (_isListenSocket()) { |
- data |= (1 << _LISTENING_SOCKET); |
+ Future get closeFuture => closeCompleter.future; |
+ |
+ void activateHandlers() { |
+ if (canActivateEvents && !isClosed) { |
+ // If we don't listen for either read or write, disconnect as we won't |
+ // get close and error events anyway. |
+ if ((eventMask & ((1 << READ_EVENT) | (1 << WRITE_EVENT))) == 0) { |
+ if (eventPort != null) disconnectFromEventHandler(); |
} else { |
- if (_closedRead) { data &= ~(1 << _IN_EVENT); } |
- if (_closedWrite) { data &= ~(1 << _OUT_EVENT); } |
- if (_isPipe()) data |= (1 << _PIPE); |
+ int data = eventMask; |
+ data |= typeFlags; |
+ if (isClosedRead) data &= ~(1 << READ_EVENT); |
+ if (isClosedWrite) data &= ~(1 << WRITE_EVENT); |
+ sendToEventHandler(data); |
} |
- _sendToEventHandler(data); |
} |
} |
- int get port { |
- if (_port == null) { |
- _port = _getPort(); |
+ void close() { |
+ if (!isClosed) { |
+ sendToEventHandler(1 << CLOSE_COMMAND); |
+ isClosed = true; |
+ closeCompleter.complete(this); |
} |
- return _port; |
+ // Outside the if support closing sockets created but never |
+ // assigned any actual socket. |
+ disconnectFromEventHandler(); |
} |
- void close([bool halfClose = false]) { |
- if (!_closed) { |
- if (halfClose) { |
- _closeWrite(); |
- } else { |
- _close(); |
+ void shutdown(SocketDirection direction) { |
+ if (!isClosed) { |
+ switch (direction) { |
+ case SocketDirection.RECEIVE: |
+ shutdownRead(); |
+ break; |
+ case SocketDirection.SEND: |
+ shutdownWrite(); |
+ break; |
+ case SocketDirection.BOTH: |
+ close(); |
+ break; |
+ default: |
+ throw new ArgumentError(direction); |
} |
- } else if (_handler != null) { |
- // This is to support closing sockets created but never assigned |
- // any actual socket. |
- _handler.close(); |
- _handler = null; |
} |
} |
- void _closeWrite() { |
- if (!_closed) { |
- if (_closedRead) { |
- _close(); |
+ void shutdownWrite() { |
+ if (!isClosed) { |
+ if (isClosedRead) { |
+ close(); |
} else { |
- _sendToEventHandler(1 << _SHUTDOWN_WRITE_COMMAND); |
+ sendToEventHandler(1 << SHUTDOWN_WRITE_COMMAND); |
} |
- _closedWrite = true; |
+ isClosedWrite = true; |
} |
} |
- void _closeRead() { |
- if (!_closed) { |
- if (_closedWrite) { |
- _close(); |
+ void shutdownRead() { |
+ if (!isClosed) { |
+ if (isClosedWrite) { |
+ close(); |
} else { |
- _sendToEventHandler(1 << _SHUTDOWN_READ_COMMAND); |
+ sendToEventHandler(1 << SHUTDOWN_READ_COMMAND); |
} |
- _closedRead = true; |
+ isClosedRead = true; |
} |
} |
- void _close() { |
- if (!_closed) { |
- _sendToEventHandler(1 << _CLOSE_COMMAND); |
- _handler.close(); |
- _handler = null; |
- _closed = true; |
+ void sendToEventHandler(int data) { |
+ connectToEventHandler(); |
+ assert(!isClosed); |
+ _EventHandler._sendData(this, eventPort, data); |
+ } |
+ |
+ void connectToEventHandler() { |
+ if (eventPort == null) { |
+ eventPort = new ReceivePort(); |
+ eventPort.receive ((var message, _) => multiplex(message)); |
} |
} |
- void _sendToEventHandler(int data) { |
- if (_handler == null) { |
- _handler = new ReceivePort(); |
- _handler.receive((var message, ignored) { _multiplex(message); }); |
+ void disconnectFromEventHandler() { |
+ if (eventPort != null) { |
+ eventPort.close(); |
+ eventPort = null; |
} |
- assert(!_closed); |
- _EventHandler._sendData(this, _handler, data); |
} |
- bool _reportError(error, String message) { |
- void doReportError(Exception e) { |
- // Invoke the socket error callback if any. |
- bool reported = false; |
- if (_handlerMap[_ERROR_EVENT] != null) { |
- _handlerMap[_ERROR_EVENT](e); |
- reported = true; |
- } |
- // Propagate the error to any additional listeners. |
- reported = reported || _propagateError(e); |
- if (!reported) throw e; |
+ static void ensureSocketService() { |
+ if (socketService == null) { |
+ socketService = _NativeSocket.newServicePort(); |
} |
+ } |
- // For all errors we close the socket, call the error handler and |
- // disable further calls of the error handler. |
- close(); |
+ // Check whether this is an error response from a native port call. |
+ static bool isErrorResponse(response) { |
+ return response is List && response[0] != _SUCCESS_RESPONSE; |
+ } |
+ |
+ // Create the appropriate error/exception from different returned |
+ // error objects. |
+ static createError(error, String message) { |
if (error is OSError) { |
- doReportError(new SocketIOException(message, error)); |
+ return new SocketIOException(message, error); |
} else if (error is List) { |
- assert(_isErrorResponse(error)); |
+ assert(isErrorResponse(error)); |
switch (error[0]) { |
case _ILLEGAL_ARGUMENT_RESPONSE: |
- doReportError(new ArgumentError()); |
- break; |
+ return new ArgumentError(); |
case _OSERROR_RESPONSE: |
- doReportError(new SocketIOException( |
- message, new OSError(error[2], error[1]))); |
- break; |
+ return new SocketIOException( |
+ message, new OSError(error[2], error[1])); |
default: |
- doReportError(new Exception("Unknown error")); |
- break; |
+ return new Exception("Unknown error"); |
} |
} else { |
- doReportError(new SocketIOException(message)); |
+ return new SocketIOException(message); |
} |
} |
- int get hashCode => _hashCode; |
+ void reportError(error, String message) { |
+ var e = createError(error, message); |
+ // Invoke the error handler if any. |
+ if (eventHandlers[ERROR_EVENT] != null) { |
+ eventHandlers[ERROR_EVENT](e); |
+ } |
+ // For all errors we close the socket |
+ close(); |
+ } |
- bool _propagateError(Exception e) => false; |
+ nativeAvailable() native "Socket_Available"; |
+ nativeRead(int len) native "Socket_Read"; |
+ nativeWrite(List<int> buffer, int offset, int bytes) |
+ native "Socket_WriteList"; |
+ bool nativeCreateConnect(String host, int port) native "Socket_CreateConnect"; |
+ nativeCreateBindListen(String address, int port, int backlog) |
+ native "ServerSocket_CreateBindListen"; |
+ nativeAccept(_NativeSocket socket) native "ServerSocket_Accept"; |
+ int nativeGetPort() native "Socket_GetPort"; |
+ List nativeGetRemotePeer() native "Socket_GetRemotePeer"; |
+ OSError nativeGetError() native "Socket_GetError"; |
+ |
+ static SendPort newServicePort() native "Socket_NewServicePort"; |
+} |
- bool _isListenSocket(); |
- bool _isPipe(); |
- // Is this socket closed. |
- bool _closed; |
+class _RawServerSocket extends Stream<RawSocket> |
+ implements RawServerSocket { |
+ final _NativeSocket _socket; |
+ StreamController<RawSocket> _controller; |
+ |
+ static Future<_RawServerSocket> bind(String address, |
+ int port, |
+ int backlog) { |
+ if (port < 0 || port > 0xFFFF) |
+ throw new ArgumentError("Invalid port $port"); |
+ if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog"); |
+ return _NativeSocket.bind(address, port, backlog) |
+ .then((socket) => new _RawServerSocket(socket)); |
+ } |
+ |
+ _RawServerSocket(this._socket) { |
+ _controller = new StreamController( |
+ onSubscriptionStateChange: _onSubscriptionStateChange, |
+ onPauseStateChange: _onPauseStateChange); |
+ _socket.closeFuture.then((_) => _controller.close()); |
+ _socket.setHandlers( |
+ read: () { |
+ var socket = _socket.accept(); |
+ if (socket != null) _controller.add(new _RawSocket(socket)); |
+ }, |
+ error: (e) { |
+ _controller.signalError(new AsyncError(e)); |
+ _controller.close(); |
+ } |
+ ); |
+ } |
- // Dedicated ReceivePort for socket events. |
- ReceivePort _handler; |
+ StreamSubscription<RawSocket> listen(void onData(RawSocket event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ return _controller.stream.listen( |
+ onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
+ } |
- // Poll event to handler map. |
- List _handlerMap; |
+ int get port => _socket.port; |
- // Indicates for which poll events the socket registered handlers. |
- int _handlerMask; |
+ void close() => _socket.close(); |
- // Indicates if native interrupts can be activated. |
- bool _canActivateHandlers; |
+ void _pause() { |
+ _socket.setListening(read: false, write: false); |
+ } |
- // Holds the port of the socket, null if not known. |
- int _port; |
+ void _resume() { |
+ _socket.setListening(read: true, write: false); |
+ } |
- // Hash code for the socket. Currently this is just a counter. |
- int _hashCode; |
- static int _nextHashCode = 0; |
- bool _closedRead = false; |
- bool _closedWrite = false; |
+ void _onSubscriptionStateChange() { |
+ if (_controller.hasSubscribers) { |
+ _resume(); |
+ } else { |
+ close(); |
+ } |
+ } |
+ void _onPauseStateChange() { |
+ if (_controller.isPaused) { |
+ _pause(); |
+ } else { |
+ _resume(); |
+ } |
+ } |
} |
-class _ServerSocket extends _SocketBase implements ServerSocket { |
- // Constructor for server socket. First a socket object is allocated |
- // in which the native socket is stored. After that _createBind |
- // is called which creates a file descriptor and binds the given address |
- // and port to the socket. Null is returned if file descriptor creation or |
- // bind failed. |
- factory _ServerSocket(String bindAddress, int port, int backlog) { |
- _ServerSocket socket = new _ServerSocket._internal(); |
- var result = socket._createBindListen(bindAddress, port, backlog); |
- if (result is OSError) { |
- socket.close(); |
- throw new SocketIOException("Failed to create server socket", result); |
+class _RawSocket extends Stream<RawSocketEvent> |
+ implements RawSocket { |
+ final _NativeSocket _socket; |
+ StreamController<RawSocketEvent> _controller; |
+ bool _readEventsEnabled = true; |
+ bool _writeEventsEnabled = true; |
+ |
+ static Future<RawSocket> connect(String host, int port) { |
+ return _NativeSocket.connect(host, port) |
+ .then((socket) => new _RawSocket(socket)); |
+ } |
+ |
+ _RawSocket(this._socket) { |
+ _controller = new StreamController( |
+ onSubscriptionStateChange: _onSubscriptionStateChange, |
+ onPauseStateChange: _onPauseStateChange); |
+ _socket.closeFuture.then((_) => _controller.close()); |
+ _socket.setHandlers( |
+ read: () => _controller.add(RawSocketEvent.READ), |
+ write: () { |
+ // The write event handler is automatically disabled by the |
+ // event handler when it fires. |
+ _writeEventsEnabled = false; |
+ _controller.add(RawSocketEvent.WRITE); |
+ }, |
+ closed: () => _controller.add(RawSocketEvent.READ_CLOSED), |
+ error: (e) { |
+ _controller.signalError(new AsyncError(e)); |
+ close(); |
+ } |
+ ); |
+ } |
+ |
+ factory _RawSocket._writePipe(int fd) { |
+ var native = new _NativeSocket.pipe(); |
+ native.isClosedRead = true; |
+ if (fd != null) _getStdioHandle(native, fd); |
+ return new _RawSocket(native); |
+ } |
+ |
+ factory _RawSocket._readPipe(int fd) { |
+ var native = new _NativeSocket.pipe(); |
+ native.isClosedWrite = true; |
+ if (fd != null) _getStdioHandle(native, fd); |
+ return new _RawSocket(native); |
+ } |
+ |
+ StreamSubscription<RawSocketEvent> listen(void onData(RawSocketEvent event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ return _controller.stream.listen( |
+ onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
+ } |
+ |
+ int available() => _socket.available(); |
+ |
+ List<int> read([int len]) => _socket.read(len); |
+ |
+ int write(List<int> buffer, [int offset, int count]) => |
+ _socket.write(buffer, offset, count); |
+ |
+ void close() => _socket.close(); |
+ |
+ void shutdown(SocketDirection direction) => _socket.shutdown(direction); |
+ |
+ int get port => _socket.port; |
+ |
+ int get remotePort => _socket.remotePort; |
+ |
+ String get remoteHost => _socket.remoteHost; |
+ |
+ bool get readEventsEnabled => _readEventsEnabled; |
+ void set readEventsEnabled(bool value) { |
+ if (value != _readEventsEnabled) { |
+ _readEventsEnabled = value; |
+ if (!_controller.isPaused) _resume(); |
} |
- socket._closed = false; |
- assert(result); |
- if (port != 0) { |
- socket._port = port; |
+ } |
+ |
+ bool get writeEventsEnabled => _writeEventsEnabled; |
+ void set writeEventsEnabled(bool value) { |
+ if (value != _writeEventsEnabled) { |
+ _writeEventsEnabled = value; |
+ if (!_controller.isPaused) _resume(); |
} |
- return socket; |
} |
- _ServerSocket._internal(); |
+ _pause() { |
+ _socket.setListening(read: false, write: false); |
+ } |
- _accept(Socket socket) native "ServerSocket_Accept"; |
+ void _resume() { |
+ _socket.setListening(read: _readEventsEnabled, write: _writeEventsEnabled); |
+ } |
- _createBindListen(String bindAddress, int port, int backlog) |
- native "ServerSocket_CreateBindListen"; |
+ void _onPauseStateChange() { |
+ if (_controller.isPaused) { |
+ _pause(); |
+ } else { |
+ _resume(); |
+ } |
+ } |
- void set onConnection(void callback(Socket connection)) { |
- _clientConnectionHandler = callback; |
- _setHandler(_SocketBase._IN_EVENT, |
- _clientConnectionHandler != null ? _connectionHandler : null); |
- } |
- |
- void _connectionHandler() { |
- if (!_closed) { |
- _Socket socket = new _Socket._internal(); |
- var result = _accept(socket); |
- if (result is OSError) { |
- _reportError(result, "Accept failed"); |
- } else if (result) { |
- socket._closed = false; |
- _clientConnectionHandler(socket); |
- } else { |
- // Temporary failure accepting the connection. Ignoring |
- // temporary failures lets us retry when we wake up with data |
- // on the listening socket again. |
- } |
+ void _onSubscriptionStateChange() { |
+ if (_controller.hasSubscribers) { |
+ _resume(); |
+ } else { |
+ close(); |
} |
} |
+} |
- bool _isListenSocket() => true; |
- bool _isPipe() => false; |
- var _clientConnectionHandler; |
+patch class ServerSocket { |
+ /* patch */ static Future<ServerSocket> bind([String address = "127.0.0.1", |
+ int port = 0, |
+ int backlog = 0]) { |
+ return _ServerSocket.bind(address, port, backlog); |
+ } |
} |
+class _ServerSocket extends Stream<Socket> |
+ implements ServerSocket { |
+ final _socket; |
-class _Socket extends _SocketBase implements Socket { |
- static const HOST_NAME_LOOKUP = 0; |
- |
- // Constructs a new socket. During the construction an asynchronous |
- // host name lookup is initiated. The returned socket is not yet |
- // connected but ready for registration of callbacks. |
- factory _Socket(String host, int port) { |
- Socket socket = new _Socket._internal(); |
- _ensureSocketService(); |
- List request = new List.fixedLength(2); |
- request[0] = HOST_NAME_LOOKUP; |
- request[1] = host; |
- _socketService.call(request).then((response) { |
- if (socket._isErrorResponse(response)) { |
- socket._reportError(response, "Failed host name lookup"); |
- } else{ |
- var result = socket._createConnect(response, port); |
- if (result is OSError) { |
- socket.close(); |
- socket._reportError(result, "Connection failed"); |
- } else { |
- socket._closed = false; |
- socket._activateHandlers(); |
- } |
- } |
- }); |
- return socket; |
+ static Future<_ServerSocket> bind(String address, |
+ int port, |
+ int backlog) { |
+ return _RawServerSocket.bind(address, port, backlog) |
+ .then((socket) => new _ServerSocket(socket)); |
} |
- _Socket._internal(); |
- _Socket._internalReadOnly() : _pipe = true { super._closedWrite = true; } |
- _Socket._internalWriteOnly() : _pipe = true { super._closedRead = true; } |
+ _ServerSocket(this._socket); |
- int available() { |
- if (!_closed) { |
- var result = _available(); |
- if (result is OSError) { |
- _reportError(result, "Available failed"); |
- return 0; |
- } else { |
- return result; |
- } |
- } |
- throw new |
- SocketIOException("Error: available failed - invalid socket handle"); |
+ StreamSubscription<Socket> listen(void onData(Socket event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ return _socket.map((rawSocket) => new _Socket(rawSocket)).listen( |
+ onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
} |
- _available() native "Socket_Available"; |
+ int get port => _socket.port; |
- List<int> read([int len]) { |
- if (len != null && len <= 0) { |
- throw new SocketIOException("Illegal length $len"); |
- } |
- var result = _read(len == null ? -1 : len); |
- if (result is OSError) { |
- _reportError(result, "Read failed"); |
- return null; |
- } |
- return result; |
+ void close() => _socket.close(); |
+} |
+ |
+ |
+patch class Socket { |
+ /* patch */ static Future<Socket> connect(String host, int port) { |
+ return RawSocket.connect(host, port).then( |
+ (socket) => new _Socket(socket)); |
} |
+} |
- _read(int len) native "Socket_Read"; |
- int readList(List<int> buffer, int offset, int bytes) { |
- if (!_closed) { |
- if (bytes == 0) { |
- return 0; |
- } |
- if (offset < 0) { |
- throw new RangeError.value(offset); |
- } |
- if (bytes < 0) { |
- throw new RangeError.value(bytes); |
- } |
- if ((offset + bytes) > buffer.length) { |
- throw new RangeError.value(offset + bytes); |
- } |
- var result = _readList(buffer, offset, bytes); |
- if (result is OSError) { |
- _reportError(result, "Read failed"); |
- return -1; |
+patch class SecureSocket { |
+ /* patch */ factory SecureSocket._(RawSecureSocket rawSocket) => |
+ new _SecureSocket(rawSocket); |
+} |
+ |
+ |
+class _SocketStreamConsumer extends StreamConsumer<List<int>, Socket> { |
+ StreamSubscription subscription; |
+ final _Socket socket; |
+ int offset; |
+ List<int> buffer; |
+ bool paused = false; |
+ |
+ _SocketStreamConsumer(this.socket); |
+ |
+ Future<Socket> consume(Stream<List<int>> stream) { |
+ subscription = stream.listen( |
+ (data) { |
+ assert(!paused); |
+ assert(buffer == null); |
+ buffer = data; |
+ offset = 0; |
+ write(); |
+ }, |
+ onDone: () { |
+ socket._consumerDone(); |
+ }); |
+ return socket._doneFuture; |
+ } |
+ |
+ void write() { |
+ try { |
+ if (subscription == null) return; |
+ assert(buffer != null); |
+ // Write as much as possible. |
+ offset += socket._write(buffer, offset, buffer.length - offset); |
+ if (offset < buffer.length) { |
+ if (!paused) { |
+ paused = true; |
+ // TODO(ajohnsen): It would be nice to avoid this check. |
+ // Some info: socket._write can emit an event, if it fails to write. |
+ // If the user closes the socket in that event, stop() will be called |
+ // before we get a change to pause. |
+ if (subscription == null) return; |
+ subscription.pause(); |
+ } |
+ socket._enableWriteEvent(); |
+ } else { |
+ buffer = null; |
+ if (paused) { |
+ paused = false; |
+ subscription.resume(); |
+ } |
} |
- return result; |
+ } catch (e) { |
+ socket._consumerDone(e); |
} |
- throw new |
- SocketIOException("Error: readList failed - invalid socket handle"); |
} |
- _readList(List<int> buffer, int offset, int bytes) native "Socket_ReadList"; |
+ void stop() { |
+ if (subscription == null) return; |
+ subscription.cancel(); |
+ subscription = null; |
+ socket._disableWriteEvent(); |
+ } |
+} |
- int writeList(List<int> buffer, int offset, int bytes) { |
- if (buffer is! List || offset is! int || bytes is! int) { |
- throw new ArgumentError( |
- "Invalid arguments to writeList on Socket"); |
- } |
- if (!_closed) { |
- if (bytes == 0) { |
- return 0; |
- } |
- if (offset < 0) { |
- throw new RangeError.value(offset); |
- } |
- if (bytes < 0) { |
- throw new RangeError.value(bytes); |
- } |
- if ((offset + bytes) > buffer.length) { |
- throw new RangeError.value(offset + bytes); |
- } |
- _BufferAndOffset bufferAndOffset = |
- _ensureFastAndSerializableBuffer(buffer, offset, bytes); |
- var result = |
- _writeList(bufferAndOffset.buffer, bufferAndOffset.offset, bytes); |
- if (result is OSError) { |
- _reportError(result, "Write failed"); |
- // If writing fails we return 0 as the number of bytes and |
- // report the error on the error handler. |
- result = 0; |
- } |
- return result; |
- } |
- throw new SocketIOException("writeList failed - invalid socket handle"); |
+ |
+class _Socket extends Stream<List<int>> implements Socket { |
+ RawSocket _raw; // Set to null when the raw socket is closed. |
+ bool _closed = false; // Set to true when the raw socket is closed. |
+ StreamController _controller; |
+ bool _controllerClosed = false; |
+ _SocketStreamConsumer _consumer; |
+ IOSink<Socket> _sink; |
+ Completer _doneCompleter; |
+ var _subscription; |
+ |
+ _Socket(RawSocket this._raw) { |
+ _controller = new StreamController<List<int>>( |
+ onSubscriptionStateChange: _onSubscriptionStateChange, |
+ onPauseStateChange: _onPauseStateChange); |
+ _consumer = new _SocketStreamConsumer(this); |
+ _sink = new IOSink(_consumer); |
+ |
+ // Disable read events until there is a subscription. |
+ _raw.readEventsEnabled = false; |
+ |
+ // Disable write events until the consumer needs it for pending writes. |
+ _raw.writeEventsEnabled = false; |
} |
- _writeList(List<int> buffer, int offset, int bytes) native "Socket_WriteList"; |
+ factory _Socket._writePipe([int fd]) { |
+ return new _Socket(new _RawSocket._writePipe(fd)); |
+ } |
- bool _isErrorResponse(response) { |
- return response is List && response[0] != _SUCCESS_RESPONSE; |
+ factory _Socket._readPipe([int fd]) { |
+ return new _Socket(new _RawSocket._readPipe(fd)); |
} |
- bool _createConnect(String host, int port) native "Socket_CreateConnect"; |
+ _NativeSocket get _nativeSocket => _raw._socket; |
- void set onWrite(void callback()) { |
- if (_outputStream != null) throw new StreamException( |
- "Cannot set write handler when output stream is used"); |
- _clientWriteHandler = callback; |
- _updateOutHandler(); |
+ StreamSubscription<List<int>> listen(void onData(List<int> event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ return _controller.stream.listen( |
+ onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
} |
- void set onConnect(void callback()) { |
- if (_seenFirstOutEvent) { |
- throw new StreamException( |
- "Cannot set connect handler when already connected"); |
- } |
- _clientConnectHandler = callback; |
- _updateOutHandler(); |
+ Future<Socket> consume(Stream<List<int>> stream) { |
+ return _sink.consume(stream); |
} |
- void set onData(void callback()) { |
- if (_inputStream != null) throw new StreamException( |
- "Cannot set data handler when input stream is used"); |
- _onData = callback; |
+ Future<Socket> addStream(Stream<List<int>> stream) { |
+ return _sink.addStream(stream); |
} |
- void set onClosed(void callback()) { |
- if (_inputStream != null) throw new StreamException( |
- "Cannot set close handler when input stream is used"); |
- _onClosed = callback; |
+ void add(List<int> data) { |
+ return _sink.add(data); |
} |
- bool _isListenSocket() => false; |
+ void addString(String string, [Encoding encoding = Encoding.UTF_8]) { |
+ return _sink.addString(string, encoding); |
+ } |
- bool _isPipe() => _pipe; |
+ close() => _sink.close(); |
- InputStream get inputStream { |
- if (_inputStream == null) { |
- if (_handlerMap[_SocketBase._IN_EVENT] != null || |
- _handlerMap[_SocketBase._CLOSE_EVENT] != null) { |
- throw new StreamException( |
- "Cannot get input stream when socket handlers are used"); |
- } |
- _inputStream = new _SocketInputStream(this); |
- } |
- return _inputStream; |
+ Future<Socket> get done => _sink.done; |
+ |
+ void destroy() { |
+ // Destroy can always be called to get rid of a socket. |
+ if (_raw == null) return; |
+ _closeRawSocket(); |
+ _consumer.stop(); |
+ _controllerClosed = true; |
+ _controller.close(); |
} |
- OutputStream get outputStream { |
- if (_outputStream == null) { |
- if (_clientWriteHandler != null) { |
- throw new StreamException( |
- "Cannot get output stream when socket handlers are used"); |
- } |
- _outputStream = new _SocketOutputStream(this); |
+ int get port => _raw.port; |
+ String get remoteHost => _raw.remoteHost; |
+ int get remotePort => _raw.remotePort; |
+ |
+ // Ensure a subscription on the raw socket. Both the stream and the |
+ // consumer needs a subscription as they share the error and done |
+ // events from the raw socket. |
+ void _ensureRawSocketSubscription() { |
+ if (_subscription == null) { |
+ _subscription = _raw.listen(_onData, |
+ onError: _onError, |
+ onDone: _onDone, |
+ unsubscribeOnError: true); |
} |
- return _outputStream; |
} |
- void set _onWrite(void callback()) { |
- _setHandler(_SocketBase._OUT_EVENT, callback); |
+ _closeRawSocket() { |
+ var tmp = _raw; |
+ _raw = null; |
+ _closed = true; |
+ tmp.close(); |
} |
- void set _onData(void callback()) { |
- _setHandler(_SocketBase._IN_EVENT, callback); |
+ void _onSubscriptionStateChange() { |
+ if (_controller.hasSubscribers) { |
+ _ensureRawSocketSubscription(); |
+ // Enable read events for providing data to subscription. |
+ if (_raw != null) { |
+ _raw.readEventsEnabled = true; |
+ } |
+ } else { |
+ _controllerClosed = true; |
+ if (_raw != null) { |
+ _raw.shutdown(SocketDirection.RECEIVE); |
+ } |
+ } |
} |
- void set _onClosed(void callback()) { |
- _setHandler(_SocketBase._CLOSE_EVENT, callback); |
+ void _onPauseStateChange() { |
+ if (_raw != null) { |
+ _raw.readEventsEnabled = !_controller.isPaused; |
+ } |
} |
- bool _propagateError(Exception e) { |
- bool reported = false; |
- if (_inputStream != null) { |
- reported = reported || _inputStream._onSocketError(e); |
+ void _onData(event) { |
+ switch (event) { |
+ case RawSocketEvent.READ: |
+ _controller.add(_raw.read()); |
+ break; |
+ case RawSocketEvent.WRITE: |
+ _consumer.write(); |
+ break; |
+ case RawSocketEvent.READ_CLOSED: |
+ _controllerClosed = true; |
+ _controller.close(); |
+ break; |
} |
- if (_outputStream != null) { |
- reported = reported || _outputStream._onSocketError(e); |
- } |
- return reported; |
} |
- void _updateOutHandler() { |
- void firstWriteHandler() { |
- assert(!_seenFirstOutEvent); |
- _seenFirstOutEvent = true; |
- |
- // From now on the write handler is only the client write |
- // handler (connect handler cannot be called again). Change this |
- // before calling any handlers as handlers can change the |
- // handlers. |
- if (_clientWriteHandler == null) _onWrite = _clientWriteHandler; |
+ void _onDone() { |
+ if (!_controllerClosed) { |
+ _controllerClosed = true; |
+ _controller.close(); |
+ } |
+ _done(); |
+ } |
- // First out event is socket connected event. |
- if (_clientConnectHandler != null) _clientConnectHandler(); |
- _clientConnectHandler = null; |
+ void _onError(error) { |
+ if (!_controllerClosed) { |
+ _controllerClosed = true; |
+ _controller.signalError(error); |
+ _controller.close(); |
+ } |
+ _done(error); |
+ } |
- // Always (even for the first out event) call the write handler. |
- if (_clientWriteHandler != null) _clientWriteHandler(); |
+ get _doneFuture { |
+ if (_doneCompleter == null) { |
+ _ensureRawSocketSubscription(); |
+ _doneCompleter = new Completer(); |
} |
+ return _doneCompleter.future; |
+ } |
- if (_clientConnectHandler == null && _clientWriteHandler == null) { |
- _onWrite = null; |
- } else { |
- if (_seenFirstOutEvent) { |
- _onWrite = _clientWriteHandler; |
+ void _done([error]) { |
+ if (_doneCompleter != null) { |
+ var tmp = _doneCompleter; |
+ _doneCompleter = null; |
+ if (error != null) { |
+ tmp.completeError(error); |
} else { |
- _onWrite = firstWriteHandler; |
+ tmp.complete(this); |
} |
} |
} |
- int get remotePort { |
- if (_remotePort == null) { |
- remoteHost; |
+ int _write(List<int> data, int offset, int length) => |
+ _raw.write(data, offset, length); |
+ |
+ void _enableWriteEvent() { |
+ _raw.writeEventsEnabled = true; |
+ } |
+ |
+ void _disableWriteEvent() { |
+ if (_raw != null) { |
+ _raw.writeEventsEnabled = false; |
} |
- return _remotePort; |
} |
- String get remoteHost { |
- if (_remoteHost == null) { |
- List peer = _getRemotePeer(); |
- _remoteHost = peer[0]; |
- _remotePort = peer[1]; |
+ void _consumerDone([error]) { |
+ if (_raw != null) { |
+ _raw.shutdown(SocketDirection.SEND); |
+ _disableWriteEvent(); |
} |
- return _remoteHost; |
+ _done(error); |
} |
+} |
- List _getRemotePeer() native "Socket_GetRemotePeer"; |
- static SendPort _newServicePort() native "Socket_NewServicePort"; |
+class _SecureSocket extends _Socket implements SecureSocket { |
+ _SecureSocket(RawSecureSocket raw) : super(raw); |
- static void _ensureSocketService() { |
- if (_socketService == null) { |
- _socketService = _Socket._newServicePort(); |
+ void set onBadCertificate(bool callback(X509Certificate certificate)) { |
+ if (_raw == null) { |
+ throw new StateError("onBadCertificate called on destroyed SecureSocket"); |
} |
+ _raw.onBadCertificate = callback; |
} |
- bool _seenFirstOutEvent = false; |
- bool _pipe = false; |
- Function _clientConnectHandler; |
- Function _clientWriteHandler; |
- _SocketInputStream _inputStream; |
- _SocketOutputStream _outputStream; |
- String _remoteHost; |
- int _remotePort; |
- static SendPort _socketService; |
+ X509Certificate get peerCertificate { |
+ if (_raw == null) { |
+ throw new StateError("peerCertificate called on destroyed SecureSocket"); |
+ } |
+ return _raw.peerCertificate; |
+ } |
} |