Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 #if !defined(DART_IO_DISABLED) | 5 #if !defined(DART_IO_DISABLED) |
| 6 | 6 |
| 7 #include "platform/globals.h" | 7 #include "platform/globals.h" |
| 8 #if defined(TARGET_OS_FUCHSIA) | 8 #if defined(TARGET_OS_FUCHSIA) |
| 9 | 9 |
| 10 #include "bin/eventhandler.h" | 10 #include "bin/eventhandler.h" |
| 11 #include "bin/eventhandler_fuchsia.h" | 11 #include "bin/eventhandler_fuchsia.h" |
| 12 | 12 |
| 13 #include <magenta/status.h> | 13 #include <errno.h> // NOLINT |
| 14 #include <magenta/syscalls.h> | 14 #include <fcntl.h> // NOLINT |
| 15 #include <pthread.h> // NOLINT | |
| 16 #include <stdio.h> // NOLINT | |
| 17 #include <string.h> // NOLINT | |
| 18 #include <sys/epoll.h> // NOLINT | |
| 19 #include <sys/stat.h> // NOLINT | |
| 20 #include <unistd.h> // NOLINT | |
| 15 | 21 |
| 22 #include "bin/fdutils.h" | |
| 23 #include "bin/lockers.h" | |
| 16 #include "bin/log.h" | 24 #include "bin/log.h" |
| 25 #include "bin/socket.h" | |
| 17 #include "bin/thread.h" | 26 #include "bin/thread.h" |
| 18 #include "bin/utils.h" | 27 #include "bin/utils.h" |
| 28 #include "platform/hashmap.h" | |
| 29 #include "platform/utils.h" | |
| 19 | 30 |
| 31 // #define EVENTHANDLER_LOGGING 1 | |
| 20 #if defined(EVENTHANDLER_LOGGING) | 32 #if defined(EVENTHANDLER_LOGGING) |
| 21 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) | 33 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) |
| 22 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) | 34 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) |
| 23 #else | 35 #else |
| 24 #define LOG_ERR(msg, ...) | 36 #define LOG_ERR(msg, ...) |
| 25 #define LOG_INFO(msg, ...) | 37 #define LOG_INFO(msg, ...) |
| 26 #endif // defined(EVENTHANDLER_LOGGING) | 38 #endif // defined(EVENTHANDLER_LOGGING) |
| 27 | 39 |
| 28 namespace dart { | 40 namespace dart { |
| 29 namespace bin { | 41 namespace bin { |
| 30 | 42 |
| 31 MagentaWaitManyInfo::MagentaWaitManyInfo() | 43 #if defined(EVENTHANDLER_LOGGING) |
| 32 : capacity_(kInitialCapacity), size_(0) { | 44 static void PrintEventMask(intptr_t fd, intptr_t events) { |
| 33 descriptor_infos_ = static_cast<DescriptorInfo**>( | 45 Log::PrintErr("%d ", fd); |
| 34 malloc(kInitialCapacity * sizeof(*descriptor_infos_))); | 46 if ((events & EPOLLIN) != 0) { |
| 35 if (descriptor_infos_ == NULL) { | 47 Log::PrintErr("EPOLLIN "); |
| 36 FATAL("Failed to allocate descriptor_infos array"); | 48 } |
| 37 } | 49 if ((events & EPOLLPRI) != 0) { |
| 38 items_ = | 50 Log::PrintErr("EPOLLPRI "); |
| 39 static_cast<mx_wait_item_t*>(malloc(kInitialCapacity * sizeof(*items_))); | 51 } |
| 40 if (items_ == NULL) { | 52 if ((events & EPOLLOUT) != 0) { |
| 41 FATAL("Failed to allocate items array"); | 53 Log::PrintErr("EPOLLOUT "); |
| 42 } | 54 } |
| 43 } | 55 if ((events & EPOLLERR) != 0) { |
| 44 | 56 Log::PrintErr("EPOLLERR "); |
| 45 | 57 } |
| 46 MagentaWaitManyInfo::~MagentaWaitManyInfo() { | 58 if ((events & EPOLLHUP) != 0) { |
| 47 free(descriptor_infos_); | 59 Log::PrintErr("EPOLLHUP "); |
| 48 free(items_); | 60 } |
| 49 } | 61 if ((events & EPOLLRDHUP) != 0) { |
| 50 | 62 Log::PrintErr("EPOLLRDHUP "); |
| 51 | 63 } |
| 52 void MagentaWaitManyInfo::AddHandle(mx_handle_t handle, | 64 int all_events = EPOLLIN | EPOLLPRI | EPOLLOUT | |
| 53 mx_signals_t signals, | 65 EPOLLERR | EPOLLHUP | EPOLLRDHUP; |
| 66 if ((events & ~all_events) != 0) { | |
| 67 Log::PrintErr("(and %08x) ", events & ~all_events); | |
| 68 } | |
| 69 | |
| 70 Log::PrintErr("\n"); | |
| 71 } | |
| 72 #endif | |
| 73 | |
| 74 | |
| 75 intptr_t DescriptorInfo::GetPollEvents() { | |
| 76 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are | |
| 77 // triggered anyway. | |
| 78 intptr_t events = 0; | |
| 79 if ((Mask() & (1 << kInEvent)) != 0) { | |
| 80 events |= EPOLLIN; | |
| 81 } | |
| 82 if ((Mask() & (1 << kOutEvent)) != 0) { | |
| 83 events |= EPOLLOUT; | |
| 84 } | |
| 85 return events; | |
| 86 } | |
| 87 | |
| 88 | |
| 89 // Unregister the file descriptor for a DescriptorInfo structure with | |
| 90 // epoll. | |
| 91 static void RemoveFromEpollInstance(intptr_t epoll_fd_, | |
| 54 DescriptorInfo* di) { | 92 DescriptorInfo* di) { |
| 55 #if defined(DEBUG) | 93 LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd()); |
| 56 // Check that the handle is not already in the list. | 94 VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, |
| 57 for (intptr_t i = 0; i < size_; i++) { | 95 EPOLL_CTL_DEL, |
| 58 if (items_[i].handle == handle) { | 96 di->fd(), |
| 59 FATAL("The handle is already in the list!"); | 97 NULL)); |
| 98 } | |
| 99 | |
| 100 | |
| 101 static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { | |
| 102 struct epoll_event event; | |
| 103 event.events = EPOLLRDHUP | di->GetPollEvents(); | |
| 104 if (!di->IsListeningSocket()) { | |
| 105 event.events |= EPOLLET; | |
| 106 } | |
| 107 event.data.ptr = di; | |
| 108 LOG_INFO("AddToEpollInstance: fd = %ld\n", | |
| 109 di->fd()); | |
| 110 int status = NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, | |
| 111 EPOLL_CTL_ADD, | |
| 112 di->fd(), | |
| 113 &event)); | |
| 114 LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", | |
| 115 di->fd(), status); | |
| 116 #if defined(EVENTHANDLER_LOGGING) | |
| 117 PrintEventMask(di->fd(), event.events); | |
| 118 #endif | |
| 119 if (status == -1) { | |
| 120 // TODO(dart:io): Verify that the dart end is handling this correctly. | |
| 121 | |
| 122 // Epoll does not accept the file descriptor. It could be due to | |
| 123 // already closed file descriptor, or unuspported devices, such | |
| 124 // as /dev/null. In such case, mark the file descriptor as closed, | |
| 125 // so dart will handle it accordingly. | |
| 126 di->NotifyAllDartPorts(1 << kCloseEvent); | |
| 127 } | |
| 128 } | |
| 129 | |
| 130 | |
| 131 EventHandlerImplementation::EventHandlerImplementation() | |
| 132 : socket_map_(&HashMap::SamePointerValue, 16) { | |
| 133 intptr_t result; | |
| 134 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_)); | |
| 135 if (result != 0) { | |
| 136 FATAL("Pipe creation failed"); | |
| 137 } | |
| 138 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) { | |
| 139 FATAL("Failed to set pipe fd non blocking\n"); | |
| 140 } | |
| 141 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) { | |
| 142 FATAL("Failed to set pipe fd close on exec\n"); | |
| 143 } | |
| 144 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) { | |
| 145 FATAL("Failed to set pipe fd close on exec\n"); | |
| 146 } | |
| 147 shutdown_ = false; | |
| 148 // The initial size passed to epoll_create is ignore on newer (>= | |
| 149 // 2.6.8) Linux versions | |
| 150 static const int kEpollInitialSize = 64; | |
| 151 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize)); | |
| 152 if (epoll_fd_ == -1) { | |
| 153 FATAL1("Failed creating epoll file descriptor: %i", errno); | |
| 154 } | |
| 155 if (!FDUtils::SetCloseOnExec(epoll_fd_)) { | |
| 156 FATAL("Failed to set epoll fd close on exec\n"); | |
| 157 } | |
| 158 // Register the interrupt_fd with the epoll instance. | |
| 159 struct epoll_event event; | |
| 160 event.events = EPOLLIN; | |
| 161 event.data.ptr = NULL; | |
| 162 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", | |
| 163 epoll_fd_); | |
| 164 int status = NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, | |
| 165 EPOLL_CTL_ADD, | |
| 166 interrupt_fds_[0], | |
| 167 &event)); | |
| 168 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n", | |
| 169 epoll_fd_, status); | |
| 170 if (status == -1) { | |
| 171 FATAL("Failed adding interrupt fd to epoll instance"); | |
| 172 } | |
|
siva
2016/11/21 02:22:41
timer_fd is not being registered here, do we not h
zra
2016/11/21 05:56:24
AFAIK there are no timerfds in Fuchsia. Timers are
| |
| 173 } | |
| 174 | |
| 175 | |
| 176 static void DeleteDescriptorInfo(void* info) { | |
| 177 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info); | |
| 178 di->Close(); | |
| 179 LOG_INFO("Closed %d\n", di->fd()); | |
| 180 delete di; | |
| 181 } | |
| 182 | |
| 183 | |
| 184 EventHandlerImplementation::~EventHandlerImplementation() { | |
| 185 socket_map_.Clear(DeleteDescriptorInfo); | |
| 186 VOID_NO_RETRY_EXPECTED(close(epoll_fd_)); | |
|
siva
2016/11/21 02:22:41
Ditto comment about missing timer_fd_
zra
2016/11/21 05:56:24
Acknowledged.
| |
| 187 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0])); | |
| 188 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1])); | |
| 189 } | |
| 190 | |
| 191 | |
| 192 void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask, | |
| 193 DescriptorInfo *di) { | |
| 194 intptr_t new_mask = di->Mask(); | |
| 195 LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", | |
| 196 di->fd(), old_mask, new_mask); | |
| 197 if ((old_mask != 0) && (new_mask == 0)) { | |
| 198 RemoveFromEpollInstance(epoll_fd_, di); | |
| 199 } else if ((old_mask == 0) && (new_mask != 0)) { | |
| 200 AddToEpollInstance(epoll_fd_, di); | |
| 201 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) { | |
| 202 ASSERT(!di->IsListeningSocket()); | |
| 203 RemoveFromEpollInstance(epoll_fd_, di); | |
| 204 AddToEpollInstance(epoll_fd_, di); | |
| 205 } | |
| 206 } | |
| 207 | |
| 208 | |
| 209 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( | |
| 210 intptr_t fd, bool is_listening) { | |
| 211 ASSERT(fd >= 0); | |
| 212 HashMap::Entry* entry = socket_map_.Lookup( | |
| 213 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); | |
| 214 ASSERT(entry != NULL); | |
| 215 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value); | |
| 216 if (di == NULL) { | |
| 217 // If there is no data in the hash map for this file descriptor a | |
| 218 // new DescriptorInfo for the file descriptor is inserted. | |
| 219 if (is_listening) { | |
| 220 di = new DescriptorInfoMultiple(fd); | |
| 221 } else { | |
| 222 di = new DescriptorInfoSingle(fd); | |
| 60 } | 223 } |
| 61 } | 224 entry->value = di; |
| 62 #endif | 225 } |
| 63 intptr_t new_size = size_ + 1; | 226 ASSERT(fd == di->fd()); |
| 64 GrowArraysIfNeeded(new_size); | 227 return di; |
| 65 descriptor_infos_[size_] = di; | 228 } |
| 66 items_[size_].handle = handle; | 229 |
| 67 items_[size_].waitfor = signals; | 230 |
| 68 items_[size_].pending = 0; | 231 static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) { |
| 69 size_ = new_size; | 232 size_t remaining = count; |
| 70 LOG_INFO("AddHandle(%ld, %ld, %p), size = %ld\n", handle, signals, di, size_); | 233 char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer)); |
| 71 } | 234 while (remaining > 0) { |
| 72 | 235 ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining)); |
| 73 | 236 if (bytes_written == 0) { |
| 74 void MagentaWaitManyInfo::RemoveHandle(mx_handle_t handle) { | 237 return count - remaining; |
| 75 intptr_t idx; | 238 } else if (bytes_written == -1) { |
| 76 for (idx = 1; idx < size_; idx++) { | 239 ASSERT(EAGAIN == EWOULDBLOCK); |
| 77 if (handle == items_[idx].handle) { | 240 // Error code EWOULDBLOCK should only happen for non blocking |
| 78 break; | 241 // file descriptors. |
| 242 ASSERT(errno != EWOULDBLOCK); | |
| 243 return -1; | |
| 244 } else { | |
| 245 ASSERT(bytes_written > 0); | |
| 246 remaining -= bytes_written; | |
| 247 buffer_pos += bytes_written; | |
| 79 } | 248 } |
| 80 } | 249 } |
| 81 if (idx == size_) { | 250 return count; |
| 82 FATAL("Handle is not in the list!"); | 251 } |
| 83 } | 252 |
| 84 | 253 |
| 85 if (idx != (size_ - 1)) { | |
| 86 descriptor_infos_[idx] = descriptor_infos_[size_ - 1]; | |
| 87 items_[idx] = items_[size_ - 1]; | |
| 88 } | |
| 89 descriptor_infos_[size_ - 1] = NULL; | |
| 90 items_[size_ - 1] = {MX_HANDLE_INVALID, 0, 0}; | |
| 91 size_ = size_ - 1; | |
| 92 LOG_INFO("RemoveHandle(%ld), size = %ld\n", handle, size_); | |
| 93 } | |
| 94 | |
| 95 | |
| 96 void MagentaWaitManyInfo::GrowArraysIfNeeded(intptr_t desired_size) { | |
| 97 if (desired_size < capacity_) { | |
| 98 return; | |
| 99 } | |
| 100 intptr_t new_capacity = desired_size + (desired_size >> 1); | |
| 101 descriptor_infos_ = static_cast<DescriptorInfo**>( | |
| 102 realloc(descriptor_infos_, new_capacity * sizeof(*descriptor_infos_))); | |
| 103 if (descriptor_infos_ == NULL) { | |
| 104 FATAL("Failed to grow descriptor_infos array"); | |
| 105 } | |
| 106 items_ = static_cast<mx_wait_item_t*>( | |
| 107 realloc(items_, new_capacity * sizeof(*items_))); | |
| 108 if (items_ == NULL) { | |
| 109 FATAL("Failed to grow items array"); | |
| 110 } | |
| 111 capacity_ = new_capacity; | |
| 112 LOG_INFO("GrowArraysIfNeeded(%ld), capacity = %ld\n", desired_size, | |
| 113 capacity_); | |
| 114 } | |
| 115 | |
| 116 | |
| 117 EventHandlerImplementation::EventHandlerImplementation() { | |
| 118 mx_status_t status = | |
| 119 mx_channel_create(0, &interrupt_handles_[0], &interrupt_handles_[1]); | |
| 120 if (status != NO_ERROR) { | |
| 121 FATAL1("mx_channel_create failed: %s\n", mx_status_get_string(status)); | |
| 122 } | |
| 123 shutdown_ = false; | |
| 124 info_.AddHandle(interrupt_handles_[0], | |
| 125 MX_SIGNAL_READABLE | MX_SIGNAL_PEER_CLOSED, NULL); | |
| 126 LOG_INFO("EventHandlerImplementation initialized\n"); | |
| 127 } | |
| 128 | |
| 129 | |
| 130 EventHandlerImplementation::~EventHandlerImplementation() { | |
| 131 mx_status_t status = mx_handle_close(interrupt_handles_[0]); | |
| 132 if (status != NO_ERROR) { | |
| 133 FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status)); | |
| 134 } | |
| 135 status = mx_handle_close(interrupt_handles_[1]); | |
| 136 if (status != NO_ERROR) { | |
| 137 FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status)); | |
| 138 } | |
| 139 LOG_INFO("EventHandlerImplementation destroyed\n"); | |
| 140 } | |
| 141 | |
| 142 | |
| 143 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 254 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
| 144 Dart_Port dart_port, | 255 Dart_Port dart_port, |
| 145 int64_t data) { | 256 int64_t data) { |
| 146 InterruptMessage msg; | 257 InterruptMessage msg; |
| 147 msg.id = id; | 258 msg.id = id; |
| 148 msg.dart_port = dart_port; | 259 msg.dart_port = dart_port; |
| 149 msg.data = data; | 260 msg.data = data; |
| 150 | 261 // WriteToBlocking will write up to 512 bytes atomically, and since our msg |
| 151 mx_status_t status = | 262 // is smaller than 512, we don't need a thread lock. |
| 152 mx_channel_write(interrupt_handles_[1], 0, &msg, sizeof(msg), NULL, 0); | 263 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. |
| 153 if (status != NO_ERROR) { | 264 ASSERT(kInterruptMessageSize < PIPE_BUF); |
| 154 FATAL1("mx_channel_write failed: %s\n", mx_status_get_string(status)); | 265 intptr_t result = |
| 266 WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); | |
| 267 if (result != kInterruptMessageSize) { | |
| 268 if (result == -1) { | |
| 269 perror("Interrupt message failure:"); | |
| 270 } | |
| 271 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result); | |
| 155 } | 272 } |
| 156 LOG_INFO("WakeupHandler(%ld, %ld, %lld)\n", id, dart_port, data); | |
| 157 } | 273 } |
| 158 | 274 |
| 159 | 275 |
| 160 void EventHandlerImplementation::HandleInterruptFd() { | 276 void EventHandlerImplementation::HandleInterruptFd() { |
| 161 LOG_INFO("HandleInterruptFd entry\n"); | 277 const intptr_t MAX_MESSAGES = kInterruptMessageSize; |
| 162 InterruptMessage msg; | 278 InterruptMessage msg[MAX_MESSAGES]; |
| 163 uint32_t bytes = kInterruptMessageSize; | 279 ssize_t bytes = NO_RETRY_EXPECTED( |
| 164 mx_status_t status; | 280 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); |
| 165 while (true) { | 281 LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes); |
| 166 status = mx_channel_read(interrupt_handles_[0], 0, &msg, bytes, &bytes, | 282 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { |
| 167 NULL, 0, NULL); | 283 if (msg[i].id == kTimerId) { |
| 168 if (status != NO_ERROR) { | |
| 169 break; | |
| 170 } | |
| 171 ASSERT(bytes == kInterruptMessageSize); | |
| 172 if (msg.id == kTimerId) { | |
| 173 LOG_INFO("HandleInterruptFd read timer update\n"); | 284 LOG_INFO("HandleInterruptFd read timer update\n"); |
| 174 timeout_queue_.UpdateTimeout(msg.dart_port, msg.data); | 285 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); |
| 175 } else if (msg.id == kShutdownId) { | 286 } else if (msg[i].id == kShutdownId) { |
| 176 LOG_INFO("HandleInterruptFd read shutdown\n"); | 287 LOG_INFO("HandleInterruptFd read shutdown\n"); |
| 177 shutdown_ = true; | 288 shutdown_ = true; |
| 178 } else { | 289 } else { |
| 179 // TODO(zra): Handle commands to add and remove handles from the | 290 ASSERT((msg[i].data & COMMAND_MASK) != 0); |
| 180 // MagentaWaitManyInfo. | 291 LOG_INFO("HandleInterruptFd command\n"); |
| 181 UNIMPLEMENTED(); | 292 DescriptorInfo* di = GetDescriptorInfo( |
| 293 msg[i].id, IS_LISTENING_SOCKET(msg[i].data)); | |
| 294 if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) { | |
| 295 ASSERT(!di->IsListeningSocket()); | |
| 296 // Close the socket for reading. | |
| 297 LOG_INFO("\tSHUT_RD: %d\n", di->fd()); | |
| 298 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD)); | |
| 299 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) { | |
| 300 ASSERT(!di->IsListeningSocket()); | |
| 301 // Close the socket for writing. | |
| 302 LOG_INFO("\tSHUT_WR: %d\n", di->fd()); | |
| 303 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR)); | |
| 304 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) { | |
| 305 // Close the socket and free system resources and move on to next | |
| 306 // message. | |
| 307 intptr_t old_mask = di->Mask(); | |
| 308 Dart_Port port = msg[i].dart_port; | |
| 309 di->RemovePort(port); | |
| 310 intptr_t new_mask = di->Mask(); | |
| 311 UpdateEpollInstance(old_mask, di); | |
| 312 | |
| 313 LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask); | |
| 314 intptr_t fd = di->fd(); | |
| 315 if (di->IsListeningSocket()) { | |
| 316 // We only close the socket file descriptor from the operating | |
| 317 // system if there are no other dart socket objects which | |
| 318 // are listening on the same (address, port) combination. | |
| 319 ListeningSocketRegistry *registry = | |
| 320 ListeningSocketRegistry::Instance(); | |
| 321 | |
| 322 MutexLocker locker(registry->mutex()); | |
| 323 | |
| 324 if (registry->CloseSafe(fd)) { | |
| 325 ASSERT(new_mask == 0); | |
| 326 socket_map_.Remove(GetHashmapKeyFromFd(fd), | |
| 327 GetHashmapHashFromFd(fd)); | |
| 328 di->Close(); | |
| 329 LOG_INFO("Closed %d\n", di->fd()); | |
| 330 delete di; | |
| 331 } | |
| 332 } else { | |
| 333 ASSERT(new_mask == 0); | |
| 334 socket_map_.Remove( | |
| 335 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); | |
| 336 di->Close(); | |
| 337 LOG_INFO("Closed %d\n", di->fd()); | |
| 338 delete di; | |
| 339 } | |
| 340 | |
| 341 DartUtils::PostInt32(port, 1 << kDestroyedEvent); | |
| 342 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) { | |
| 343 int count = TOKEN_COUNT(msg[i].data); | |
| 344 intptr_t old_mask = di->Mask(); | |
| 345 LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask); | |
| 346 di->ReturnTokens(msg[i].dart_port, count); | |
| 347 UpdateEpollInstance(old_mask, di); | |
| 348 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) { | |
| 349 // `events` can only have kInEvent/kOutEvent flags set. | |
| 350 intptr_t events = msg[i].data & EVENT_MASK; | |
| 351 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); | |
| 352 | |
| 353 intptr_t old_mask = di->Mask(); | |
| 354 LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", | |
| 355 di->fd(), old_mask, msg[i].data & EVENT_MASK); | |
| 356 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK); | |
| 357 UpdateEpollInstance(old_mask, di); | |
| 358 } else { | |
| 359 UNREACHABLE(); | |
| 360 } | |
| 182 } | 361 } |
| 183 } | 362 } |
| 184 // status == ERR_SHOULD_WAIT when we try to read and there are no messages | |
| 185 // available, so it is an error if we get here and status != ERR_SHOULD_WAIT. | |
| 186 if (status != ERR_SHOULD_WAIT) { | |
| 187 FATAL1("mx_channel_read failed: %s\n", mx_status_get_string(status)); | |
| 188 } | |
| 189 LOG_INFO("HandleInterruptFd exit\n"); | 363 LOG_INFO("HandleInterruptFd exit\n"); |
| 190 } | 364 } |
| 191 | 365 |
| 192 | 366 |
| 193 void EventHandlerImplementation::HandleEvents() { | 367 intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, |
| 194 LOG_INFO("HandleEvents entry\n"); | 368 DescriptorInfo* di) { |
| 195 for (intptr_t i = 1; i < info_.size(); i++) { | 369 #ifdef EVENTHANDLER_LOGGING |
| 196 const mx_wait_item_t& wait_item = info_.items()[i]; | 370 PrintEventMask(di->fd(), events); |
| 197 if (wait_item.pending & wait_item.waitfor) { | 371 #endif |
| 198 // Only the control handle has no descriptor info. | 372 if ((events & EPOLLERR) != 0) { |
| 199 ASSERT(info_.descriptor_infos()[i] != NULL); | 373 // Return error only if EPOLLIN is present. |
| 200 ASSERT(wait_item.handle != interrupt_handles_[0]); | 374 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0; |
| 201 // TODO(zra): Handle events on other handles. At the moment we are | 375 } |
| 202 // only interrupted when there is a message on interrupt_handles_[0]. | 376 intptr_t event_mask = 0; |
| 203 UNIMPLEMENTED(); | 377 if ((events & EPOLLIN) != 0) { |
| 378 event_mask |= (1 << kInEvent); | |
| 379 } | |
| 380 if ((events & EPOLLOUT) != 0) { | |
| 381 event_mask |= (1 << kOutEvent); | |
| 382 } | |
| 383 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) { | |
| 384 event_mask |= (1 << kCloseEvent); | |
| 385 } | |
| 386 return event_mask; | |
| 387 } | |
| 388 | |
| 389 | |
| 390 void EventHandlerImplementation::HandleEvents(struct epoll_event* events, | |
| 391 int size) { | |
| 392 bool interrupt_seen = false; | |
| 393 for (int i = 0; i < size; i++) { | |
| 394 if (events[i].data.ptr == NULL) { | |
| 395 interrupt_seen = true; | |
| 396 } else { | |
| 397 DescriptorInfo* di = | |
| 398 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr); | |
| 399 intptr_t event_mask = GetPollEvents(events[i].events, di); | |
| 400 | |
| 401 if ((event_mask & (1 << kErrorEvent)) != 0) { | |
| 402 di->NotifyAllDartPorts(event_mask); | |
| 403 } | |
| 404 event_mask &= ~(1 << kErrorEvent); | |
| 405 | |
| 406 LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask); | |
| 407 if (event_mask != 0) { | |
| 408 intptr_t old_mask = di->Mask(); | |
| 409 Dart_Port port = di->NextNotifyDartPort(event_mask); | |
| 410 ASSERT(port != 0); | |
| 411 UpdateEpollInstance(old_mask, di); | |
| 412 LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", | |
| 413 event_mask, port, di->fd()); | |
| 414 bool success = DartUtils::PostInt32(port, event_mask); | |
| 415 if (!success) { | |
| 416 // This can happen if e.g. the isolate that owns the port has died | |
| 417 // for some reason. | |
| 418 FATAL2("Failed to post event for fd %ld to port %ld", di->fd(), port); | |
| 419 } | |
| 420 } | |
| 204 } | 421 } |
| 205 } | 422 } |
| 206 | 423 if (interrupt_seen) { |
| 207 if ((info_.items()[0].pending & MX_SIGNAL_PEER_CLOSED) != 0) { | 424 // Handle after socket events, so we avoid closing a socket before we handle |
| 208 FATAL("EventHandlerImplementation::Poll: Unexpected peer closed\n"); | 425 // the current events. |
| 209 } | |
| 210 if ((info_.items()[0].pending & MX_SIGNAL_READABLE) != 0) { | |
| 211 LOG_INFO("HandleEvents interrupt_handles_[0] readable\n"); | |
| 212 HandleInterruptFd(); | 426 HandleInterruptFd(); |
| 213 } else { | |
| 214 LOG_INFO("HandleEvents interrupt_handles_[0] not readable\n"); | |
| 215 } | 427 } |
| 216 } | 428 } |
| 217 | 429 |
| 218 | 430 |
| 219 int64_t EventHandlerImplementation::GetTimeout() const { | 431 int64_t EventHandlerImplementation::GetTimeout() const { |
| 220 if (!timeout_queue_.HasTimeout()) { | 432 if (!timeout_queue_.HasTimeout()) { |
| 221 return kInfinityTimeout; | 433 return kInfinityTimeout; |
| 222 } | 434 } |
| 223 int64_t millis = | 435 int64_t millis = timeout_queue_.CurrentTimeout() - |
| 224 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); | 436 TimerUtils::GetCurrentMonotonicMillis(); |
| 225 return (millis < 0) ? 0 : millis; | 437 return (millis < 0) ? 0 : millis; |
| 226 } | 438 } |
| 227 | 439 |
| 228 | 440 |
| 229 void EventHandlerImplementation::HandleTimeout() { | 441 void EventHandlerImplementation::HandleTimeout() { |
| 230 if (timeout_queue_.HasTimeout()) { | 442 if (timeout_queue_.HasTimeout()) { |
| 231 int64_t millis = timeout_queue_.CurrentTimeout() - | 443 int64_t millis = timeout_queue_.CurrentTimeout() - |
| 232 TimerUtils::GetCurrentMonotonicMillis(); | 444 TimerUtils::GetCurrentMonotonicMillis(); |
| 233 if (millis <= 0) { | 445 if (millis <= 0) { |
| 234 DartUtils::PostNull(timeout_queue_.CurrentPort()); | 446 DartUtils::PostNull(timeout_queue_.CurrentPort()); |
| 235 timeout_queue_.RemoveCurrent(); | 447 timeout_queue_.RemoveCurrent(); |
| 236 } | 448 } |
| 237 } | 449 } |
| 238 } | 450 } |
| 239 | 451 |
| 240 | 452 |
| 241 void EventHandlerImplementation::Poll(uword args) { | 453 void EventHandlerImplementation::Poll(uword args) { |
| 454 static const intptr_t kMaxEvents = 16; | |
| 455 struct epoll_event events[kMaxEvents]; | |
| 242 EventHandler* handler = reinterpret_cast<EventHandler*>(args); | 456 EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
| 243 EventHandlerImplementation* handler_impl = &handler->delegate_; | 457 EventHandlerImplementation* handler_impl = &handler->delegate_; |
| 244 ASSERT(handler_impl != NULL); | 458 ASSERT(handler_impl != NULL); |
| 245 | 459 |
| 246 while (!handler_impl->shutdown_) { | 460 while (!handler_impl->shutdown_) { |
| 247 int64_t millis = handler_impl->GetTimeout(); | 461 int64_t millis = handler_impl->GetTimeout(); |
| 248 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); | 462 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); |
| 249 mx_time_t timeout = | 463 LOG_INFO("epoll_wait(millis = %ld)\n", millis); |
| 250 millis * kMicrosecondsPerMillisecond * kNanosecondsPerMicrosecond; | 464 intptr_t result = NO_RETRY_EXPECTED( |
| 251 const MagentaWaitManyInfo& info = handler_impl->info(); | 465 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis)); |
| 252 LOG_INFO("mx_handle_wait_many(%p, %ld, %lld)\n", info.items(), info.size(), | 466 ASSERT(EAGAIN == EWOULDBLOCK); |
| 253 timeout); | 467 LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result); |
| 254 mx_status_t status = | 468 if (result < 0) { |
| 255 mx_handle_wait_many(info.items(), info.size(), timeout); | 469 if (errno != EWOULDBLOCK) { |
| 256 if ((status != NO_ERROR) && (status != ERR_TIMED_OUT)) { | 470 perror("Poll failed"); |
| 257 FATAL1("mx_handle_wait_many failed: %s\n", mx_status_get_string(status)); | 471 } |
| 258 } else { | 472 } else { |
| 259 LOG_INFO("mx_handle_wait_many returned: %ld\n", status); | |
| 260 handler_impl->HandleTimeout(); | 473 handler_impl->HandleTimeout(); |
| 261 handler_impl->HandleEvents(); | 474 handler_impl->HandleEvents(events, result); |
| 262 } | 475 } |
| 263 } | 476 } |
| 264 handler->NotifyShutdownDone(); | 477 handler->NotifyShutdownDone(); |
| 265 LOG_INFO("EventHandlerImplementation notifying about shutdown\n"); | |
| 266 } | 478 } |
| 267 | 479 |
| 268 | 480 |
| 269 void EventHandlerImplementation::Start(EventHandler* handler) { | 481 void EventHandlerImplementation::Start(EventHandler* handler) { |
| 270 int result = Thread::Start(&EventHandlerImplementation::Poll, | 482 int result = Thread::Start(&EventHandlerImplementation::Poll, |
| 271 reinterpret_cast<uword>(handler)); | 483 reinterpret_cast<uword>(handler)); |
| 272 if (result != 0) { | 484 if (result != 0) { |
| 273 FATAL1("Failed to start event handler thread %d", result); | 485 FATAL1("Failed to start event handler thread %d", result); |
| 274 } | 486 } |
| 275 } | 487 } |
| 276 | 488 |
| 277 | 489 |
| 278 void EventHandlerImplementation::Shutdown() { | 490 void EventHandlerImplementation::Shutdown() { |
| 279 SendData(kShutdownId, 0, 0); | 491 SendData(kShutdownId, 0, 0); |
| 280 } | 492 } |
| 281 | 493 |
| 282 | 494 |
| 283 void EventHandlerImplementation::SendData(intptr_t id, | 495 void EventHandlerImplementation::SendData(intptr_t id, |
| 284 Dart_Port dart_port, | 496 Dart_Port dart_port, |
| 285 int64_t data) { | 497 int64_t data) { |
| 286 WakeupHandler(id, dart_port, data); | 498 WakeupHandler(id, dart_port, data); |
| 287 } | 499 } |
| 288 | 500 |
| 501 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { | |
| 502 // The hashmap does not support keys with value 0. | |
| 503 return reinterpret_cast<void*>(fd + 1); | |
| 504 } | |
| 505 | |
| 506 | |
| 507 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { | |
| 508 // The hashmap does not support keys with value 0. | |
| 509 return dart::Utils::WordHash(fd + 1); | |
| 510 } | |
| 511 | |
| 289 } // namespace bin | 512 } // namespace bin |
| 290 } // namespace dart | 513 } // namespace dart |
| 291 | 514 |
| 292 #endif // defined(TARGET_OS_FUCHSIA) | 515 #endif // defined(TARGET_OS_FUCHSIA) |
| 293 | 516 |
| 294 #endif // !defined(DART_IO_DISABLED) | 517 #endif // !defined(DART_IO_DISABLED) |
| OLD | NEW |