| Index: runtime/bin/socket_patch.dart
|
| diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart
|
| index d6ae4a78f4ca32830abdb0ff0ccd344a6ec887c9..76289c505c7cf4e47e6f93479a3ed85dffa20d4f 100644
|
| --- a/runtime/bin/socket_patch.dart
|
| +++ b/runtime/bin/socket_patch.dart
|
| @@ -256,13 +256,9 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| Completer closeCompleter = new Completer.sync();
|
|
|
| // Handlers and receive port for socket events from the event handler.
|
| - int eventMask = 0;
|
| - List eventHandlers;
|
| + final List eventHandlers = new List(EVENT_COUNT + 1);
|
| RawReceivePort eventPort;
|
|
|
| - // Indicates if native interrupts can be activated.
|
| - bool canActivateEvents = true;
|
| -
|
| // The type flags for this socket.
|
| final int typeFlags;
|
|
|
| @@ -272,6 +268,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| // Holds the address used to connect or bind the socket.
|
| InternetAddress address;
|
|
|
| + int available = 0;
|
| +
|
| + bool sendReadEvents = false;
|
| + bool readEventIssued = false;
|
| +
|
| + bool sendWriteEvents = false;
|
| + bool writeEventIssued = false;
|
| + bool writeAvailable = false;
|
| +
|
| static Future<List<InternetAddress>> lookup(
|
| String host, {InternetAddressType type: InternetAddressType.ANY}) {
|
| return _IOService.dispatch(_SOCKET_LOOKUP, [host, type._value])
|
| @@ -426,50 +431,30 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| });
|
| }
|
|
|
| - _NativeSocket.datagram(this.address)
|
| - : typeFlags = TYPE_NORMAL_SOCKET {
|
| - eventHandlers = new List(EVENT_COUNT + 1);
|
| - }
|
| + _NativeSocket.datagram(this.address) : typeFlags = TYPE_NORMAL_SOCKET;
|
|
|
| - _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET {
|
| - eventHandlers = new List(EVENT_COUNT + 1);
|
| - }
|
| + _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET;
|
|
|
| - _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET {
|
| - eventHandlers = new List(EVENT_COUNT + 1);
|
| - }
|
| + _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET;
|
|
|
| - _NativeSocket.pipe() : typeFlags = TYPE_PIPE {
|
| - eventHandlers = new List(EVENT_COUNT + 1);
|
| - }
|
| + _NativeSocket.pipe() : typeFlags = TYPE_PIPE;
|
|
|
| _NativeSocket.watch(int id) : typeFlags = TYPE_NORMAL_SOCKET {
|
| - eventHandlers = new List(EVENT_COUNT + 1);
|
| isClosedWrite = true;
|
| nativeSetSocketId(id);
|
| }
|
|
|
| - int available() {
|
| - if (isClosing || isClosed) return 0;
|
| - var result = nativeAvailable();
|
| - if (result is OSError) {
|
| - reportError(result, "Available failed");
|
| - return 0;
|
| - } else {
|
| - return result;
|
| - }
|
| - }
|
| -
|
| List<int> read(int len) {
|
| if (len != null && len <= 0) {
|
| throw new ArgumentError("Illegal length $len");
|
| }
|
| if (isClosing || isClosed) return null;
|
| - var result = nativeRead(len == null ? -1 : len);
|
| + var result = nativeRead(min(available, len == null ? available : len));
|
| if (result is OSError) {
|
| reportError(result, "Read failed");
|
| return null;
|
| }
|
| + if (result != null) available -= result.length;
|
| return result;
|
| }
|
|
|
| @@ -480,6 +465,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| reportError(result, "Receive failed");
|
| return null;
|
| }
|
| + if (result != null) {
|
| + if (Platform.isMacOS) {
|
| + // Mac includes the header size, so we need to query the actual
|
| + // available.
|
| + available = nativeAvailable();
|
| + } else {
|
| + available -= result.data.length;
|
| + }
|
| + }
|
| return result;
|
| }
|
|
|
| @@ -510,6 +504,14 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| scheduleMicrotask(() => reportError(result, "Write failed"));
|
| result = 0;
|
| }
|
| + // The result may be negative, if we forced a short write for testing
|
| + // purpose. In such case, don't mark writeAvailable as false, as we don't
|
| + // know if we'll receive an event. It's better to just retry.
|
| + if (result >= 0 && result < bytes) {
|
| + writeAvailable = false;
|
| + }
|
| + // Negate the result, as stated above.
|
| + if (result < 0) result = -result;
|
| return result;
|
| }
|
|
|
| @@ -554,17 +556,85 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| return new _InternetAddress(result[1], null, result[2]);
|
| }
|
|
|
| + void issueReadEvent() {
|
| + if (readEventIssued) return;
|
| + readEventIssued = true;
|
| + void issue() {
|
| + readEventIssued = false;
|
| + if (isClosing) return;
|
| + if (!sendReadEvents) return;
|
| + if (available == 0) {
|
| + if (isClosedRead) {
|
| + if (isClosedWrite) close();
|
| + var handler = eventHandlers[CLOSED_EVENT];
|
| + if (handler == null) return;
|
| + handler();
|
| + }
|
| + return;
|
| + }
|
| + var handler = eventHandlers[READ_EVENT];
|
| + if (handler == null) return;
|
| + readEventIssued = true;
|
| + handler();
|
| + scheduleMicrotask(issue);
|
| + }
|
| + scheduleMicrotask(issue);
|
| + }
|
| +
|
| + void issueWriteEvent({bool delayed: true}) {
|
| + if (writeEventIssued) return;
|
| + if (!writeAvailable) return;
|
| + void issue() {
|
| + writeEventIssued = false;
|
| + if (!writeAvailable) return;
|
| + if (isClosing) return;
|
| + if (!sendWriteEvents) return;
|
| + sendWriteEvents = false;
|
| + var handler = eventHandlers[WRITE_EVENT];
|
| + if (handler == null) return;
|
| + handler();
|
| + }
|
| + if (delayed) {
|
| + writeEventIssued = true;
|
| + scheduleMicrotask(issue);
|
| + } else {
|
| + issue();
|
| + }
|
| + }
|
| +
|
| // Multiplexes socket events to the socket handlers.
|
| void multiplex(int events) {
|
| - canActivateEvents = false;
|
| for (int i = FIRST_EVENT; i <= LAST_EVENT; i++) {
|
| if (((events & (1 << i)) != 0)) {
|
| if ((i == CLOSED_EVENT || i == READ_EVENT) && isClosedRead) continue;
|
| + if (isClosing && i != DESTROYED_EVENT) continue;
|
| if (i == CLOSED_EVENT &&
|
| typeFlags != TYPE_LISTENING_SOCKET &&
|
| !isClosing &&
|
| !isClosed) {
|
| isClosedRead = true;
|
| + issueReadEvent();
|
| + continue;
|
| + }
|
| +
|
| + if (i == WRITE_EVENT) {
|
| + writeAvailable = true;
|
| + issueWriteEvent(delayed: false);
|
| + continue;
|
| + }
|
| +
|
| + if (i == READ_EVENT &&
|
| + typeFlags != TYPE_LISTENING_SOCKET) {
|
| + var avail = nativeAvailable();
|
| + if (avail is int) {
|
| + available = avail;
|
| + } else {
|
| + // Available failed. Mark socket as having data, to ensure read
|
| + // events, and thus reporting of this error.
|
| + available = 1;
|
| + }
|
| + issueReadEvent();
|
| + continue;
|
| }
|
|
|
| var handler = eventHandlers[i];
|
| @@ -576,24 +646,7 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| if (handler != null) handler();
|
| continue;
|
| }
|
| - assert(handler != null);
|
| - if (i == WRITE_EVENT) {
|
| - // If the event was disabled before we had a chance to fire the event,
|
| - // discard it. If we register again, we'll get a new one.
|
| - if ((eventMask & (1 << i)) == 0) continue;
|
| - // Unregister the out handler before executing it. There is
|
| - // no need to notify the eventhandler as handlers are
|
| - // disabled while the event is handled.
|
| - eventMask &= ~(1 << i);
|
| - }
|
|
|
| - // Don't call the in handler if there is no data available
|
| - // after all.
|
| - if (i == READ_EVENT &&
|
| - typeFlags != TYPE_LISTENING_SOCKET &&
|
| - available() == 0) {
|
| - continue;
|
| - }
|
| if (i == ERROR_EVENT) {
|
| if (!isClosing) {
|
| reportError(nativeGetError(), "");
|
| @@ -605,9 +658,6 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| }
|
| }
|
| }
|
| - if (isClosedRead && isClosedWrite) close();
|
| - canActivateEvents = true;
|
| - activateHandlers();
|
| }
|
|
|
| void setHandlers({read, write, error, closed, destroyed}) {
|
| @@ -619,26 +669,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| }
|
|
|
| void setListening({read: true, write: true}) {
|
| - eventMask = (1 << CLOSED_EVENT) | (1 << ERROR_EVENT);
|
| - if (read) eventMask |= (1 << READ_EVENT);
|
| - if (write) eventMask |= (1 << WRITE_EVENT);
|
| - activateHandlers();
|
| - }
|
| -
|
| -
|
| - void activateHandlers() {
|
| - if (canActivateEvents && !isClosing && !isClosed) {
|
| - if ((eventMask & ((1 << READ_EVENT) | (1 << WRITE_EVENT))) == 0) {
|
| - // If we don't listen for either read or write, disconnect as we won't
|
| - // get close and error events anyway.
|
| - if (eventPort != null) disconnectFromEventHandler();
|
| - } else {
|
| - int data = eventMask;
|
| - if (isClosedRead) data &= ~(1 << READ_EVENT);
|
| - if (isClosedWrite) data &= ~(1 << WRITE_EVENT);
|
| - data |= typeFlags;
|
| - sendToEventHandler(data);
|
| - }
|
| + sendReadEvents = read;
|
| + sendWriteEvents = write;
|
| + if (read) issueReadEvent();
|
| + if (write) issueWriteEvent();
|
| + if (eventPort == null) {
|
| + int flags = typeFlags;
|
| + if (!isClosedRead) flags |= 1 << READ_EVENT;
|
| + if (!isClosedWrite) flags |= 1 << WRITE_EVENT;
|
| + sendToEventHandler(flags);
|
| }
|
| }
|
|
|
| @@ -673,9 +712,7 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| if (isClosedRead) {
|
| close();
|
| } else {
|
| - bool connected = eventPort != null;
|
| sendToEventHandler(1 << SHUTDOWN_WRITE_COMMAND);
|
| - if (!connected) disconnectFromEventHandler();
|
| }
|
| isClosedWrite = true;
|
| }
|
| @@ -686,17 +723,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| if (isClosedWrite) {
|
| close();
|
| } else {
|
| - bool connected = eventPort != null;
|
| sendToEventHandler(1 << SHUTDOWN_READ_COMMAND);
|
| - if (!connected) disconnectFromEventHandler();
|
| }
|
| isClosedRead = true;
|
| }
|
| }
|
|
|
| void sendToEventHandler(int data) {
|
| - connectToEventHandler();
|
| assert(!isClosed);
|
| + connectToEventHandler();
|
| _EventHandler._sendData(this, eventPort, data);
|
| }
|
|
|
| @@ -707,10 +742,9 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
|
| }
|
|
|
| void disconnectFromEventHandler() {
|
| - if (eventPort != null) {
|
| - eventPort.close();
|
| - eventPort = null;
|
| - }
|
| + assert(eventPort != null);
|
| + eventPort.close();
|
| + eventPort = null;
|
| }
|
|
|
| // Check whether this is an error response from a native port call.
|
| @@ -868,8 +902,11 @@ class _RawServerSocket extends Stream<RawSocket>
|
| onResume: _onPauseStateChange);
|
| _socket.setHandlers(
|
| read: zone.bindCallback(() {
|
| - var socket = _socket.accept();
|
| - if (socket != null) _controller.add(new _RawSocket(socket));
|
| + do {
|
| + var socket = _socket.accept();
|
| + if (socket == null) return;
|
| + _controller.add(new _RawSocket(socket));
|
| + } while (!_controller.isPaused);
|
| }),
|
| error: zone.bindUnaryCallback((e) {
|
| _controller.addError(e);
|
| @@ -992,7 +1029,7 @@ class _RawSocket extends Stream<RawSocketEvent>
|
| cancelOnError: cancelOnError);
|
| }
|
|
|
| - int available() => _socket.available();
|
| + int available() => _socket.available;
|
|
|
| List<int> read([int len]) {
|
| if (_isMacOSTerminalInput) {
|
|
|