Index: runtime/bin/eventhandler_fuchsia.cc |
diff --git a/runtime/bin/eventhandler_fuchsia.cc b/runtime/bin/eventhandler_fuchsia.cc |
index 2d0ebe3b58540233ee0428db9f208ac1d3ed9303..c298e220e3875aff5f4da21852b401219ce78fde 100644 |
--- a/runtime/bin/eventhandler_fuchsia.cc |
+++ b/runtime/bin/eventhandler_fuchsia.cc |
@@ -10,13 +10,25 @@ |
#include "bin/eventhandler.h" |
#include "bin/eventhandler_fuchsia.h" |
-#include <magenta/status.h> |
-#include <magenta/syscalls.h> |
- |
+#include <errno.h> // NOLINT |
+#include <fcntl.h> // NOLINT |
+#include <pthread.h> // NOLINT |
+#include <stdio.h> // NOLINT |
+#include <string.h> // NOLINT |
+#include <sys/epoll.h> // NOLINT |
+#include <sys/stat.h> // NOLINT |
+#include <unistd.h> // NOLINT |
+ |
+#include "bin/fdutils.h" |
+#include "bin/lockers.h" |
#include "bin/log.h" |
+#include "bin/socket.h" |
#include "bin/thread.h" |
#include "bin/utils.h" |
+#include "platform/hashmap.h" |
+#include "platform/utils.h" |
+// #define EVENTHANDLER_LOGGING 1 |
#if defined(EVENTHANDLER_LOGGING) |
#define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) |
#define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) |
@@ -28,115 +40,204 @@ |
namespace dart { |
namespace bin { |
-MagentaWaitManyInfo::MagentaWaitManyInfo() |
- : capacity_(kInitialCapacity), size_(0) { |
- descriptor_infos_ = static_cast<DescriptorInfo**>( |
- malloc(kInitialCapacity * sizeof(*descriptor_infos_))); |
- if (descriptor_infos_ == NULL) { |
- FATAL("Failed to allocate descriptor_infos array"); |
+#if defined(EVENTHANDLER_LOGGING) |
+static void PrintEventMask(intptr_t fd, intptr_t events) { |
+ Log::PrintErr("%d ", fd); |
+ if ((events & EPOLLIN) != 0) { |
+ Log::PrintErr("EPOLLIN "); |
+ } |
+ if ((events & EPOLLPRI) != 0) { |
+ Log::PrintErr("EPOLLPRI "); |
+ } |
+ if ((events & EPOLLOUT) != 0) { |
+ Log::PrintErr("EPOLLOUT "); |
} |
- items_ = |
- static_cast<mx_wait_item_t*>(malloc(kInitialCapacity * sizeof(*items_))); |
- if (items_ == NULL) { |
- FATAL("Failed to allocate items array"); |
+ if ((events & EPOLLERR) != 0) { |
+ Log::PrintErr("EPOLLERR "); |
} |
+ if ((events & EPOLLHUP) != 0) { |
+ Log::PrintErr("EPOLLHUP "); |
+ } |
+ if ((events & EPOLLRDHUP) != 0) { |
+ Log::PrintErr("EPOLLRDHUP "); |
+ } |
+ int all_events = |
+ EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP; |
+ if ((events & ~all_events) != 0) { |
+ Log::PrintErr("(and %08x) ", events & ~all_events); |
+ } |
+ |
+ Log::PrintErr("\n"); |
} |
+#endif |
-MagentaWaitManyInfo::~MagentaWaitManyInfo() { |
- free(descriptor_infos_); |
- free(items_); |
+intptr_t DescriptorInfo::GetPollEvents() { |
+ // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are |
+ // triggered anyway. |
+ intptr_t events = 0; |
+ if ((Mask() & (1 << kInEvent)) != 0) { |
+ events |= EPOLLIN; |
+ } |
+ if ((Mask() & (1 << kOutEvent)) != 0) { |
+ events |= EPOLLOUT; |
+ } |
+ return events; |
} |
-void MagentaWaitManyInfo::AddHandle(mx_handle_t handle, |
- mx_signals_t signals, |
- DescriptorInfo* di) { |
-#if defined(DEBUG) |
- // Check that the handle is not already in the list. |
- for (intptr_t i = 0; i < size_; i++) { |
- if (items_[i].handle == handle) { |
- FATAL("The handle is already in the list!"); |
- } |
- } |
-#endif |
- intptr_t new_size = size_ + 1; |
- GrowArraysIfNeeded(new_size); |
- descriptor_infos_[size_] = di; |
- items_[size_].handle = handle; |
- items_[size_].waitfor = signals; |
- items_[size_].pending = 0; |
- size_ = new_size; |
- LOG_INFO("AddHandle(%ld, %ld, %p), size = %ld\n", handle, signals, di, size_); |
+// Unregister the file descriptor for a DescriptorInfo structure with |
+// epoll. |
+static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { |
+ LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd()); |
+ VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL)); |
} |
-void MagentaWaitManyInfo::RemoveHandle(mx_handle_t handle) { |
- intptr_t idx; |
- for (idx = 1; idx < size_; idx++) { |
- if (handle == items_[idx].handle) { |
- break; |
- } |
+static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { |
+ struct epoll_event event; |
+ event.events = EPOLLRDHUP | di->GetPollEvents(); |
+ if (!di->IsListeningSocket()) { |
+ event.events |= EPOLLET; |
} |
- if (idx == size_) { |
- FATAL("Handle is not in the list!"); |
- } |
- |
- if (idx != (size_ - 1)) { |
- descriptor_infos_[idx] = descriptor_infos_[size_ - 1]; |
- items_[idx] = items_[size_ - 1]; |
+ event.data.ptr = di; |
+ LOG_INFO("AddToEpollInstance: fd = %ld\n", di->fd()); |
+ int status = |
+ NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event)); |
+ LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", di->fd(), status); |
+#if defined(EVENTHANDLER_LOGGING) |
+ PrintEventMask(di->fd(), event.events); |
+#endif |
+ if (status == -1) { |
+ // TODO(dart:io): Verify that the dart end is handling this correctly. |
+ |
+ // Epoll does not accept the file descriptor. It could be due to |
+ // already closed file descriptor, or unuspported devices, such |
+ // as /dev/null. In such case, mark the file descriptor as closed, |
+ // so dart will handle it accordingly. |
+ di->NotifyAllDartPorts(1 << kCloseEvent); |
} |
- descriptor_infos_[size_ - 1] = NULL; |
- items_[size_ - 1] = {MX_HANDLE_INVALID, 0, 0}; |
- size_ = size_ - 1; |
- LOG_INFO("RemoveHandle(%ld), size = %ld\n", handle, size_); |
} |
-void MagentaWaitManyInfo::GrowArraysIfNeeded(intptr_t desired_size) { |
- if (desired_size < capacity_) { |
- return; |
+EventHandlerImplementation::EventHandlerImplementation() |
+ : socket_map_(&HashMap::SamePointerValue, 16) { |
+ intptr_t result; |
+ result = NO_RETRY_EXPECTED(pipe(interrupt_fds_)); |
+ if (result != 0) { |
+ FATAL("Pipe creation failed"); |
+ } |
+ if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) { |
+ FATAL("Failed to set pipe fd non blocking\n"); |
} |
- intptr_t new_capacity = desired_size + (desired_size >> 1); |
- descriptor_infos_ = static_cast<DescriptorInfo**>( |
- realloc(descriptor_infos_, new_capacity * sizeof(*descriptor_infos_))); |
- if (descriptor_infos_ == NULL) { |
- FATAL("Failed to grow descriptor_infos array"); |
+ if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) { |
+ FATAL("Failed to set pipe fd close on exec\n"); |
} |
- items_ = static_cast<mx_wait_item_t*>( |
- realloc(items_, new_capacity * sizeof(*items_))); |
- if (items_ == NULL) { |
- FATAL("Failed to grow items array"); |
+ if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) { |
+ FATAL("Failed to set pipe fd close on exec\n"); |
+ } |
+ shutdown_ = false; |
+ // The initial size passed to epoll_create is ignore on newer (>= |
+ // 2.6.8) Linux versions |
+ static const int kEpollInitialSize = 64; |
+ epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize)); |
+ if (epoll_fd_ == -1) { |
+ FATAL1("Failed creating epoll file descriptor: %i", errno); |
+ } |
+ if (!FDUtils::SetCloseOnExec(epoll_fd_)) { |
+ FATAL("Failed to set epoll fd close on exec\n"); |
+ } |
+ // Register the interrupt_fd with the epoll instance. |
+ struct epoll_event event; |
+ event.events = EPOLLIN; |
+ event.data.ptr = NULL; |
+ LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", epoll_fd_); |
+ int status = NO_RETRY_EXPECTED( |
+ epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event)); |
+ LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n", |
+ epoll_fd_, status); |
+ if (status == -1) { |
+ FATAL("Failed adding interrupt fd to epoll instance"); |
} |
- capacity_ = new_capacity; |
- LOG_INFO("GrowArraysIfNeeded(%ld), capacity = %ld\n", desired_size, |
- capacity_); |
} |
-EventHandlerImplementation::EventHandlerImplementation() { |
- mx_status_t status = |
- mx_channel_create(0, &interrupt_handles_[0], &interrupt_handles_[1]); |
- if (status != NO_ERROR) { |
- FATAL1("mx_channel_create failed: %s\n", mx_status_get_string(status)); |
- } |
- shutdown_ = false; |
- info_.AddHandle(interrupt_handles_[0], |
- MX_SIGNAL_READABLE | MX_SIGNAL_PEER_CLOSED, NULL); |
- LOG_INFO("EventHandlerImplementation initialized\n"); |
+static void DeleteDescriptorInfo(void* info) { |
+ DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info); |
+ di->Close(); |
+ LOG_INFO("Closed %d\n", di->fd()); |
+ delete di; |
} |
EventHandlerImplementation::~EventHandlerImplementation() { |
- mx_status_t status = mx_handle_close(interrupt_handles_[0]); |
- if (status != NO_ERROR) { |
- FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status)); |
+ socket_map_.Clear(DeleteDescriptorInfo); |
+ VOID_NO_RETRY_EXPECTED(close(epoll_fd_)); |
+ VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0])); |
+ VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1])); |
+} |
+ |
+ |
+void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask, |
+ DescriptorInfo* di) { |
+ intptr_t new_mask = di->Mask(); |
+ LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", di->fd(), old_mask, |
+ new_mask); |
+ if ((old_mask != 0) && (new_mask == 0)) { |
+ RemoveFromEpollInstance(epoll_fd_, di); |
+ } else if ((old_mask == 0) && (new_mask != 0)) { |
+ AddToEpollInstance(epoll_fd_, di); |
+ } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) { |
+ ASSERT(!di->IsListeningSocket()); |
+ RemoveFromEpollInstance(epoll_fd_, di); |
+ AddToEpollInstance(epoll_fd_, di); |
} |
- status = mx_handle_close(interrupt_handles_[1]); |
- if (status != NO_ERROR) { |
- FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status)); |
+} |
+ |
+ |
+DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( |
+ intptr_t fd, |
+ bool is_listening) { |
+ ASSERT(fd >= 0); |
+ HashMap::Entry* entry = socket_map_.Lookup(GetHashmapKeyFromFd(fd), |
+ GetHashmapHashFromFd(fd), true); |
+ ASSERT(entry != NULL); |
+ DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value); |
+ if (di == NULL) { |
+ // If there is no data in the hash map for this file descriptor a |
+ // new DescriptorInfo for the file descriptor is inserted. |
+ if (is_listening) { |
+ di = new DescriptorInfoMultiple(fd); |
+ } else { |
+ di = new DescriptorInfoSingle(fd); |
+ } |
+ entry->value = di; |
} |
- LOG_INFO("EventHandlerImplementation destroyed\n"); |
+ ASSERT(fd == di->fd()); |
+ return di; |
+} |
+ |
+ |
+static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) { |
+ size_t remaining = count; |
+ char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer)); |
+ while (remaining > 0) { |
+ ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining)); |
+ if (bytes_written == 0) { |
+ return count - remaining; |
+ } else if (bytes_written == -1) { |
+ ASSERT(EAGAIN == EWOULDBLOCK); |
+ // Error code EWOULDBLOCK should only happen for non blocking |
+ // file descriptors. |
+ ASSERT(errno != EWOULDBLOCK); |
+ return -1; |
+ } else { |
+ ASSERT(bytes_written > 0); |
+ remaining -= bytes_written; |
+ buffer_pos += bytes_written; |
+ } |
+ } |
+ return count; |
} |
@@ -147,71 +248,171 @@ void EventHandlerImplementation::WakeupHandler(intptr_t id, |
msg.id = id; |
msg.dart_port = dart_port; |
msg.data = data; |
- |
- mx_status_t status = |
- mx_channel_write(interrupt_handles_[1], 0, &msg, sizeof(msg), NULL, 0); |
- if (status != NO_ERROR) { |
- FATAL1("mx_channel_write failed: %s\n", mx_status_get_string(status)); |
+ // WriteToBlocking will write up to 512 bytes atomically, and since our msg |
+ // is smaller than 512, we don't need a thread lock. |
+ // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. |
+ ASSERT(kInterruptMessageSize < PIPE_BUF); |
+ intptr_t result = |
+ WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
+ if (result != kInterruptMessageSize) { |
+ if (result == -1) { |
+ perror("Interrupt message failure:"); |
+ } |
+ FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result); |
} |
- LOG_INFO("WakeupHandler(%ld, %ld, %lld)\n", id, dart_port, data); |
} |
void EventHandlerImplementation::HandleInterruptFd() { |
- LOG_INFO("HandleInterruptFd entry\n"); |
- InterruptMessage msg; |
- uint32_t bytes = kInterruptMessageSize; |
- mx_status_t status; |
- while (true) { |
- status = mx_channel_read(interrupt_handles_[0], 0, &msg, bytes, &bytes, |
- NULL, 0, NULL); |
- if (status != NO_ERROR) { |
- break; |
- } |
- ASSERT(bytes == kInterruptMessageSize); |
- if (msg.id == kTimerId) { |
+ const intptr_t MAX_MESSAGES = kInterruptMessageSize; |
+ InterruptMessage msg[MAX_MESSAGES]; |
+ ssize_t bytes = NO_RETRY_EXPECTED( |
+ read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); |
+ LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes); |
+ for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { |
+ if (msg[i].id == kTimerId) { |
LOG_INFO("HandleInterruptFd read timer update\n"); |
- timeout_queue_.UpdateTimeout(msg.dart_port, msg.data); |
- } else if (msg.id == kShutdownId) { |
+ timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); |
+ } else if (msg[i].id == kShutdownId) { |
LOG_INFO("HandleInterruptFd read shutdown\n"); |
shutdown_ = true; |
} else { |
- // TODO(zra): Handle commands to add and remove handles from the |
- // MagentaWaitManyInfo. |
- UNIMPLEMENTED(); |
+ ASSERT((msg[i].data & COMMAND_MASK) != 0); |
+ LOG_INFO("HandleInterruptFd command\n"); |
+ DescriptorInfo* di = |
+ GetDescriptorInfo(msg[i].id, IS_LISTENING_SOCKET(msg[i].data)); |
+ if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) { |
+ ASSERT(!di->IsListeningSocket()); |
+ // Close the socket for reading. |
+ LOG_INFO("\tSHUT_RD: %d\n", di->fd()); |
+ VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD)); |
+ } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) { |
+ ASSERT(!di->IsListeningSocket()); |
+ // Close the socket for writing. |
+ LOG_INFO("\tSHUT_WR: %d\n", di->fd()); |
+ VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR)); |
+ } else if (IS_COMMAND(msg[i].data, kCloseCommand)) { |
+ // Close the socket and free system resources and move on to next |
+ // message. |
+ intptr_t old_mask = di->Mask(); |
+ Dart_Port port = msg[i].dart_port; |
+ di->RemovePort(port); |
+ intptr_t new_mask = di->Mask(); |
+ UpdateEpollInstance(old_mask, di); |
+ |
+ LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask); |
+ intptr_t fd = di->fd(); |
+ if (di->IsListeningSocket()) { |
+ // We only close the socket file descriptor from the operating |
+ // system if there are no other dart socket objects which |
+ // are listening on the same (address, port) combination. |
+ ListeningSocketRegistry* registry = |
+ ListeningSocketRegistry::Instance(); |
+ |
+ MutexLocker locker(registry->mutex()); |
+ |
+ if (registry->CloseSafe(fd)) { |
+ ASSERT(new_mask == 0); |
+ socket_map_.Remove(GetHashmapKeyFromFd(fd), |
+ GetHashmapHashFromFd(fd)); |
+ di->Close(); |
+ LOG_INFO("Closed %d\n", di->fd()); |
+ delete di; |
+ } |
+ } else { |
+ ASSERT(new_mask == 0); |
+ socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
+ di->Close(); |
+ LOG_INFO("Closed %d\n", di->fd()); |
+ delete di; |
+ } |
+ |
+ DartUtils::PostInt32(port, 1 << kDestroyedEvent); |
+ } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) { |
+ int count = TOKEN_COUNT(msg[i].data); |
+ intptr_t old_mask = di->Mask(); |
+ LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask); |
+ di->ReturnTokens(msg[i].dart_port, count); |
+ UpdateEpollInstance(old_mask, di); |
+ } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) { |
+ // `events` can only have kInEvent/kOutEvent flags set. |
+ intptr_t events = msg[i].data & EVENT_MASK; |
+ ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
+ |
+ intptr_t old_mask = di->Mask(); |
+ LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", di->fd(), old_mask, |
+ msg[i].data & EVENT_MASK); |
+ di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK); |
+ UpdateEpollInstance(old_mask, di); |
+ } else { |
+ UNREACHABLE(); |
+ } |
} |
} |
- // status == ERR_SHOULD_WAIT when we try to read and there are no messages |
- // available, so it is an error if we get here and status != ERR_SHOULD_WAIT. |
- if (status != ERR_SHOULD_WAIT) { |
- FATAL1("mx_channel_read failed: %s\n", mx_status_get_string(status)); |
- } |
LOG_INFO("HandleInterruptFd exit\n"); |
} |
-void EventHandlerImplementation::HandleEvents() { |
- LOG_INFO("HandleEvents entry\n"); |
- for (intptr_t i = 1; i < info_.size(); i++) { |
- const mx_wait_item_t& wait_item = info_.items()[i]; |
- if (wait_item.pending & wait_item.waitfor) { |
- // Only the control handle has no descriptor info. |
- ASSERT(info_.descriptor_infos()[i] != NULL); |
- ASSERT(wait_item.handle != interrupt_handles_[0]); |
- // TODO(zra): Handle events on other handles. At the moment we are |
- // only interrupted when there is a message on interrupt_handles_[0]. |
- UNIMPLEMENTED(); |
- } |
+intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, |
+ DescriptorInfo* di) { |
+#ifdef EVENTHANDLER_LOGGING |
+ PrintEventMask(di->fd(), events); |
+#endif |
+ if ((events & EPOLLERR) != 0) { |
+ // Return error only if EPOLLIN is present. |
+ return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0; |
+ } |
+ intptr_t event_mask = 0; |
+ if ((events & EPOLLIN) != 0) { |
+ event_mask |= (1 << kInEvent); |
} |
+ if ((events & EPOLLOUT) != 0) { |
+ event_mask |= (1 << kOutEvent); |
+ } |
+ if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) { |
+ event_mask |= (1 << kCloseEvent); |
+ } |
+ return event_mask; |
+} |
+ |
- if ((info_.items()[0].pending & MX_SIGNAL_PEER_CLOSED) != 0) { |
- FATAL("EventHandlerImplementation::Poll: Unexpected peer closed\n"); |
+void EventHandlerImplementation::HandleEvents(struct epoll_event* events, |
+ int size) { |
+ bool interrupt_seen = false; |
+ for (int i = 0; i < size; i++) { |
+ if (events[i].data.ptr == NULL) { |
+ interrupt_seen = true; |
+ } else { |
+ DescriptorInfo* di = |
+ reinterpret_cast<DescriptorInfo*>(events[i].data.ptr); |
+ intptr_t event_mask = GetPollEvents(events[i].events, di); |
+ |
+ if ((event_mask & (1 << kErrorEvent)) != 0) { |
+ di->NotifyAllDartPorts(event_mask); |
+ } |
+ event_mask &= ~(1 << kErrorEvent); |
+ |
+ LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask); |
+ if (event_mask != 0) { |
+ intptr_t old_mask = di->Mask(); |
+ Dart_Port port = di->NextNotifyDartPort(event_mask); |
+ ASSERT(port != 0); |
+ UpdateEpollInstance(old_mask, di); |
+ LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", event_mask, |
+ port, di->fd()); |
+ bool success = DartUtils::PostInt32(port, event_mask); |
+ if (!success) { |
+ // This can happen if e.g. the isolate that owns the port has died |
+ // for some reason. |
+ FATAL2("Failed to post event for fd %ld to port %ld", di->fd(), port); |
+ } |
+ } |
+ } |
} |
- if ((info_.items()[0].pending & MX_SIGNAL_READABLE) != 0) { |
- LOG_INFO("HandleEvents interrupt_handles_[0] readable\n"); |
+ if (interrupt_seen) { |
+ // Handle after socket events, so we avoid closing a socket before we handle |
+ // the current events. |
HandleInterruptFd(); |
- } else { |
- LOG_INFO("HandleEvents interrupt_handles_[0] not readable\n"); |
} |
} |
@@ -239,6 +440,8 @@ void EventHandlerImplementation::HandleTimeout() { |
void EventHandlerImplementation::Poll(uword args) { |
+ static const intptr_t kMaxEvents = 16; |
+ struct epoll_event events[kMaxEvents]; |
EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
EventHandlerImplementation* handler_impl = &handler->delegate_; |
ASSERT(handler_impl != NULL); |
@@ -246,23 +449,21 @@ void EventHandlerImplementation::Poll(uword args) { |
while (!handler_impl->shutdown_) { |
int64_t millis = handler_impl->GetTimeout(); |
ASSERT((millis == kInfinityTimeout) || (millis >= 0)); |
- mx_time_t timeout = |
- millis * kMicrosecondsPerMillisecond * kNanosecondsPerMicrosecond; |
- const MagentaWaitManyInfo& info = handler_impl->info(); |
- LOG_INFO("mx_handle_wait_many(%p, %ld, %lld)\n", info.items(), info.size(), |
- timeout); |
- mx_status_t status = |
- mx_handle_wait_many(info.items(), info.size(), timeout); |
- if ((status != NO_ERROR) && (status != ERR_TIMED_OUT)) { |
- FATAL1("mx_handle_wait_many failed: %s\n", mx_status_get_string(status)); |
+ LOG_INFO("epoll_wait(millis = %ld)\n", millis); |
+ intptr_t result = NO_RETRY_EXPECTED( |
+ epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis)); |
+ ASSERT(EAGAIN == EWOULDBLOCK); |
+ LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result); |
+ if (result < 0) { |
+ if (errno != EWOULDBLOCK) { |
+ perror("Poll failed"); |
+ } |
} else { |
- LOG_INFO("mx_handle_wait_many returned: %ld\n", status); |
handler_impl->HandleTimeout(); |
- handler_impl->HandleEvents(); |
+ handler_impl->HandleEvents(events, result); |
} |
} |
handler->NotifyShutdownDone(); |
- LOG_INFO("EventHandlerImplementation notifying about shutdown\n"); |
} |
@@ -286,6 +487,17 @@ void EventHandlerImplementation::SendData(intptr_t id, |
WakeupHandler(id, dart_port, data); |
} |
+void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { |
+ // The hashmap does not support keys with value 0. |
+ return reinterpret_cast<void*>(fd + 1); |
+} |
+ |
+ |
+uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { |
+ // The hashmap does not support keys with value 0. |
+ return dart::Utils::WordHash(fd + 1); |
+} |
+ |
} // namespace bin |
} // namespace dart |