Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(187)

Unified Diff: runtime/bin/socket_patch.dart

Issue 12316036: Merge IO v2 branch to bleeding edge (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebased to r18818 Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/bin/socket_macos.cc ('k') | runtime/bin/socket_win.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/bin/socket_patch.dart
diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart
index bc91a3272cf623e0f2f1e875e9720e5860e99fac..6382fc70dbd0d7a75f21d105752bd15b80383170 100644
--- a/runtime/bin/socket_patch.dart
+++ b/runtime/bin/socket_patch.dart
@@ -1,20 +1,25 @@
-// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-patch class ServerSocket {
- /* patch */ factory ServerSocket(String bindAddress, int port, int backlog) {
- return new _ServerSocket(bindAddress, port, backlog);
+patch class RawServerSocket {
+ /* patch */ static Future<RawServerSocket> bind([String address = "127.0.0.1",
+ int port = 0,
+ int backlog = 0]) {
+ return _RawServerSocket.bind(address, port, backlog);
}
}
-patch class Socket {
- /* patch */ factory Socket(String host, int port) => new _Socket(host, port);
+patch class RawSocket {
+ /* patch */ static Future<RawSocket> connect(String host, int port) {
+ return _RawSocket.connect(host, port);
+ }
}
-class _SocketBase extends NativeFieldWrapperClass1 {
+// The _NativeSocket class encapsulates an OS socket.
+class _NativeSocket extends NativeFieldWrapperClass1 {
// Bit flags used when communicating between the eventhandler and
// dart code. The EVENT flags are used to indicate events of
// interest when sending a message from dart code to the
@@ -24,584 +29,894 @@ class _SocketBase extends NativeFieldWrapperClass1 {
// eventhandler. COMMAND flags are never received from the
// eventhandler. Additional flags are used to communicate other
// information.
- static const int _IN_EVENT = 0;
- static const int _OUT_EVENT = 1;
- static const int _ERROR_EVENT = 2;
- static const int _CLOSE_EVENT = 3;
-
- static const int _CLOSE_COMMAND = 8;
- static const int _SHUTDOWN_READ_COMMAND = 9;
- static const int _SHUTDOWN_WRITE_COMMAND = 10;
-
- // Flag send to the eventhandler providing additional information on
- // the type of the file descriptor.
- static const int _LISTENING_SOCKET = 16;
- static const int _PIPE = 17;
-
- static const int _FIRST_EVENT = _IN_EVENT;
- static const int _LAST_EVENT = _CLOSE_EVENT;
-
- static const int _FIRST_COMMAND = _CLOSE_COMMAND;
- static const int _LAST_COMMAND = _SHUTDOWN_WRITE_COMMAND;
-
- _SocketBase () {
- _handlerMap = new List.fixedLength(_LAST_EVENT + 1);
- _handlerMask = 0;
- _canActivateHandlers = true;
- _closed = true;
+ static const int READ_EVENT = 0;
+ static const int WRITE_EVENT = 1;
+ static const int ERROR_EVENT = 2;
+ static const int CLOSED_EVENT = 3;
+ static const int FIRST_EVENT = READ_EVENT;
+ static const int LAST_EVENT = CLOSED_EVENT;
+ static const int EVENT_COUNT = LAST_EVENT - FIRST_EVENT + 1;
+
+ static const int CLOSE_COMMAND = 8;
+ static const int SHUTDOWN_READ_COMMAND = 9;
+ static const int SHUTDOWN_WRITE_COMMAND = 10;
+ static const int FIRST_COMMAND = CLOSE_COMMAND;
+ static const int LAST_COMMAND = SHUTDOWN_WRITE_COMMAND;
+
+ // Type flag send to the eventhandler providing additional
+ // information on the type of the file descriptor.
+ static const int LISTENING_SOCKET = 16;
+ static const int PIPE_SOCKET = 17;
+ static const int TYPE_NORMAL_SOCKET = 0;
+ static const int TYPE_LISTENING_SOCKET = 1 << LISTENING_SOCKET;
+ static const int TYPE_PIPE = 1 << PIPE_SOCKET;
+
+ // Native port messages.
+ static const HOST_NAME_LOOKUP = 0;
+
+ // Socket close state
+ bool isClosed = false;
+ bool isClosedRead = false;
+ bool isClosedWrite = false;
+ Completer closeCompleter = new Completer();
+
+ // Handlers and receive port for socket events from the event handler.
+ int eventMask = 0;
+ List eventHandlers;
+ ReceivePort eventPort;
+
+ // Indicates if native interrupts can be activated.
+ bool canActivateEvents = true;
+
+ // The type flags for this socket.
+ final int typeFlags;
+
+ // Holds the port of the socket, null if not known.
+ int localPort;
+
+ // Native port for socket services.
+ static SendPort socketService;
+
+ static Future<_NativeSocket> connect(String host, int port) {
+ var completer = new Completer();
+ ensureSocketService();
+ socketService.call([HOST_NAME_LOOKUP, host]).then((response) {
+ if (isErrorResponse(response)) {
+ completer.completeError(
+ createError(response, "Failed host name lookup"));
+ } else {
+ var socket = new _NativeSocket.normal();
+ var result = socket.nativeCreateConnect(response, port);
+ if (result is OSError) {
+ completer.completeError(createError(result, "Connection failed"));
+ } else {
+ // Setup handlers for receiving the first write event which
+ // indicate that the socket is fully connected.
+ socket.setHandlers(
+ write: () {
+ socket.setListening(read: false, write: false);
+ completer.complete(socket);
+ },
+ error: (e) {
+ socket.close();
+ completer.completeError(createError(e, "Connection failed"));
+ }
+ );
+ socket.setListening(read: false, write: true);
+ }
+ }
+ });
+ return completer.future;
+ }
+
+ static Future<_NativeSocket> bind(String address,
+ int port,
+ int backlog) {
+ var socket = new _NativeSocket.listen();
+ var result = socket.nativeCreateBindListen(address, port, backlog);
+ if (result is OSError) {
+ return new Future.immediateError(
+ new SocketIOException("Failed to create server socket", result));
+ }
+ if (port != 0) socket.localPort = port;
+ return new Future.immediate(socket);
+ }
+
+ _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET {
+ eventHandlers = new List.fixedLength(EVENT_COUNT + 1);
+ _EventHandler._start();
+ }
+
+ _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET {
+ eventHandlers = new List.fixedLength(EVENT_COUNT + 1);
+ _EventHandler._start();
+ }
+
+ _NativeSocket.pipe() : typeFlags = TYPE_PIPE {
+ eventHandlers = new List.fixedLength(EVENT_COUNT + 1);
_EventHandler._start();
- _hashCode = _nextHashCode;
- _nextHashCode = (_nextHashCode + 1) & 0xFFFFFFF;
+ }
+
+ int available() {
+ if (isClosed) return 0;
+ var result = nativeAvailable();
+ if (result is OSError) {
+ reportError(result, "Available failed");
+ return 0;
+ } else {
+ return result;
+ }
+ }
+
+ List<int> read(int len) {
+ if (len != null && len <= 0) {
+ throw new ArgumentError("Illegal length $len");
+ }
+ var result = nativeRead(len == null ? -1 : len);
+ if (result is OSError) {
+ reportError(result, "Read failed");
+ return null;
+ }
+ return result;
+ }
+
+ int write(List<int> buffer, int offset, int bytes) {
+ if (buffer is! List) throw new ArgumentError();
+ if (offset == null) offset = 0;
+ if (bytes == null) bytes = buffer.length;
+ if (offset < 0) throw new RangeError.value(offset);
+ if (bytes < 0) throw new RangeError.value(bytes);
+ if ((offset + bytes) > buffer.length) {
+ throw new RangeError.value(offset + bytes);
+ }
+ if (offset is! int || bytes is! int) {
+ throw new ArgumentError("Invalid arguments to write on Socket");
+ }
+ if (isClosed) return 0;
+ if (bytes == 0) return 0;
+ _BufferAndOffset bufferAndOffset =
+ _ensureFastAndSerializableBuffer(buffer, offset, bytes);
+ var result =
+ nativeWrite(bufferAndOffset.buffer, bufferAndOffset.offset, bytes);
+ if (result is OSError) {
+ reportError(result, "Write failed");
+ result = 0;
+ }
+ return result;
+ }
+
+ _NativeSocket accept() {
+ var socket = new _NativeSocket.normal();
+ if (nativeAccept(socket) != true) return null;
+ return socket;
+ }
+
+ int get port {
+ if (localPort != null) return localPort;
+ return localPort = nativeGetPort();
+ }
+
+ int get remotePort {
+ return nativeGetRemotePeer()[1];
+ }
+
+ String get remoteHost {
+ return nativeGetRemotePeer()[0];
}
// Multiplexes socket events to the socket handlers.
- void _multiplex(int event_mask) {
- _canActivateHandlers = false;
- for (int i = _FIRST_EVENT; i <= _LAST_EVENT; i++) {
- if (((event_mask & (1 << i)) != 0)) {
- if ((i == _CLOSE_EVENT) && this is _Socket && !_closed) {
- _closedRead = true;
- if (_closedWrite) _close();
+ void multiplex(int events) {
+ canActivateEvents = false;
+ for (int i = FIRST_EVENT; i <= LAST_EVENT; i++) {
+ if (((events & (1 << i)) != 0)) {
+ if (i == CLOSED_EVENT &&
+ typeFlags != TYPE_LISTENING_SOCKET &&
+ !isClosed) {
+ isClosedRead = true;
}
- var eventHandler = _handlerMap[i];
- if (eventHandler != null || i == _ERROR_EVENT) {
+ var handler = eventHandlers[i];
+ assert(handler != null);
+ if (i == WRITE_EVENT) {
+ // If the event was disabled before we had a chance to fire the event,
+ // discard it. If we register again, we'll get a new one.
+ if ((eventMask & (1 << i)) == 0) continue;
// Unregister the out handler before executing it. There is
// no need to notify the eventhandler as handlers are
// disabled while the event is handled.
- if (i == _OUT_EVENT) _setHandler(i, null, notifyEventhandler: false);
-
- // Don't call the in handler if there is no data available
- // after all.
- if ((i == _IN_EVENT) && (this is _Socket) && (available() == 0)) {
- continue;
- }
- if (i == _ERROR_EVENT) {
- _reportError(_getError(), "");
- close();
- } else {
- eventHandler();
- }
+ eventMask &= ~(1 << i);
+ }
+
+ // Don't call the in handler if there is no data available
+ // after all.
+ if (i == READ_EVENT &&
+ typeFlags != TYPE_LISTENING_SOCKET &&
+ available() == 0) {
+ continue;
+ }
+ if (i == ERROR_EVENT) {
+ reportError(nativeGetError(), "");
+ } else if (!isClosed) {
+ handler();
}
}
}
- _canActivateHandlers = true;
- _activateHandlers();
+ if (isClosedRead && isClosedWrite) close();
+ canActivateEvents = true;
+ activateHandlers();
}
- void _setHandler(int event,
- Function callback,
- {bool notifyEventhandler: true}) {
- if (callback == null) {
- _handlerMask &= ~(1 << event);
- } else {
- _handlerMask |= (1 << event);
- }
- _handlerMap[event] = callback;
- // If the socket is only for writing then close the receive port
- // when not waiting for any events.
- if (this is _Socket &&
- _closedRead &&
- _handlerMask == 0 &&
- _handler != null) {
- _handler.close();
- _handler = null;
- } else {
- if (notifyEventhandler) _activateHandlers();
- }
+ void setHandlers({read: null, write: null, error: null, closed: null}) {
+ eventHandlers[READ_EVENT] = read;
+ eventHandlers[WRITE_EVENT] = write;
+ eventHandlers[ERROR_EVENT] = error;
+ eventHandlers[CLOSED_EVENT] = closed;
}
- OSError _getError() native "Socket_GetError";
-
- int _getPort() native "Socket_GetPort";
-
- void set onError(void callback(e)) {
- _setHandler(_ERROR_EVENT, callback);
+ void setListening({read: true, write: true}) {
+ eventMask = (1 << CLOSED_EVENT) | (1 << ERROR_EVENT);
+ if (read) eventMask |= (1 << READ_EVENT);
+ if (write) eventMask |= (1 << WRITE_EVENT);
+ activateHandlers();
}
- void _activateHandlers() {
- if (_canActivateHandlers && !_closed) {
- if (_handlerMask == 0) {
- if (_handler != null) {
- _handler.close();
- _handler = null;
- }
- return;
- }
- int data = _handlerMask;
- if (_isListenSocket()) {
- data |= (1 << _LISTENING_SOCKET);
+ Future get closeFuture => closeCompleter.future;
+
+ void activateHandlers() {
+ if (canActivateEvents && !isClosed) {
+ // If we don't listen for either read or write, disconnect as we won't
+ // get close and error events anyway.
+ if ((eventMask & ((1 << READ_EVENT) | (1 << WRITE_EVENT))) == 0) {
+ if (eventPort != null) disconnectFromEventHandler();
} else {
- if (_closedRead) { data &= ~(1 << _IN_EVENT); }
- if (_closedWrite) { data &= ~(1 << _OUT_EVENT); }
- if (_isPipe()) data |= (1 << _PIPE);
+ int data = eventMask;
+ data |= typeFlags;
+ if (isClosedRead) data &= ~(1 << READ_EVENT);
+ if (isClosedWrite) data &= ~(1 << WRITE_EVENT);
+ sendToEventHandler(data);
}
- _sendToEventHandler(data);
}
}
- int get port {
- if (_port == null) {
- _port = _getPort();
+ void close() {
+ if (!isClosed) {
+ sendToEventHandler(1 << CLOSE_COMMAND);
+ isClosed = true;
+ closeCompleter.complete(this);
}
- return _port;
+ // Outside the if support closing sockets created but never
+ // assigned any actual socket.
+ disconnectFromEventHandler();
}
- void close([bool halfClose = false]) {
- if (!_closed) {
- if (halfClose) {
- _closeWrite();
- } else {
- _close();
+ void shutdown(SocketDirection direction) {
+ if (!isClosed) {
+ switch (direction) {
+ case SocketDirection.RECEIVE:
+ shutdownRead();
+ break;
+ case SocketDirection.SEND:
+ shutdownWrite();
+ break;
+ case SocketDirection.BOTH:
+ close();
+ break;
+ default:
+ throw new ArgumentError(direction);
}
- } else if (_handler != null) {
- // This is to support closing sockets created but never assigned
- // any actual socket.
- _handler.close();
- _handler = null;
}
}
- void _closeWrite() {
- if (!_closed) {
- if (_closedRead) {
- _close();
+ void shutdownWrite() {
+ if (!isClosed) {
+ if (isClosedRead) {
+ close();
} else {
- _sendToEventHandler(1 << _SHUTDOWN_WRITE_COMMAND);
+ sendToEventHandler(1 << SHUTDOWN_WRITE_COMMAND);
}
- _closedWrite = true;
+ isClosedWrite = true;
}
}
- void _closeRead() {
- if (!_closed) {
- if (_closedWrite) {
- _close();
+ void shutdownRead() {
+ if (!isClosed) {
+ if (isClosedWrite) {
+ close();
} else {
- _sendToEventHandler(1 << _SHUTDOWN_READ_COMMAND);
+ sendToEventHandler(1 << SHUTDOWN_READ_COMMAND);
}
- _closedRead = true;
+ isClosedRead = true;
}
}
- void _close() {
- if (!_closed) {
- _sendToEventHandler(1 << _CLOSE_COMMAND);
- _handler.close();
- _handler = null;
- _closed = true;
+ void sendToEventHandler(int data) {
+ connectToEventHandler();
+ assert(!isClosed);
+ _EventHandler._sendData(this, eventPort, data);
+ }
+
+ void connectToEventHandler() {
+ if (eventPort == null) {
+ eventPort = new ReceivePort();
+ eventPort.receive ((var message, _) => multiplex(message));
}
}
- void _sendToEventHandler(int data) {
- if (_handler == null) {
- _handler = new ReceivePort();
- _handler.receive((var message, ignored) { _multiplex(message); });
+ void disconnectFromEventHandler() {
+ if (eventPort != null) {
+ eventPort.close();
+ eventPort = null;
}
- assert(!_closed);
- _EventHandler._sendData(this, _handler, data);
}
- bool _reportError(error, String message) {
- void doReportError(Exception e) {
- // Invoke the socket error callback if any.
- bool reported = false;
- if (_handlerMap[_ERROR_EVENT] != null) {
- _handlerMap[_ERROR_EVENT](e);
- reported = true;
- }
- // Propagate the error to any additional listeners.
- reported = reported || _propagateError(e);
- if (!reported) throw e;
+ static void ensureSocketService() {
+ if (socketService == null) {
+ socketService = _NativeSocket.newServicePort();
}
+ }
- // For all errors we close the socket, call the error handler and
- // disable further calls of the error handler.
- close();
+ // Check whether this is an error response from a native port call.
+ static bool isErrorResponse(response) {
+ return response is List && response[0] != _SUCCESS_RESPONSE;
+ }
+
+ // Create the appropriate error/exception from different returned
+ // error objects.
+ static createError(error, String message) {
if (error is OSError) {
- doReportError(new SocketIOException(message, error));
+ return new SocketIOException(message, error);
} else if (error is List) {
- assert(_isErrorResponse(error));
+ assert(isErrorResponse(error));
switch (error[0]) {
case _ILLEGAL_ARGUMENT_RESPONSE:
- doReportError(new ArgumentError());
- break;
+ return new ArgumentError();
case _OSERROR_RESPONSE:
- doReportError(new SocketIOException(
- message, new OSError(error[2], error[1])));
- break;
+ return new SocketIOException(
+ message, new OSError(error[2], error[1]));
default:
- doReportError(new Exception("Unknown error"));
- break;
+ return new Exception("Unknown error");
}
} else {
- doReportError(new SocketIOException(message));
+ return new SocketIOException(message);
}
}
- int get hashCode => _hashCode;
+ void reportError(error, String message) {
+ var e = createError(error, message);
+ // Invoke the error handler if any.
+ if (eventHandlers[ERROR_EVENT] != null) {
+ eventHandlers[ERROR_EVENT](e);
+ }
+ // For all errors we close the socket
+ close();
+ }
- bool _propagateError(Exception e) => false;
+ nativeAvailable() native "Socket_Available";
+ nativeRead(int len) native "Socket_Read";
+ nativeWrite(List<int> buffer, int offset, int bytes)
+ native "Socket_WriteList";
+ bool nativeCreateConnect(String host, int port) native "Socket_CreateConnect";
+ nativeCreateBindListen(String address, int port, int backlog)
+ native "ServerSocket_CreateBindListen";
+ nativeAccept(_NativeSocket socket) native "ServerSocket_Accept";
+ int nativeGetPort() native "Socket_GetPort";
+ List nativeGetRemotePeer() native "Socket_GetRemotePeer";
+ OSError nativeGetError() native "Socket_GetError";
+
+ static SendPort newServicePort() native "Socket_NewServicePort";
+}
- bool _isListenSocket();
- bool _isPipe();
- // Is this socket closed.
- bool _closed;
+class _RawServerSocket extends Stream<RawSocket>
+ implements RawServerSocket {
+ final _NativeSocket _socket;
+ StreamController<RawSocket> _controller;
+
+ static Future<_RawServerSocket> bind(String address,
+ int port,
+ int backlog) {
+ if (port < 0 || port > 0xFFFF)
+ throw new ArgumentError("Invalid port $port");
+ if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog");
+ return _NativeSocket.bind(address, port, backlog)
+ .then((socket) => new _RawServerSocket(socket));
+ }
+
+ _RawServerSocket(this._socket) {
+ _controller = new StreamController(
+ onSubscriptionStateChange: _onSubscriptionStateChange,
+ onPauseStateChange: _onPauseStateChange);
+ _socket.closeFuture.then((_) => _controller.close());
+ _socket.setHandlers(
+ read: () {
+ var socket = _socket.accept();
+ if (socket != null) _controller.add(new _RawSocket(socket));
+ },
+ error: (e) {
+ _controller.signalError(new AsyncError(e));
+ _controller.close();
+ }
+ );
+ }
- // Dedicated ReceivePort for socket events.
- ReceivePort _handler;
+ StreamSubscription<RawSocket> listen(void onData(RawSocket event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ return _controller.stream.listen(
+ onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
+ }
- // Poll event to handler map.
- List _handlerMap;
+ int get port => _socket.port;
- // Indicates for which poll events the socket registered handlers.
- int _handlerMask;
+ void close() => _socket.close();
- // Indicates if native interrupts can be activated.
- bool _canActivateHandlers;
+ void _pause() {
+ _socket.setListening(read: false, write: false);
+ }
- // Holds the port of the socket, null if not known.
- int _port;
+ void _resume() {
+ _socket.setListening(read: true, write: false);
+ }
- // Hash code for the socket. Currently this is just a counter.
- int _hashCode;
- static int _nextHashCode = 0;
- bool _closedRead = false;
- bool _closedWrite = false;
+ void _onSubscriptionStateChange() {
+ if (_controller.hasSubscribers) {
+ _resume();
+ } else {
+ close();
+ }
+ }
+ void _onPauseStateChange() {
+ if (_controller.isPaused) {
+ _pause();
+ } else {
+ _resume();
+ }
+ }
}
-class _ServerSocket extends _SocketBase implements ServerSocket {
- // Constructor for server socket. First a socket object is allocated
- // in which the native socket is stored. After that _createBind
- // is called which creates a file descriptor and binds the given address
- // and port to the socket. Null is returned if file descriptor creation or
- // bind failed.
- factory _ServerSocket(String bindAddress, int port, int backlog) {
- _ServerSocket socket = new _ServerSocket._internal();
- var result = socket._createBindListen(bindAddress, port, backlog);
- if (result is OSError) {
- socket.close();
- throw new SocketIOException("Failed to create server socket", result);
+class _RawSocket extends Stream<RawSocketEvent>
+ implements RawSocket {
+ final _NativeSocket _socket;
+ StreamController<RawSocketEvent> _controller;
+ bool _readEventsEnabled = true;
+ bool _writeEventsEnabled = true;
+
+ static Future<RawSocket> connect(String host, int port) {
+ return _NativeSocket.connect(host, port)
+ .then((socket) => new _RawSocket(socket));
+ }
+
+ _RawSocket(this._socket) {
+ _controller = new StreamController(
+ onSubscriptionStateChange: _onSubscriptionStateChange,
+ onPauseStateChange: _onPauseStateChange);
+ _socket.closeFuture.then((_) => _controller.close());
+ _socket.setHandlers(
+ read: () => _controller.add(RawSocketEvent.READ),
+ write: () {
+ // The write event handler is automatically disabled by the
+ // event handler when it fires.
+ _writeEventsEnabled = false;
+ _controller.add(RawSocketEvent.WRITE);
+ },
+ closed: () => _controller.add(RawSocketEvent.READ_CLOSED),
+ error: (e) {
+ _controller.signalError(new AsyncError(e));
+ close();
+ }
+ );
+ }
+
+ factory _RawSocket._writePipe(int fd) {
+ var native = new _NativeSocket.pipe();
+ native.isClosedRead = true;
+ if (fd != null) _getStdioHandle(native, fd);
+ return new _RawSocket(native);
+ }
+
+ factory _RawSocket._readPipe(int fd) {
+ var native = new _NativeSocket.pipe();
+ native.isClosedWrite = true;
+ if (fd != null) _getStdioHandle(native, fd);
+ return new _RawSocket(native);
+ }
+
+ StreamSubscription<RawSocketEvent> listen(void onData(RawSocketEvent event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ return _controller.stream.listen(
+ onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
+ }
+
+ int available() => _socket.available();
+
+ List<int> read([int len]) => _socket.read(len);
+
+ int write(List<int> buffer, [int offset, int count]) =>
+ _socket.write(buffer, offset, count);
+
+ void close() => _socket.close();
+
+ void shutdown(SocketDirection direction) => _socket.shutdown(direction);
+
+ int get port => _socket.port;
+
+ int get remotePort => _socket.remotePort;
+
+ String get remoteHost => _socket.remoteHost;
+
+ bool get readEventsEnabled => _readEventsEnabled;
+ void set readEventsEnabled(bool value) {
+ if (value != _readEventsEnabled) {
+ _readEventsEnabled = value;
+ if (!_controller.isPaused) _resume();
}
- socket._closed = false;
- assert(result);
- if (port != 0) {
- socket._port = port;
+ }
+
+ bool get writeEventsEnabled => _writeEventsEnabled;
+ void set writeEventsEnabled(bool value) {
+ if (value != _writeEventsEnabled) {
+ _writeEventsEnabled = value;
+ if (!_controller.isPaused) _resume();
}
- return socket;
}
- _ServerSocket._internal();
+ _pause() {
+ _socket.setListening(read: false, write: false);
+ }
- _accept(Socket socket) native "ServerSocket_Accept";
+ void _resume() {
+ _socket.setListening(read: _readEventsEnabled, write: _writeEventsEnabled);
+ }
- _createBindListen(String bindAddress, int port, int backlog)
- native "ServerSocket_CreateBindListen";
+ void _onPauseStateChange() {
+ if (_controller.isPaused) {
+ _pause();
+ } else {
+ _resume();
+ }
+ }
- void set onConnection(void callback(Socket connection)) {
- _clientConnectionHandler = callback;
- _setHandler(_SocketBase._IN_EVENT,
- _clientConnectionHandler != null ? _connectionHandler : null);
- }
-
- void _connectionHandler() {
- if (!_closed) {
- _Socket socket = new _Socket._internal();
- var result = _accept(socket);
- if (result is OSError) {
- _reportError(result, "Accept failed");
- } else if (result) {
- socket._closed = false;
- _clientConnectionHandler(socket);
- } else {
- // Temporary failure accepting the connection. Ignoring
- // temporary failures lets us retry when we wake up with data
- // on the listening socket again.
- }
+ void _onSubscriptionStateChange() {
+ if (_controller.hasSubscribers) {
+ _resume();
+ } else {
+ close();
}
}
+}
- bool _isListenSocket() => true;
- bool _isPipe() => false;
- var _clientConnectionHandler;
+patch class ServerSocket {
+ /* patch */ static Future<ServerSocket> bind([String address = "127.0.0.1",
+ int port = 0,
+ int backlog = 0]) {
+ return _ServerSocket.bind(address, port, backlog);
+ }
}
+class _ServerSocket extends Stream<Socket>
+ implements ServerSocket {
+ final _socket;
-class _Socket extends _SocketBase implements Socket {
- static const HOST_NAME_LOOKUP = 0;
-
- // Constructs a new socket. During the construction an asynchronous
- // host name lookup is initiated. The returned socket is not yet
- // connected but ready for registration of callbacks.
- factory _Socket(String host, int port) {
- Socket socket = new _Socket._internal();
- _ensureSocketService();
- List request = new List.fixedLength(2);
- request[0] = HOST_NAME_LOOKUP;
- request[1] = host;
- _socketService.call(request).then((response) {
- if (socket._isErrorResponse(response)) {
- socket._reportError(response, "Failed host name lookup");
- } else{
- var result = socket._createConnect(response, port);
- if (result is OSError) {
- socket.close();
- socket._reportError(result, "Connection failed");
- } else {
- socket._closed = false;
- socket._activateHandlers();
- }
- }
- });
- return socket;
+ static Future<_ServerSocket> bind(String address,
+ int port,
+ int backlog) {
+ return _RawServerSocket.bind(address, port, backlog)
+ .then((socket) => new _ServerSocket(socket));
}
- _Socket._internal();
- _Socket._internalReadOnly() : _pipe = true { super._closedWrite = true; }
- _Socket._internalWriteOnly() : _pipe = true { super._closedRead = true; }
+ _ServerSocket(this._socket);
- int available() {
- if (!_closed) {
- var result = _available();
- if (result is OSError) {
- _reportError(result, "Available failed");
- return 0;
- } else {
- return result;
- }
- }
- throw new
- SocketIOException("Error: available failed - invalid socket handle");
+ StreamSubscription<Socket> listen(void onData(Socket event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ return _socket.map((rawSocket) => new _Socket(rawSocket)).listen(
+ onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
}
- _available() native "Socket_Available";
+ int get port => _socket.port;
- List<int> read([int len]) {
- if (len != null && len <= 0) {
- throw new SocketIOException("Illegal length $len");
- }
- var result = _read(len == null ? -1 : len);
- if (result is OSError) {
- _reportError(result, "Read failed");
- return null;
- }
- return result;
+ void close() => _socket.close();
+}
+
+
+patch class Socket {
+ /* patch */ static Future<Socket> connect(String host, int port) {
+ return RawSocket.connect(host, port).then(
+ (socket) => new _Socket(socket));
}
+}
- _read(int len) native "Socket_Read";
- int readList(List<int> buffer, int offset, int bytes) {
- if (!_closed) {
- if (bytes == 0) {
- return 0;
- }
- if (offset < 0) {
- throw new RangeError.value(offset);
- }
- if (bytes < 0) {
- throw new RangeError.value(bytes);
- }
- if ((offset + bytes) > buffer.length) {
- throw new RangeError.value(offset + bytes);
- }
- var result = _readList(buffer, offset, bytes);
- if (result is OSError) {
- _reportError(result, "Read failed");
- return -1;
+patch class SecureSocket {
+ /* patch */ factory SecureSocket._(RawSecureSocket rawSocket) =>
+ new _SecureSocket(rawSocket);
+}
+
+
+class _SocketStreamConsumer extends StreamConsumer<List<int>, Socket> {
+ StreamSubscription subscription;
+ final _Socket socket;
+ int offset;
+ List<int> buffer;
+ bool paused = false;
+
+ _SocketStreamConsumer(this.socket);
+
+ Future<Socket> consume(Stream<List<int>> stream) {
+ subscription = stream.listen(
+ (data) {
+ assert(!paused);
+ assert(buffer == null);
+ buffer = data;
+ offset = 0;
+ write();
+ },
+ onDone: () {
+ socket._consumerDone();
+ });
+ return socket._doneFuture;
+ }
+
+ void write() {
+ try {
+ if (subscription == null) return;
+ assert(buffer != null);
+ // Write as much as possible.
+ offset += socket._write(buffer, offset, buffer.length - offset);
+ if (offset < buffer.length) {
+ if (!paused) {
+ paused = true;
+ // TODO(ajohnsen): It would be nice to avoid this check.
+ // Some info: socket._write can emit an event, if it fails to write.
+ // If the user closes the socket in that event, stop() will be called
+ // before we get a change to pause.
+ if (subscription == null) return;
+ subscription.pause();
+ }
+ socket._enableWriteEvent();
+ } else {
+ buffer = null;
+ if (paused) {
+ paused = false;
+ subscription.resume();
+ }
}
- return result;
+ } catch (e) {
+ socket._consumerDone(e);
}
- throw new
- SocketIOException("Error: readList failed - invalid socket handle");
}
- _readList(List<int> buffer, int offset, int bytes) native "Socket_ReadList";
+ void stop() {
+ if (subscription == null) return;
+ subscription.cancel();
+ subscription = null;
+ socket._disableWriteEvent();
+ }
+}
- int writeList(List<int> buffer, int offset, int bytes) {
- if (buffer is! List || offset is! int || bytes is! int) {
- throw new ArgumentError(
- "Invalid arguments to writeList on Socket");
- }
- if (!_closed) {
- if (bytes == 0) {
- return 0;
- }
- if (offset < 0) {
- throw new RangeError.value(offset);
- }
- if (bytes < 0) {
- throw new RangeError.value(bytes);
- }
- if ((offset + bytes) > buffer.length) {
- throw new RangeError.value(offset + bytes);
- }
- _BufferAndOffset bufferAndOffset =
- _ensureFastAndSerializableBuffer(buffer, offset, bytes);
- var result =
- _writeList(bufferAndOffset.buffer, bufferAndOffset.offset, bytes);
- if (result is OSError) {
- _reportError(result, "Write failed");
- // If writing fails we return 0 as the number of bytes and
- // report the error on the error handler.
- result = 0;
- }
- return result;
- }
- throw new SocketIOException("writeList failed - invalid socket handle");
+
+class _Socket extends Stream<List<int>> implements Socket {
+ RawSocket _raw; // Set to null when the raw socket is closed.
+ bool _closed = false; // Set to true when the raw socket is closed.
+ StreamController _controller;
+ bool _controllerClosed = false;
+ _SocketStreamConsumer _consumer;
+ IOSink<Socket> _sink;
+ Completer _doneCompleter;
+ var _subscription;
+
+ _Socket(RawSocket this._raw) {
+ _controller = new StreamController<List<int>>(
+ onSubscriptionStateChange: _onSubscriptionStateChange,
+ onPauseStateChange: _onPauseStateChange);
+ _consumer = new _SocketStreamConsumer(this);
+ _sink = new IOSink(_consumer);
+
+ // Disable read events until there is a subscription.
+ _raw.readEventsEnabled = false;
+
+ // Disable write events until the consumer needs it for pending writes.
+ _raw.writeEventsEnabled = false;
}
- _writeList(List<int> buffer, int offset, int bytes) native "Socket_WriteList";
+ factory _Socket._writePipe([int fd]) {
+ return new _Socket(new _RawSocket._writePipe(fd));
+ }
- bool _isErrorResponse(response) {
- return response is List && response[0] != _SUCCESS_RESPONSE;
+ factory _Socket._readPipe([int fd]) {
+ return new _Socket(new _RawSocket._readPipe(fd));
}
- bool _createConnect(String host, int port) native "Socket_CreateConnect";
+ _NativeSocket get _nativeSocket => _raw._socket;
- void set onWrite(void callback()) {
- if (_outputStream != null) throw new StreamException(
- "Cannot set write handler when output stream is used");
- _clientWriteHandler = callback;
- _updateOutHandler();
+ StreamSubscription<List<int>> listen(void onData(List<int> event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ return _controller.stream.listen(
+ onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
}
- void set onConnect(void callback()) {
- if (_seenFirstOutEvent) {
- throw new StreamException(
- "Cannot set connect handler when already connected");
- }
- _clientConnectHandler = callback;
- _updateOutHandler();
+ Future<Socket> consume(Stream<List<int>> stream) {
+ return _sink.consume(stream);
}
- void set onData(void callback()) {
- if (_inputStream != null) throw new StreamException(
- "Cannot set data handler when input stream is used");
- _onData = callback;
+ Future<Socket> addStream(Stream<List<int>> stream) {
+ return _sink.addStream(stream);
}
- void set onClosed(void callback()) {
- if (_inputStream != null) throw new StreamException(
- "Cannot set close handler when input stream is used");
- _onClosed = callback;
+ void add(List<int> data) {
+ return _sink.add(data);
}
- bool _isListenSocket() => false;
+ void addString(String string, [Encoding encoding = Encoding.UTF_8]) {
+ return _sink.addString(string, encoding);
+ }
- bool _isPipe() => _pipe;
+ close() => _sink.close();
- InputStream get inputStream {
- if (_inputStream == null) {
- if (_handlerMap[_SocketBase._IN_EVENT] != null ||
- _handlerMap[_SocketBase._CLOSE_EVENT] != null) {
- throw new StreamException(
- "Cannot get input stream when socket handlers are used");
- }
- _inputStream = new _SocketInputStream(this);
- }
- return _inputStream;
+ Future<Socket> get done => _sink.done;
+
+ void destroy() {
+ // Destroy can always be called to get rid of a socket.
+ if (_raw == null) return;
+ _closeRawSocket();
+ _consumer.stop();
+ _controllerClosed = true;
+ _controller.close();
}
- OutputStream get outputStream {
- if (_outputStream == null) {
- if (_clientWriteHandler != null) {
- throw new StreamException(
- "Cannot get output stream when socket handlers are used");
- }
- _outputStream = new _SocketOutputStream(this);
+ int get port => _raw.port;
+ String get remoteHost => _raw.remoteHost;
+ int get remotePort => _raw.remotePort;
+
+ // Ensure a subscription on the raw socket. Both the stream and the
+ // consumer needs a subscription as they share the error and done
+ // events from the raw socket.
+ void _ensureRawSocketSubscription() {
+ if (_subscription == null) {
+ _subscription = _raw.listen(_onData,
+ onError: _onError,
+ onDone: _onDone,
+ unsubscribeOnError: true);
}
- return _outputStream;
}
- void set _onWrite(void callback()) {
- _setHandler(_SocketBase._OUT_EVENT, callback);
+ _closeRawSocket() {
+ var tmp = _raw;
+ _raw = null;
+ _closed = true;
+ tmp.close();
}
- void set _onData(void callback()) {
- _setHandler(_SocketBase._IN_EVENT, callback);
+ void _onSubscriptionStateChange() {
+ if (_controller.hasSubscribers) {
+ _ensureRawSocketSubscription();
+ // Enable read events for providing data to subscription.
+ if (_raw != null) {
+ _raw.readEventsEnabled = true;
+ }
+ } else {
+ _controllerClosed = true;
+ if (_raw != null) {
+ _raw.shutdown(SocketDirection.RECEIVE);
+ }
+ }
}
- void set _onClosed(void callback()) {
- _setHandler(_SocketBase._CLOSE_EVENT, callback);
+ void _onPauseStateChange() {
+ if (_raw != null) {
+ _raw.readEventsEnabled = !_controller.isPaused;
+ }
}
- bool _propagateError(Exception e) {
- bool reported = false;
- if (_inputStream != null) {
- reported = reported || _inputStream._onSocketError(e);
+ void _onData(event) {
+ switch (event) {
+ case RawSocketEvent.READ:
+ _controller.add(_raw.read());
+ break;
+ case RawSocketEvent.WRITE:
+ _consumer.write();
+ break;
+ case RawSocketEvent.READ_CLOSED:
+ _controllerClosed = true;
+ _controller.close();
+ break;
}
- if (_outputStream != null) {
- reported = reported || _outputStream._onSocketError(e);
- }
- return reported;
}
- void _updateOutHandler() {
- void firstWriteHandler() {
- assert(!_seenFirstOutEvent);
- _seenFirstOutEvent = true;
-
- // From now on the write handler is only the client write
- // handler (connect handler cannot be called again). Change this
- // before calling any handlers as handlers can change the
- // handlers.
- if (_clientWriteHandler == null) _onWrite = _clientWriteHandler;
+ void _onDone() {
+ if (!_controllerClosed) {
+ _controllerClosed = true;
+ _controller.close();
+ }
+ _done();
+ }
- // First out event is socket connected event.
- if (_clientConnectHandler != null) _clientConnectHandler();
- _clientConnectHandler = null;
+ void _onError(error) {
+ if (!_controllerClosed) {
+ _controllerClosed = true;
+ _controller.signalError(error);
+ _controller.close();
+ }
+ _done(error);
+ }
- // Always (even for the first out event) call the write handler.
- if (_clientWriteHandler != null) _clientWriteHandler();
+ get _doneFuture {
+ if (_doneCompleter == null) {
+ _ensureRawSocketSubscription();
+ _doneCompleter = new Completer();
}
+ return _doneCompleter.future;
+ }
- if (_clientConnectHandler == null && _clientWriteHandler == null) {
- _onWrite = null;
- } else {
- if (_seenFirstOutEvent) {
- _onWrite = _clientWriteHandler;
+ void _done([error]) {
+ if (_doneCompleter != null) {
+ var tmp = _doneCompleter;
+ _doneCompleter = null;
+ if (error != null) {
+ tmp.completeError(error);
} else {
- _onWrite = firstWriteHandler;
+ tmp.complete(this);
}
}
}
- int get remotePort {
- if (_remotePort == null) {
- remoteHost;
+ int _write(List<int> data, int offset, int length) =>
+ _raw.write(data, offset, length);
+
+ void _enableWriteEvent() {
+ _raw.writeEventsEnabled = true;
+ }
+
+ void _disableWriteEvent() {
+ if (_raw != null) {
+ _raw.writeEventsEnabled = false;
}
- return _remotePort;
}
- String get remoteHost {
- if (_remoteHost == null) {
- List peer = _getRemotePeer();
- _remoteHost = peer[0];
- _remotePort = peer[1];
+ void _consumerDone([error]) {
+ if (_raw != null) {
+ _raw.shutdown(SocketDirection.SEND);
+ _disableWriteEvent();
}
- return _remoteHost;
+ _done(error);
}
+}
- List _getRemotePeer() native "Socket_GetRemotePeer";
- static SendPort _newServicePort() native "Socket_NewServicePort";
+class _SecureSocket extends _Socket implements SecureSocket {
+ _SecureSocket(RawSecureSocket raw) : super(raw);
- static void _ensureSocketService() {
- if (_socketService == null) {
- _socketService = _Socket._newServicePort();
+ void set onBadCertificate(bool callback(X509Certificate certificate)) {
+ if (_raw == null) {
+ throw new StateError("onBadCertificate called on destroyed SecureSocket");
}
+ _raw.onBadCertificate = callback;
}
- bool _seenFirstOutEvent = false;
- bool _pipe = false;
- Function _clientConnectHandler;
- Function _clientWriteHandler;
- _SocketInputStream _inputStream;
- _SocketOutputStream _outputStream;
- String _remoteHost;
- int _remotePort;
- static SendPort _socketService;
+ X509Certificate get peerCertificate {
+ if (_raw == null) {
+ throw new StateError("peerCertificate called on destroyed SecureSocket");
+ }
+ return _raw.peerCertificate;
+ }
}
« no previous file with comments | « runtime/bin/socket_macos.cc ('k') | runtime/bin/socket_win.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698