Index: runtime/bin/eventhandler_linux.cc |
diff --git a/runtime/bin/eventhandler_linux.cc b/runtime/bin/eventhandler_linux.cc |
index 612f3cb811ecaf108d372ad31cfb98bf9c3fb27d..f87c1a4785192c81740d7c08db2b1a48e2f0fef7 100644 |
--- a/runtime/bin/eventhandler_linux.cc |
+++ b/runtime/bin/eventhandler_linux.cc |
@@ -20,7 +20,6 @@ |
#include "bin/dartutils.h" |
#include "bin/fdutils.h" |
#include "bin/log.h" |
-#include "bin/socket.h" |
#include "platform/hashmap.h" |
#include "platform/thread.h" |
#include "platform/utils.h" |
@@ -29,36 +28,63 @@ |
namespace dart { |
namespace bin { |
+static const int kInterruptMessageSize = sizeof(InterruptMessage); |
static const int kTimerId = -1; |
+static const int kShutdownId = -2; |
-static void AddToEpollInstance(intptr_t epoll_fd_, |
- int fd, Dart_Port port, |
- int mask) { |
+intptr_t SocketData::GetPollEvents() { |
+ // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are |
+ // triggered anyway. |
+ intptr_t events = EPOLLET | EPOLLRDHUP; |
+ if ((mask_ & (1 << kInEvent)) != 0) { |
+ events |= EPOLLIN; |
+ } |
+ if ((mask_ & (1 << kOutEvent)) != 0) { |
+ events |= EPOLLOUT; |
+ } |
+ return events; |
+} |
+ |
+ |
+// Unregister the file descriptor for a SocketData structure with epoll. |
+static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) { |
+ VOID_TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
+ EPOLL_CTL_DEL, |
+ sd->fd(), |
+ NULL)); |
+} |
+ |
+ |
+static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd) { |
struct epoll_event event; |
- event.events = EPOLLET | EPOLLRDHUP; |
- if ((mask & (1 << kInEvent)) != 0) event.events |= EPOLLIN; |
- if ((mask & (1 << kOutEvent)) != 0) event.events |= EPOLLOUT; |
- // Be sure we don't collide with the TIMER_BIT. |
- if (port == ILLEGAL_PORT) { |
- FATAL("Illigal port sent to event handler"); |
- } |
- event.data.u64 = port; |
+ event.events = sd->GetPollEvents(); |
+ event.data.ptr = sd; |
int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
EPOLL_CTL_ADD, |
- fd, |
+ sd->fd(), |
&event)); |
if (status == -1) { |
// Epoll does not accept the file descriptor. It could be due to |
// already closed file descriptor, or unuspported devices, such |
// as /dev/null. In such case, mark the file descriptor as closed, |
// so dart will handle it accordingly. |
- DartUtils::PostInt32(port, 1 << kCloseEvent); |
+ DartUtils::PostInt32(sd->port(), 1 << kCloseEvent); |
} |
} |
-EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) { |
+EventHandlerImplementation::EventHandlerImplementation() |
+ : socket_map_(&HashMap::SamePointerValue, 16) { |
+ intptr_t result; |
+ result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); |
+ if (result != 0) { |
+ FATAL("Pipe creation failed"); |
+ } |
+ FDUtils::SetNonBlocking(interrupt_fds_[0]); |
+ FDUtils::SetCloseOnExec(interrupt_fds_[0]); |
+ FDUtils::SetCloseOnExec(interrupt_fds_[1]); |
+ shutdown_ = false; |
// The initial size passed to epoll_create is ignore on newer (>= |
// 2.6.8) Linux versions |
static const int kEpollInitialSize = 64; |
@@ -67,18 +93,28 @@ EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) { |
FATAL1("Failed creating epoll file descriptor: %i", errno); |
} |
FDUtils::SetCloseOnExec(epoll_fd_); |
+ // Register the interrupt_fd with the epoll instance. |
+ struct epoll_event event; |
+ event.events = EPOLLIN; |
+ event.data.ptr = NULL; |
+ int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
+ EPOLL_CTL_ADD, |
+ interrupt_fds_[0], |
+ &event)); |
+ if (status == -1) { |
+ FATAL("Failed adding interrupt fd to epoll instance"); |
+ } |
timer_fd_ = TEMP_FAILURE_RETRY(timerfd_create(CLOCK_REALTIME, TFD_CLOEXEC)); |
if (timer_fd_ == -1) { |
FATAL1("Failed creating timerfd file descriptor: %i", errno); |
} |
// Register the timer_fd_ with the epoll instance. |
- struct epoll_event event; |
event.events = EPOLLIN; |
- event.data.u64 = ILLEGAL_PORT; // Use ILLEGAL_PORT to identify timer-fd. |
- int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
- EPOLL_CTL_ADD, |
- timer_fd_, |
- &event)); |
+ event.data.fd = timer_fd_; |
+ status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
+ EPOLL_CTL_ADD, |
+ timer_fd_, |
+ &event)); |
if (status == -1) { |
FATAL2( |
"Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_, errno); |
@@ -87,14 +123,106 @@ EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) { |
EventHandlerImplementation::~EventHandlerImplementation() { |
- TEMP_FAILURE_RETRY(close(epoll_fd_)); |
- TEMP_FAILURE_RETRY(close(timer_fd_)); |
+ VOID_TEMP_FAILURE_RETRY(close(epoll_fd_)); |
+ VOID_TEMP_FAILURE_RETRY(close(timer_fd_)); |
+ VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); |
+ VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); |
+} |
+ |
+ |
+SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
+ ASSERT(fd >= 0); |
+ HashMap::Entry* entry = socket_map_.Lookup( |
+ GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); |
+ ASSERT(entry != NULL); |
+ SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
+ if (sd == NULL) { |
+ // If there is no data in the hash map for this file descriptor a |
+ // new SocketData for the file descriptor is inserted. |
+ sd = new SocketData(fd); |
+ entry->value = sd; |
+ } |
+ ASSERT(fd == sd->fd()); |
+ return sd; |
+} |
+ |
+ |
+void EventHandlerImplementation::WakeupHandler(intptr_t id, |
+ Dart_Port dart_port, |
+ int64_t data) { |
+ InterruptMessage msg; |
+ msg.id = id; |
+ msg.dart_port = dart_port; |
+ msg.data = data; |
+ // WriteToBlocking will write up to 512 bytes atomically, and since our msg |
+ // is smaller than 512, we don't need a thread lock. |
+ // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. |
+ ASSERT(kInterruptMessageSize < PIPE_BUF); |
+ intptr_t result = |
+ FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
+ if (result != kInterruptMessageSize) { |
+ if (result == -1) { |
+ perror("Interrupt message failure:"); |
+ } |
+ FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result); |
+ } |
+} |
+ |
+ |
+void EventHandlerImplementation::HandleInterruptFd() { |
+ const intptr_t MAX_MESSAGES = kInterruptMessageSize; |
+ InterruptMessage msg[MAX_MESSAGES]; |
+ ssize_t bytes = TEMP_FAILURE_RETRY( |
+ read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); |
+ for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { |
+ if (msg[i].id == kTimerId) { |
+ timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); |
+ struct itimerspec it; |
+ memset(&it, 0, sizeof(it)); |
+ if (timeout_queue_.HasTimeout()) { |
+ int64_t millis = timeout_queue_.CurrentTimeout(); |
+ it.it_value.tv_sec = millis / 1000; |
+ it.it_value.tv_nsec = (millis % 1000) * 1000000; |
+ } |
+ timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL); |
+ } else if (msg[i].id == kShutdownId) { |
+ shutdown_ = true; |
+ } else { |
+ SocketData* sd = GetSocketData(msg[i].id); |
+ if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) { |
+ ASSERT(msg[i].data == (1 << kShutdownReadCommand)); |
+ // Close the socket for reading. |
+ shutdown(sd->fd(), SHUT_RD); |
+ } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) { |
+ ASSERT(msg[i].data == (1 << kShutdownWriteCommand)); |
+ // Close the socket for writing. |
+ shutdown(sd->fd(), SHUT_WR); |
+ } else if ((msg[i].data & (1 << kCloseCommand)) != 0) { |
+ ASSERT(msg[i].data == (1 << kCloseCommand)); |
+ // Close the socket and free system resources and move on to |
+ // next message. |
+ RemoveFromEpollInstance(epoll_fd_, sd); |
+ intptr_t fd = sd->fd(); |
+ sd->Close(); |
+ socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
+ delete sd; |
+ DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent); |
+ } else if ((msg[i].data & (1 << kReturnTokenCommand)) != 0) { |
+ if (sd->ReturnToken()) { |
+ AddToEpollInstance(epoll_fd_, sd); |
+ } |
+ } else { |
+ // Setup events to wait for. |
+ sd->SetPortAndMask(msg[i].dart_port, msg[i].data); |
+ AddToEpollInstance(epoll_fd_, sd); |
+ } |
+ } |
+ } |
} |
#ifdef DEBUG_POLL |
-static void PrintEventMask(intptr_t events) { |
- // TODO(ajohnsen): When DEBUG_POLL is enabled, we could add the fd to the |
- // epoll-data as well. |
+static void PrintEventMask(intptr_t fd, intptr_t events) { |
+ Log::Print("%d ", fd); |
if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN "); |
if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI "); |
if ((events & EPOLLOUT) != 0) Log::Print("EPOLLOUT "); |
@@ -106,17 +234,19 @@ static void PrintEventMask(intptr_t events) { |
if ((events & ~all_events) != 0) { |
Log::Print("(and %08x) ", events & ~all_events); |
} |
+ Log::Print("(available %d) ", FDUtils::AvailableBytes(fd)); |
Log::Print("\n"); |
} |
#endif |
-intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) { |
+intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, |
+ SocketData* sd) { |
#ifdef DEBUG_POLL |
- PrintEventMask(events); |
+ PrintEventMask(sd->fd(), events); |
#endif |
if (events & EPOLLERR) { |
- // Return only error if EPOLLIN is present. |
+ // Return error only if EPOLLIN is present. |
return (events & EPOLLIN) ? (1 << kErrorEvent) : 0; |
} |
intptr_t event_mask = 0; |
@@ -129,32 +259,40 @@ intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) { |
void EventHandlerImplementation::HandleEvents(struct epoll_event* events, |
int size) { |
+ bool interrupt_seen = false; |
for (int i = 0; i < size; i++) { |
- uint64_t data = events[i].data.u64; |
- // ILLEGAL_PORT is used to identify timer-fd. |
- if (data == ILLEGAL_PORT) { |
+ if (events[i].data.ptr == NULL) { |
+ interrupt_seen = true; |
+ } else if (events[i].data.fd == timer_fd_) { |
int64_t val; |
VOID_TEMP_FAILURE_RETRY(read(timer_fd_, &val, sizeof(val))); |
- timer_mutex_.Lock(); |
if (timeout_queue_.HasTimeout()) { |
DartUtils::PostNull(timeout_queue_.CurrentPort()); |
timeout_queue_.RemoveCurrent(); |
} |
- timer_mutex_.Unlock(); |
} else { |
- int32_t event_mask = GetPollEvents(events[i].events); |
+ SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr); |
+ intptr_t event_mask = GetPollEvents(events[i].events, sd); |
if (event_mask != 0) { |
- Dart_Port port = data; |
+ if (sd->TakeToken()) { |
+ // Took last token, remove from epoll. |
+ RemoveFromEpollInstance(epoll_fd_, sd); |
+ } |
+ Dart_Port port = sd->port(); |
ASSERT(port != 0); |
DartUtils::PostInt32(port, event_mask); |
} |
} |
} |
+ if (interrupt_seen) { |
+ // Handle after socket events, so we avoid closing a socket before we handle |
+ // the current events. |
+ HandleInterruptFd(); |
+ } |
} |
void EventHandlerImplementation::Poll(uword args) { |
- // Main event-handler thread loop. |
static const intptr_t kMaxEvents = 16; |
struct epoll_event events[kMaxEvents]; |
EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
@@ -188,51 +326,26 @@ void EventHandlerImplementation::Start(EventHandler* handler) { |
void EventHandlerImplementation::Shutdown() { |
- shutdown_ = true; |
-} |
- |
- |
-void EventHandlerImplementation::Notify(intptr_t id, |
- Dart_Port dart_port, |
- int64_t data) { |
- // This method is called by isolates, that is, not in the event-handler |
- // thread. |
- if (id == kTimerId) { |
- // Lock this region, as multiple isolates may attempt to update |
- // timeout_queue_. |
- // TODO(ajohnsen): Consider using a timer-fd per isolate to avoid the lock. |
- timer_mutex_.Lock(); |
- timeout_queue_.UpdateTimeout(dart_port, data); |
- struct itimerspec it; |
- memset(&it, 0, sizeof(it)); |
- if (timeout_queue_.HasTimeout()) { |
- int64_t millis = timeout_queue_.CurrentTimeout(); |
- it.it_value.tv_sec = millis / 1000; |
- it.it_value.tv_nsec = (millis % 1000) * 1000000; |
- } |
- timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL); |
- timer_mutex_.Unlock(); |
- } else { |
- if ((data & (1 << kShutdownReadCommand)) != 0) { |
- ASSERT(data == (1 << kShutdownReadCommand)); |
- // Close the socket for reading. |
- shutdown(id, SHUT_RD); |
- } else if ((data & (1 << kShutdownWriteCommand)) != 0) { |
- ASSERT(data == (1 << kShutdownWriteCommand)); |
- // Close the socket for writing. |
- shutdown(id, SHUT_WR); |
- } else if ((data & (1 << kCloseCommand)) != 0) { |
- ASSERT(data == (1 << kCloseCommand)); |
- // Close the socket and free system resources and move on to |
- // next message. |
- // This will also remove the file descriptor from epoll. |
- Socket::Close(id); |
- DartUtils::PostInt32(dart_port, 1 << kDestroyedEvent); |
- } else { |
- // Add to epoll - this is the first time we see it. |
- AddToEpollInstance(epoll_fd_, id, dart_port, data); |
- } |
- } |
+ SendData(kShutdownId, 0, 0); |
+} |
+ |
+ |
+void EventHandlerImplementation::SendData(intptr_t id, |
+ Dart_Port dart_port, |
+ int64_t data) { |
+ WakeupHandler(id, dart_port, data); |
+} |
+ |
+ |
+void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { |
+ // The hashmap does not support keys with value 0. |
+ return reinterpret_cast<void*>(fd + 1); |
+} |
+ |
+ |
+uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { |
+ // The hashmap does not support keys with value 0. |
+ return dart::Utils::WordHash(fd + 1); |
} |
} // namespace bin |