Chromium Code Reviews| Index: runtime/bin/socket_patch.dart |
| diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart |
| index d6ae4a78f4ca32830abdb0ff0ccd344a6ec887c9..39158f66a23626afec387e6264ecf72bf221abe5 100644 |
| --- a/runtime/bin/socket_patch.dart |
| +++ b/runtime/bin/socket_patch.dart |
| @@ -256,13 +256,9 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| Completer closeCompleter = new Completer.sync(); |
| // Handlers and receive port for socket events from the event handler. |
| - int eventMask = 0; |
| - List eventHandlers; |
| + final List eventHandlers = new List(EVENT_COUNT + 1); |
| RawReceivePort eventPort; |
| - // Indicates if native interrupts can be activated. |
| - bool canActivateEvents = true; |
| - |
| // The type flags for this socket. |
| final int typeFlags; |
| @@ -272,6 +268,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| // Holds the address used to connect or bind the socket. |
| InternetAddress address; |
| + int available = 0; |
| + |
| + bool sendReadEvents = false; |
| + bool readEventIssued = false; |
| + |
| + bool sendWriteEvents = false; |
| + bool writeEventIssued = false; |
| + bool writeAvailable = false; |
| + |
| static Future<List<InternetAddress>> lookup( |
| String host, {InternetAddressType type: InternetAddressType.ANY}) { |
| return _IOService.dispatch(_SOCKET_LOOKUP, [host, type._value]) |
| @@ -426,50 +431,30 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| }); |
| } |
| - _NativeSocket.datagram(this.address) |
| - : typeFlags = TYPE_NORMAL_SOCKET { |
| - eventHandlers = new List(EVENT_COUNT + 1); |
| - } |
| + _NativeSocket.datagram(this.address) : typeFlags = TYPE_NORMAL_SOCKET; |
| - _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET { |
| - eventHandlers = new List(EVENT_COUNT + 1); |
| - } |
| + _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET; |
| - _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET { |
| - eventHandlers = new List(EVENT_COUNT + 1); |
| - } |
| + _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET; |
| - _NativeSocket.pipe() : typeFlags = TYPE_PIPE { |
| - eventHandlers = new List(EVENT_COUNT + 1); |
| - } |
| + _NativeSocket.pipe() : typeFlags = TYPE_PIPE; |
| _NativeSocket.watch(int id) : typeFlags = TYPE_NORMAL_SOCKET { |
| - eventHandlers = new List(EVENT_COUNT + 1); |
| isClosedWrite = true; |
| nativeSetSocketId(id); |
| } |
| - int available() { |
| - if (isClosing || isClosed) return 0; |
| - var result = nativeAvailable(); |
| - if (result is OSError) { |
| - reportError(result, "Available failed"); |
| - return 0; |
| - } else { |
| - return result; |
| - } |
| - } |
| - |
| List<int> read(int len) { |
| if (len != null && len <= 0) { |
| throw new ArgumentError("Illegal length $len"); |
| } |
| if (isClosing || isClosed) return null; |
| - var result = nativeRead(len == null ? -1 : len); |
| + var result = nativeRead(min(available, len == null ? available : len)); |
| if (result is OSError) { |
| reportError(result, "Read failed"); |
| return null; |
| } |
| + if (result != null) available -= result.length; |
| return result; |
| } |
| @@ -480,6 +465,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| reportError(result, "Receive failed"); |
| return null; |
| } |
| + if (result != null) { |
| + if (Platform.isMacOS) { |
| + // Mac includes the header size, so we need to query the actual |
| + // available. |
| + available = nativeAvailable(); |
| + } else { |
| + available -= result.data.length; |
| + } |
| + } |
| return result; |
| } |
| @@ -510,6 +504,10 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| scheduleMicrotask(() => reportError(result, "Write failed")); |
| result = 0; |
| } |
| + if (result >= 0 && result < bytes) { |
| + writeAvailable = false; |
| + } |
| + if (result < 0) result = -result; |
|
Søren Gjesse
2014/02/18 14:28:00
If the negative result is needed please explain it
Anders Johnsen
2014/02/19 09:43:03
Done.
|
| return result; |
| } |
| @@ -554,17 +552,85 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| return new _InternetAddress(result[1], null, result[2]); |
| } |
| + void issueReadEvent() { |
| + if (readEventIssued) return; |
| + readEventIssued = true; |
| + void issue() { |
| + readEventIssued = false; |
| + if (isClosing) return; |
| + if (!sendReadEvents) return; |
| + if (available == 0) { |
| + if (isClosedRead) { |
| + if (isClosedWrite) close(); |
| + var handler = eventHandlers[CLOSED_EVENT]; |
| + if (handler == null) return; |
| + handler(); |
| + } |
| + return; |
| + } |
| + var handler = eventHandlers[READ_EVENT]; |
| + if (handler == null) return; |
| + readEventIssued = true; |
| + handler(); |
| + scheduleMicrotask(issue); |
| + } |
| + scheduleMicrotask(issue); |
| + } |
| + |
| + void issueWriteEvent({bool delayed: true}) { |
| + if (writeEventIssued) return; |
| + if (!writeAvailable) return; |
| + void issue() { |
| + writeEventIssued = false; |
| + if (!writeAvailable) return; |
| + if (isClosing) return; |
| + if (!sendWriteEvents) return; |
| + sendWriteEvents = false; |
| + var handler = eventHandlers[WRITE_EVENT]; |
| + if (handler == null) return; |
| + handler(); |
| + } |
| + if (delayed) { |
| + writeEventIssued = true; |
| + scheduleMicrotask(issue); |
| + } else { |
| + issue(); |
| + } |
| + } |
| + |
| // Multiplexes socket events to the socket handlers. |
| void multiplex(int events) { |
| - canActivateEvents = false; |
| for (int i = FIRST_EVENT; i <= LAST_EVENT; i++) { |
| if (((events & (1 << i)) != 0)) { |
| if ((i == CLOSED_EVENT || i == READ_EVENT) && isClosedRead) continue; |
| + if (isClosing && i != DESTROYED_EVENT) continue; |
| if (i == CLOSED_EVENT && |
| typeFlags != TYPE_LISTENING_SOCKET && |
| !isClosing && |
| !isClosed) { |
| isClosedRead = true; |
| + issueReadEvent(); |
| + continue; |
| + } |
| + |
| + if (i == WRITE_EVENT) { |
| + writeAvailable = true; |
| + issueWriteEvent(delayed: false); |
| + continue; |
| + } |
| + |
| + if (i == READ_EVENT && |
| + typeFlags != TYPE_LISTENING_SOCKET) { |
| + var avail = nativeAvailable(); |
| + if (avail is int) { |
| + available = avail; |
| + } else { |
| + // Available failed. Mark socket as having data, to ensure read |
| + // events, and thus reporting of this error. |
| + available = 1; |
| + } |
| + issueReadEvent(); |
|
Søren Gjesse
2014/02/18 14:28:00
Why is there no "delayed: false" for this as well?
Anders Johnsen
2014/02/19 09:43:03
It's always delayed. If it wasn't I was having tro
|
| + continue; |
| } |
| var handler = eventHandlers[i]; |
| @@ -576,24 +642,7 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| if (handler != null) handler(); |
| continue; |
| } |
| - assert(handler != null); |
| - if (i == WRITE_EVENT) { |
| - // If the event was disabled before we had a chance to fire the event, |
| - // discard it. If we register again, we'll get a new one. |
| - if ((eventMask & (1 << i)) == 0) continue; |
| - // Unregister the out handler before executing it. There is |
| - // no need to notify the eventhandler as handlers are |
| - // disabled while the event is handled. |
| - eventMask &= ~(1 << i); |
| - } |
| - // Don't call the in handler if there is no data available |
| - // after all. |
| - if (i == READ_EVENT && |
| - typeFlags != TYPE_LISTENING_SOCKET && |
| - available() == 0) { |
| - continue; |
| - } |
| if (i == ERROR_EVENT) { |
| if (!isClosing) { |
| reportError(nativeGetError(), ""); |
| @@ -605,9 +654,6 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| } |
| } |
| } |
| - if (isClosedRead && isClosedWrite) close(); |
| - canActivateEvents = true; |
| - activateHandlers(); |
| } |
| void setHandlers({read, write, error, closed, destroyed}) { |
| @@ -619,26 +665,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| } |
| void setListening({read: true, write: true}) { |
| - eventMask = (1 << CLOSED_EVENT) | (1 << ERROR_EVENT); |
| - if (read) eventMask |= (1 << READ_EVENT); |
| - if (write) eventMask |= (1 << WRITE_EVENT); |
| - activateHandlers(); |
| - } |
| - |
| - |
| - void activateHandlers() { |
| - if (canActivateEvents && !isClosing && !isClosed) { |
| - if ((eventMask & ((1 << READ_EVENT) | (1 << WRITE_EVENT))) == 0) { |
| - // If we don't listen for either read or write, disconnect as we won't |
| - // get close and error events anyway. |
| - if (eventPort != null) disconnectFromEventHandler(); |
| - } else { |
| - int data = eventMask; |
| - if (isClosedRead) data &= ~(1 << READ_EVENT); |
| - if (isClosedWrite) data &= ~(1 << WRITE_EVENT); |
| - data |= typeFlags; |
| - sendToEventHandler(data); |
| - } |
| + sendReadEvents = read; |
| + sendWriteEvents = write; |
| + if (read) issueReadEvent(); |
| + if (write) issueWriteEvent(); |
| + if (eventPort == null) { |
| + int flags = typeFlags; |
| + if (!isClosedRead) flags |= 1 << READ_EVENT; |
| + if (!isClosedWrite) flags |= 1 << WRITE_EVENT; |
| + sendToEventHandler(flags); |
| } |
| } |
| @@ -673,9 +708,7 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| if (isClosedRead) { |
| close(); |
| } else { |
| - bool connected = eventPort != null; |
| sendToEventHandler(1 << SHUTDOWN_WRITE_COMMAND); |
| - if (!connected) disconnectFromEventHandler(); |
| } |
| isClosedWrite = true; |
| } |
| @@ -686,17 +719,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| if (isClosedWrite) { |
| close(); |
| } else { |
| - bool connected = eventPort != null; |
| sendToEventHandler(1 << SHUTDOWN_READ_COMMAND); |
| - if (!connected) disconnectFromEventHandler(); |
| } |
| isClosedRead = true; |
| } |
| } |
| void sendToEventHandler(int data) { |
| - connectToEventHandler(); |
| assert(!isClosed); |
| + connectToEventHandler(); |
| _EventHandler._sendData(this, eventPort, data); |
| } |
| @@ -707,10 +738,9 @@ class _NativeSocket extends NativeFieldWrapperClass1 { |
| } |
| void disconnectFromEventHandler() { |
| - if (eventPort != null) { |
| - eventPort.close(); |
| - eventPort = null; |
| - } |
| + assert(eventPort != null); |
| + eventPort.close(); |
| + eventPort = null; |
| } |
| // Check whether this is an error response from a native port call. |
| @@ -868,8 +898,11 @@ class _RawServerSocket extends Stream<RawSocket> |
| onResume: _onPauseStateChange); |
| _socket.setHandlers( |
| read: zone.bindCallback(() { |
| - var socket = _socket.accept(); |
| - if (socket != null) _controller.add(new _RawSocket(socket)); |
| + do { |
| + var socket = _socket.accept(); |
| + if (socket == null) return; |
| + _controller.add(new _RawSocket(socket)); |
| + } while (!_controller.isPaused); |
| }), |
| error: zone.bindUnaryCallback((e) { |
| _controller.addError(e); |
| @@ -992,7 +1025,7 @@ class _RawSocket extends Stream<RawSocketEvent> |
| cancelOnError: cancelOnError); |
| } |
| - int available() => _socket.available(); |
| + int available() => _socket.available; |
| List<int> read([int len]) { |
| if (_isMacOSTerminalInput) { |