Index: runtime/bin/eventhandler_macos.cc |
diff --git a/runtime/bin/eventhandler_macos.cc b/runtime/bin/eventhandler_macos.cc |
index c090e3a4bb7348a29294525652948be8d8f00f85..a61a0d4acc6b9556c3aae5f5e1724740cae41643 100644 |
--- a/runtime/bin/eventhandler_macos.cc |
+++ b/runtime/bin/eventhandler_macos.cc |
@@ -34,105 +34,83 @@ static const int kShutdownId = -2; |
bool SocketData::HasReadEvent() { |
- return !IsClosedRead() && ((mask_ & (1 << kInEvent)) != 0); |
+ return (mask_ & (1 << kInEvent)) != 0; |
} |
bool SocketData::HasWriteEvent() { |
- return !IsClosedWrite() && ((mask_ & (1 << kOutEvent)) != 0); |
+ return (mask_ & (1 << kOutEvent)) != 0; |
} |
// Unregister the file descriptor for a SocketData structure with kqueue. |
static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) { |
+ if (!sd->tracked_by_kqueue()) return; |
static const intptr_t kMaxChanges = 2; |
intptr_t changes = 0; |
struct kevent events[kMaxChanges]; |
- if (sd->read_tracked_by_kqueue()) { |
+ if (sd->HasReadEvent()) { |
EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL); |
++changes; |
- sd->set_read_tracked_by_kqueue(false); |
} |
- if (sd->write_tracked_by_kqueue()) { |
- EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, sd); |
+ if (sd->HasWriteEvent()) { |
+ EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL); |
++changes; |
- sd->set_write_tracked_by_kqueue(false); |
} |
- if (changes > 0) { |
- ASSERT(changes <= kMaxChanges); |
- int status = |
- TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); |
- if (status == -1) { |
- const int kBufferSize = 1024; |
- char error_message[kBufferSize]; |
- strerror_r(errno, error_message, kBufferSize); |
- FATAL1("Failed deleting events from kqueue: %s\n", error_message); |
- } |
+ ASSERT(changes > 0); |
+ ASSERT(changes <= kMaxChanges); |
+ int status = |
+ TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); |
+ if (status == -1) { |
+ const int kBufferSize = 1024; |
+ char error_message[kBufferSize]; |
+ strerror_r(errno, error_message, kBufferSize); |
+ FATAL1("Failed deleting events from kqueue: %s\n", error_message); |
} |
+ sd->set_tracked_by_kqueue(false); |
} |
// Update the kqueue registration for SocketData structure to reflect |
// the events currently of interest. |
-static void UpdateKqueue(intptr_t kqueue_fd_, SocketData* sd) { |
+static void AddToKqueue(intptr_t kqueue_fd_, SocketData* sd) { |
static const intptr_t kMaxChanges = 2; |
intptr_t changes = 0; |
struct kevent events[kMaxChanges]; |
- // Only report events once and wait for them to be re-enabled after the |
- // event has been handled by the Dart code. This is done by using EV_ONESHOT. |
- if (sd->port() != 0) { |
- // Register or unregister READ filter if needed. |
- if (sd->HasReadEvent()) { |
- if (!sd->read_tracked_by_kqueue()) { |
- EV_SET(events + changes, |
- sd->fd(), |
- EVFILT_READ, |
- EV_ADD | EV_ONESHOT, |
- 0, |
- 0, |
- sd); |
- ++changes; |
- sd->set_read_tracked_by_kqueue(true); |
- } |
- } else if (sd->read_tracked_by_kqueue()) { |
- EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL); |
- ++changes; |
- sd->set_read_tracked_by_kqueue(false); |
- } |
- // Register or unregister WRITE filter if needed. |
- if (sd->HasWriteEvent()) { |
- if (!sd->write_tracked_by_kqueue()) { |
- EV_SET(events + changes, |
- sd->fd(), |
- EVFILT_WRITE, |
- EV_ADD | EV_ONESHOT, |
- 0, |
- 0, |
- sd); |
- ++changes; |
- sd->set_write_tracked_by_kqueue(true); |
- } |
- } else if (sd->write_tracked_by_kqueue()) { |
- EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL); |
- ++changes; |
- sd->set_write_tracked_by_kqueue(false); |
- } |
+ // Register or unregister READ filter if needed. |
+ if (sd->HasReadEvent()) { |
+ EV_SET(events + changes, |
+ sd->fd(), |
+ EVFILT_READ, |
+ EV_ADD | EV_CLEAR, |
+ 0, |
+ 0, |
+ sd); |
+ ++changes; |
} |
- if (changes > 0) { |
- ASSERT(changes <= kMaxChanges); |
- int status = |
- TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); |
- if (status == -1) { |
- // kQueue does not accept the file descriptor. It could be due to |
- // already closed file descriptor, or unuspported devices, such |
- // as /dev/null. In such case, mark the file descriptor as closed, |
- // so dart will handle it accordingly. |
- sd->set_write_tracked_by_kqueue(false); |
- sd->set_read_tracked_by_kqueue(false); |
- sd->ShutdownRead(); |
- sd->ShutdownWrite(); |
- DartUtils::PostInt32(sd->port(), 1 << kCloseEvent); |
- } |
+ // Register or unregister WRITE filter if needed. |
+ if (sd->HasWriteEvent()) { |
+ EV_SET(events + changes, |
+ sd->fd(), |
+ EVFILT_WRITE, |
+ EV_ADD | EV_CLEAR, |
+ 0, |
+ 0, |
+ sd); |
+ ++changes; |
+ } |
+ ASSERT(changes > 0); |
+ ASSERT(changes <= kMaxChanges); |
+ int status = |
+ TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); |
+ if (status == -1) { |
+ // kQueue does not accept the file descriptor. It could be due to |
+ // already closed file descriptor, or unuspported devices, such |
+ // as /dev/null. In such case, mark the file descriptor as closed, |
+ // so dart will handle it accordingly. |
+ DartUtils::PostInt32(sd->port(), 1 << kCloseEvent); |
+ } else { |
+ sd->set_tracked_by_kqueue(true); |
} |
} |
@@ -174,7 +152,8 @@ EventHandlerImplementation::~EventHandlerImplementation() { |
} |
-SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
+SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd, |
+ bool* is_new) { |
ASSERT(fd >= 0); |
HashMap::Entry* entry = socket_map_.Lookup( |
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); |
@@ -185,6 +164,7 @@ SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
// new SocketData for the file descriptor is inserted. |
sd = new SocketData(fd); |
entry->value = sd; |
+ *is_new = true; |
} |
ASSERT(fd == sd->fd()); |
return sd; |
@@ -223,17 +203,19 @@ void EventHandlerImplementation::HandleInterruptFd() { |
} else if (msg[i].id == kShutdownId) { |
shutdown_ = true; |
} else { |
- SocketData* sd = GetSocketData(msg[i].id); |
+ bool is_new = false; |
+ SocketData* sd = GetSocketData(msg[i].id, &is_new); |
+ if (is_new) { |
+ sd->SetPortAndMask(msg[i].dart_port, msg[i].data); |
+ } |
if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) { |
ASSERT(msg[i].data == (1 << kShutdownReadCommand)); |
// Close the socket for reading. |
sd->ShutdownRead(); |
- UpdateKqueue(kqueue_fd_, sd); |
} else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) { |
ASSERT(msg[i].data == (1 << kShutdownWriteCommand)); |
// Close the socket for writing. |
sd->ShutdownWrite(); |
- UpdateKqueue(kqueue_fd_, sd); |
} else if ((msg[i].data & (1 << kCloseCommand)) != 0) { |
ASSERT(msg[i].data == (1 << kCloseCommand)); |
// Close the socket and free system resources. |
@@ -244,14 +226,8 @@ void EventHandlerImplementation::HandleInterruptFd() { |
delete sd; |
DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent); |
} else { |
- if ((msg[i].data & (1 << kInEvent)) != 0 && sd->IsClosedRead()) { |
- DartUtils::PostInt32(msg[i].dart_port, 1 << kCloseEvent); |
- } else { |
- // Setup events to wait for. |
- ASSERT((msg[i].data > 0) && (msg[i].data < kIntptrMax)); |
- sd->SetPortAndMask(msg[i].dart_port, |
- static_cast<intptr_t>(msg[i].data)); |
- UpdateKqueue(kqueue_fd_, sd); |
+ if (is_new) { |
+ AddToKqueue(kqueue_fd_, sd); |
} |
} |
} |
@@ -261,12 +237,17 @@ void EventHandlerImplementation::HandleInterruptFd() { |
#ifdef DEBUG_KQUEUE |
static void PrintEventMask(intptr_t fd, struct kevent* event) { |
Log::Print("%d ", static_cast<int>(fd)); |
+ Log::Print("filter=0x%x:", event->filter); |
if (event->filter == EVFILT_READ) Log::Print("EVFILT_READ "); |
if (event->filter == EVFILT_WRITE) Log::Print("EVFILT_WRITE "); |
Log::Print("flags: %x: ", event->flags); |
if ((event->flags & EV_EOF) != 0) Log::Print("EV_EOF "); |
if ((event->flags & EV_ERROR) != 0) Log::Print("EV_ERROR "); |
+ if ((event->flags & EV_CLEAR) != 0) Log::Print("EV_CLEAR "); |
+ if ((event->flags & EV_ADD) != 0) Log::Print("EV_ADD "); |
+ if ((event->flags & EV_DELETE) != 0) Log::Print("EV_DELETE "); |
Log::Print("- fflags: %d ", event->fflags); |
+ Log::Print("- data: %ld ", event->data); |
Log::Print("(available %d) ", |
static_cast<int>(FDUtils::AvailableBytes(fd))); |
Log::Print("\n"); |
@@ -298,30 +279,20 @@ intptr_t EventHandlerImplementation::GetEvents(struct kevent* event, |
} else { |
// Prioritize data events over close and error events. |
if (event->filter == EVFILT_READ) { |
- if (FDUtils::AvailableBytes(sd->fd()) != 0) { |
- event_mask = (1 << kInEvent); |
- } else if ((event->flags & EV_EOF) != 0) { |
+ event_mask = (1 << kInEvent); |
+ if ((event->flags & EV_EOF) != 0) { |
if (event->fflags != 0) { |
- event_mask |= (1 << kErrorEvent); |
+ event_mask = (1 << kErrorEvent); |
} else { |
event_mask |= (1 << kCloseEvent); |
} |
- sd->MarkClosedRead(); |
} |
} else if (event->filter == EVFILT_WRITE) { |
+ event_mask |= (1 << kOutEvent); |
if ((event->flags & EV_EOF) != 0) { |
if (event->fflags != 0) { |
- event_mask |= (1 << kErrorEvent); |
- } else { |
- event_mask |= (1 << kCloseEvent); |
+ event_mask = (1 << kErrorEvent); |
} |
- // If the receiver closed for reading, close for writing, |
- // update the registration with kqueue, and do not report a |
- // write event. |
- sd->MarkClosedWrite(); |
- UpdateKqueue(kqueue_fd_, sd); |
- } else { |
- event_mask |= (1 << kOutEvent); |
} |
} else { |
UNREACHABLE(); |
@@ -347,13 +318,8 @@ void EventHandlerImplementation::HandleEvents(struct kevent* events, |
interrupt_seen = true; |
} else { |
SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata); |
- sd->set_write_tracked_by_kqueue(false); |
- sd->set_read_tracked_by_kqueue(false); |
intptr_t event_mask = GetEvents(events + i, sd); |
- if (event_mask == 0) { |
- // Event not handled, re-add to kqueue. |
- UpdateKqueue(kqueue_fd_, sd); |
- } else { |
+ if (event_mask != 0) { |
Dart_Port port = sd->port(); |
ASSERT(port != 0); |
DartUtils::PostInt32(port, event_mask); |