Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(259)

Unified Diff: runtime/bin/eventhandler_linux.cc

Issue 198743002: Make the event-handler handle backpreasure. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Doc fix Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/bin/eventhandler_linux.h ('k') | runtime/bin/eventhandler_macos.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/bin/eventhandler_linux.cc
diff --git a/runtime/bin/eventhandler_linux.cc b/runtime/bin/eventhandler_linux.cc
index 612f3cb811ecaf108d372ad31cfb98bf9c3fb27d..f87c1a4785192c81740d7c08db2b1a48e2f0fef7 100644
--- a/runtime/bin/eventhandler_linux.cc
+++ b/runtime/bin/eventhandler_linux.cc
@@ -20,7 +20,6 @@
#include "bin/dartutils.h"
#include "bin/fdutils.h"
#include "bin/log.h"
-#include "bin/socket.h"
#include "platform/hashmap.h"
#include "platform/thread.h"
#include "platform/utils.h"
@@ -29,36 +28,63 @@
namespace dart {
namespace bin {
+static const int kInterruptMessageSize = sizeof(InterruptMessage);
static const int kTimerId = -1;
+static const int kShutdownId = -2;
-static void AddToEpollInstance(intptr_t epoll_fd_,
- int fd, Dart_Port port,
- int mask) {
+intptr_t SocketData::GetPollEvents() {
+ // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
+ // triggered anyway.
+ intptr_t events = EPOLLET | EPOLLRDHUP;
+ if ((mask_ & (1 << kInEvent)) != 0) {
+ events |= EPOLLIN;
+ }
+ if ((mask_ & (1 << kOutEvent)) != 0) {
+ events |= EPOLLOUT;
+ }
+ return events;
+}
+
+
+// Unregister the file descriptor for a SocketData structure with epoll.
+static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
+ VOID_TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
+ EPOLL_CTL_DEL,
+ sd->fd(),
+ NULL));
+}
+
+
+static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
struct epoll_event event;
- event.events = EPOLLET | EPOLLRDHUP;
- if ((mask & (1 << kInEvent)) != 0) event.events |= EPOLLIN;
- if ((mask & (1 << kOutEvent)) != 0) event.events |= EPOLLOUT;
- // Be sure we don't collide with the TIMER_BIT.
- if (port == ILLEGAL_PORT) {
- FATAL("Illigal port sent to event handler");
- }
- event.data.u64 = port;
+ event.events = sd->GetPollEvents();
+ event.data.ptr = sd;
int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_ADD,
- fd,
+ sd->fd(),
&event));
if (status == -1) {
// Epoll does not accept the file descriptor. It could be due to
// already closed file descriptor, or unuspported devices, such
// as /dev/null. In such case, mark the file descriptor as closed,
// so dart will handle it accordingly.
- DartUtils::PostInt32(port, 1 << kCloseEvent);
+ DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
}
}
-EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) {
+EventHandlerImplementation::EventHandlerImplementation()
+ : socket_map_(&HashMap::SamePointerValue, 16) {
+ intptr_t result;
+ result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_));
+ if (result != 0) {
+ FATAL("Pipe creation failed");
+ }
+ FDUtils::SetNonBlocking(interrupt_fds_[0]);
+ FDUtils::SetCloseOnExec(interrupt_fds_[0]);
+ FDUtils::SetCloseOnExec(interrupt_fds_[1]);
+ shutdown_ = false;
// The initial size passed to epoll_create is ignore on newer (>=
// 2.6.8) Linux versions
static const int kEpollInitialSize = 64;
@@ -67,18 +93,28 @@ EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) {
FATAL1("Failed creating epoll file descriptor: %i", errno);
}
FDUtils::SetCloseOnExec(epoll_fd_);
+ // Register the interrupt_fd with the epoll instance.
+ struct epoll_event event;
+ event.events = EPOLLIN;
+ event.data.ptr = NULL;
+ int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
+ EPOLL_CTL_ADD,
+ interrupt_fds_[0],
+ &event));
+ if (status == -1) {
+ FATAL("Failed adding interrupt fd to epoll instance");
+ }
timer_fd_ = TEMP_FAILURE_RETRY(timerfd_create(CLOCK_REALTIME, TFD_CLOEXEC));
if (timer_fd_ == -1) {
FATAL1("Failed creating timerfd file descriptor: %i", errno);
}
// Register the timer_fd_ with the epoll instance.
- struct epoll_event event;
event.events = EPOLLIN;
- event.data.u64 = ILLEGAL_PORT; // Use ILLEGAL_PORT to identify timer-fd.
- int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
- EPOLL_CTL_ADD,
- timer_fd_,
- &event));
+ event.data.fd = timer_fd_;
+ status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
+ EPOLL_CTL_ADD,
+ timer_fd_,
+ &event));
if (status == -1) {
FATAL2(
"Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_, errno);
@@ -87,14 +123,106 @@ EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) {
EventHandlerImplementation::~EventHandlerImplementation() {
- TEMP_FAILURE_RETRY(close(epoll_fd_));
- TEMP_FAILURE_RETRY(close(timer_fd_));
+ VOID_TEMP_FAILURE_RETRY(close(epoll_fd_));
+ VOID_TEMP_FAILURE_RETRY(close(timer_fd_));
+ VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[0]));
+ VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[1]));
+}
+
+
+SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
+ ASSERT(fd >= 0);
+ HashMap::Entry* entry = socket_map_.Lookup(
+ GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
+ ASSERT(entry != NULL);
+ SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
+ if (sd == NULL) {
+ // If there is no data in the hash map for this file descriptor a
+ // new SocketData for the file descriptor is inserted.
+ sd = new SocketData(fd);
+ entry->value = sd;
+ }
+ ASSERT(fd == sd->fd());
+ return sd;
+}
+
+
+void EventHandlerImplementation::WakeupHandler(intptr_t id,
+ Dart_Port dart_port,
+ int64_t data) {
+ InterruptMessage msg;
+ msg.id = id;
+ msg.dart_port = dart_port;
+ msg.data = data;
+ // WriteToBlocking will write up to 512 bytes atomically, and since our msg
+ // is smaller than 512, we don't need a thread lock.
+ // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
+ ASSERT(kInterruptMessageSize < PIPE_BUF);
+ intptr_t result =
+ FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
+ if (result != kInterruptMessageSize) {
+ if (result == -1) {
+ perror("Interrupt message failure:");
+ }
+ FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
+ }
+}
+
+
+void EventHandlerImplementation::HandleInterruptFd() {
+ const intptr_t MAX_MESSAGES = kInterruptMessageSize;
+ InterruptMessage msg[MAX_MESSAGES];
+ ssize_t bytes = TEMP_FAILURE_RETRY(
+ read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
+ for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
+ if (msg[i].id == kTimerId) {
+ timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
+ struct itimerspec it;
+ memset(&it, 0, sizeof(it));
+ if (timeout_queue_.HasTimeout()) {
+ int64_t millis = timeout_queue_.CurrentTimeout();
+ it.it_value.tv_sec = millis / 1000;
+ it.it_value.tv_nsec = (millis % 1000) * 1000000;
+ }
+ timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL);
+ } else if (msg[i].id == kShutdownId) {
+ shutdown_ = true;
+ } else {
+ SocketData* sd = GetSocketData(msg[i].id);
+ if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) {
+ ASSERT(msg[i].data == (1 << kShutdownReadCommand));
+ // Close the socket for reading.
+ shutdown(sd->fd(), SHUT_RD);
+ } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) {
+ ASSERT(msg[i].data == (1 << kShutdownWriteCommand));
+ // Close the socket for writing.
+ shutdown(sd->fd(), SHUT_WR);
+ } else if ((msg[i].data & (1 << kCloseCommand)) != 0) {
+ ASSERT(msg[i].data == (1 << kCloseCommand));
+ // Close the socket and free system resources and move on to
+ // next message.
+ RemoveFromEpollInstance(epoll_fd_, sd);
+ intptr_t fd = sd->fd();
+ sd->Close();
+ socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
+ delete sd;
+ DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
+ } else if ((msg[i].data & (1 << kReturnTokenCommand)) != 0) {
+ if (sd->ReturnToken()) {
+ AddToEpollInstance(epoll_fd_, sd);
+ }
+ } else {
+ // Setup events to wait for.
+ sd->SetPortAndMask(msg[i].dart_port, msg[i].data);
+ AddToEpollInstance(epoll_fd_, sd);
+ }
+ }
+ }
}
#ifdef DEBUG_POLL
-static void PrintEventMask(intptr_t events) {
- // TODO(ajohnsen): When DEBUG_POLL is enabled, we could add the fd to the
- // epoll-data as well.
+static void PrintEventMask(intptr_t fd, intptr_t events) {
+ Log::Print("%d ", fd);
if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN ");
if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI ");
if ((events & EPOLLOUT) != 0) Log::Print("EPOLLOUT ");
@@ -106,17 +234,19 @@ static void PrintEventMask(intptr_t events) {
if ((events & ~all_events) != 0) {
Log::Print("(and %08x) ", events & ~all_events);
}
+ Log::Print("(available %d) ", FDUtils::AvailableBytes(fd));
Log::Print("\n");
}
#endif
-intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) {
+intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
+ SocketData* sd) {
#ifdef DEBUG_POLL
- PrintEventMask(events);
+ PrintEventMask(sd->fd(), events);
#endif
if (events & EPOLLERR) {
- // Return only error if EPOLLIN is present.
+ // Return error only if EPOLLIN is present.
return (events & EPOLLIN) ? (1 << kErrorEvent) : 0;
}
intptr_t event_mask = 0;
@@ -129,32 +259,40 @@ intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) {
void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
int size) {
+ bool interrupt_seen = false;
for (int i = 0; i < size; i++) {
- uint64_t data = events[i].data.u64;
- // ILLEGAL_PORT is used to identify timer-fd.
- if (data == ILLEGAL_PORT) {
+ if (events[i].data.ptr == NULL) {
+ interrupt_seen = true;
+ } else if (events[i].data.fd == timer_fd_) {
int64_t val;
VOID_TEMP_FAILURE_RETRY(read(timer_fd_, &val, sizeof(val)));
- timer_mutex_.Lock();
if (timeout_queue_.HasTimeout()) {
DartUtils::PostNull(timeout_queue_.CurrentPort());
timeout_queue_.RemoveCurrent();
}
- timer_mutex_.Unlock();
} else {
- int32_t event_mask = GetPollEvents(events[i].events);
+ SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr);
+ intptr_t event_mask = GetPollEvents(events[i].events, sd);
if (event_mask != 0) {
- Dart_Port port = data;
+ if (sd->TakeToken()) {
+ // Took last token, remove from epoll.
+ RemoveFromEpollInstance(epoll_fd_, sd);
+ }
+ Dart_Port port = sd->port();
ASSERT(port != 0);
DartUtils::PostInt32(port, event_mask);
}
}
}
+ if (interrupt_seen) {
+ // Handle after socket events, so we avoid closing a socket before we handle
+ // the current events.
+ HandleInterruptFd();
+ }
}
void EventHandlerImplementation::Poll(uword args) {
- // Main event-handler thread loop.
static const intptr_t kMaxEvents = 16;
struct epoll_event events[kMaxEvents];
EventHandler* handler = reinterpret_cast<EventHandler*>(args);
@@ -188,51 +326,26 @@ void EventHandlerImplementation::Start(EventHandler* handler) {
void EventHandlerImplementation::Shutdown() {
- shutdown_ = true;
-}
-
-
-void EventHandlerImplementation::Notify(intptr_t id,
- Dart_Port dart_port,
- int64_t data) {
- // This method is called by isolates, that is, not in the event-handler
- // thread.
- if (id == kTimerId) {
- // Lock this region, as multiple isolates may attempt to update
- // timeout_queue_.
- // TODO(ajohnsen): Consider using a timer-fd per isolate to avoid the lock.
- timer_mutex_.Lock();
- timeout_queue_.UpdateTimeout(dart_port, data);
- struct itimerspec it;
- memset(&it, 0, sizeof(it));
- if (timeout_queue_.HasTimeout()) {
- int64_t millis = timeout_queue_.CurrentTimeout();
- it.it_value.tv_sec = millis / 1000;
- it.it_value.tv_nsec = (millis % 1000) * 1000000;
- }
- timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL);
- timer_mutex_.Unlock();
- } else {
- if ((data & (1 << kShutdownReadCommand)) != 0) {
- ASSERT(data == (1 << kShutdownReadCommand));
- // Close the socket for reading.
- shutdown(id, SHUT_RD);
- } else if ((data & (1 << kShutdownWriteCommand)) != 0) {
- ASSERT(data == (1 << kShutdownWriteCommand));
- // Close the socket for writing.
- shutdown(id, SHUT_WR);
- } else if ((data & (1 << kCloseCommand)) != 0) {
- ASSERT(data == (1 << kCloseCommand));
- // Close the socket and free system resources and move on to
- // next message.
- // This will also remove the file descriptor from epoll.
- Socket::Close(id);
- DartUtils::PostInt32(dart_port, 1 << kDestroyedEvent);
- } else {
- // Add to epoll - this is the first time we see it.
- AddToEpollInstance(epoll_fd_, id, dart_port, data);
- }
- }
+ SendData(kShutdownId, 0, 0);
+}
+
+
+void EventHandlerImplementation::SendData(intptr_t id,
+ Dart_Port dart_port,
+ int64_t data) {
+ WakeupHandler(id, dart_port, data);
+}
+
+
+void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
+ // The hashmap does not support keys with value 0.
+ return reinterpret_cast<void*>(fd + 1);
+}
+
+
+uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
+ // The hashmap does not support keys with value 0.
+ return dart::Utils::WordHash(fd + 1);
}
} // namespace bin
« no previous file with comments | « runtime/bin/eventhandler_linux.h ('k') | runtime/bin/eventhandler_macos.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698