Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1156)

Unified Diff: runtime/bin/socket_patch.dart

Issue 169383003: Make event-handlers edge-triggered and move socket-state to Dart. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/bin/socket_linux.cc ('k') | sdk/lib/io/secure_socket.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/bin/socket_patch.dart
diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart
index d6ae4a78f4ca32830abdb0ff0ccd344a6ec887c9..76289c505c7cf4e47e6f93479a3ed85dffa20d4f 100644
--- a/runtime/bin/socket_patch.dart
+++ b/runtime/bin/socket_patch.dart
@@ -256,13 +256,9 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
Completer closeCompleter = new Completer.sync();
// Handlers and receive port for socket events from the event handler.
- int eventMask = 0;
- List eventHandlers;
+ final List eventHandlers = new List(EVENT_COUNT + 1);
RawReceivePort eventPort;
- // Indicates if native interrupts can be activated.
- bool canActivateEvents = true;
-
// The type flags for this socket.
final int typeFlags;
@@ -272,6 +268,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
// Holds the address used to connect or bind the socket.
InternetAddress address;
+ int available = 0;
+
+ bool sendReadEvents = false;
+ bool readEventIssued = false;
+
+ bool sendWriteEvents = false;
+ bool writeEventIssued = false;
+ bool writeAvailable = false;
+
static Future<List<InternetAddress>> lookup(
String host, {InternetAddressType type: InternetAddressType.ANY}) {
return _IOService.dispatch(_SOCKET_LOOKUP, [host, type._value])
@@ -426,50 +431,30 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
});
}
- _NativeSocket.datagram(this.address)
- : typeFlags = TYPE_NORMAL_SOCKET {
- eventHandlers = new List(EVENT_COUNT + 1);
- }
+ _NativeSocket.datagram(this.address) : typeFlags = TYPE_NORMAL_SOCKET;
- _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET {
- eventHandlers = new List(EVENT_COUNT + 1);
- }
+ _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET;
- _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET {
- eventHandlers = new List(EVENT_COUNT + 1);
- }
+ _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET;
- _NativeSocket.pipe() : typeFlags = TYPE_PIPE {
- eventHandlers = new List(EVENT_COUNT + 1);
- }
+ _NativeSocket.pipe() : typeFlags = TYPE_PIPE;
_NativeSocket.watch(int id) : typeFlags = TYPE_NORMAL_SOCKET {
- eventHandlers = new List(EVENT_COUNT + 1);
isClosedWrite = true;
nativeSetSocketId(id);
}
- int available() {
- if (isClosing || isClosed) return 0;
- var result = nativeAvailable();
- if (result is OSError) {
- reportError(result, "Available failed");
- return 0;
- } else {
- return result;
- }
- }
-
List<int> read(int len) {
if (len != null && len <= 0) {
throw new ArgumentError("Illegal length $len");
}
if (isClosing || isClosed) return null;
- var result = nativeRead(len == null ? -1 : len);
+ var result = nativeRead(min(available, len == null ? available : len));
if (result is OSError) {
reportError(result, "Read failed");
return null;
}
+ if (result != null) available -= result.length;
return result;
}
@@ -480,6 +465,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
reportError(result, "Receive failed");
return null;
}
+ if (result != null) {
+ if (Platform.isMacOS) {
+ // Mac includes the header size, so we need to query the actual
+ // available.
+ available = nativeAvailable();
+ } else {
+ available -= result.data.length;
+ }
+ }
return result;
}
@@ -510,6 +504,14 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
scheduleMicrotask(() => reportError(result, "Write failed"));
result = 0;
}
+ // The result may be negative, if we forced a short write for testing
+ // purpose. In such case, don't mark writeAvailable as false, as we don't
+ // know if we'll receive an event. It's better to just retry.
+ if (result >= 0 && result < bytes) {
+ writeAvailable = false;
+ }
+ // Negate the result, as stated above.
+ if (result < 0) result = -result;
return result;
}
@@ -554,17 +556,85 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
return new _InternetAddress(result[1], null, result[2]);
}
+ void issueReadEvent() {
+ if (readEventIssued) return;
+ readEventIssued = true;
+ void issue() {
+ readEventIssued = false;
+ if (isClosing) return;
+ if (!sendReadEvents) return;
+ if (available == 0) {
+ if (isClosedRead) {
+ if (isClosedWrite) close();
+ var handler = eventHandlers[CLOSED_EVENT];
+ if (handler == null) return;
+ handler();
+ }
+ return;
+ }
+ var handler = eventHandlers[READ_EVENT];
+ if (handler == null) return;
+ readEventIssued = true;
+ handler();
+ scheduleMicrotask(issue);
+ }
+ scheduleMicrotask(issue);
+ }
+
+ void issueWriteEvent({bool delayed: true}) {
+ if (writeEventIssued) return;
+ if (!writeAvailable) return;
+ void issue() {
+ writeEventIssued = false;
+ if (!writeAvailable) return;
+ if (isClosing) return;
+ if (!sendWriteEvents) return;
+ sendWriteEvents = false;
+ var handler = eventHandlers[WRITE_EVENT];
+ if (handler == null) return;
+ handler();
+ }
+ if (delayed) {
+ writeEventIssued = true;
+ scheduleMicrotask(issue);
+ } else {
+ issue();
+ }
+ }
+
// Multiplexes socket events to the socket handlers.
void multiplex(int events) {
- canActivateEvents = false;
for (int i = FIRST_EVENT; i <= LAST_EVENT; i++) {
if (((events & (1 << i)) != 0)) {
if ((i == CLOSED_EVENT || i == READ_EVENT) && isClosedRead) continue;
+ if (isClosing && i != DESTROYED_EVENT) continue;
if (i == CLOSED_EVENT &&
typeFlags != TYPE_LISTENING_SOCKET &&
!isClosing &&
!isClosed) {
isClosedRead = true;
+ issueReadEvent();
+ continue;
+ }
+
+ if (i == WRITE_EVENT) {
+ writeAvailable = true;
+ issueWriteEvent(delayed: false);
+ continue;
+ }
+
+ if (i == READ_EVENT &&
+ typeFlags != TYPE_LISTENING_SOCKET) {
+ var avail = nativeAvailable();
+ if (avail is int) {
+ available = avail;
+ } else {
+ // Available failed. Mark socket as having data, to ensure read
+ // events, and thus reporting of this error.
+ available = 1;
+ }
+ issueReadEvent();
+ continue;
}
var handler = eventHandlers[i];
@@ -576,24 +646,7 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
if (handler != null) handler();
continue;
}
- assert(handler != null);
- if (i == WRITE_EVENT) {
- // If the event was disabled before we had a chance to fire the event,
- // discard it. If we register again, we'll get a new one.
- if ((eventMask & (1 << i)) == 0) continue;
- // Unregister the out handler before executing it. There is
- // no need to notify the eventhandler as handlers are
- // disabled while the event is handled.
- eventMask &= ~(1 << i);
- }
- // Don't call the in handler if there is no data available
- // after all.
- if (i == READ_EVENT &&
- typeFlags != TYPE_LISTENING_SOCKET &&
- available() == 0) {
- continue;
- }
if (i == ERROR_EVENT) {
if (!isClosing) {
reportError(nativeGetError(), "");
@@ -605,9 +658,6 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
}
}
}
- if (isClosedRead && isClosedWrite) close();
- canActivateEvents = true;
- activateHandlers();
}
void setHandlers({read, write, error, closed, destroyed}) {
@@ -619,26 +669,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
}
void setListening({read: true, write: true}) {
- eventMask = (1 << CLOSED_EVENT) | (1 << ERROR_EVENT);
- if (read) eventMask |= (1 << READ_EVENT);
- if (write) eventMask |= (1 << WRITE_EVENT);
- activateHandlers();
- }
-
-
- void activateHandlers() {
- if (canActivateEvents && !isClosing && !isClosed) {
- if ((eventMask & ((1 << READ_EVENT) | (1 << WRITE_EVENT))) == 0) {
- // If we don't listen for either read or write, disconnect as we won't
- // get close and error events anyway.
- if (eventPort != null) disconnectFromEventHandler();
- } else {
- int data = eventMask;
- if (isClosedRead) data &= ~(1 << READ_EVENT);
- if (isClosedWrite) data &= ~(1 << WRITE_EVENT);
- data |= typeFlags;
- sendToEventHandler(data);
- }
+ sendReadEvents = read;
+ sendWriteEvents = write;
+ if (read) issueReadEvent();
+ if (write) issueWriteEvent();
+ if (eventPort == null) {
+ int flags = typeFlags;
+ if (!isClosedRead) flags |= 1 << READ_EVENT;
+ if (!isClosedWrite) flags |= 1 << WRITE_EVENT;
+ sendToEventHandler(flags);
}
}
@@ -673,9 +712,7 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
if (isClosedRead) {
close();
} else {
- bool connected = eventPort != null;
sendToEventHandler(1 << SHUTDOWN_WRITE_COMMAND);
- if (!connected) disconnectFromEventHandler();
}
isClosedWrite = true;
}
@@ -686,17 +723,15 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
if (isClosedWrite) {
close();
} else {
- bool connected = eventPort != null;
sendToEventHandler(1 << SHUTDOWN_READ_COMMAND);
- if (!connected) disconnectFromEventHandler();
}
isClosedRead = true;
}
}
void sendToEventHandler(int data) {
- connectToEventHandler();
assert(!isClosed);
+ connectToEventHandler();
_EventHandler._sendData(this, eventPort, data);
}
@@ -707,10 +742,9 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
}
void disconnectFromEventHandler() {
- if (eventPort != null) {
- eventPort.close();
- eventPort = null;
- }
+ assert(eventPort != null);
+ eventPort.close();
+ eventPort = null;
}
// Check whether this is an error response from a native port call.
@@ -868,8 +902,11 @@ class _RawServerSocket extends Stream<RawSocket>
onResume: _onPauseStateChange);
_socket.setHandlers(
read: zone.bindCallback(() {
- var socket = _socket.accept();
- if (socket != null) _controller.add(new _RawSocket(socket));
+ do {
+ var socket = _socket.accept();
+ if (socket == null) return;
+ _controller.add(new _RawSocket(socket));
+ } while (!_controller.isPaused);
}),
error: zone.bindUnaryCallback((e) {
_controller.addError(e);
@@ -992,7 +1029,7 @@ class _RawSocket extends Stream<RawSocketEvent>
cancelOnError: cancelOnError);
}
- int available() => _socket.available();
+ int available() => _socket.available;
List<int> read([int len]) {
if (_isMacOSTerminalInput) {
« no previous file with comments | « runtime/bin/socket_linux.cc ('k') | sdk/lib/io/secure_socket.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698