Index: runtime/bin/eventhandler_linux.cc |
diff --git a/runtime/bin/eventhandler_linux.cc b/runtime/bin/eventhandler_linux.cc |
index 017c3be89e2b3b7ba17ed8b82ee5c3a39c129672..9764ac28484892ff52b602c15f273648b9152df3 100644 |
--- a/runtime/bin/eventhandler_linux.cc |
+++ b/runtime/bin/eventhandler_linux.cc |
@@ -24,30 +24,33 @@ int64_t GetCurrentTimeMilliseconds() { |
} |
-static const int kInitialPortMapSize = 128; |
+static const int kInitialPortMapSize = 16; |
static const int kPortMapGrowingFactor = 2; |
static const int kInterruptMessageSize = sizeof(InterruptMessage); |
static const int kInfinityTimeout = -1; |
static const int kTimerId = -1; |
- |
-void SocketData::FillPollEvents(struct pollfd* pollfds) { |
+intptr_t SocketData::GetPollEvents() { |
// Do not ask for POLLERR and POLLHUP explicitly as they are |
// triggered anyway. |
- if ((_mask & (1 << kInEvent)) != 0) { |
- pollfds->events |= POLLIN; |
+ intptr_t events = 0; |
+ if (!IsClosedRead()) { |
+ if ((mask_ & (1 << kInEvent)) != 0) { |
+ events |= POLLIN; |
+ } |
} |
- if ((_mask & (1 << kOutEvent)) != 0) { |
- pollfds->events |= POLLOUT; |
+ if (!IsClosedWrite()) { |
+ if ((mask_ & (1 << kOutEvent)) != 0) { |
+ events |= POLLOUT; |
+ } |
} |
- pollfds->events |= POLLRDHUP; |
+ return events; |
} |
EventHandlerImplementation::EventHandlerImplementation() { |
intptr_t result; |
- socket_map_entries_ = 0; |
socket_map_size_ = kInitialPortMapSize; |
socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_, |
sizeof(SocketData))); |
@@ -89,48 +92,9 @@ SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
socket_map_size_ = new_socket_map_size; |
} |
- return socket_map_ + fd; |
-} |
- |
- |
-void EventHandlerImplementation::SetPort(intptr_t fd, |
- Dart_Port dart_port, |
- intptr_t mask) { |
- SocketData* sd = GetSocketData(fd); |
- |
- // Only change the port map entries count if SetPort changes the |
- // port map state. |
- if (dart_port == 0 && sd->port() != 0) { |
- socket_map_entries_--; |
- } else if (dart_port != 0 && sd->port() == 0) { |
- socket_map_entries_++; |
- } |
- |
- sd->set_port(dart_port); |
- sd->set_mask(mask); |
-} |
- |
- |
-void EventHandlerImplementation::RegisterFdWakeup(intptr_t id, |
- Dart_Port dart_port, |
- intptr_t data) { |
- WakeupHandler(id, dart_port, data); |
-} |
- |
- |
-void EventHandlerImplementation::CloseFd(intptr_t id) { |
- SetPort(id, 0, 0); |
- close(id); |
-} |
- |
- |
-void EventHandlerImplementation::UnregisterFdWakeup(intptr_t id) { |
- WakeupHandler(id, 0, 0); |
-} |
- |
- |
-void EventHandlerImplementation::UnregisterFd(intptr_t id) { |
- SetPort(id, 0, 0); |
+ SocketData* sd = socket_map_ + fd; |
+ sd->set_fd(fd); // For now just make sure the fd is set. |
+ return sd; |
} |
@@ -152,7 +116,13 @@ void EventHandlerImplementation::WakeupHandler(intptr_t id, |
struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { |
struct pollfd* pollfds; |
- intptr_t numPollfds = 1 + socket_map_entries_; |
+ // Calculate the number of file descriptors to poll on. |
+ intptr_t numPollfds = 1; |
+ for (int i = 0; i < socket_map_size_; i++) { |
+ SocketData* sd = &socket_map_[i]; |
+ if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; |
+ } |
+ |
pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), |
numPollfds)); |
pollfds[0].fd = interrupt_fds_[0]; |
@@ -162,14 +132,16 @@ struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { |
int j = 1; |
for (int i = 0; i < socket_map_size_; i++) { |
SocketData* sd = &socket_map_[i]; |
- if (sd->port() != 0) { |
+ intptr_t events = sd->GetPollEvents(); |
+ if (sd->port() > 0 && events != 0) { |
// Fd is added to the poll set. |
- pollfds[j].fd = i; |
- sd->FillPollEvents(&pollfds[j]); |
+ pollfds[j].fd = sd->fd(); |
+ pollfds[j].events = events; |
j++; |
} |
} |
- *pollfds_size = numPollfds; |
+ ASSERT(numPollfds == j); |
+ *pollfds_size = j; |
return pollfds; |
} |
@@ -198,20 +170,53 @@ void EventHandlerImplementation::HandleInterruptFd() { |
if (msg.id == kTimerId) { |
timeout_ = msg.data; |
timeout_port_ = msg.dart_port; |
- } else if ((msg.data & (1 << kCloseCommand)) != 0) { |
- /* |
- * A close event happened in dart, we have to explicitly unregister |
- * the fd and close the fd. |
- */ |
- CloseFd(msg.id); |
} else { |
- SetPort(msg.id, msg.dart_port, msg.data); |
+ SocketData* sd = GetSocketData(msg.id); |
+ if ((msg.data & (1 << kShutdownReadCommand)) != 0) { |
+ ASSERT(msg.data == (1 << kShutdownReadCommand)); |
+ // Close the socket for reading. |
+ sd->ShutdownRead(); |
+ } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { |
+ ASSERT(msg.data == (1 << kShutdownWriteCommand)); |
+ // Close the socket for writing. |
+ sd->ShutdownWrite(); |
+ } else if ((msg.data & (1 << kCloseCommand)) != 0) { |
+ ASSERT(msg.data == (1 << kCloseCommand)); |
+ // Close the socket and free system resources. |
+ sd->Close(); |
+ } else { |
+ // Setup events to wait for. |
+ sd->SetPortAndMask(msg.dart_port, msg.data); |
+ } |
} |
} |
} |
+#ifdef DEBUG_POLL |
+static void PrintEventMask(struct pollfd* pollfd) { |
+ printf("%d ", pollfd->fd); |
+ if ((pollfd->revents & POLLIN) != 0) printf("POLLIN "); |
+ if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI "); |
+ if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT "); |
+ if ((pollfd->revents & POLLERR) != 0) printf("POLLERR "); |
+ if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP "); |
+ if ((pollfd->revents & POLLRDHUP) != 0) printf("POLLRDHUP "); |
+ if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL "); |
+ int all_events = POLLIN | POLLPRI | POLLOUT | |
+ POLLERR | POLLHUP | POLLRDHUP | POLLNVAL; |
+ if ((pollfd->revents & ~all_events) != 0) { |
+ printf("(and %08x) ", pollfd->revents & ~all_events); |
+ } |
+ printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd)); |
+ |
+ printf("\n"); |
+} |
+#endif |
intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { |
+#ifdef DEBUG_POLL |
+ if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd); |
+#endif |
intptr_t event_mask = 0; |
SocketData* sd = GetSocketData(pollfd->fd); |
if (sd->IsListeningSocket()) { |
@@ -224,18 +229,41 @@ intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { |
if (event_mask == 0) event_mask |= (1 << kInEvent); |
} |
} else { |
+ if ((pollfd->revents & POLLNVAL) != 0) { |
+ return 0; |
+ } |
+ |
// Prioritize data events over close and error events. |
if ((pollfd->revents & POLLIN) != 0) { |
if (FDUtils::AvailableBytes(pollfd->fd) != 0) { |
event_mask = (1 << kInEvent); |
- } else if (((pollfd->revents & POLLHUP) != 0) || |
- ((pollfd->revents & POLLRDHUP) != 0)) { |
+ } else if (((pollfd->revents & POLLHUP) != 0)) { |
event_mask = (1 << kCloseEvent); |
+ sd->MarkClosedRead(); |
} else if ((pollfd->revents & POLLERR) != 0) { |
event_mask = (1 << kErrorEvent); |
+ } else { |
+ // If POLLIN is set with no available data and no POLLHUP use |
+ // recv to peek for whether the other end of the socket |
+ // actually closed. |
+ char buffer; |
+ ssize_t bytesPeeked = recv(sd->fd(), &buffer, 1, MSG_PEEK); |
+ if (bytesPeeked == 0) { |
+ event_mask = (1 << kCloseEvent); |
+ sd->MarkClosedRead(); |
+ } else if (errno != EAGAIN) { |
+ fprintf(stderr, "Error recv: %s\n", strerror(errno)); |
+ } |
} |
} |
+ // On pipes POLLHUP is reported without POLLIN. |
+ if (((pollfd->revents & POLLIN) == 0) && |
+ ((pollfd->revents & POLLHUP) != 0)) { |
+ event_mask = (1 << kCloseEvent); |
+ sd->MarkClosedRead(); |
+ } |
+ |
if ((pollfd->revents & POLLOUT) != 0) event_mask |= (1 << kOutEvent); |
} |
@@ -258,9 +286,10 @@ void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, |
intptr_t event_mask = GetPollEvents(&pollfds[i]); |
if (event_mask != 0) { |
intptr_t fd = pollfds[i].fd; |
- Dart_Port port = GetSocketData(fd)->port(); |
+ SocketData* sd = GetSocketData(fd); |
+ Dart_Port port = sd->port(); |
ASSERT(port != 0); |
- UnregisterFd(fd); |
+ sd->Unregister(); |
Dart_PostIntArray(port, 1, &event_mask); |
} |
} |
@@ -326,5 +355,5 @@ void EventHandlerImplementation::StartEventHandler() { |
void EventHandlerImplementation::SendData(intptr_t id, |
Dart_Port dart_port, |
intptr_t data) { |
- RegisterFdWakeup(id, dart_port, data); |
+ WakeupHandler(id, dart_port, data); |
} |