| Index: runtime/bin/eventhandler_linux.cc
|
| diff --git a/runtime/bin/eventhandler_linux.cc b/runtime/bin/eventhandler_linux.cc
|
| index 6e75903ffca25e9bc45525cbd8b94dbea9a65c06..73f2947db7d90f02d2c8f8905a1dd2a86caee36a 100644
|
| --- a/runtime/bin/eventhandler_linux.cc
|
| +++ b/runtime/bin/eventhandler_linux.cc
|
| @@ -20,7 +20,7 @@
|
| #include "bin/dartutils.h"
|
| #include "bin/fdutils.h"
|
| #include "bin/log.h"
|
| -#include "bin/utils.h"
|
| +#include "bin/socket.h"
|
| #include "platform/hashmap.h"
|
| #include "platform/thread.h"
|
| #include "platform/utils.h"
|
| @@ -29,59 +29,36 @@
|
| namespace dart {
|
| namespace bin {
|
|
|
| -static const int kInterruptMessageSize = sizeof(InterruptMessage);
|
| static const int kTimerId = -1;
|
| -static const int kShutdownId = -2;
|
|
|
|
|
| -// Unregister the file descriptor for a SocketData structure with epoll.
|
| -static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
|
| - if (!sd->tracked_by_epoll()) return;
|
| - int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
|
| - EPOLL_CTL_DEL,
|
| - sd->fd(),
|
| - NULL));
|
| - if (status == -1) {
|
| - FATAL("Failed unregistering events for file descriptor");
|
| - }
|
| - sd->set_tracked_by_epoll(false);
|
| -}
|
| -
|
| -
|
| -static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd, int mask) {
|
| - ASSERT(!sd->tracked_by_epoll());
|
| +static void AddToEpollInstance(intptr_t epoll_fd_,
|
| + int fd, Dart_Port port,
|
| + int mask) {
|
| struct epoll_event event;
|
| event.events = EPOLLET | EPOLLRDHUP;
|
| if ((mask & (1 << kInEvent)) != 0) event.events |= EPOLLIN;
|
| if ((mask & (1 << kOutEvent)) != 0) event.events |= EPOLLOUT;
|
| - event.data.ptr = sd;
|
| + // Be sure we don't collide with the TIMER_BIT.
|
| + if (port == ILLEGAL_PORT) {
|
| + FATAL("Illigal port sent to event handler");
|
| + }
|
| + event.data.u64 = port;
|
| int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
|
| EPOLL_CTL_ADD,
|
| - sd->fd(),
|
| + fd,
|
| &event));
|
| if (status == -1) {
|
| // Epoll does not accept the file descriptor. It could be due to
|
| // already closed file descriptor, or unuspported devices, such
|
| // as /dev/null. In such case, mark the file descriptor as closed,
|
| // so dart will handle it accordingly.
|
| - DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
|
| - } else {
|
| - sd->set_tracked_by_epoll(true);
|
| + DartUtils::PostInt32(port, 1 << kCloseEvent);
|
| }
|
| }
|
|
|
|
|
| -EventHandlerImplementation::EventHandlerImplementation()
|
| - : socket_map_(&HashMap::SamePointerValue, 16) {
|
| - intptr_t result;
|
| - result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_));
|
| - if (result != 0) {
|
| - FATAL("Pipe creation failed");
|
| - }
|
| - FDUtils::SetNonBlocking(interrupt_fds_[0]);
|
| - FDUtils::SetCloseOnExec(interrupt_fds_[0]);
|
| - FDUtils::SetCloseOnExec(interrupt_fds_[1]);
|
| - shutdown_ = false;
|
| +EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) {
|
| // The initial size passed to epoll_create is ignore on newer (>=
|
| // 2.6.8) Linux versions
|
| static const int kEpollInitialSize = 64;
|
| @@ -90,28 +67,18 @@ EventHandlerImplementation::EventHandlerImplementation()
|
| FATAL("Failed creating epoll file descriptor");
|
| }
|
| FDUtils::SetCloseOnExec(epoll_fd_);
|
| - // Register the interrupt_fd with the epoll instance.
|
| - struct epoll_event event;
|
| - event.events = EPOLLIN;
|
| - event.data.ptr = NULL;
|
| - int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
|
| - EPOLL_CTL_ADD,
|
| - interrupt_fds_[0],
|
| - &event));
|
| - if (status == -1) {
|
| - FATAL("Failed adding interrupt fd to epoll instance");
|
| - }
|
| timer_fd_ = TEMP_FAILURE_RETRY(timerfd_create(CLOCK_REALTIME, TFD_CLOEXEC));
|
| if (epoll_fd_ == -1) {
|
| FATAL("Failed creating timerfd file descriptor");
|
| }
|
| // Register the timer_fd_ with the epoll instance.
|
| + struct epoll_event event;
|
| event.events = EPOLLIN;
|
| - event.data.fd = timer_fd_;
|
| - status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
|
| - EPOLL_CTL_ADD,
|
| - timer_fd_,
|
| - &event));
|
| + event.data.u64 = ILLEGAL_PORT; // Use ILLEGAL_PORT to identify timer-fd.
|
| + int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
|
| + EPOLL_CTL_ADD,
|
| + timer_fd_,
|
| + &event));
|
| if (status == -1) {
|
| FATAL2(
|
| "Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_, errno);
|
| @@ -122,103 +89,12 @@ EventHandlerImplementation::EventHandlerImplementation()
|
| EventHandlerImplementation::~EventHandlerImplementation() {
|
| TEMP_FAILURE_RETRY(close(epoll_fd_));
|
| TEMP_FAILURE_RETRY(close(timer_fd_));
|
| - TEMP_FAILURE_RETRY(close(interrupt_fds_[0]));
|
| - TEMP_FAILURE_RETRY(close(interrupt_fds_[1]));
|
| -}
|
| -
|
| -
|
| -SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd,
|
| - bool* is_new) {
|
| - ASSERT(fd >= 0);
|
| - HashMap::Entry* entry = socket_map_.Lookup(
|
| - GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
|
| - ASSERT(entry != NULL);
|
| - SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
|
| - if (sd == NULL) {
|
| - // If there is no data in the hash map for this file descriptor a
|
| - // new SocketData for the file descriptor is inserted.
|
| - sd = new SocketData(fd);
|
| - entry->value = sd;
|
| - *is_new = true;
|
| - }
|
| - ASSERT(fd == sd->fd());
|
| - return sd;
|
| -}
|
| -
|
| -
|
| -void EventHandlerImplementation::WakeupHandler(intptr_t id,
|
| - Dart_Port dart_port,
|
| - int64_t data) {
|
| - InterruptMessage msg;
|
| - msg.id = id;
|
| - msg.dart_port = dart_port;
|
| - msg.data = data;
|
| - // WriteToBlocking will write up to 512 bytes atomically, and since our msg
|
| - // is smaller than 512, we don't need a thread lock.
|
| - // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
|
| - ASSERT(kInterruptMessageSize < PIPE_BUF);
|
| - intptr_t result =
|
| - FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
|
| - if (result != kInterruptMessageSize) {
|
| - if (result == -1) {
|
| - perror("Interrupt message failure:");
|
| - }
|
| - FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
|
| - }
|
| -}
|
| -
|
| -
|
| -void EventHandlerImplementation::HandleInterruptFd() {
|
| - const intptr_t MAX_MESSAGES = kInterruptMessageSize;
|
| - InterruptMessage msg[MAX_MESSAGES];
|
| - ssize_t bytes = TEMP_FAILURE_RETRY(
|
| - read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
|
| - for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
|
| - if (msg[i].id == kTimerId) {
|
| - timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
|
| - struct itimerspec it;
|
| - memset(&it, 0, sizeof(it));
|
| - if (timeout_queue_.HasTimeout()) {
|
| - int64_t millis = timeout_queue_.CurrentTimeout();
|
| - it.it_value.tv_sec = millis / 1000;
|
| - it.it_value.tv_nsec = (millis % 1000) * 1000000;
|
| - }
|
| - timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL);
|
| - } else if (msg[i].id == kShutdownId) {
|
| - shutdown_ = true;
|
| - } else {
|
| - bool is_new = false;
|
| - SocketData* sd = GetSocketData(msg[i].id, &is_new);
|
| - if (is_new) {
|
| - sd->SetPort(msg[i].dart_port);
|
| - AddToEpollInstance(epoll_fd_, sd, msg[i].data);
|
| - }
|
| - if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) {
|
| - ASSERT(msg[i].data == (1 << kShutdownReadCommand));
|
| - // Close the socket for reading.
|
| - sd->ShutdownRead();
|
| - } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) {
|
| - ASSERT(msg[i].data == (1 << kShutdownWriteCommand));
|
| - // Close the socket for writing.
|
| - sd->ShutdownWrite();
|
| - } else if ((msg[i].data & (1 << kCloseCommand)) != 0) {
|
| - ASSERT(msg[i].data == (1 << kCloseCommand));
|
| - // Close the socket and free system resources and move on to
|
| - // next message.
|
| - RemoveFromEpollInstance(epoll_fd_, sd);
|
| - intptr_t fd = sd->fd();
|
| - sd->Close();
|
| - socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
|
| - delete sd;
|
| - DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
|
| - }
|
| - }
|
| - }
|
| }
|
|
|
| #ifdef DEBUG_POLL
|
| -static void PrintEventMask(intptr_t fd, intptr_t events) {
|
| - Log::Print("%d ", fd);
|
| +static void PrintEventMask(intptr_t events) {
|
| + // TODO(ajohnsen): When DEBUG_POLL is enabled, we could add the fd to the
|
| + // epoll-data as well.
|
| if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN ");
|
| if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI ");
|
| if ((events & EPOLLOUT) != 0) Log::Print("EPOLLOUT ");
|
| @@ -230,16 +106,14 @@ static void PrintEventMask(intptr_t fd, intptr_t events) {
|
| if ((events & ~all_events) != 0) {
|
| Log::Print("(and %08x) ", events & ~all_events);
|
| }
|
| - Log::Print("(available %d) ", FDUtils::AvailableBytes(fd));
|
|
|
| Log::Print("\n");
|
| }
|
| #endif
|
|
|
| -intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
|
| - SocketData* sd) {
|
| +intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) {
|
| #ifdef DEBUG_POLL
|
| - PrintEventMask(sd->fd(), events);
|
| + PrintEventMask(events);
|
| #endif
|
| if (events & EPOLLERR) {
|
| // Return only error if EPOLLIN is present.
|
| @@ -255,11 +129,9 @@ intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
|
|
|
| void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
|
| int size) {
|
| - bool interrupt_seen = false;
|
| for (int i = 0; i < size; i++) {
|
| - if (events[i].data.ptr == NULL) {
|
| - interrupt_seen = true;
|
| - } else if (events[i].data.fd == timer_fd_) {
|
| + uint64_t data = events[i].data.u64;
|
| + if (data == ILLEGAL_PORT) {
|
| int64_t val;
|
| VOID_TEMP_FAILURE_RETRY(read(timer_fd_, &val, sizeof(val)));
|
| if (timeout_queue_.HasTimeout()) {
|
| @@ -267,24 +139,19 @@ void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
|
| timeout_queue_.RemoveCurrent();
|
| }
|
| } else {
|
| - SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr);
|
| - int32_t event_mask = GetPollEvents(events[i].events, sd);
|
| + int32_t event_mask = GetPollEvents(events[i].events);
|
| if (event_mask != 0) {
|
| - Dart_Port port = sd->port();
|
| + Dart_Port port = data;
|
| ASSERT(port != 0);
|
| DartUtils::PostInt32(port, event_mask);
|
| }
|
| }
|
| }
|
| - if (interrupt_seen) {
|
| - // Handle after socket events, so we avoid closing a socket before we handle
|
| - // the current events.
|
| - HandleInterruptFd();
|
| - }
|
| }
|
|
|
|
|
| void EventHandlerImplementation::Poll(uword args) {
|
| + // Main event-handler thread loop.
|
| static const intptr_t kMaxEvents = 16;
|
| struct epoll_event events[kMaxEvents];
|
| EventHandler* handler = reinterpret_cast<EventHandler*>(args);
|
| @@ -318,26 +185,51 @@ void EventHandlerImplementation::Start(EventHandler* handler) {
|
|
|
|
|
| void EventHandlerImplementation::Shutdown() {
|
| - SendData(kShutdownId, 0, 0);
|
| -}
|
| -
|
| -
|
| -void EventHandlerImplementation::SendData(intptr_t id,
|
| - Dart_Port dart_port,
|
| - int64_t data) {
|
| - WakeupHandler(id, dart_port, data);
|
| -}
|
| -
|
| -
|
| -void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
|
| - // The hashmap does not support keys with value 0.
|
| - return reinterpret_cast<void*>(fd + 1);
|
| -}
|
| -
|
| -
|
| -uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
|
| - // The hashmap does not support keys with value 0.
|
| - return dart::Utils::WordHash(fd + 1);
|
| + shutdown_ = true;
|
| +}
|
| +
|
| +
|
| +void EventHandlerImplementation::Notify(intptr_t id,
|
| + Dart_Port dart_port,
|
| + int64_t data) {
|
| + // This method is called by isolates, that is, not in the event-handler
|
| + // thread.
|
| + if (id == kTimerId) {
|
| + // Lock this region, as multiple isolates may attempt to update
|
| + // timeout_queue_.
|
| + // TODO(ajohnsen): Consider using a timer-fd per isolate to avoid the lock.
|
| + timer_mutex_.Lock();
|
| + timeout_queue_.UpdateTimeout(dart_port, data);
|
| + struct itimerspec it;
|
| + memset(&it, 0, sizeof(it));
|
| + if (timeout_queue_.HasTimeout()) {
|
| + int64_t millis = timeout_queue_.CurrentTimeout();
|
| + it.it_value.tv_sec = millis / 1000;
|
| + it.it_value.tv_nsec = (millis % 1000) * 1000000;
|
| + }
|
| + timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL);
|
| + timer_mutex_.Unlock();
|
| + } else {
|
| + if ((data & (1 << kShutdownReadCommand)) != 0) {
|
| + ASSERT(data == (1 << kShutdownReadCommand));
|
| + // Close the socket for reading.
|
| + shutdown(id, SHUT_RD);
|
| + } else if ((data & (1 << kShutdownWriteCommand)) != 0) {
|
| + ASSERT(data == (1 << kShutdownWriteCommand));
|
| + // Close the socket for writing.
|
| + shutdown(id, SHUT_WR);
|
| + } else if ((data & (1 << kCloseCommand)) != 0) {
|
| + ASSERT(data == (1 << kCloseCommand));
|
| + // Close the socket and free system resources and move on to
|
| + // next message.
|
| + // This will also remove the file descriptor from epoll.
|
| + Socket::Close(id);
|
| + DartUtils::PostInt32(dart_port, 1 << kDestroyedEvent);
|
| + } else {
|
| + // Add to epoll - this is the first time we see it.
|
| + AddToEpollInstance(epoll_fd_, id, dart_port, data);
|
| + }
|
| + }
|
| }
|
|
|
| } // namespace bin
|
|
|