| Index: runtime/bin/eventhandler_linux.cc
|
| diff --git a/runtime/bin/eventhandler_linux.cc b/runtime/bin/eventhandler_linux.cc
|
| index 612f3cb811ecaf108d372ad31cfb98bf9c3fb27d..f87c1a4785192c81740d7c08db2b1a48e2f0fef7 100644
|
| --- a/runtime/bin/eventhandler_linux.cc
|
| +++ b/runtime/bin/eventhandler_linux.cc
|
| @@ -20,7 +20,6 @@
|
| #include "bin/dartutils.h"
|
| #include "bin/fdutils.h"
|
| #include "bin/log.h"
|
| -#include "bin/socket.h"
|
| #include "platform/hashmap.h"
|
| #include "platform/thread.h"
|
| #include "platform/utils.h"
|
| @@ -29,36 +28,63 @@
|
| namespace dart {
|
| namespace bin {
|
|
|
| +static const int kInterruptMessageSize = sizeof(InterruptMessage);
|
| static const int kTimerId = -1;
|
| +static const int kShutdownId = -2;
|
|
|
|
|
| -static void AddToEpollInstance(intptr_t epoll_fd_,
|
| - int fd, Dart_Port port,
|
| - int mask) {
|
| +intptr_t SocketData::GetPollEvents() {
|
| + // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
|
| + // triggered anyway.
|
| + intptr_t events = EPOLLET | EPOLLRDHUP;
|
| + if ((mask_ & (1 << kInEvent)) != 0) {
|
| + events |= EPOLLIN;
|
| + }
|
| + if ((mask_ & (1 << kOutEvent)) != 0) {
|
| + events |= EPOLLOUT;
|
| + }
|
| + return events;
|
| +}
|
| +
|
| +
|
| +// Unregister the file descriptor for a SocketData structure with epoll.
|
| +static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
|
| + VOID_TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
|
| + EPOLL_CTL_DEL,
|
| + sd->fd(),
|
| + NULL));
|
| +}
|
| +
|
| +
|
| +static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
|
| struct epoll_event event;
|
| - event.events = EPOLLET | EPOLLRDHUP;
|
| - if ((mask & (1 << kInEvent)) != 0) event.events |= EPOLLIN;
|
| - if ((mask & (1 << kOutEvent)) != 0) event.events |= EPOLLOUT;
|
| - // Be sure we don't collide with the TIMER_BIT.
|
| - if (port == ILLEGAL_PORT) {
|
| - FATAL("Illigal port sent to event handler");
|
| - }
|
| - event.data.u64 = port;
|
| + event.events = sd->GetPollEvents();
|
| + event.data.ptr = sd;
|
| int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
|
| EPOLL_CTL_ADD,
|
| - fd,
|
| + sd->fd(),
|
| &event));
|
| if (status == -1) {
|
| // Epoll does not accept the file descriptor. It could be due to
|
| // already closed file descriptor, or unuspported devices, such
|
| // as /dev/null. In such case, mark the file descriptor as closed,
|
| // so dart will handle it accordingly.
|
| - DartUtils::PostInt32(port, 1 << kCloseEvent);
|
| + DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
|
| }
|
| }
|
|
|
|
|
| -EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) {
|
| +EventHandlerImplementation::EventHandlerImplementation()
|
| + : socket_map_(&HashMap::SamePointerValue, 16) {
|
| + intptr_t result;
|
| + result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_));
|
| + if (result != 0) {
|
| + FATAL("Pipe creation failed");
|
| + }
|
| + FDUtils::SetNonBlocking(interrupt_fds_[0]);
|
| + FDUtils::SetCloseOnExec(interrupt_fds_[0]);
|
| + FDUtils::SetCloseOnExec(interrupt_fds_[1]);
|
| + shutdown_ = false;
|
| // The initial size passed to epoll_create is ignore on newer (>=
|
| // 2.6.8) Linux versions
|
| static const int kEpollInitialSize = 64;
|
| @@ -67,18 +93,28 @@ EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) {
|
| FATAL1("Failed creating epoll file descriptor: %i", errno);
|
| }
|
| FDUtils::SetCloseOnExec(epoll_fd_);
|
| + // Register the interrupt_fd with the epoll instance.
|
| + struct epoll_event event;
|
| + event.events = EPOLLIN;
|
| + event.data.ptr = NULL;
|
| + int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
|
| + EPOLL_CTL_ADD,
|
| + interrupt_fds_[0],
|
| + &event));
|
| + if (status == -1) {
|
| + FATAL("Failed adding interrupt fd to epoll instance");
|
| + }
|
| timer_fd_ = TEMP_FAILURE_RETRY(timerfd_create(CLOCK_REALTIME, TFD_CLOEXEC));
|
| if (timer_fd_ == -1) {
|
| FATAL1("Failed creating timerfd file descriptor: %i", errno);
|
| }
|
| // Register the timer_fd_ with the epoll instance.
|
| - struct epoll_event event;
|
| event.events = EPOLLIN;
|
| - event.data.u64 = ILLEGAL_PORT; // Use ILLEGAL_PORT to identify timer-fd.
|
| - int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
|
| - EPOLL_CTL_ADD,
|
| - timer_fd_,
|
| - &event));
|
| + event.data.fd = timer_fd_;
|
| + status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
|
| + EPOLL_CTL_ADD,
|
| + timer_fd_,
|
| + &event));
|
| if (status == -1) {
|
| FATAL2(
|
| "Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_, errno);
|
| @@ -87,14 +123,106 @@ EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) {
|
|
|
|
|
| EventHandlerImplementation::~EventHandlerImplementation() {
|
| - TEMP_FAILURE_RETRY(close(epoll_fd_));
|
| - TEMP_FAILURE_RETRY(close(timer_fd_));
|
| + VOID_TEMP_FAILURE_RETRY(close(epoll_fd_));
|
| + VOID_TEMP_FAILURE_RETRY(close(timer_fd_));
|
| + VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[0]));
|
| + VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[1]));
|
| +}
|
| +
|
| +
|
| +SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
|
| + ASSERT(fd >= 0);
|
| + HashMap::Entry* entry = socket_map_.Lookup(
|
| + GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
|
| + ASSERT(entry != NULL);
|
| + SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
|
| + if (sd == NULL) {
|
| + // If there is no data in the hash map for this file descriptor a
|
| + // new SocketData for the file descriptor is inserted.
|
| + sd = new SocketData(fd);
|
| + entry->value = sd;
|
| + }
|
| + ASSERT(fd == sd->fd());
|
| + return sd;
|
| +}
|
| +
|
| +
|
| +void EventHandlerImplementation::WakeupHandler(intptr_t id,
|
| + Dart_Port dart_port,
|
| + int64_t data) {
|
| + InterruptMessage msg;
|
| + msg.id = id;
|
| + msg.dart_port = dart_port;
|
| + msg.data = data;
|
| + // WriteToBlocking will write up to 512 bytes atomically, and since our msg
|
| + // is smaller than 512, we don't need a thread lock.
|
| + // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
|
| + ASSERT(kInterruptMessageSize < PIPE_BUF);
|
| + intptr_t result =
|
| + FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
|
| + if (result != kInterruptMessageSize) {
|
| + if (result == -1) {
|
| + perror("Interrupt message failure:");
|
| + }
|
| + FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
|
| + }
|
| +}
|
| +
|
| +
|
| +void EventHandlerImplementation::HandleInterruptFd() {
|
| + const intptr_t MAX_MESSAGES = kInterruptMessageSize;
|
| + InterruptMessage msg[MAX_MESSAGES];
|
| + ssize_t bytes = TEMP_FAILURE_RETRY(
|
| + read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
|
| + for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
|
| + if (msg[i].id == kTimerId) {
|
| + timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
|
| + struct itimerspec it;
|
| + memset(&it, 0, sizeof(it));
|
| + if (timeout_queue_.HasTimeout()) {
|
| + int64_t millis = timeout_queue_.CurrentTimeout();
|
| + it.it_value.tv_sec = millis / 1000;
|
| + it.it_value.tv_nsec = (millis % 1000) * 1000000;
|
| + }
|
| + timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL);
|
| + } else if (msg[i].id == kShutdownId) {
|
| + shutdown_ = true;
|
| + } else {
|
| + SocketData* sd = GetSocketData(msg[i].id);
|
| + if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) {
|
| + ASSERT(msg[i].data == (1 << kShutdownReadCommand));
|
| + // Close the socket for reading.
|
| + shutdown(sd->fd(), SHUT_RD);
|
| + } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) {
|
| + ASSERT(msg[i].data == (1 << kShutdownWriteCommand));
|
| + // Close the socket for writing.
|
| + shutdown(sd->fd(), SHUT_WR);
|
| + } else if ((msg[i].data & (1 << kCloseCommand)) != 0) {
|
| + ASSERT(msg[i].data == (1 << kCloseCommand));
|
| + // Close the socket and free system resources and move on to
|
| + // next message.
|
| + RemoveFromEpollInstance(epoll_fd_, sd);
|
| + intptr_t fd = sd->fd();
|
| + sd->Close();
|
| + socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
|
| + delete sd;
|
| + DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
|
| + } else if ((msg[i].data & (1 << kReturnTokenCommand)) != 0) {
|
| + if (sd->ReturnToken()) {
|
| + AddToEpollInstance(epoll_fd_, sd);
|
| + }
|
| + } else {
|
| + // Setup events to wait for.
|
| + sd->SetPortAndMask(msg[i].dart_port, msg[i].data);
|
| + AddToEpollInstance(epoll_fd_, sd);
|
| + }
|
| + }
|
| + }
|
| }
|
|
|
| #ifdef DEBUG_POLL
|
| -static void PrintEventMask(intptr_t events) {
|
| - // TODO(ajohnsen): When DEBUG_POLL is enabled, we could add the fd to the
|
| - // epoll-data as well.
|
| +static void PrintEventMask(intptr_t fd, intptr_t events) {
|
| + Log::Print("%d ", fd);
|
| if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN ");
|
| if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI ");
|
| if ((events & EPOLLOUT) != 0) Log::Print("EPOLLOUT ");
|
| @@ -106,17 +234,19 @@ static void PrintEventMask(intptr_t events) {
|
| if ((events & ~all_events) != 0) {
|
| Log::Print("(and %08x) ", events & ~all_events);
|
| }
|
| + Log::Print("(available %d) ", FDUtils::AvailableBytes(fd));
|
|
|
| Log::Print("\n");
|
| }
|
| #endif
|
|
|
| -intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) {
|
| +intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
|
| + SocketData* sd) {
|
| #ifdef DEBUG_POLL
|
| - PrintEventMask(events);
|
| + PrintEventMask(sd->fd(), events);
|
| #endif
|
| if (events & EPOLLERR) {
|
| - // Return only error if EPOLLIN is present.
|
| + // Return error only if EPOLLIN is present.
|
| return (events & EPOLLIN) ? (1 << kErrorEvent) : 0;
|
| }
|
| intptr_t event_mask = 0;
|
| @@ -129,32 +259,40 @@ intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) {
|
|
|
| void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
|
| int size) {
|
| + bool interrupt_seen = false;
|
| for (int i = 0; i < size; i++) {
|
| - uint64_t data = events[i].data.u64;
|
| - // ILLEGAL_PORT is used to identify timer-fd.
|
| - if (data == ILLEGAL_PORT) {
|
| + if (events[i].data.ptr == NULL) {
|
| + interrupt_seen = true;
|
| + } else if (events[i].data.fd == timer_fd_) {
|
| int64_t val;
|
| VOID_TEMP_FAILURE_RETRY(read(timer_fd_, &val, sizeof(val)));
|
| - timer_mutex_.Lock();
|
| if (timeout_queue_.HasTimeout()) {
|
| DartUtils::PostNull(timeout_queue_.CurrentPort());
|
| timeout_queue_.RemoveCurrent();
|
| }
|
| - timer_mutex_.Unlock();
|
| } else {
|
| - int32_t event_mask = GetPollEvents(events[i].events);
|
| + SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr);
|
| + intptr_t event_mask = GetPollEvents(events[i].events, sd);
|
| if (event_mask != 0) {
|
| - Dart_Port port = data;
|
| + if (sd->TakeToken()) {
|
| + // Took last token, remove from epoll.
|
| + RemoveFromEpollInstance(epoll_fd_, sd);
|
| + }
|
| + Dart_Port port = sd->port();
|
| ASSERT(port != 0);
|
| DartUtils::PostInt32(port, event_mask);
|
| }
|
| }
|
| }
|
| + if (interrupt_seen) {
|
| + // Handle after socket events, so we avoid closing a socket before we handle
|
| + // the current events.
|
| + HandleInterruptFd();
|
| + }
|
| }
|
|
|
|
|
| void EventHandlerImplementation::Poll(uword args) {
|
| - // Main event-handler thread loop.
|
| static const intptr_t kMaxEvents = 16;
|
| struct epoll_event events[kMaxEvents];
|
| EventHandler* handler = reinterpret_cast<EventHandler*>(args);
|
| @@ -188,51 +326,26 @@ void EventHandlerImplementation::Start(EventHandler* handler) {
|
|
|
|
|
| void EventHandlerImplementation::Shutdown() {
|
| - shutdown_ = true;
|
| -}
|
| -
|
| -
|
| -void EventHandlerImplementation::Notify(intptr_t id,
|
| - Dart_Port dart_port,
|
| - int64_t data) {
|
| - // This method is called by isolates, that is, not in the event-handler
|
| - // thread.
|
| - if (id == kTimerId) {
|
| - // Lock this region, as multiple isolates may attempt to update
|
| - // timeout_queue_.
|
| - // TODO(ajohnsen): Consider using a timer-fd per isolate to avoid the lock.
|
| - timer_mutex_.Lock();
|
| - timeout_queue_.UpdateTimeout(dart_port, data);
|
| - struct itimerspec it;
|
| - memset(&it, 0, sizeof(it));
|
| - if (timeout_queue_.HasTimeout()) {
|
| - int64_t millis = timeout_queue_.CurrentTimeout();
|
| - it.it_value.tv_sec = millis / 1000;
|
| - it.it_value.tv_nsec = (millis % 1000) * 1000000;
|
| - }
|
| - timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL);
|
| - timer_mutex_.Unlock();
|
| - } else {
|
| - if ((data & (1 << kShutdownReadCommand)) != 0) {
|
| - ASSERT(data == (1 << kShutdownReadCommand));
|
| - // Close the socket for reading.
|
| - shutdown(id, SHUT_RD);
|
| - } else if ((data & (1 << kShutdownWriteCommand)) != 0) {
|
| - ASSERT(data == (1 << kShutdownWriteCommand));
|
| - // Close the socket for writing.
|
| - shutdown(id, SHUT_WR);
|
| - } else if ((data & (1 << kCloseCommand)) != 0) {
|
| - ASSERT(data == (1 << kCloseCommand));
|
| - // Close the socket and free system resources and move on to
|
| - // next message.
|
| - // This will also remove the file descriptor from epoll.
|
| - Socket::Close(id);
|
| - DartUtils::PostInt32(dart_port, 1 << kDestroyedEvent);
|
| - } else {
|
| - // Add to epoll - this is the first time we see it.
|
| - AddToEpollInstance(epoll_fd_, id, dart_port, data);
|
| - }
|
| - }
|
| + SendData(kShutdownId, 0, 0);
|
| +}
|
| +
|
| +
|
| +void EventHandlerImplementation::SendData(intptr_t id,
|
| + Dart_Port dart_port,
|
| + int64_t data) {
|
| + WakeupHandler(id, dart_port, data);
|
| +}
|
| +
|
| +
|
| +void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
|
| + // The hashmap does not support keys with value 0.
|
| + return reinterpret_cast<void*>(fd + 1);
|
| +}
|
| +
|
| +
|
| +uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
|
| + // The hashmap does not support keys with value 0.
|
| + return dart::Utils::WordHash(fd + 1);
|
| }
|
|
|
| } // namespace bin
|
|
|