Chromium Code Reviews| Index: runtime/bin/eventhandler_linux.cc |
| diff --git a/runtime/bin/eventhandler_linux.cc b/runtime/bin/eventhandler_linux.cc |
| index 6e75903ffca25e9bc45525cbd8b94dbea9a65c06..7b676234224424937f67b97ac5082b5cf4890038 100644 |
| --- a/runtime/bin/eventhandler_linux.cc |
| +++ b/runtime/bin/eventhandler_linux.cc |
| @@ -20,7 +20,7 @@ |
| #include "bin/dartutils.h" |
| #include "bin/fdutils.h" |
| #include "bin/log.h" |
| -#include "bin/utils.h" |
| +#include "bin/socket.h" |
| #include "platform/hashmap.h" |
| #include "platform/thread.h" |
| #include "platform/utils.h" |
| @@ -29,59 +29,39 @@ |
| namespace dart { |
| namespace bin { |
| -static const int kInterruptMessageSize = sizeof(InterruptMessage); |
| static const int kTimerId = -1; |
| -static const int kShutdownId = -2; |
| +// MSB is used to mark the timer fd. |
| +static const uint64_t TIMER_BIT = 0x8000000000000000; |
| -// Unregister the file descriptor for a SocketData structure with epoll. |
| -static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) { |
| - if (!sd->tracked_by_epoll()) return; |
| - int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
| - EPOLL_CTL_DEL, |
| - sd->fd(), |
| - NULL)); |
| - if (status == -1) { |
| - FATAL("Failed unregistering events for file descriptor"); |
| - } |
| - sd->set_tracked_by_epoll(false); |
| -} |
| - |
| -static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd, int mask) { |
| - ASSERT(!sd->tracked_by_epoll()); |
| +static void AddToEpollInstance(intptr_t epoll_fd_, int fd, Dart_Port port, |
|
Søren Gjesse
2014/02/20 08:15:54
Move all arguments to the secone line for better r
Anders Johnsen
2014/02/20 08:56:21
Done.
|
| + int mask) { |
| struct epoll_event event; |
| event.events = EPOLLET | EPOLLRDHUP; |
| if ((mask & (1 << kInEvent)) != 0) event.events |= EPOLLIN; |
| if ((mask & (1 << kOutEvent)) != 0) event.events |= EPOLLOUT; |
| - event.data.ptr = sd; |
| + // Be sure we don't collide with the TIMER_BIT. |
| + if (port & TIMER_BIT) { |
| + FATAL("Port not in expected range"); |
| + } |
| + event.data.u64 = port; |
| int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
| EPOLL_CTL_ADD, |
| - sd->fd(), |
| + fd, |
| &event)); |
| if (status == -1) { |
| // Epoll does not accept the file descriptor. It could be due to |
| // already closed file descriptor, or unuspported devices, such |
| // as /dev/null. In such case, mark the file descriptor as closed, |
| // so dart will handle it accordingly. |
| - DartUtils::PostInt32(sd->port(), 1 << kCloseEvent); |
| - } else { |
| - sd->set_tracked_by_epoll(true); |
| + DartUtils::PostInt32(port, 1 << kCloseEvent); |
| } |
| } |
| EventHandlerImplementation::EventHandlerImplementation() |
| - : socket_map_(&HashMap::SamePointerValue, 16) { |
| - intptr_t result; |
| - result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); |
| - if (result != 0) { |
| - FATAL("Pipe creation failed"); |
| - } |
| - FDUtils::SetNonBlocking(interrupt_fds_[0]); |
| - FDUtils::SetCloseOnExec(interrupt_fds_[0]); |
| - FDUtils::SetCloseOnExec(interrupt_fds_[1]); |
| - shutdown_ = false; |
| + : shutdown_(false) { |
|
Søren Gjesse
2014/02/20 08:15:54
Does this fit on the previous line?
Anders Johnsen
2014/02/20 08:56:21
Done.
|
| // The initial size passed to epoll_create is ignore on newer (>= |
| // 2.6.8) Linux versions |
| static const int kEpollInitialSize = 64; |
| @@ -90,28 +70,18 @@ EventHandlerImplementation::EventHandlerImplementation() |
| FATAL("Failed creating epoll file descriptor"); |
| } |
| FDUtils::SetCloseOnExec(epoll_fd_); |
| - // Register the interrupt_fd with the epoll instance. |
| - struct epoll_event event; |
| - event.events = EPOLLIN; |
| - event.data.ptr = NULL; |
| - int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
| - EPOLL_CTL_ADD, |
| - interrupt_fds_[0], |
| - &event)); |
| - if (status == -1) { |
| - FATAL("Failed adding interrupt fd to epoll instance"); |
| - } |
| timer_fd_ = TEMP_FAILURE_RETRY(timerfd_create(CLOCK_REALTIME, TFD_CLOEXEC)); |
| if (epoll_fd_ == -1) { |
| FATAL("Failed creating timerfd file descriptor"); |
| } |
| // Register the timer_fd_ with the epoll instance. |
| + struct epoll_event event; |
| event.events = EPOLLIN; |
| - event.data.fd = timer_fd_; |
| - status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
| - EPOLL_CTL_ADD, |
| - timer_fd_, |
| - &event)); |
| + event.data.u64 = timer_fd_ | TIMER_BIT; |
|
Søren Gjesse
2014/02/20 08:15:54
Can't we just use ILLEGAL_PORT (value 0) here, and
Anders Johnsen
2014/02/20 08:56:21
Done.
|
| + int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, |
| + EPOLL_CTL_ADD, |
| + timer_fd_, |
| + &event)); |
| if (status == -1) { |
| FATAL2( |
| "Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_, errno); |
| @@ -122,103 +92,12 @@ EventHandlerImplementation::EventHandlerImplementation() |
| EventHandlerImplementation::~EventHandlerImplementation() { |
| TEMP_FAILURE_RETRY(close(epoll_fd_)); |
| TEMP_FAILURE_RETRY(close(timer_fd_)); |
| - TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); |
| - TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); |
| -} |
| - |
| - |
| -SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd, |
| - bool* is_new) { |
| - ASSERT(fd >= 0); |
| - HashMap::Entry* entry = socket_map_.Lookup( |
| - GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); |
| - ASSERT(entry != NULL); |
| - SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
| - if (sd == NULL) { |
| - // If there is no data in the hash map for this file descriptor a |
| - // new SocketData for the file descriptor is inserted. |
| - sd = new SocketData(fd); |
| - entry->value = sd; |
| - *is_new = true; |
| - } |
| - ASSERT(fd == sd->fd()); |
| - return sd; |
| -} |
| - |
| - |
| -void EventHandlerImplementation::WakeupHandler(intptr_t id, |
| - Dart_Port dart_port, |
| - int64_t data) { |
| - InterruptMessage msg; |
| - msg.id = id; |
| - msg.dart_port = dart_port; |
| - msg.data = data; |
| - // WriteToBlocking will write up to 512 bytes atomically, and since our msg |
| - // is smaller than 512, we don't need a thread lock. |
| - // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. |
| - ASSERT(kInterruptMessageSize < PIPE_BUF); |
| - intptr_t result = |
| - FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
| - if (result != kInterruptMessageSize) { |
| - if (result == -1) { |
| - perror("Interrupt message failure:"); |
| - } |
| - FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result); |
| - } |
| -} |
| - |
| - |
| -void EventHandlerImplementation::HandleInterruptFd() { |
| - const intptr_t MAX_MESSAGES = kInterruptMessageSize; |
| - InterruptMessage msg[MAX_MESSAGES]; |
| - ssize_t bytes = TEMP_FAILURE_RETRY( |
| - read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); |
| - for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { |
| - if (msg[i].id == kTimerId) { |
| - timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); |
| - struct itimerspec it; |
| - memset(&it, 0, sizeof(it)); |
| - if (timeout_queue_.HasTimeout()) { |
| - int64_t millis = timeout_queue_.CurrentTimeout(); |
| - it.it_value.tv_sec = millis / 1000; |
| - it.it_value.tv_nsec = (millis % 1000) * 1000000; |
| - } |
| - timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL); |
| - } else if (msg[i].id == kShutdownId) { |
| - shutdown_ = true; |
| - } else { |
| - bool is_new = false; |
| - SocketData* sd = GetSocketData(msg[i].id, &is_new); |
| - if (is_new) { |
| - sd->SetPort(msg[i].dart_port); |
| - AddToEpollInstance(epoll_fd_, sd, msg[i].data); |
| - } |
| - if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) { |
| - ASSERT(msg[i].data == (1 << kShutdownReadCommand)); |
| - // Close the socket for reading. |
| - sd->ShutdownRead(); |
| - } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) { |
| - ASSERT(msg[i].data == (1 << kShutdownWriteCommand)); |
| - // Close the socket for writing. |
| - sd->ShutdownWrite(); |
| - } else if ((msg[i].data & (1 << kCloseCommand)) != 0) { |
| - ASSERT(msg[i].data == (1 << kCloseCommand)); |
| - // Close the socket and free system resources and move on to |
| - // next message. |
| - RemoveFromEpollInstance(epoll_fd_, sd); |
| - intptr_t fd = sd->fd(); |
| - sd->Close(); |
| - socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
| - delete sd; |
| - DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent); |
| - } |
| - } |
| - } |
| } |
| #ifdef DEBUG_POLL |
| -static void PrintEventMask(intptr_t fd, intptr_t events) { |
| - Log::Print("%d ", fd); |
| +static void PrintEventMask(intptr_t events) { |
| + // TODO(ajohnsen): When DEBUG_POLL is enabled, we could add the fd to the |
| + // epoll-data as well. |
| if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN "); |
| if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI "); |
| if ((events & EPOLLOUT) != 0) Log::Print("EPOLLOUT "); |
| @@ -230,16 +109,14 @@ static void PrintEventMask(intptr_t fd, intptr_t events) { |
| if ((events & ~all_events) != 0) { |
| Log::Print("(and %08x) ", events & ~all_events); |
| } |
| - Log::Print("(available %d) ", FDUtils::AvailableBytes(fd)); |
| Log::Print("\n"); |
| } |
| #endif |
| -intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, |
| - SocketData* sd) { |
| +intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) { |
| #ifdef DEBUG_POLL |
| - PrintEventMask(sd->fd(), events); |
| + PrintEventMask(events); |
| #endif |
| if (events & EPOLLERR) { |
| // Return only error if EPOLLIN is present. |
| @@ -255,11 +132,9 @@ intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, |
| void EventHandlerImplementation::HandleEvents(struct epoll_event* events, |
| int size) { |
| - bool interrupt_seen = false; |
| for (int i = 0; i < size; i++) { |
| - if (events[i].data.ptr == NULL) { |
| - interrupt_seen = true; |
| - } else if (events[i].data.fd == timer_fd_) { |
| + uint64_t data = events[i].data.u64; |
| + if (data & TIMER_BIT) { |
| int64_t val; |
| VOID_TEMP_FAILURE_RETRY(read(timer_fd_, &val, sizeof(val))); |
| if (timeout_queue_.HasTimeout()) { |
| @@ -267,20 +142,14 @@ void EventHandlerImplementation::HandleEvents(struct epoll_event* events, |
| timeout_queue_.RemoveCurrent(); |
| } |
| } else { |
| - SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr); |
| - int32_t event_mask = GetPollEvents(events[i].events, sd); |
| + int32_t event_mask = GetPollEvents(events[i].events); |
| if (event_mask != 0) { |
| - Dart_Port port = sd->port(); |
| + Dart_Port port = data; |
| ASSERT(port != 0); |
| DartUtils::PostInt32(port, event_mask); |
| } |
| } |
| } |
| - if (interrupt_seen) { |
| - // Handle after socket events, so we avoid closing a socket before we handle |
| - // the current events. |
| - HandleInterruptFd(); |
| - } |
| } |
| @@ -318,26 +187,49 @@ void EventHandlerImplementation::Start(EventHandler* handler) { |
| void EventHandlerImplementation::Shutdown() { |
| - SendData(kShutdownId, 0, 0); |
| + shutdown_ = true; |
| } |
| void EventHandlerImplementation::SendData(intptr_t id, |
|
Søren Gjesse
2014/02/20 08:15:54
This method is no longer sending data, but just ma
Anders Johnsen
2014/02/20 08:56:21
Done.
|
| Dart_Port dart_port, |
| int64_t data) { |
| - WakeupHandler(id, dart_port, data); |
| -} |
| - |
| - |
| -void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { |
| - // The hashmap does not support keys with value 0. |
| - return reinterpret_cast<void*>(fd + 1); |
| -} |
| - |
| - |
| -uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { |
| - // The hashmap does not support keys with value 0. |
| - return dart::Utils::WordHash(fd + 1); |
| + if (id == kTimerId) { |
| + // Lock this region, as multiple isolates may attempt to update |
| + // timeout_queue_. |
| + // TODO(ajohnsen): Consider using a timer-fd per isolate to avoid the lock. |
| + timer_mutex_.Lock(); |
| + timeout_queue_.UpdateTimeout(dart_port, data); |
| + struct itimerspec it; |
| + memset(&it, 0, sizeof(it)); |
| + if (timeout_queue_.HasTimeout()) { |
| + int64_t millis = timeout_queue_.CurrentTimeout(); |
| + it.it_value.tv_sec = millis / 1000; |
| + it.it_value.tv_nsec = (millis % 1000) * 1000000; |
| + } |
| + timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL); |
| + timer_mutex_.Unlock(); |
| + } else { |
| + if ((data & (1 << kShutdownReadCommand)) != 0) { |
| + ASSERT(data == (1 << kShutdownReadCommand)); |
| + // Close the socket for reading. |
| + shutdown(id, SHUT_RD); |
| + } else if ((data & (1 << kShutdownWriteCommand)) != 0) { |
| + ASSERT(data == (1 << kShutdownWriteCommand)); |
| + // Close the socket for writing. |
| + shutdown(id, SHUT_WR); |
| + } else if ((data & (1 << kCloseCommand)) != 0) { |
| + ASSERT(data == (1 << kCloseCommand)); |
| + // Close the socket and free system resources and move on to |
| + // next message. |
| + // This will also remove the file descriptor from epoll. |
| + Socket::Close(id); |
| + DartUtils::PostInt32(dart_port, 1 << kDestroyedEvent); |
| + } else { |
| + // Add to epoll - this is the first time we see it. |
| + AddToEpollInstance(epoll_fd_, id, dart_port, data); |
| + } |
| + } |
| } |
| } // namespace bin |