Index: runtime/bin/socket_patch.dart |
diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart |
index d6ae4a78f4ca32830abdb0ff0ccd344a6ec887c9..76289c505c7cf4e47e6f93479a3ed85dffa20d4f 100644 |
--- a/runtime/bin/socket_patch.dart |
+++ b/runtime/bin/socket_patch.dart |
@@ -256,13 +256,9 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
Completer closeCompleter = new Completer.sync(); |
// Handlers and receive port for socket events from the event handler. |
- int eventMask = 0; |
- List eventHandlers; |
+ final List eventHandlers = new List(EVENT_COUNT + 1); |
RawReceivePort eventPort; |
- // Indicates if native interrupts can be activated. |
- bool canActivateEvents = true; |
- |
// The type flags for this socket. |
final int typeFlags; |
@@ -272,6 +268,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
// Holds the address used to connect or bind the socket. |
InternetAddress address; |
+ int available = 0; |
+ |
+ bool sendReadEvents = false; |
+ bool readEventIssued = false; |
+ |
+ bool sendWriteEvents = false; |
+ bool writeEventIssued = false; |
+ bool writeAvailable = false; |
+ |
static Future<List<InternetAddress>> lookup( |
String host, {InternetAddressType type: InternetAddressType.ANY}) { |
return _IOService.dispatch(_SOCKET_LOOKUP, [host, type._value]) |
@@ -426,50 +431,30 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
}); |
} |
- _NativeSocket.datagram(this.address) |
- : typeFlags = TYPE_NORMAL_SOCKET { |
- eventHandlers = new List(EVENT_COUNT + 1); |
- } |
+ _NativeSocket.datagram(this.address) : typeFlags = TYPE_NORMAL_SOCKET; |
- _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET { |
- eventHandlers = new List(EVENT_COUNT + 1); |
- } |
+ _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET; |
- _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET { |
- eventHandlers = new List(EVENT_COUNT + 1); |
- } |
+ _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET; |
- _NativeSocket.pipe() : typeFlags = TYPE_PIPE { |
- eventHandlers = new List(EVENT_COUNT + 1); |
- } |
+ _NativeSocket.pipe() : typeFlags = TYPE_PIPE; |
_NativeSocket.watch(int id) : typeFlags = TYPE_NORMAL_SOCKET { |
- eventHandlers = new List(EVENT_COUNT + 1); |
isClosedWrite = true; |
nativeSetSocketId(id); |
} |
- int available() { |
- if (isClosing || isClosed) return 0; |
- var result = nativeAvailable(); |
- if (result is OSError) { |
- reportError(result, "Available failed"); |
- return 0; |
- } else { |
- return result; |
- } |
- } |
- |
List<int> read(int len) { |
if (len != null && len <= 0) { |
throw new ArgumentError("Illegal length $len"); |
} |
if (isClosing || isClosed) return null; |
- var result = nativeRead(len == null ? -1 : len); |
+ var result = nativeRead(min(available, len == null ? available : len)); |
if (result is OSError) { |
reportError(result, "Read failed"); |
return null; |
} |
+ if (result != null) available -= result.length; |
return result; |
} |
@@ -480,6 +465,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
reportError(result, "Receive failed"); |
return null; |
} |
+ if (result != null) { |
+ if (Platform.isMacOS) { |
+ // Mac includes the header size, so we need to query the actual |
+ // available. |
+ available = nativeAvailable(); |
+ } else { |
+ available -= result.data.length; |
+ } |
+ } |
return result; |
} |
@@ -510,6 +504,14 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
scheduleMicrotask(() => reportError(result, "Write failed")); |
result = 0; |
} |
+ // The result may be negative, if we forced a short write for testing |
+ // purpose. In such case, don't mark writeAvailable as false, as we don't |
+ // know if we'll receive an event. It's better to just retry. |
+ if (result >= 0 && result < bytes) { |
+ writeAvailable = false; |
+ } |
+ // Negate the result, as stated above. |
+ if (result < 0) result = -result; |
return result; |
} |
@@ -554,17 +556,85 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
return new _InternetAddress(result[1], null, result[2]); |
} |
+ void issueReadEvent() { |
+ if (readEventIssued) return; |
+ readEventIssued = true; |
+ void issue() { |
+ readEventIssued = false; |
+ if (isClosing) return; |
+ if (!sendReadEvents) return; |
+ if (available == 0) { |
+ if (isClosedRead) { |
+ if (isClosedWrite) close(); |
+ var handler = eventHandlers[CLOSED_EVENT]; |
+ if (handler == null) return; |
+ handler(); |
+ } |
+ return; |
+ } |
+ var handler = eventHandlers[READ_EVENT]; |
+ if (handler == null) return; |
+ readEventIssued = true; |
+ handler(); |
+ scheduleMicrotask(issue); |
+ } |
+ scheduleMicrotask(issue); |
+ } |
+ |
+ void issueWriteEvent({bool delayed: true}) { |
+ if (writeEventIssued) return; |
+ if (!writeAvailable) return; |
+ void issue() { |
+ writeEventIssued = false; |
+ if (!writeAvailable) return; |
+ if (isClosing) return; |
+ if (!sendWriteEvents) return; |
+ sendWriteEvents = false; |
+ var handler = eventHandlers[WRITE_EVENT]; |
+ if (handler == null) return; |
+ handler(); |
+ } |
+ if (delayed) { |
+ writeEventIssued = true; |
+ scheduleMicrotask(issue); |
+ } else { |
+ issue(); |
+ } |
+ } |
+ |
// Multiplexes socket events to the socket handlers. |
void multiplex(int events) { |
- canActivateEvents = false; |
for (int i = FIRST_EVENT; i <= LAST_EVENT; i++) { |
if (((events & (1 << i)) != 0)) { |
if ((i == CLOSED_EVENT || i == READ_EVENT) && isClosedRead) continue; |
+ if (isClosing && i != DESTROYED_EVENT) continue; |
if (i == CLOSED_EVENT && |
typeFlags != TYPE_LISTENING_SOCKET && |
!isClosing && |
!isClosed) { |
isClosedRead = true; |
+ issueReadEvent(); |
+ continue; |
+ } |
+ |
+ if (i == WRITE_EVENT) { |
+ writeAvailable = true; |
+ issueWriteEvent(delayed: false); |
+ continue; |
+ } |
+ |
+ if (i == READ_EVENT && |
+ typeFlags != TYPE_LISTENING_SOCKET) { |
+ var avail = nativeAvailable(); |
+ if (avail is int) { |
+ available = avail; |
+ } else { |
+ // Available failed. Mark socket as having data, to ensure read |
+ // events, and thus reporting of this error. |
+ available = 1; |
+ } |
+ issueReadEvent(); |
+ continue; |
} |
var handler = eventHandlers[i]; |
@@ -576,24 +646,7 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
if (handler != null) handler(); |
continue; |
} |
- assert(handler != null); |
- if (i == WRITE_EVENT) { |
- // If the event was disabled before we had a chance to fire the event, |
- // discard it. If we register again, we'll get a new one. |
- if ((eventMask & (1 << i)) == 0) continue; |
- // Unregister the out handler before executing it. There is |
- // no need to notify the eventhandler as handlers are |
- // disabled while the event is handled. |
- eventMask &= ~(1 << i); |
- } |
- // Don't call the in handler if there is no data available |
- // after all. |
- if (i == READ_EVENT && |
- typeFlags != TYPE_LISTENING_SOCKET && |
- available() == 0) { |
- continue; |
- } |
if (i == ERROR_EVENT) { |
if (!isClosing) { |
reportError(nativeGetError(), ""); |
@@ -605,9 +658,6 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
} |
} |
} |
- if (isClosedRead && isClosedWrite) close(); |
- canActivateEvents = true; |
- activateHandlers(); |
} |
void setHandlers({read, write, error, closed, destroyed}) { |
@@ -619,26 +669,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
} |
void setListening({read: true, write: true}) { |
- eventMask = (1 << CLOSED_EVENT) | (1 << ERROR_EVENT); |
- if (read) eventMask |= (1 << READ_EVENT); |
- if (write) eventMask |= (1 << WRITE_EVENT); |
- activateHandlers(); |
- } |
- |
- |
- void activateHandlers() { |
- if (canActivateEvents && !isClosing && !isClosed) { |
- if ((eventMask & ((1 << READ_EVENT) | (1 << WRITE_EVENT))) == 0) { |
- // If we don't listen for either read or write, disconnect as we won't |
- // get close and error events anyway. |
- if (eventPort != null) disconnectFromEventHandler(); |
- } else { |
- int data = eventMask; |
- if (isClosedRead) data &= ~(1 << READ_EVENT); |
- if (isClosedWrite) data &= ~(1 << WRITE_EVENT); |
- data |= typeFlags; |
- sendToEventHandler(data); |
- } |
+ sendReadEvents = read; |
+ sendWriteEvents = write; |
+ if (read) issueReadEvent(); |
+ if (write) issueWriteEvent(); |
+ if (eventPort == null) { |
+ int flags = typeFlags; |
+ if (!isClosedRead) flags |= 1 << READ_EVENT; |
+ if (!isClosedWrite) flags |= 1 << WRITE_EVENT; |
+ sendToEventHandler(flags); |
} |
} |
@@ -673,9 +712,7 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
if (isClosedRead) { |
close(); |
} else { |
- bool connected = eventPort != null; |
sendToEventHandler(1 << SHUTDOWN_WRITE_COMMAND); |
- if (!connected) disconnectFromEventHandler(); |
} |
isClosedWrite = true; |
} |
@@ -686,17 +723,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
if (isClosedWrite) { |
close(); |
} else { |
- bool connected = eventPort != null; |
sendToEventHandler(1 << SHUTDOWN_READ_COMMAND); |
- if (!connected) disconnectFromEventHandler(); |
} |
isClosedRead = true; |
} |
} |
void sendToEventHandler(int data) { |
- connectToEventHandler(); |
assert(!isClosed); |
+ connectToEventHandler(); |
_EventHandler._sendData(this, eventPort, data); |
} |
@@ -707,10 +742,9 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
} |
void disconnectFromEventHandler() { |
- if (eventPort != null) { |
- eventPort.close(); |
- eventPort = null; |
- } |
+ assert(eventPort != null); |
+ eventPort.close(); |
+ eventPort = null; |
} |
// Check whether this is an error response from a native port call. |
@@ -868,8 +902,11 @@ class _RawServerSocket extends Stream<RawSocket> |
onResume: _onPauseStateChange); |
_socket.setHandlers( |
read: zone.bindCallback(() { |
- var socket = _socket.accept(); |
- if (socket != null) _controller.add(new _RawSocket(socket)); |
+ do { |
+ var socket = _socket.accept(); |
+ if (socket == null) return; |
+ _controller.add(new _RawSocket(socket)); |
+ } while (!_controller.isPaused); |
}), |
error: zone.bindUnaryCallback((e) { |
_controller.addError(e); |
@@ -992,7 +1029,7 @@ class _RawSocket extends Stream<RawSocketEvent> |
cancelOnError: cancelOnError); |
} |
- int available() => _socket.available(); |
+ int available() => _socket.available; |
List<int> read([int len]) { |
if (_isMacOSTerminalInput) { |