Chromium Code Reviews| Index: runtime/bin/eventhandler_fuchsia.cc |
| diff --git a/runtime/bin/eventhandler_fuchsia.cc b/runtime/bin/eventhandler_fuchsia.cc |
| index 3d586ca034995a3f1ce01d1943768d7253d5d24b..9cef2445cf7e00185ae1a678873390a46f1f7a3f 100644 |
| --- a/runtime/bin/eventhandler_fuchsia.cc |
| +++ b/runtime/bin/eventhandler_fuchsia.cc |
| @@ -10,14 +10,20 @@ |
| #include "bin/eventhandler.h" |
| #include "bin/eventhandler_fuchsia.h" |
| -#include <errno.h> // NOLINT |
| -#include <fcntl.h> // NOLINT |
| -#include <pthread.h> // NOLINT |
| -#include <stdio.h> // NOLINT |
| -#include <string.h> // NOLINT |
| -#include <sys/epoll.h> // NOLINT |
| -#include <sys/stat.h> // NOLINT |
| -#include <unistd.h> // NOLINT |
| +#include <errno.h> |
| +#include <fcntl.h> |
| +#include <magenta/status.h> |
| +#include <magenta/syscalls.h> |
| +#include <magenta/syscalls/object.h> |
| +#include <magenta/syscalls/port.h> |
| +#include <mxio/private.h> |
| +#include <pthread.h> |
| +#include <stdio.h> |
| +#include <string.h> |
| +#include <sys/epoll.h> |
| +#include <sys/socket.h> |
| +#include <sys/stat.h> |
| +#include <unistd.h> |
| #include "bin/fdutils.h" |
| #include "bin/lockers.h" |
| @@ -28,169 +34,293 @@ |
| #include "platform/hashmap.h" |
| #include "platform/utils.h" |
| -// #define EVENTHANDLER_LOGGING 1 |
| -#if defined(EVENTHANDLER_LOGGING) |
| -#define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) |
| -#define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) |
| +// The EventHandler for Fuchsia uses its "ports v2" API: |
| +// https://fuchsia.googlesource.com/magenta/+/HEAD/docs/syscalls/port_create.md |
| +// This API does not have epoll()-like edge triggering (EPOLLET). Since clients |
| +// of the EventHandler expect edge-triggered notifications, we must simulate it. |
| +// When a packet from mx_port_wait() indicates that a signal is asserted for a |
| +// handle, we unsubscribe from that signal until the event that asserted the |
| +// signal can be processed. For example: |
| +// |
| +// 1. We get MX_SOCKET_WRITABLE from mx_port_wait() for a handle. |
| +// 2. We send kOutEvent to the Dart thread. |
| +// 3. We unsubscribe from further MX_SOCKET_WRITABLE signals for the handle. |
| +// 4. Some time later the Dart thread actually does a write(). |
| +// 5. After writing, the Dart thread resubscribes to write events. |
| +// |
| +// We use he same procedure for MX_SOCKET_READABLE, and read()/accept(). |
| + |
| +// define EVENTHANDLER_LOG_ERROR to get log messages only for errors. |
| +// define EVENTHANDLER_LOG_INFO to get log messages for both information and |
| +// errors. |
| +// #define EVENTHANDLER_LOG_INFO 1 |
| +#define EVENTHANDLER_LOG_ERROR 1 |
| +#if defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR) |
| +#define LOG_ERR(msg, ...) \ |
| + { \ |
| + int err = errno; \ |
| + Log::PrintErr("Dart EventHandler ERROR: %s:%d: " msg, __FILE__, __LINE__, \ |
| + ##__VA_ARGS__); \ |
| + errno = err; \ |
| + } |
| +#if defined(EVENTHANDLER_LOG_INFO) |
| +#define LOG_INFO(msg, ...) \ |
| + Log::Print("Dart EventHandler INFO: %s:%d: " msg, __FILE__, __LINE__, \ |
| + ##__VA_ARGS__) |
| +#else |
| +#define LOG_INFO(msg, ...) |
| +#endif // defined(EVENTHANDLER_LOG_INFO) |
| #else |
| #define LOG_ERR(msg, ...) |
| #define LOG_INFO(msg, ...) |
| -#endif // defined(EVENTHANDLER_LOGGING) |
| +#endif // defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR) |
| namespace dart { |
| namespace bin { |
| -#if defined(EVENTHANDLER_LOGGING) |
| -static void PrintEventMask(intptr_t fd, intptr_t events) { |
| - Log::PrintErr("%d ", fd); |
| - if ((events & EPOLLIN) != 0) { |
| - Log::PrintErr("EPOLLIN "); |
| - } |
| - if ((events & EPOLLPRI) != 0) { |
| - Log::PrintErr("EPOLLPRI "); |
| - } |
| - if ((events & EPOLLOUT) != 0) { |
| - Log::PrintErr("EPOLLOUT "); |
| - } |
| - if ((events & EPOLLERR) != 0) { |
| - Log::PrintErr("EPOLLERR "); |
| - } |
| - if ((events & EPOLLHUP) != 0) { |
| - Log::PrintErr("EPOLLHUP "); |
| +intptr_t IOHandle::Read(void* buffer, intptr_t num_bytes) { |
| + MutexLocker ml(mutex_); |
| + const ssize_t read_bytes = NO_RETRY_EXPECTED(read(fd_, buffer, num_bytes)); |
| + const int err = errno; |
| + LOG_INFO("IOHandle::Read: fd = %ld. read %ld bytes\n", fd_, read_bytes); |
| + |
| + // Resubscribe to read events. We resubscribe to events even if read() returns |
| + // an error. The error might be, e.g. EWOULDBLOCK, in which case |
| + // re-subscription is necessary. Logic in the caller decides which errors are |
| + // real, and which are ignore-and-continue. |
| + read_events_enabled_ = true; |
| + if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLIN, wait_key_)) { |
| + LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_); |
| } |
| - if ((events & EPOLLRDHUP) != 0) { |
| - Log::PrintErr("EPOLLRDHUP "); |
| + |
| + errno = err; |
| + return read_bytes; |
| +} |
| + |
| + |
| +intptr_t IOHandle::Write(const void* buffer, intptr_t num_bytes) { |
| + MutexLocker ml(mutex_); |
| + const ssize_t written_bytes = |
| + NO_RETRY_EXPECTED(write(fd_, buffer, num_bytes)); |
| + const int err = errno; |
| + LOG_INFO("IOHandle::Write: fd = %ld. wrote %ld bytes\n", fd_, written_bytes); |
| + |
| + // Resubscribe to to write events. |
|
siva
2017/06/23 15:08:27
typo 'to to'
zra
2017/06/23 17:31:38
Done.
|
| + write_events_enabled_ = true; |
| + if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLOUT, wait_key_)) { |
| + LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_); |
| } |
| - int all_events = |
| - EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP; |
| - if ((events & ~all_events) != 0) { |
| - Log::PrintErr("(and %08x) ", events & ~all_events); |
| + |
| + errno = err; |
| + return written_bytes; |
| +} |
| + |
| + |
| +intptr_t IOHandle::Accept(struct sockaddr* addr, socklen_t* addrlen) { |
| + MutexLocker ml(mutex_); |
| + const intptr_t socket = NO_RETRY_EXPECTED(accept(fd_, addr, addrlen)); |
| + const int err = errno; |
| + LOG_INFO("IOHandle::Accept: fd = %ld. socket = %ld\n", fd_, socket); |
| + |
| + // Re-subscribe to read events. |
| + read_events_enabled_ = true; |
| + if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLIN, wait_key_)) { |
| + LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_); |
| } |
| - Log::PrintErr("\n"); |
| + errno = err; |
| + return socket; |
| +} |
| + |
| + |
| +void IOHandle::Close() { |
| + MutexLocker ml(mutex_); |
| + VOID_NO_RETRY_EXPECTED(close(fd_)); |
| } |
| -#endif |
| -intptr_t DescriptorInfo::GetPollEvents() { |
| +uint32_t IOHandle::MaskToEpollEvents(intptr_t mask) { |
| + MutexLocker ml(mutex_); |
| // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are |
| // triggered anyway. |
| - intptr_t events = 0; |
| - if ((Mask() & (1 << kInEvent)) != 0) { |
| + uint32_t events = EPOLLRDHUP; |
| + if (read_events_enabled_ && ((mask & (1 << kInEvent)) != 0)) { |
| events |= EPOLLIN; |
| } |
| - if ((Mask() & (1 << kOutEvent)) != 0) { |
| + if (write_events_enabled_ && ((mask & (1 << kOutEvent)) != 0)) { |
| events |= EPOLLOUT; |
| } |
| return events; |
| } |
| -// Unregister the file descriptor for a DescriptorInfo structure with |
| -// epoll. |
| -static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { |
| - LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd()); |
| - VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL)); |
| +intptr_t IOHandle::EpollEventsToMask(intptr_t events) { |
| + if ((events & EPOLLERR) != 0) { |
| + // Return error only if EPOLLIN is present. |
| + return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0; |
| + } |
| + intptr_t event_mask = 0; |
| + if ((events & EPOLLIN) != 0) { |
| + event_mask |= (1 << kInEvent); |
| + } |
| + if ((events & EPOLLOUT) != 0) { |
| + event_mask |= (1 << kOutEvent); |
| + } |
| + if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) { |
| + event_mask |= (1 << kCloseEvent); |
| + } |
| + return event_mask; |
| } |
| -static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { |
| - struct epoll_event event; |
| - event.events = EPOLLRDHUP | di->GetPollEvents(); |
| - if (!di->IsListeningSocket()) { |
| - event.events |= EPOLLET; |
| - } |
| - event.data.ptr = di; |
| - LOG_INFO("AddToEpollInstance: fd = %ld\n", di->fd()); |
| - int status = |
| - NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event)); |
| - LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", di->fd(), status); |
| -#if defined(EVENTHANDLER_LOGGING) |
| - PrintEventMask(di->fd(), event.events); |
| -#endif |
| - if (status == -1) { |
| - // TODO(dart:io): Verify that the dart end is handling this correctly. |
| - |
| - // Epoll does not accept the file descriptor. It could be due to |
| - // already closed file descriptor, or unuspported devices, such |
| - // as /dev/null. In such case, mark the file descriptor as closed, |
| - // so dart will handle it accordingly. |
| - di->NotifyAllDartPorts(1 << kCloseEvent); |
| +bool IOHandle::AsyncWaitLocked(mx_handle_t port, |
| + uint32_t events, |
| + uint64_t key) { |
| + LOG_INFO("IOHandle::AsyncWait: fd = %ld\n", fd_); |
| + // The call to __mxio_fd_to_io() in the DescriptorInfo constructor may have |
| + // returned NULL. If it did, propagate the problem up to Dart. |
| + if (mxio_ == NULL) { |
| + LOG_ERR("__mxio_fd_to_io(%d) returned NULL\n", fd_); |
| + return false; |
| } |
| + |
| + mx_handle_t handle; |
| + mx_signals_t signals; |
| + __mxio_wait_begin(mxio_, events, &handle, &signals); |
| + if (handle == MX_HANDLE_INVALID) { |
| + LOG_ERR("fd = %ld __mxio_wait_begin returned an invalid handle\n", fd_); |
| + return false; |
| + } |
| + |
| + // Remember the port. Use the remembered port if the argument "port" is |
| + // MX_HANDLE_INVALID. |
| + ASSERT((port != MX_HANDLE_INVALID) || (port_ != MX_HANDLE_INVALID)); |
| + if ((port_ == MX_HANDLE_INVALID) || (port != MX_HANDLE_INVALID)) { |
| + port_ = port; |
| + } |
| + |
| + handle_ = handle; |
| + wait_key_ = key; |
| + LOG_INFO("mx_object_wait_async(fd = %ld, signals = %x)\n", fd_, signals); |
| + mx_status_t status = |
| + mx_object_wait_async(handle_, port_, key, signals, MX_WAIT_ASYNC_ONCE); |
| + if (status != MX_OK) { |
| + LOG_ERR("mx_object_wait_async failed: %s\n", mx_status_get_string(status)); |
| + return false; |
| + } |
| + |
| + return true; |
| } |
| -EventHandlerImplementation::EventHandlerImplementation() |
| - : socket_map_(&HashMap::SamePointerValue, 16) { |
| - intptr_t result; |
| - result = NO_RETRY_EXPECTED(pipe(interrupt_fds_)); |
| - if (result != 0) { |
| - FATAL("Pipe creation failed"); |
| +bool IOHandle::AsyncWait(mx_handle_t port, uint32_t events, uint64_t key) { |
| + MutexLocker ml(mutex_); |
| + return AsyncWaitLocked(port, events, key); |
| +} |
| + |
| + |
| +void IOHandle::CancelWait(mx_handle_t port, uint64_t key) { |
| + MutexLocker ml(mutex_); |
| + LOG_INFO("IOHandle::CancelWait: fd = %ld\n", fd_); |
| + ASSERT(port != MX_HANDLE_INVALID); |
| + ASSERT(handle_ != MX_HANDLE_INVALID); |
| + mx_status_t status = mx_port_cancel(port, handle_, key); |
| + if ((status != MX_OK) && (status != MX_ERR_NOT_FOUND)) { |
| + LOG_ERR("mx_port_cancel failed: %s\n", mx_status_get_string(status)); |
| + } |
| +} |
| + |
| + |
| +uint32_t IOHandle::WaitEnd(mx_signals_t observed) { |
| + MutexLocker ml(mutex_); |
| + uint32_t events = 0; |
| + __mxio_wait_end(mxio_, observed, &events); |
| + return events; |
| +} |
| + |
| + |
| +intptr_t IOHandle::ToggleEvents(intptr_t event_mask) { |
| + MutexLocker ml(mutex_); |
| + if (!write_events_enabled_) { |
| + LOG_INFO("IOHandle::ToggleEvents: fd = %ld de-asserting write\n", fd_); |
| + event_mask = event_mask & ~(1 << kOutEvent); |
| } |
| - if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) { |
| - FATAL("Failed to set pipe fd non blocking\n"); |
| + if ((event_mask & (1 << kOutEvent)) != 0) { |
| + LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting write and disabling\n", |
| + fd_); |
| + write_events_enabled_ = false; |
| } |
| - if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) { |
| - FATAL("Failed to set pipe fd close on exec\n"); |
| + if (!read_events_enabled_) { |
| + LOG_INFO("IOHandle::ToggleEvents: fd=%ld de-asserting read\n", fd_); |
| + event_mask = event_mask & ~(1 << kInEvent); |
| } |
| - if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) { |
| - FATAL("Failed to set pipe fd close on exec\n"); |
| + if ((event_mask & (1 << kInEvent)) != 0) { |
| + LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting read and disabling\n", |
| + fd_); |
| + read_events_enabled_ = false; |
| } |
| - shutdown_ = false; |
| - // The initial size passed to epoll_create is ignore on newer (>= |
| - // 2.6.8) Linux versions |
| - static const int kEpollInitialSize = 64; |
| - epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize)); |
| - if (epoll_fd_ == -1) { |
| - FATAL1("Failed creating epoll file descriptor: %i", errno); |
| - } |
| - if (!FDUtils::SetCloseOnExec(epoll_fd_)) { |
| - FATAL("Failed to set epoll fd close on exec\n"); |
| - } |
| - // Register the interrupt_fd with the epoll instance. |
| - struct epoll_event event; |
| - event.events = EPOLLIN; |
| - event.data.ptr = NULL; |
| - LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", epoll_fd_); |
| - int status = NO_RETRY_EXPECTED( |
| - epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event)); |
| - LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n", |
| - epoll_fd_, status); |
| - if (status == -1) { |
| - FATAL("Failed adding interrupt fd to epoll instance"); |
| + return event_mask; |
| +} |
| + |
| + |
| +void EventHandlerImplementation::AddToPort(mx_handle_t port_handle, |
| + DescriptorInfo* di) { |
| + const uint32_t events = di->io_handle()->MaskToEpollEvents(di->Mask()); |
| + const uint64_t key = reinterpret_cast<uint64_t>(di); |
| + if (!di->io_handle()->AsyncWait(port_handle, events, key)) { |
| + di->NotifyAllDartPorts(1 << kCloseEvent); |
| } |
| } |
| +void EventHandlerImplementation::RemoveFromPort(mx_handle_t port_handle, |
| + DescriptorInfo* di) { |
| + const uint64_t key = reinterpret_cast<uint64_t>(di); |
| + di->io_handle()->CancelWait(port_handle, key); |
| +} |
| + |
| + |
| +EventHandlerImplementation::EventHandlerImplementation() |
| + : socket_map_(&HashMap::SamePointerValue, 16) { |
| + shutdown_ = false; |
| + // Create the port. |
| + port_handle_ = MX_HANDLE_INVALID; |
| + mx_status_t status = mx_port_create(MX_PORT_OPT_V2, &port_handle_); |
| + if (status != MX_OK) { |
| + // This is a FATAL because the VM won't work at all if we can't create this |
| + // port. |
| + FATAL1("mx_port_create failed: %s\n", mx_status_get_string(status)); |
| + } |
| + ASSERT(port_handle_ != MX_HANDLE_INVALID); |
| +} |
| + |
| + |
| static void DeleteDescriptorInfo(void* info) { |
| DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info); |
| + LOG_INFO("Closed %ld\n", di->io_handle()->fd()); |
| di->Close(); |
| - LOG_INFO("Closed %d\n", di->fd()); |
| delete di; |
| } |
| EventHandlerImplementation::~EventHandlerImplementation() { |
| socket_map_.Clear(DeleteDescriptorInfo); |
| - VOID_NO_RETRY_EXPECTED(close(epoll_fd_)); |
| - VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0])); |
| - VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1])); |
| + mx_handle_close(port_handle_); |
| + port_handle_ = MX_HANDLE_INVALID; |
| } |
| -void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask, |
| - DescriptorInfo* di) { |
| - intptr_t new_mask = di->Mask(); |
| - LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", di->fd(), old_mask, |
| - new_mask); |
| +void EventHandlerImplementation::UpdatePort(intptr_t old_mask, |
| + DescriptorInfo* di) { |
| + const intptr_t new_mask = di->Mask(); |
| if ((old_mask != 0) && (new_mask == 0)) { |
| - RemoveFromEpollInstance(epoll_fd_, di); |
| + RemoveFromPort(port_handle_, di); |
| } else if ((old_mask == 0) && (new_mask != 0)) { |
| - AddToEpollInstance(epoll_fd_, di); |
| - } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) { |
| + AddToPort(port_handle_, di); |
| + } else if ((old_mask != 0) && (new_mask != 0)) { |
| ASSERT(!di->IsListeningSocket()); |
| - RemoveFromEpollInstance(epoll_fd_, di); |
| - AddToEpollInstance(epoll_fd_, di); |
| + RemoveFromPort(port_handle_, di); |
| + AddToPort(port_handle_, di); |
| } |
| } |
| @@ -198,9 +328,11 @@ void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask, |
| DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( |
| intptr_t fd, |
| bool is_listening) { |
| - ASSERT(fd >= 0); |
| - HashMap::Entry* entry = socket_map_.Lookup(GetHashmapKeyFromFd(fd), |
| - GetHashmapHashFromFd(fd), true); |
| + IOHandle* handle = reinterpret_cast<IOHandle*>(fd); |
| + ASSERT(handle->fd() >= 0); |
| + HashMap::Entry* entry = |
| + socket_map_.Lookup(GetHashmapKeyFromFd(handle->fd()), |
| + GetHashmapHashFromFd(handle->fd()), true); |
| ASSERT(entry != NULL); |
| DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value); |
| if (di == NULL) { |
| @@ -218,210 +350,153 @@ DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( |
| } |
| -static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) { |
| - size_t remaining = count; |
| - char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer)); |
| - while (remaining > 0) { |
| - ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining)); |
| - if (bytes_written == 0) { |
| - return count - remaining; |
| - } else if (bytes_written == -1) { |
| - ASSERT(EAGAIN == EWOULDBLOCK); |
| - // Error code EWOULDBLOCK should only happen for non blocking |
| - // file descriptors. |
| - ASSERT(errno != EWOULDBLOCK); |
| - return -1; |
| - } else { |
| - ASSERT(bytes_written > 0); |
| - remaining -= bytes_written; |
| - buffer_pos += bytes_written; |
| - } |
| - } |
| - return count; |
| -} |
| - |
| - |
| void EventHandlerImplementation::WakeupHandler(intptr_t id, |
| Dart_Port dart_port, |
| int64_t data) { |
| - InterruptMessage msg; |
| - msg.id = id; |
| - msg.dart_port = dart_port; |
| - msg.data = data; |
| - // WriteToBlocking will write up to 512 bytes atomically, and since our msg |
| - // is smaller than 512, we don't need a thread lock. |
| - // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. |
| - ASSERT(kInterruptMessageSize < PIPE_BUF); |
| - intptr_t result = |
| - WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
| - if (result != kInterruptMessageSize) { |
| - if (result == -1) { |
| - perror("Interrupt message failure:"); |
| - } |
| - FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result); |
| + COMPILE_ASSERT(sizeof(InterruptMessage) <= sizeof(mx_packet_user_t)); |
| + mx_port_packet_t pkt; |
| + InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt.user); |
| + pkt.key = kInterruptPacketKey; |
| + msg->id = id; |
| + msg->dart_port = dart_port; |
| + msg->data = data; |
| + mx_status_t status = |
| + mx_port_queue(port_handle_, reinterpret_cast<void*>(&pkt), 0); |
| + if (status != MX_OK) { |
| + // This is a FATAL because the VM won't work at all if we can't send any |
| + // messages to the EventHandler thread. |
| + FATAL1("mx_port_queue failed: %s\n", mx_status_get_string(status)); |
| } |
| } |
| -void EventHandlerImplementation::HandleInterruptFd() { |
| - const intptr_t MAX_MESSAGES = kInterruptMessageSize; |
| - InterruptMessage msg[MAX_MESSAGES]; |
| - ssize_t bytes = NO_RETRY_EXPECTED( |
| - read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); |
| - LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes); |
| - for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { |
| - if (msg[i].id == kTimerId) { |
| - LOG_INFO("HandleInterruptFd read timer update\n"); |
| - timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); |
| - } else if (msg[i].id == kShutdownId) { |
| - LOG_INFO("HandleInterruptFd read shutdown\n"); |
| - shutdown_ = true; |
| - } else { |
| - ASSERT((msg[i].data & COMMAND_MASK) != 0); |
| - LOG_INFO("HandleInterruptFd command\n"); |
| - Socket* socket = reinterpret_cast<Socket*>(msg[i].id); |
| - RefCntReleaseScope<Socket> rs(socket); |
| - if (socket->fd() == -1) { |
| - continue; |
| +void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
| + if (msg->id == kTimerId) { |
| + LOG_INFO("HandleInterrupt read timer update\n"); |
| + timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); |
| + return; |
| + } else if (msg->id == kShutdownId) { |
| + LOG_INFO("HandleInterrupt read shutdown\n"); |
| + shutdown_ = true; |
| + return; |
| + } |
| + ASSERT((msg->data & COMMAND_MASK) != 0); |
| + LOG_INFO("HandleInterrupt command:\n"); |
| + Socket* socket = reinterpret_cast<Socket*>(msg->id); |
| + RefCntReleaseScope<Socket> rs(socket); |
| + if (socket->fd() == -1) { |
| + return; |
| + } |
| + IOHandle* io_handle = reinterpret_cast<IOHandle*>(socket->fd()); |
| + const intptr_t fd = io_handle->fd(); |
| + DescriptorInfo* di = |
| + GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg->data)); |
| + ASSERT(io_handle == di->io_handle()); |
| + if (IS_COMMAND(msg->data, kShutdownReadCommand)) { |
| + ASSERT(!di->IsListeningSocket()); |
| + // Close the socket for reading. |
| + LOG_INFO("\tSHUT_RD: %ld\n", fd); |
| + VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_RD)); |
| + } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) { |
| + ASSERT(!di->IsListeningSocket()); |
| + // Close the socket for writing. |
| + LOG_INFO("\tSHUT_WR: %ld\n", fd); |
| + VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_WR)); |
| + } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
| + // Close the socket and free system resources and move on to next |
| + // message. |
| + const intptr_t old_mask = di->Mask(); |
| + Dart_Port port = msg->dart_port; |
| + di->RemovePort(port); |
| + const intptr_t new_mask = di->Mask(); |
| + UpdatePort(old_mask, di); |
| + |
| + LOG_INFO("\tCLOSE: %ld: %lx -> %lx\n", fd, old_mask, new_mask); |
| + if (di->IsListeningSocket()) { |
| + // We only close the socket file descriptor from the operating |
| + // system if there are no other dart socket objects which |
| + // are listening on the same (address, port) combination. |
| + ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance(); |
| + |
| + MutexLocker locker(registry->mutex()); |
| + |
| + if (registry->CloseSafe(socket)) { |
| + ASSERT(new_mask == 0); |
| + socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
| + di->Close(); |
| + delete di; |
| + socket->SetClosedFd(); |
| } |
| - DescriptorInfo* di = |
| - GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data)); |
| - if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) { |
| - ASSERT(!di->IsListeningSocket()); |
| - // Close the socket for reading. |
| - LOG_INFO("\tSHUT_RD: %d\n", di->fd()); |
| - VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD)); |
| - } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) { |
| - ASSERT(!di->IsListeningSocket()); |
| - // Close the socket for writing. |
| - LOG_INFO("\tSHUT_WR: %d\n", di->fd()); |
| - VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR)); |
| - } else if (IS_COMMAND(msg[i].data, kCloseCommand)) { |
| - // Close the socket and free system resources and move on to next |
| - // message. |
| - intptr_t old_mask = di->Mask(); |
| - Dart_Port port = msg[i].dart_port; |
| - di->RemovePort(port); |
| - intptr_t new_mask = di->Mask(); |
| - UpdateEpollInstance(old_mask, di); |
| - |
| - LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask); |
| - intptr_t fd = di->fd(); |
| - if (di->IsListeningSocket()) { |
| - // We only close the socket file descriptor from the operating |
| - // system if there are no other dart socket objects which |
| - // are listening on the same (address, port) combination. |
| - ListeningSocketRegistry* registry = |
| - ListeningSocketRegistry::Instance(); |
| - |
| - MutexLocker locker(registry->mutex()); |
| - |
| - if (registry->CloseSafe(socket)) { |
| - ASSERT(new_mask == 0); |
| - socket_map_.Remove(GetHashmapKeyFromFd(fd), |
| - GetHashmapHashFromFd(fd)); |
| - di->Close(); |
| - LOG_INFO("Closed %d\n", di->fd()); |
| - delete di; |
| - socket->SetClosedFd(); |
| - } |
| - } else { |
| - ASSERT(new_mask == 0); |
| - socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
| - di->Close(); |
| - LOG_INFO("Closed %d\n", di->fd()); |
| - delete di; |
| - socket->SetClosedFd(); |
| - } |
| - |
| - bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent); |
| - if (!success) { |
| - LOG_ERR("Failed to post destroy event to port %ld", port); |
| - } |
| - } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) { |
| - int count = TOKEN_COUNT(msg[i].data); |
| - intptr_t old_mask = di->Mask(); |
| - LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask); |
| - di->ReturnTokens(msg[i].dart_port, count); |
| - UpdateEpollInstance(old_mask, di); |
| - } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) { |
| - // `events` can only have kInEvent/kOutEvent flags set. |
| - intptr_t events = msg[i].data & EVENT_MASK; |
| - ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
| - |
| - intptr_t old_mask = di->Mask(); |
| - LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", di->fd(), old_mask, |
| - msg[i].data & EVENT_MASK); |
| - di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK); |
| - UpdateEpollInstance(old_mask, di); |
| - } else { |
| - UNREACHABLE(); |
| + } else { |
| + ASSERT(new_mask == 0); |
| + socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
| + di->Close(); |
| + delete di; |
| + socket->SetClosedFd(); |
| + } |
| + if (port != 0) { |
| + const bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent); |
| + if (!success) { |
| + LOG_ERR("Failed to post destroy event to port %ld\n", port); |
| } |
| } |
| + } else if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
| + const int count = TOKEN_COUNT(msg->data); |
| + const intptr_t old_mask = di->Mask(); |
| + LOG_INFO("\t Return Token: %ld: %lx\n", fd, old_mask); |
| + di->ReturnTokens(msg->dart_port, count); |
| + UpdatePort(old_mask, di); |
| + } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { |
| + // `events` can only have kInEvent/kOutEvent flags set. |
| + const intptr_t events = msg->data & EVENT_MASK; |
| + ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
| + |
| + const intptr_t old_mask = di->Mask(); |
| + LOG_INFO("\t Set Event Mask: %ld: %lx %lx\n", fd, old_mask, |
| + msg->data & EVENT_MASK); |
| + di->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK); |
| + UpdatePort(old_mask, di); |
| + } else { |
| + UNREACHABLE(); |
| } |
| - LOG_INFO("HandleInterruptFd exit\n"); |
| } |
| -intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, |
| - DescriptorInfo* di) { |
| -#ifdef EVENTHANDLER_LOGGING |
| - PrintEventMask(di->fd(), events); |
| -#endif |
| - if ((events & EPOLLERR) != 0) { |
| - // Return error only if EPOLLIN is present. |
| - return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0; |
| - } |
| - intptr_t event_mask = 0; |
| - if ((events & EPOLLIN) != 0) { |
| - event_mask |= (1 << kInEvent); |
| - } |
| - if ((events & EPOLLOUT) != 0) { |
| - event_mask |= (1 << kOutEvent); |
| - } |
| - if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) { |
| - event_mask |= (1 << kCloseEvent); |
| - } |
| - return event_mask; |
| -} |
| - |
| - |
| -void EventHandlerImplementation::HandleEvents(struct epoll_event* events, |
| - int size) { |
| - bool interrupt_seen = false; |
| - for (int i = 0; i < size; i++) { |
| - if (events[i].data.ptr == NULL) { |
| - interrupt_seen = true; |
| - } else { |
| - DescriptorInfo* di = |
| - reinterpret_cast<DescriptorInfo*>(events[i].data.ptr); |
| - const intptr_t old_mask = di->Mask(); |
| - const intptr_t event_mask = GetPollEvents(events[i].events, di); |
| - LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask); |
| - if ((event_mask & (1 << kErrorEvent)) != 0) { |
| - di->NotifyAllDartPorts(event_mask); |
| - UpdateEpollInstance(old_mask, di); |
| - } else if (event_mask != 0) { |
| - Dart_Port port = di->NextNotifyDartPort(event_mask); |
| - ASSERT(port != 0); |
| - UpdateEpollInstance(old_mask, di); |
| - LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", event_mask, |
| - port, di->fd()); |
| - bool success = DartUtils::PostInt32(port, event_mask); |
| - if (!success) { |
| - // This can happen if e.g. the isolate that owns the port has died |
| - // for some reason. |
| - LOG_ERR("Failed to post event for fd %ld to port %ld", di->fd(), |
| - port); |
| - } |
| +void EventHandlerImplementation::HandlePacket(mx_port_packet_t* pkt) { |
| + LOG_INFO("HandlePacket: Got event packet: key=%lx\n", pkt->key); |
| + LOG_INFO("HandlePacket: Got event packet: type=%lx\n", pkt->type); |
| + LOG_INFO("HandlePacket: Got event packet: status=%ld\n", pkt->status); |
| + if (pkt->type == MX_PKT_TYPE_USER) { |
| + ASSERT(pkt->key == kInterruptPacketKey); |
| + InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt->user); |
| + HandleInterrupt(msg); |
| + return; |
| + } |
| + LOG_INFO("HandlePacket: Got event packet: observed = %lx\n", |
| + pkt->signal.observed); |
| + LOG_INFO("HandlePacket: Got event packet: count = %ld\n", pkt->signal.count); |
| + |
| + DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(pkt->key); |
| + mx_signals_t observed = pkt->signal.observed; |
| + const intptr_t old_mask = di->Mask(); |
| + const uint32_t epoll_event = di->io_handle()->WaitEnd(observed); |
| + intptr_t event_mask = IOHandle::EpollEventsToMask(epoll_event); |
| + if ((event_mask & (1 << kErrorEvent)) != 0) { |
| + di->NotifyAllDartPorts(event_mask); |
| + } else if (event_mask != 0) { |
| + event_mask = di->io_handle()->ToggleEvents(event_mask); |
| + if (event_mask != 0) { |
| + Dart_Port port = di->NextNotifyDartPort(event_mask); |
| + ASSERT(port != 0); |
| + bool success = DartUtils::PostInt32(port, event_mask); |
| + if (!success) { |
| + // This can happen if e.g. the isolate that owns the port has died |
| + // for some reason. |
| + LOG_ERR("Failed to post event to port %ld\n", port); |
| } |
| } |
| } |
| - if (interrupt_seen) { |
| - // Handle after socket events, so we avoid closing a socket before we handle |
| - // the current events. |
| - HandleInterruptFd(); |
| - } |
| + UpdatePort(old_mask, di); |
| } |
| @@ -448,31 +523,28 @@ void EventHandlerImplementation::HandleTimeout() { |
| void EventHandlerImplementation::Poll(uword args) { |
| - static const intptr_t kMaxEvents = 16; |
| - struct epoll_event events[kMaxEvents]; |
| EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
| EventHandlerImplementation* handler_impl = &handler->delegate_; |
| ASSERT(handler_impl != NULL); |
| + mx_port_packet_t pkt; |
| while (!handler_impl->shutdown_) { |
| int64_t millis = handler_impl->GetTimeout(); |
| ASSERT((millis == kInfinityTimeout) || (millis >= 0)); |
| - // TODO(US-109): When the epoll implementation is properly edge-triggered, |
| - // remove this sleep, which prevents the message queue from being |
| - // overwhelmed and leading to memory exhaustion. |
| - usleep(5000); |
| - LOG_INFO("epoll_wait(millis = %ld)\n", millis); |
| - intptr_t result = NO_RETRY_EXPECTED( |
| - epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis)); |
| - ASSERT(EAGAIN == EWOULDBLOCK); |
| - LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result); |
| - if (result < 0) { |
| - if (errno != EWOULDBLOCK) { |
| - perror("Poll failed"); |
| - } |
| + |
| + LOG_INFO("mx_port_wait(millis = %ld)\n", millis); |
| + mx_status_t status = mx_port_wait(handler_impl->port_handle_, |
| + millis == kInfinityTimeout |
| + ? MX_TIME_INFINITE |
| + : mx_deadline_after(MX_MSEC(millis)), |
| + reinterpret_cast<void*>(&pkt), 0); |
| + if (status == MX_ERR_TIMED_OUT) { |
| + handler_impl->HandleTimeout(); |
| + } else if (status != MX_OK) { |
| + FATAL1("mx_port_wait failed: %s\n", mx_status_get_string(status)); |
| } else { |
| handler_impl->HandleTimeout(); |
| - handler_impl->HandleEvents(events, result); |
| + handler_impl->HandlePacket(&pkt); |
| } |
| } |
| DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0); |