Index: runtime/bin/eventhandler_linux.cc |
diff --git a/runtime/bin/eventhandler_linux.cc b/runtime/bin/eventhandler_linux.cc |
index 6e75903ffca25e9bc45525cbd8b94dbea9a65c06..73f2947db7d90f02d2c8f8905a1dd2a86caee36a 100644 |
--- a/runtime/bin/eventhandler_linux.cc |
+++ b/runtime/bin/eventhandler_linux.cc |
@@ -20,7 +20,7 @@ |
#include "bin/dartutils.h" |
#include "bin/fdutils.h" |
#include "bin/log.h" |
-#include "bin/utils.h" |
+#include "bin/socket.h" |
#include "platform/hashmap.h" |
#include "platform/thread.h" |
#include "platform/utils.h" |
@@ -29,59 +29,36 @@ |
namespace dart { |
namespace bin { |
-static const int kInterruptMessageSize = sizeof(InterruptMessage); |
static const int kTimerId = -1; |
-static const int kShutdownId = -2; |
-// Unregister the file descriptor for a SocketData structure with epoll. |
-static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) { |
- if (!sd->tracked_by_epoll()) return; |
- int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
- EPOLL_CTL_DEL, |
- sd->fd(), |
- NULL)); |
- if (status == -1) { |
- FATAL("Failed unregistering events for file descriptor"); |
- } |
- sd->set_tracked_by_epoll(false); |
-} |
- |
- |
-static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd, int mask) { |
- ASSERT(!sd->tracked_by_epoll()); |
+static void AddToEpollInstance(intptr_t epoll_fd_, |
+ int fd, Dart_Port port, |
+ int mask) { |
struct epoll_event event; |
event.events = EPOLLET | EPOLLRDHUP; |
if ((mask & (1 << kInEvent)) != 0) event.events |= EPOLLIN; |
if ((mask & (1 << kOutEvent)) != 0) event.events |= EPOLLOUT; |
- event.data.ptr = sd; |
+ // Be sure we don't collide with the TIMER_BIT. |
+ if (port == ILLEGAL_PORT) { |
+ FATAL("Illigal port sent to event handler"); |
+ } |
+ event.data.u64 = port; |
int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
EPOLL_CTL_ADD, |
- sd->fd(), |
+ fd, |
&event)); |
if (status == -1) { |
// Epoll does not accept the file descriptor. It could be due to |
// already closed file descriptor, or unuspported devices, such |
// as /dev/null. In such case, mark the file descriptor as closed, |
// so dart will handle it accordingly. |
- DartUtils::PostInt32(sd->port(), 1 << kCloseEvent); |
- } else { |
- sd->set_tracked_by_epoll(true); |
+ DartUtils::PostInt32(port, 1 << kCloseEvent); |
} |
} |
-EventHandlerImplementation::EventHandlerImplementation() |
- : socket_map_(&HashMap::SamePointerValue, 16) { |
- intptr_t result; |
- result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); |
- if (result != 0) { |
- FATAL("Pipe creation failed"); |
- } |
- FDUtils::SetNonBlocking(interrupt_fds_[0]); |
- FDUtils::SetCloseOnExec(interrupt_fds_[0]); |
- FDUtils::SetCloseOnExec(interrupt_fds_[1]); |
- shutdown_ = false; |
+EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) { |
// The initial size passed to epoll_create is ignore on newer (>= |
// 2.6.8) Linux versions |
static const int kEpollInitialSize = 64; |
@@ -90,28 +67,18 @@ EventHandlerImplementation::EventHandlerImplementation() |
FATAL("Failed creating epoll file descriptor"); |
} |
FDUtils::SetCloseOnExec(epoll_fd_); |
- // Register the interrupt_fd with the epoll instance. |
- struct epoll_event event; |
- event.events = EPOLLIN; |
- event.data.ptr = NULL; |
- int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
- EPOLL_CTL_ADD, |
- interrupt_fds_[0], |
- &event)); |
- if (status == -1) { |
- FATAL("Failed adding interrupt fd to epoll instance"); |
- } |
timer_fd_ = TEMP_FAILURE_RETRY(timerfd_create(CLOCK_REALTIME, TFD_CLOEXEC)); |
if (epoll_fd_ == -1) { |
FATAL("Failed creating timerfd file descriptor"); |
} |
// Register the timer_fd_ with the epoll instance. |
+ struct epoll_event event; |
event.events = EPOLLIN; |
- event.data.fd = timer_fd_; |
- status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
- EPOLL_CTL_ADD, |
- timer_fd_, |
- &event)); |
+ event.data.u64 = ILLEGAL_PORT; // Use ILLEGAL_PORT to identify timer-fd. |
+ int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
+ EPOLL_CTL_ADD, |
+ timer_fd_, |
+ &event)); |
if (status == -1) { |
FATAL2( |
"Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_, errno); |
@@ -122,103 +89,12 @@ EventHandlerImplementation::EventHandlerImplementation() |
EventHandlerImplementation::~EventHandlerImplementation() { |
TEMP_FAILURE_RETRY(close(epoll_fd_)); |
TEMP_FAILURE_RETRY(close(timer_fd_)); |
- TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); |
- TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); |
-} |
- |
- |
-SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd, |
- bool* is_new) { |
- ASSERT(fd >= 0); |
- HashMap::Entry* entry = socket_map_.Lookup( |
- GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); |
- ASSERT(entry != NULL); |
- SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
- if (sd == NULL) { |
- // If there is no data in the hash map for this file descriptor a |
- // new SocketData for the file descriptor is inserted. |
- sd = new SocketData(fd); |
- entry->value = sd; |
- *is_new = true; |
- } |
- ASSERT(fd == sd->fd()); |
- return sd; |
-} |
- |
- |
-void EventHandlerImplementation::WakeupHandler(intptr_t id, |
- Dart_Port dart_port, |
- int64_t data) { |
- InterruptMessage msg; |
- msg.id = id; |
- msg.dart_port = dart_port; |
- msg.data = data; |
- // WriteToBlocking will write up to 512 bytes atomically, and since our msg |
- // is smaller than 512, we don't need a thread lock. |
- // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. |
- ASSERT(kInterruptMessageSize < PIPE_BUF); |
- intptr_t result = |
- FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
- if (result != kInterruptMessageSize) { |
- if (result == -1) { |
- perror("Interrupt message failure:"); |
- } |
- FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result); |
- } |
-} |
- |
- |
-void EventHandlerImplementation::HandleInterruptFd() { |
- const intptr_t MAX_MESSAGES = kInterruptMessageSize; |
- InterruptMessage msg[MAX_MESSAGES]; |
- ssize_t bytes = TEMP_FAILURE_RETRY( |
- read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); |
- for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { |
- if (msg[i].id == kTimerId) { |
- timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); |
- struct itimerspec it; |
- memset(&it, 0, sizeof(it)); |
- if (timeout_queue_.HasTimeout()) { |
- int64_t millis = timeout_queue_.CurrentTimeout(); |
- it.it_value.tv_sec = millis / 1000; |
- it.it_value.tv_nsec = (millis % 1000) * 1000000; |
- } |
- timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL); |
- } else if (msg[i].id == kShutdownId) { |
- shutdown_ = true; |
- } else { |
- bool is_new = false; |
- SocketData* sd = GetSocketData(msg[i].id, &is_new); |
- if (is_new) { |
- sd->SetPort(msg[i].dart_port); |
- AddToEpollInstance(epoll_fd_, sd, msg[i].data); |
- } |
- if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) { |
- ASSERT(msg[i].data == (1 << kShutdownReadCommand)); |
- // Close the socket for reading. |
- sd->ShutdownRead(); |
- } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) { |
- ASSERT(msg[i].data == (1 << kShutdownWriteCommand)); |
- // Close the socket for writing. |
- sd->ShutdownWrite(); |
- } else if ((msg[i].data & (1 << kCloseCommand)) != 0) { |
- ASSERT(msg[i].data == (1 << kCloseCommand)); |
- // Close the socket and free system resources and move on to |
- // next message. |
- RemoveFromEpollInstance(epoll_fd_, sd); |
- intptr_t fd = sd->fd(); |
- sd->Close(); |
- socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
- delete sd; |
- DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent); |
- } |
- } |
- } |
} |
#ifdef DEBUG_POLL |
-static void PrintEventMask(intptr_t fd, intptr_t events) { |
- Log::Print("%d ", fd); |
+static void PrintEventMask(intptr_t events) { |
+ // TODO(ajohnsen): When DEBUG_POLL is enabled, we could add the fd to the |
+ // epoll-data as well. |
if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN "); |
if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI "); |
if ((events & EPOLLOUT) != 0) Log::Print("EPOLLOUT "); |
@@ -230,16 +106,14 @@ static void PrintEventMask(intptr_t fd, intptr_t events) { |
if ((events & ~all_events) != 0) { |
Log::Print("(and %08x) ", events & ~all_events); |
} |
- Log::Print("(available %d) ", FDUtils::AvailableBytes(fd)); |
Log::Print("\n"); |
} |
#endif |
-intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, |
- SocketData* sd) { |
+intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) { |
#ifdef DEBUG_POLL |
- PrintEventMask(sd->fd(), events); |
+ PrintEventMask(events); |
#endif |
if (events & EPOLLERR) { |
// Return only error if EPOLLIN is present. |
@@ -255,11 +129,9 @@ intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, |
void EventHandlerImplementation::HandleEvents(struct epoll_event* events, |
int size) { |
- bool interrupt_seen = false; |
for (int i = 0; i < size; i++) { |
- if (events[i].data.ptr == NULL) { |
- interrupt_seen = true; |
- } else if (events[i].data.fd == timer_fd_) { |
+ uint64_t data = events[i].data.u64; |
+ if (data == ILLEGAL_PORT) { |
int64_t val; |
VOID_TEMP_FAILURE_RETRY(read(timer_fd_, &val, sizeof(val))); |
if (timeout_queue_.HasTimeout()) { |
@@ -267,24 +139,19 @@ void EventHandlerImplementation::HandleEvents(struct epoll_event* events, |
timeout_queue_.RemoveCurrent(); |
} |
} else { |
- SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr); |
- int32_t event_mask = GetPollEvents(events[i].events, sd); |
+ int32_t event_mask = GetPollEvents(events[i].events); |
if (event_mask != 0) { |
- Dart_Port port = sd->port(); |
+ Dart_Port port = data; |
ASSERT(port != 0); |
DartUtils::PostInt32(port, event_mask); |
} |
} |
} |
- if (interrupt_seen) { |
- // Handle after socket events, so we avoid closing a socket before we handle |
- // the current events. |
- HandleInterruptFd(); |
- } |
} |
void EventHandlerImplementation::Poll(uword args) { |
+ // Main event-handler thread loop. |
static const intptr_t kMaxEvents = 16; |
struct epoll_event events[kMaxEvents]; |
EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
@@ -318,26 +185,51 @@ void EventHandlerImplementation::Start(EventHandler* handler) { |
void EventHandlerImplementation::Shutdown() { |
- SendData(kShutdownId, 0, 0); |
-} |
- |
- |
-void EventHandlerImplementation::SendData(intptr_t id, |
- Dart_Port dart_port, |
- int64_t data) { |
- WakeupHandler(id, dart_port, data); |
-} |
- |
- |
-void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { |
- // The hashmap does not support keys with value 0. |
- return reinterpret_cast<void*>(fd + 1); |
-} |
- |
- |
-uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { |
- // The hashmap does not support keys with value 0. |
- return dart::Utils::WordHash(fd + 1); |
+ shutdown_ = true; |
+} |
+ |
+ |
+void EventHandlerImplementation::Notify(intptr_t id, |
+ Dart_Port dart_port, |
+ int64_t data) { |
+ // This method is called by isolates, that is, not in the event-handler |
+ // thread. |
+ if (id == kTimerId) { |
+ // Lock this region, as multiple isolates may attempt to update |
+ // timeout_queue_. |
+ // TODO(ajohnsen): Consider using a timer-fd per isolate to avoid the lock. |
+ timer_mutex_.Lock(); |
+ timeout_queue_.UpdateTimeout(dart_port, data); |
+ struct itimerspec it; |
+ memset(&it, 0, sizeof(it)); |
+ if (timeout_queue_.HasTimeout()) { |
+ int64_t millis = timeout_queue_.CurrentTimeout(); |
+ it.it_value.tv_sec = millis / 1000; |
+ it.it_value.tv_nsec = (millis % 1000) * 1000000; |
+ } |
+ timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL); |
+ timer_mutex_.Unlock(); |
+ } else { |
+ if ((data & (1 << kShutdownReadCommand)) != 0) { |
+ ASSERT(data == (1 << kShutdownReadCommand)); |
+ // Close the socket for reading. |
+ shutdown(id, SHUT_RD); |
+ } else if ((data & (1 << kShutdownWriteCommand)) != 0) { |
+ ASSERT(data == (1 << kShutdownWriteCommand)); |
+ // Close the socket for writing. |
+ shutdown(id, SHUT_WR); |
+ } else if ((data & (1 << kCloseCommand)) != 0) { |
+ ASSERT(data == (1 << kCloseCommand)); |
+ // Close the socket and free system resources and move on to |
+ // next message. |
+ // This will also remove the file descriptor from epoll. |
+ Socket::Close(id); |
+ DartUtils::PostInt32(dart_port, 1 << kDestroyedEvent); |
+ } else { |
+ // Add to epoll - this is the first time we see it. |
+ AddToEpollInstance(epoll_fd_, id, dart_port, data); |
+ } |
+ } |
} |
} // namespace bin |