| Index: runtime/bin/eventhandler_macos.cc
|
| diff --git a/runtime/bin/eventhandler_macos.cc b/runtime/bin/eventhandler_macos.cc
|
| index c090e3a4bb7348a29294525652948be8d8f00f85..a61a0d4acc6b9556c3aae5f5e1724740cae41643 100644
|
| --- a/runtime/bin/eventhandler_macos.cc
|
| +++ b/runtime/bin/eventhandler_macos.cc
|
| @@ -34,105 +34,83 @@ static const int kShutdownId = -2;
|
|
|
|
|
| bool SocketData::HasReadEvent() {
|
| - return !IsClosedRead() && ((mask_ & (1 << kInEvent)) != 0);
|
| + return (mask_ & (1 << kInEvent)) != 0;
|
| }
|
|
|
|
|
| bool SocketData::HasWriteEvent() {
|
| - return !IsClosedWrite() && ((mask_ & (1 << kOutEvent)) != 0);
|
| + return (mask_ & (1 << kOutEvent)) != 0;
|
| }
|
|
|
|
|
| // Unregister the file descriptor for a SocketData structure with kqueue.
|
| static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) {
|
| + if (!sd->tracked_by_kqueue()) return;
|
| static const intptr_t kMaxChanges = 2;
|
| intptr_t changes = 0;
|
| struct kevent events[kMaxChanges];
|
| - if (sd->read_tracked_by_kqueue()) {
|
| + if (sd->HasReadEvent()) {
|
| EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
|
| ++changes;
|
| - sd->set_read_tracked_by_kqueue(false);
|
| }
|
| - if (sd->write_tracked_by_kqueue()) {
|
| - EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, sd);
|
| + if (sd->HasWriteEvent()) {
|
| + EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
|
| ++changes;
|
| - sd->set_write_tracked_by_kqueue(false);
|
| }
|
| - if (changes > 0) {
|
| - ASSERT(changes <= kMaxChanges);
|
| - int status =
|
| - TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
|
| - if (status == -1) {
|
| - const int kBufferSize = 1024;
|
| - char error_message[kBufferSize];
|
| - strerror_r(errno, error_message, kBufferSize);
|
| - FATAL1("Failed deleting events from kqueue: %s\n", error_message);
|
| - }
|
| + ASSERT(changes > 0);
|
| + ASSERT(changes <= kMaxChanges);
|
| + int status =
|
| + TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
|
| + if (status == -1) {
|
| + const int kBufferSize = 1024;
|
| + char error_message[kBufferSize];
|
| + strerror_r(errno, error_message, kBufferSize);
|
| + FATAL1("Failed deleting events from kqueue: %s\n", error_message);
|
| }
|
| + sd->set_tracked_by_kqueue(false);
|
| }
|
|
|
|
|
| // Update the kqueue registration for SocketData structure to reflect
|
| // the events currently of interest.
|
| -static void UpdateKqueue(intptr_t kqueue_fd_, SocketData* sd) {
|
| +static void AddToKqueue(intptr_t kqueue_fd_, SocketData* sd) {
|
| static const intptr_t kMaxChanges = 2;
|
| intptr_t changes = 0;
|
| struct kevent events[kMaxChanges];
|
| - // Only report events once and wait for them to be re-enabled after the
|
| - // event has been handled by the Dart code. This is done by using EV_ONESHOT.
|
| - if (sd->port() != 0) {
|
| - // Register or unregister READ filter if needed.
|
| - if (sd->HasReadEvent()) {
|
| - if (!sd->read_tracked_by_kqueue()) {
|
| - EV_SET(events + changes,
|
| - sd->fd(),
|
| - EVFILT_READ,
|
| - EV_ADD | EV_ONESHOT,
|
| - 0,
|
| - 0,
|
| - sd);
|
| - ++changes;
|
| - sd->set_read_tracked_by_kqueue(true);
|
| - }
|
| - } else if (sd->read_tracked_by_kqueue()) {
|
| - EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
|
| - ++changes;
|
| - sd->set_read_tracked_by_kqueue(false);
|
| - }
|
| - // Register or unregister WRITE filter if needed.
|
| - if (sd->HasWriteEvent()) {
|
| - if (!sd->write_tracked_by_kqueue()) {
|
| - EV_SET(events + changes,
|
| - sd->fd(),
|
| - EVFILT_WRITE,
|
| - EV_ADD | EV_ONESHOT,
|
| - 0,
|
| - 0,
|
| - sd);
|
| - ++changes;
|
| - sd->set_write_tracked_by_kqueue(true);
|
| - }
|
| - } else if (sd->write_tracked_by_kqueue()) {
|
| - EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
|
| - ++changes;
|
| - sd->set_write_tracked_by_kqueue(false);
|
| - }
|
| + // Register or unregister READ filter if needed.
|
| + if (sd->HasReadEvent()) {
|
| + EV_SET(events + changes,
|
| + sd->fd(),
|
| + EVFILT_READ,
|
| + EV_ADD | EV_CLEAR,
|
| + 0,
|
| + 0,
|
| + sd);
|
| + ++changes;
|
| }
|
| - if (changes > 0) {
|
| - ASSERT(changes <= kMaxChanges);
|
| - int status =
|
| - TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
|
| - if (status == -1) {
|
| - // kQueue does not accept the file descriptor. It could be due to
|
| - // already closed file descriptor, or unuspported devices, such
|
| - // as /dev/null. In such case, mark the file descriptor as closed,
|
| - // so dart will handle it accordingly.
|
| - sd->set_write_tracked_by_kqueue(false);
|
| - sd->set_read_tracked_by_kqueue(false);
|
| - sd->ShutdownRead();
|
| - sd->ShutdownWrite();
|
| - DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
|
| - }
|
| + // Register or unregister WRITE filter if needed.
|
| + if (sd->HasWriteEvent()) {
|
| + EV_SET(events + changes,
|
| + sd->fd(),
|
| + EVFILT_WRITE,
|
| + EV_ADD | EV_CLEAR,
|
| + 0,
|
| + 0,
|
| + sd);
|
| + ++changes;
|
| + }
|
| + ASSERT(changes > 0);
|
| + ASSERT(changes <= kMaxChanges);
|
| + int status =
|
| + TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
|
| + if (status == -1) {
|
| + // kQueue does not accept the file descriptor. It could be due to
|
| + // already closed file descriptor, or unuspported devices, such
|
| + // as /dev/null. In such case, mark the file descriptor as closed,
|
| + // so dart will handle it accordingly.
|
| + DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
|
| + } else {
|
| + sd->set_tracked_by_kqueue(true);
|
| }
|
| }
|
|
|
| @@ -174,7 +152,8 @@ EventHandlerImplementation::~EventHandlerImplementation() {
|
| }
|
|
|
|
|
| -SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
|
| +SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd,
|
| + bool* is_new) {
|
| ASSERT(fd >= 0);
|
| HashMap::Entry* entry = socket_map_.Lookup(
|
| GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
|
| @@ -185,6 +164,7 @@ SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
|
| // new SocketData for the file descriptor is inserted.
|
| sd = new SocketData(fd);
|
| entry->value = sd;
|
| + *is_new = true;
|
| }
|
| ASSERT(fd == sd->fd());
|
| return sd;
|
| @@ -223,17 +203,19 @@ void EventHandlerImplementation::HandleInterruptFd() {
|
| } else if (msg[i].id == kShutdownId) {
|
| shutdown_ = true;
|
| } else {
|
| - SocketData* sd = GetSocketData(msg[i].id);
|
| + bool is_new = false;
|
| + SocketData* sd = GetSocketData(msg[i].id, &is_new);
|
| + if (is_new) {
|
| + sd->SetPortAndMask(msg[i].dart_port, msg[i].data);
|
| + }
|
| if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) {
|
| ASSERT(msg[i].data == (1 << kShutdownReadCommand));
|
| // Close the socket for reading.
|
| sd->ShutdownRead();
|
| - UpdateKqueue(kqueue_fd_, sd);
|
| } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) {
|
| ASSERT(msg[i].data == (1 << kShutdownWriteCommand));
|
| // Close the socket for writing.
|
| sd->ShutdownWrite();
|
| - UpdateKqueue(kqueue_fd_, sd);
|
| } else if ((msg[i].data & (1 << kCloseCommand)) != 0) {
|
| ASSERT(msg[i].data == (1 << kCloseCommand));
|
| // Close the socket and free system resources.
|
| @@ -244,14 +226,8 @@ void EventHandlerImplementation::HandleInterruptFd() {
|
| delete sd;
|
| DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
|
| } else {
|
| - if ((msg[i].data & (1 << kInEvent)) != 0 && sd->IsClosedRead()) {
|
| - DartUtils::PostInt32(msg[i].dart_port, 1 << kCloseEvent);
|
| - } else {
|
| - // Setup events to wait for.
|
| - ASSERT((msg[i].data > 0) && (msg[i].data < kIntptrMax));
|
| - sd->SetPortAndMask(msg[i].dart_port,
|
| - static_cast<intptr_t>(msg[i].data));
|
| - UpdateKqueue(kqueue_fd_, sd);
|
| + if (is_new) {
|
| + AddToKqueue(kqueue_fd_, sd);
|
| }
|
| }
|
| }
|
| @@ -261,12 +237,17 @@ void EventHandlerImplementation::HandleInterruptFd() {
|
| #ifdef DEBUG_KQUEUE
|
| static void PrintEventMask(intptr_t fd, struct kevent* event) {
|
| Log::Print("%d ", static_cast<int>(fd));
|
| + Log::Print("filter=0x%x:", event->filter);
|
| if (event->filter == EVFILT_READ) Log::Print("EVFILT_READ ");
|
| if (event->filter == EVFILT_WRITE) Log::Print("EVFILT_WRITE ");
|
| Log::Print("flags: %x: ", event->flags);
|
| if ((event->flags & EV_EOF) != 0) Log::Print("EV_EOF ");
|
| if ((event->flags & EV_ERROR) != 0) Log::Print("EV_ERROR ");
|
| + if ((event->flags & EV_CLEAR) != 0) Log::Print("EV_CLEAR ");
|
| + if ((event->flags & EV_ADD) != 0) Log::Print("EV_ADD ");
|
| + if ((event->flags & EV_DELETE) != 0) Log::Print("EV_DELETE ");
|
| Log::Print("- fflags: %d ", event->fflags);
|
| + Log::Print("- data: %ld ", event->data);
|
| Log::Print("(available %d) ",
|
| static_cast<int>(FDUtils::AvailableBytes(fd)));
|
| Log::Print("\n");
|
| @@ -298,30 +279,20 @@ intptr_t EventHandlerImplementation::GetEvents(struct kevent* event,
|
| } else {
|
| // Prioritize data events over close and error events.
|
| if (event->filter == EVFILT_READ) {
|
| - if (FDUtils::AvailableBytes(sd->fd()) != 0) {
|
| - event_mask = (1 << kInEvent);
|
| - } else if ((event->flags & EV_EOF) != 0) {
|
| + event_mask = (1 << kInEvent);
|
| + if ((event->flags & EV_EOF) != 0) {
|
| if (event->fflags != 0) {
|
| - event_mask |= (1 << kErrorEvent);
|
| + event_mask = (1 << kErrorEvent);
|
| } else {
|
| event_mask |= (1 << kCloseEvent);
|
| }
|
| - sd->MarkClosedRead();
|
| }
|
| } else if (event->filter == EVFILT_WRITE) {
|
| + event_mask |= (1 << kOutEvent);
|
| if ((event->flags & EV_EOF) != 0) {
|
| if (event->fflags != 0) {
|
| - event_mask |= (1 << kErrorEvent);
|
| - } else {
|
| - event_mask |= (1 << kCloseEvent);
|
| + event_mask = (1 << kErrorEvent);
|
| }
|
| - // If the receiver closed for reading, close for writing,
|
| - // update the registration with kqueue, and do not report a
|
| - // write event.
|
| - sd->MarkClosedWrite();
|
| - UpdateKqueue(kqueue_fd_, sd);
|
| - } else {
|
| - event_mask |= (1 << kOutEvent);
|
| }
|
| } else {
|
| UNREACHABLE();
|
| @@ -347,13 +318,8 @@ void EventHandlerImplementation::HandleEvents(struct kevent* events,
|
| interrupt_seen = true;
|
| } else {
|
| SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata);
|
| - sd->set_write_tracked_by_kqueue(false);
|
| - sd->set_read_tracked_by_kqueue(false);
|
| intptr_t event_mask = GetEvents(events + i, sd);
|
| - if (event_mask == 0) {
|
| - // Event not handled, re-add to kqueue.
|
| - UpdateKqueue(kqueue_fd_, sd);
|
| - } else {
|
| + if (event_mask != 0) {
|
| Dart_Port port = sd->port();
|
| ASSERT(port != 0);
|
| DartUtils::PostInt32(port, event_mask);
|
|
|