| Index: runtime/bin/eventhandler_fuchsia.cc
|
| diff --git a/runtime/bin/eventhandler_fuchsia.cc b/runtime/bin/eventhandler_fuchsia.cc
|
| index 2d0ebe3b58540233ee0428db9f208ac1d3ed9303..c298e220e3875aff5f4da21852b401219ce78fde 100644
|
| --- a/runtime/bin/eventhandler_fuchsia.cc
|
| +++ b/runtime/bin/eventhandler_fuchsia.cc
|
| @@ -10,13 +10,25 @@
|
| #include "bin/eventhandler.h"
|
| #include "bin/eventhandler_fuchsia.h"
|
|
|
| -#include <magenta/status.h>
|
| -#include <magenta/syscalls.h>
|
| -
|
| +#include <errno.h> // NOLINT
|
| +#include <fcntl.h> // NOLINT
|
| +#include <pthread.h> // NOLINT
|
| +#include <stdio.h> // NOLINT
|
| +#include <string.h> // NOLINT
|
| +#include <sys/epoll.h> // NOLINT
|
| +#include <sys/stat.h> // NOLINT
|
| +#include <unistd.h> // NOLINT
|
| +
|
| +#include "bin/fdutils.h"
|
| +#include "bin/lockers.h"
|
| #include "bin/log.h"
|
| +#include "bin/socket.h"
|
| #include "bin/thread.h"
|
| #include "bin/utils.h"
|
| +#include "platform/hashmap.h"
|
| +#include "platform/utils.h"
|
|
|
| +// #define EVENTHANDLER_LOGGING 1
|
| #if defined(EVENTHANDLER_LOGGING)
|
| #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__)
|
| #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__)
|
| @@ -28,115 +40,204 @@
|
| namespace dart {
|
| namespace bin {
|
|
|
| -MagentaWaitManyInfo::MagentaWaitManyInfo()
|
| - : capacity_(kInitialCapacity), size_(0) {
|
| - descriptor_infos_ = static_cast<DescriptorInfo**>(
|
| - malloc(kInitialCapacity * sizeof(*descriptor_infos_)));
|
| - if (descriptor_infos_ == NULL) {
|
| - FATAL("Failed to allocate descriptor_infos array");
|
| +#if defined(EVENTHANDLER_LOGGING)
|
| +static void PrintEventMask(intptr_t fd, intptr_t events) {
|
| + Log::PrintErr("%d ", fd);
|
| + if ((events & EPOLLIN) != 0) {
|
| + Log::PrintErr("EPOLLIN ");
|
| + }
|
| + if ((events & EPOLLPRI) != 0) {
|
| + Log::PrintErr("EPOLLPRI ");
|
| + }
|
| + if ((events & EPOLLOUT) != 0) {
|
| + Log::PrintErr("EPOLLOUT ");
|
| }
|
| - items_ =
|
| - static_cast<mx_wait_item_t*>(malloc(kInitialCapacity * sizeof(*items_)));
|
| - if (items_ == NULL) {
|
| - FATAL("Failed to allocate items array");
|
| + if ((events & EPOLLERR) != 0) {
|
| + Log::PrintErr("EPOLLERR ");
|
| }
|
| + if ((events & EPOLLHUP) != 0) {
|
| + Log::PrintErr("EPOLLHUP ");
|
| + }
|
| + if ((events & EPOLLRDHUP) != 0) {
|
| + Log::PrintErr("EPOLLRDHUP ");
|
| + }
|
| + int all_events =
|
| + EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
|
| + if ((events & ~all_events) != 0) {
|
| + Log::PrintErr("(and %08x) ", events & ~all_events);
|
| + }
|
| +
|
| + Log::PrintErr("\n");
|
| }
|
| +#endif
|
|
|
|
|
| -MagentaWaitManyInfo::~MagentaWaitManyInfo() {
|
| - free(descriptor_infos_);
|
| - free(items_);
|
| +intptr_t DescriptorInfo::GetPollEvents() {
|
| + // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
|
| + // triggered anyway.
|
| + intptr_t events = 0;
|
| + if ((Mask() & (1 << kInEvent)) != 0) {
|
| + events |= EPOLLIN;
|
| + }
|
| + if ((Mask() & (1 << kOutEvent)) != 0) {
|
| + events |= EPOLLOUT;
|
| + }
|
| + return events;
|
| }
|
|
|
|
|
| -void MagentaWaitManyInfo::AddHandle(mx_handle_t handle,
|
| - mx_signals_t signals,
|
| - DescriptorInfo* di) {
|
| -#if defined(DEBUG)
|
| - // Check that the handle is not already in the list.
|
| - for (intptr_t i = 0; i < size_; i++) {
|
| - if (items_[i].handle == handle) {
|
| - FATAL("The handle is already in the list!");
|
| - }
|
| - }
|
| -#endif
|
| - intptr_t new_size = size_ + 1;
|
| - GrowArraysIfNeeded(new_size);
|
| - descriptor_infos_[size_] = di;
|
| - items_[size_].handle = handle;
|
| - items_[size_].waitfor = signals;
|
| - items_[size_].pending = 0;
|
| - size_ = new_size;
|
| - LOG_INFO("AddHandle(%ld, %ld, %p), size = %ld\n", handle, signals, di, size_);
|
| +// Unregister the file descriptor for a DescriptorInfo structure with
|
| +// epoll.
|
| +static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
|
| + LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd());
|
| + VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL));
|
| }
|
|
|
|
|
| -void MagentaWaitManyInfo::RemoveHandle(mx_handle_t handle) {
|
| - intptr_t idx;
|
| - for (idx = 1; idx < size_; idx++) {
|
| - if (handle == items_[idx].handle) {
|
| - break;
|
| - }
|
| +static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
|
| + struct epoll_event event;
|
| + event.events = EPOLLRDHUP | di->GetPollEvents();
|
| + if (!di->IsListeningSocket()) {
|
| + event.events |= EPOLLET;
|
| }
|
| - if (idx == size_) {
|
| - FATAL("Handle is not in the list!");
|
| - }
|
| -
|
| - if (idx != (size_ - 1)) {
|
| - descriptor_infos_[idx] = descriptor_infos_[size_ - 1];
|
| - items_[idx] = items_[size_ - 1];
|
| + event.data.ptr = di;
|
| + LOG_INFO("AddToEpollInstance: fd = %ld\n", di->fd());
|
| + int status =
|
| + NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event));
|
| + LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", di->fd(), status);
|
| +#if defined(EVENTHANDLER_LOGGING)
|
| + PrintEventMask(di->fd(), event.events);
|
| +#endif
|
| + if (status == -1) {
|
| + // TODO(dart:io): Verify that the dart end is handling this correctly.
|
| +
|
| + // Epoll does not accept the file descriptor. It could be due to
|
| + // already closed file descriptor, or unuspported devices, such
|
| + // as /dev/null. In such case, mark the file descriptor as closed,
|
| + // so dart will handle it accordingly.
|
| + di->NotifyAllDartPorts(1 << kCloseEvent);
|
| }
|
| - descriptor_infos_[size_ - 1] = NULL;
|
| - items_[size_ - 1] = {MX_HANDLE_INVALID, 0, 0};
|
| - size_ = size_ - 1;
|
| - LOG_INFO("RemoveHandle(%ld), size = %ld\n", handle, size_);
|
| }
|
|
|
|
|
| -void MagentaWaitManyInfo::GrowArraysIfNeeded(intptr_t desired_size) {
|
| - if (desired_size < capacity_) {
|
| - return;
|
| +EventHandlerImplementation::EventHandlerImplementation()
|
| + : socket_map_(&HashMap::SamePointerValue, 16) {
|
| + intptr_t result;
|
| + result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
|
| + if (result != 0) {
|
| + FATAL("Pipe creation failed");
|
| + }
|
| + if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
|
| + FATAL("Failed to set pipe fd non blocking\n");
|
| }
|
| - intptr_t new_capacity = desired_size + (desired_size >> 1);
|
| - descriptor_infos_ = static_cast<DescriptorInfo**>(
|
| - realloc(descriptor_infos_, new_capacity * sizeof(*descriptor_infos_)));
|
| - if (descriptor_infos_ == NULL) {
|
| - FATAL("Failed to grow descriptor_infos array");
|
| + if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
|
| + FATAL("Failed to set pipe fd close on exec\n");
|
| }
|
| - items_ = static_cast<mx_wait_item_t*>(
|
| - realloc(items_, new_capacity * sizeof(*items_)));
|
| - if (items_ == NULL) {
|
| - FATAL("Failed to grow items array");
|
| + if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
|
| + FATAL("Failed to set pipe fd close on exec\n");
|
| + }
|
| + shutdown_ = false;
|
| + // The initial size passed to epoll_create is ignore on newer (>=
|
| + // 2.6.8) Linux versions
|
| + static const int kEpollInitialSize = 64;
|
| + epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize));
|
| + if (epoll_fd_ == -1) {
|
| + FATAL1("Failed creating epoll file descriptor: %i", errno);
|
| + }
|
| + if (!FDUtils::SetCloseOnExec(epoll_fd_)) {
|
| + FATAL("Failed to set epoll fd close on exec\n");
|
| + }
|
| + // Register the interrupt_fd with the epoll instance.
|
| + struct epoll_event event;
|
| + event.events = EPOLLIN;
|
| + event.data.ptr = NULL;
|
| + LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", epoll_fd_);
|
| + int status = NO_RETRY_EXPECTED(
|
| + epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event));
|
| + LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n",
|
| + epoll_fd_, status);
|
| + if (status == -1) {
|
| + FATAL("Failed adding interrupt fd to epoll instance");
|
| }
|
| - capacity_ = new_capacity;
|
| - LOG_INFO("GrowArraysIfNeeded(%ld), capacity = %ld\n", desired_size,
|
| - capacity_);
|
| }
|
|
|
|
|
| -EventHandlerImplementation::EventHandlerImplementation() {
|
| - mx_status_t status =
|
| - mx_channel_create(0, &interrupt_handles_[0], &interrupt_handles_[1]);
|
| - if (status != NO_ERROR) {
|
| - FATAL1("mx_channel_create failed: %s\n", mx_status_get_string(status));
|
| - }
|
| - shutdown_ = false;
|
| - info_.AddHandle(interrupt_handles_[0],
|
| - MX_SIGNAL_READABLE | MX_SIGNAL_PEER_CLOSED, NULL);
|
| - LOG_INFO("EventHandlerImplementation initialized\n");
|
| +static void DeleteDescriptorInfo(void* info) {
|
| + DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
|
| + di->Close();
|
| + LOG_INFO("Closed %d\n", di->fd());
|
| + delete di;
|
| }
|
|
|
|
|
| EventHandlerImplementation::~EventHandlerImplementation() {
|
| - mx_status_t status = mx_handle_close(interrupt_handles_[0]);
|
| - if (status != NO_ERROR) {
|
| - FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status));
|
| + socket_map_.Clear(DeleteDescriptorInfo);
|
| + VOID_NO_RETRY_EXPECTED(close(epoll_fd_));
|
| + VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0]));
|
| + VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1]));
|
| +}
|
| +
|
| +
|
| +void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,
|
| + DescriptorInfo* di) {
|
| + intptr_t new_mask = di->Mask();
|
| + LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", di->fd(), old_mask,
|
| + new_mask);
|
| + if ((old_mask != 0) && (new_mask == 0)) {
|
| + RemoveFromEpollInstance(epoll_fd_, di);
|
| + } else if ((old_mask == 0) && (new_mask != 0)) {
|
| + AddToEpollInstance(epoll_fd_, di);
|
| + } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) {
|
| + ASSERT(!di->IsListeningSocket());
|
| + RemoveFromEpollInstance(epoll_fd_, di);
|
| + AddToEpollInstance(epoll_fd_, di);
|
| }
|
| - status = mx_handle_close(interrupt_handles_[1]);
|
| - if (status != NO_ERROR) {
|
| - FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status));
|
| +}
|
| +
|
| +
|
| +DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
|
| + intptr_t fd,
|
| + bool is_listening) {
|
| + ASSERT(fd >= 0);
|
| + HashMap::Entry* entry = socket_map_.Lookup(GetHashmapKeyFromFd(fd),
|
| + GetHashmapHashFromFd(fd), true);
|
| + ASSERT(entry != NULL);
|
| + DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
|
| + if (di == NULL) {
|
| + // If there is no data in the hash map for this file descriptor a
|
| + // new DescriptorInfo for the file descriptor is inserted.
|
| + if (is_listening) {
|
| + di = new DescriptorInfoMultiple(fd);
|
| + } else {
|
| + di = new DescriptorInfoSingle(fd);
|
| + }
|
| + entry->value = di;
|
| }
|
| - LOG_INFO("EventHandlerImplementation destroyed\n");
|
| + ASSERT(fd == di->fd());
|
| + return di;
|
| +}
|
| +
|
| +
|
| +static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) {
|
| + size_t remaining = count;
|
| + char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer));
|
| + while (remaining > 0) {
|
| + ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining));
|
| + if (bytes_written == 0) {
|
| + return count - remaining;
|
| + } else if (bytes_written == -1) {
|
| + ASSERT(EAGAIN == EWOULDBLOCK);
|
| + // Error code EWOULDBLOCK should only happen for non blocking
|
| + // file descriptors.
|
| + ASSERT(errno != EWOULDBLOCK);
|
| + return -1;
|
| + } else {
|
| + ASSERT(bytes_written > 0);
|
| + remaining -= bytes_written;
|
| + buffer_pos += bytes_written;
|
| + }
|
| + }
|
| + return count;
|
| }
|
|
|
|
|
| @@ -147,71 +248,171 @@ void EventHandlerImplementation::WakeupHandler(intptr_t id,
|
| msg.id = id;
|
| msg.dart_port = dart_port;
|
| msg.data = data;
|
| -
|
| - mx_status_t status =
|
| - mx_channel_write(interrupt_handles_[1], 0, &msg, sizeof(msg), NULL, 0);
|
| - if (status != NO_ERROR) {
|
| - FATAL1("mx_channel_write failed: %s\n", mx_status_get_string(status));
|
| + // WriteToBlocking will write up to 512 bytes atomically, and since our msg
|
| + // is smaller than 512, we don't need a thread lock.
|
| + // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
|
| + ASSERT(kInterruptMessageSize < PIPE_BUF);
|
| + intptr_t result =
|
| + WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
|
| + if (result != kInterruptMessageSize) {
|
| + if (result == -1) {
|
| + perror("Interrupt message failure:");
|
| + }
|
| + FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
|
| }
|
| - LOG_INFO("WakeupHandler(%ld, %ld, %lld)\n", id, dart_port, data);
|
| }
|
|
|
|
|
| void EventHandlerImplementation::HandleInterruptFd() {
|
| - LOG_INFO("HandleInterruptFd entry\n");
|
| - InterruptMessage msg;
|
| - uint32_t bytes = kInterruptMessageSize;
|
| - mx_status_t status;
|
| - while (true) {
|
| - status = mx_channel_read(interrupt_handles_[0], 0, &msg, bytes, &bytes,
|
| - NULL, 0, NULL);
|
| - if (status != NO_ERROR) {
|
| - break;
|
| - }
|
| - ASSERT(bytes == kInterruptMessageSize);
|
| - if (msg.id == kTimerId) {
|
| + const intptr_t MAX_MESSAGES = kInterruptMessageSize;
|
| + InterruptMessage msg[MAX_MESSAGES];
|
| + ssize_t bytes = NO_RETRY_EXPECTED(
|
| + read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
|
| + LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes);
|
| + for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
|
| + if (msg[i].id == kTimerId) {
|
| LOG_INFO("HandleInterruptFd read timer update\n");
|
| - timeout_queue_.UpdateTimeout(msg.dart_port, msg.data);
|
| - } else if (msg.id == kShutdownId) {
|
| + timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
|
| + } else if (msg[i].id == kShutdownId) {
|
| LOG_INFO("HandleInterruptFd read shutdown\n");
|
| shutdown_ = true;
|
| } else {
|
| - // TODO(zra): Handle commands to add and remove handles from the
|
| - // MagentaWaitManyInfo.
|
| - UNIMPLEMENTED();
|
| + ASSERT((msg[i].data & COMMAND_MASK) != 0);
|
| + LOG_INFO("HandleInterruptFd command\n");
|
| + DescriptorInfo* di =
|
| + GetDescriptorInfo(msg[i].id, IS_LISTENING_SOCKET(msg[i].data));
|
| + if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
|
| + ASSERT(!di->IsListeningSocket());
|
| + // Close the socket for reading.
|
| + LOG_INFO("\tSHUT_RD: %d\n", di->fd());
|
| + VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
|
| + } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
|
| + ASSERT(!di->IsListeningSocket());
|
| + // Close the socket for writing.
|
| + LOG_INFO("\tSHUT_WR: %d\n", di->fd());
|
| + VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
|
| + } else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
|
| + // Close the socket and free system resources and move on to next
|
| + // message.
|
| + intptr_t old_mask = di->Mask();
|
| + Dart_Port port = msg[i].dart_port;
|
| + di->RemovePort(port);
|
| + intptr_t new_mask = di->Mask();
|
| + UpdateEpollInstance(old_mask, di);
|
| +
|
| + LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask);
|
| + intptr_t fd = di->fd();
|
| + if (di->IsListeningSocket()) {
|
| + // We only close the socket file descriptor from the operating
|
| + // system if there are no other dart socket objects which
|
| + // are listening on the same (address, port) combination.
|
| + ListeningSocketRegistry* registry =
|
| + ListeningSocketRegistry::Instance();
|
| +
|
| + MutexLocker locker(registry->mutex());
|
| +
|
| + if (registry->CloseSafe(fd)) {
|
| + ASSERT(new_mask == 0);
|
| + socket_map_.Remove(GetHashmapKeyFromFd(fd),
|
| + GetHashmapHashFromFd(fd));
|
| + di->Close();
|
| + LOG_INFO("Closed %d\n", di->fd());
|
| + delete di;
|
| + }
|
| + } else {
|
| + ASSERT(new_mask == 0);
|
| + socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
|
| + di->Close();
|
| + LOG_INFO("Closed %d\n", di->fd());
|
| + delete di;
|
| + }
|
| +
|
| + DartUtils::PostInt32(port, 1 << kDestroyedEvent);
|
| + } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
|
| + int count = TOKEN_COUNT(msg[i].data);
|
| + intptr_t old_mask = di->Mask();
|
| + LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask);
|
| + di->ReturnTokens(msg[i].dart_port, count);
|
| + UpdateEpollInstance(old_mask, di);
|
| + } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
|
| + // `events` can only have kInEvent/kOutEvent flags set.
|
| + intptr_t events = msg[i].data & EVENT_MASK;
|
| + ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
|
| +
|
| + intptr_t old_mask = di->Mask();
|
| + LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", di->fd(), old_mask,
|
| + msg[i].data & EVENT_MASK);
|
| + di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
|
| + UpdateEpollInstance(old_mask, di);
|
| + } else {
|
| + UNREACHABLE();
|
| + }
|
| }
|
| }
|
| - // status == ERR_SHOULD_WAIT when we try to read and there are no messages
|
| - // available, so it is an error if we get here and status != ERR_SHOULD_WAIT.
|
| - if (status != ERR_SHOULD_WAIT) {
|
| - FATAL1("mx_channel_read failed: %s\n", mx_status_get_string(status));
|
| - }
|
| LOG_INFO("HandleInterruptFd exit\n");
|
| }
|
|
|
|
|
| -void EventHandlerImplementation::HandleEvents() {
|
| - LOG_INFO("HandleEvents entry\n");
|
| - for (intptr_t i = 1; i < info_.size(); i++) {
|
| - const mx_wait_item_t& wait_item = info_.items()[i];
|
| - if (wait_item.pending & wait_item.waitfor) {
|
| - // Only the control handle has no descriptor info.
|
| - ASSERT(info_.descriptor_infos()[i] != NULL);
|
| - ASSERT(wait_item.handle != interrupt_handles_[0]);
|
| - // TODO(zra): Handle events on other handles. At the moment we are
|
| - // only interrupted when there is a message on interrupt_handles_[0].
|
| - UNIMPLEMENTED();
|
| - }
|
| +intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
|
| + DescriptorInfo* di) {
|
| +#ifdef EVENTHANDLER_LOGGING
|
| + PrintEventMask(di->fd(), events);
|
| +#endif
|
| + if ((events & EPOLLERR) != 0) {
|
| + // Return error only if EPOLLIN is present.
|
| + return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
|
| + }
|
| + intptr_t event_mask = 0;
|
| + if ((events & EPOLLIN) != 0) {
|
| + event_mask |= (1 << kInEvent);
|
| }
|
| + if ((events & EPOLLOUT) != 0) {
|
| + event_mask |= (1 << kOutEvent);
|
| + }
|
| + if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
|
| + event_mask |= (1 << kCloseEvent);
|
| + }
|
| + return event_mask;
|
| +}
|
| +
|
|
|
| - if ((info_.items()[0].pending & MX_SIGNAL_PEER_CLOSED) != 0) {
|
| - FATAL("EventHandlerImplementation::Poll: Unexpected peer closed\n");
|
| +void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
|
| + int size) {
|
| + bool interrupt_seen = false;
|
| + for (int i = 0; i < size; i++) {
|
| + if (events[i].data.ptr == NULL) {
|
| + interrupt_seen = true;
|
| + } else {
|
| + DescriptorInfo* di =
|
| + reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
|
| + intptr_t event_mask = GetPollEvents(events[i].events, di);
|
| +
|
| + if ((event_mask & (1 << kErrorEvent)) != 0) {
|
| + di->NotifyAllDartPorts(event_mask);
|
| + }
|
| + event_mask &= ~(1 << kErrorEvent);
|
| +
|
| + LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask);
|
| + if (event_mask != 0) {
|
| + intptr_t old_mask = di->Mask();
|
| + Dart_Port port = di->NextNotifyDartPort(event_mask);
|
| + ASSERT(port != 0);
|
| + UpdateEpollInstance(old_mask, di);
|
| + LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", event_mask,
|
| + port, di->fd());
|
| + bool success = DartUtils::PostInt32(port, event_mask);
|
| + if (!success) {
|
| + // This can happen if e.g. the isolate that owns the port has died
|
| + // for some reason.
|
| + FATAL2("Failed to post event for fd %ld to port %ld", di->fd(), port);
|
| + }
|
| + }
|
| + }
|
| }
|
| - if ((info_.items()[0].pending & MX_SIGNAL_READABLE) != 0) {
|
| - LOG_INFO("HandleEvents interrupt_handles_[0] readable\n");
|
| + if (interrupt_seen) {
|
| + // Handle after socket events, so we avoid closing a socket before we handle
|
| + // the current events.
|
| HandleInterruptFd();
|
| - } else {
|
| - LOG_INFO("HandleEvents interrupt_handles_[0] not readable\n");
|
| }
|
| }
|
|
|
| @@ -239,6 +440,8 @@ void EventHandlerImplementation::HandleTimeout() {
|
|
|
|
|
| void EventHandlerImplementation::Poll(uword args) {
|
| + static const intptr_t kMaxEvents = 16;
|
| + struct epoll_event events[kMaxEvents];
|
| EventHandler* handler = reinterpret_cast<EventHandler*>(args);
|
| EventHandlerImplementation* handler_impl = &handler->delegate_;
|
| ASSERT(handler_impl != NULL);
|
| @@ -246,23 +449,21 @@ void EventHandlerImplementation::Poll(uword args) {
|
| while (!handler_impl->shutdown_) {
|
| int64_t millis = handler_impl->GetTimeout();
|
| ASSERT((millis == kInfinityTimeout) || (millis >= 0));
|
| - mx_time_t timeout =
|
| - millis * kMicrosecondsPerMillisecond * kNanosecondsPerMicrosecond;
|
| - const MagentaWaitManyInfo& info = handler_impl->info();
|
| - LOG_INFO("mx_handle_wait_many(%p, %ld, %lld)\n", info.items(), info.size(),
|
| - timeout);
|
| - mx_status_t status =
|
| - mx_handle_wait_many(info.items(), info.size(), timeout);
|
| - if ((status != NO_ERROR) && (status != ERR_TIMED_OUT)) {
|
| - FATAL1("mx_handle_wait_many failed: %s\n", mx_status_get_string(status));
|
| + LOG_INFO("epoll_wait(millis = %ld)\n", millis);
|
| + intptr_t result = NO_RETRY_EXPECTED(
|
| + epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis));
|
| + ASSERT(EAGAIN == EWOULDBLOCK);
|
| + LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result);
|
| + if (result < 0) {
|
| + if (errno != EWOULDBLOCK) {
|
| + perror("Poll failed");
|
| + }
|
| } else {
|
| - LOG_INFO("mx_handle_wait_many returned: %ld\n", status);
|
| handler_impl->HandleTimeout();
|
| - handler_impl->HandleEvents();
|
| + handler_impl->HandleEvents(events, result);
|
| }
|
| }
|
| handler->NotifyShutdownDone();
|
| - LOG_INFO("EventHandlerImplementation notifying about shutdown\n");
|
| }
|
|
|
|
|
| @@ -286,6 +487,17 @@ void EventHandlerImplementation::SendData(intptr_t id,
|
| WakeupHandler(id, dart_port, data);
|
| }
|
|
|
| +void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
|
| + // The hashmap does not support keys with value 0.
|
| + return reinterpret_cast<void*>(fd + 1);
|
| +}
|
| +
|
| +
|
| +uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
|
| + // The hashmap does not support keys with value 0.
|
| + return dart::Utils::WordHash(fd + 1);
|
| +}
|
| +
|
| } // namespace bin
|
| } // namespace dart
|
|
|
|
|