OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #if !defined(DART_IO_DISABLED) | 5 #if !defined(DART_IO_DISABLED) |
6 | 6 |
7 #include "platform/globals.h" | 7 #include "platform/globals.h" |
8 #if defined(TARGET_OS_FUCHSIA) | 8 #if defined(TARGET_OS_FUCHSIA) |
9 | 9 |
10 #include "bin/eventhandler.h" | 10 #include "bin/eventhandler.h" |
11 #include "bin/eventhandler_fuchsia.h" | 11 #include "bin/eventhandler_fuchsia.h" |
12 | 12 |
13 #include <magenta/status.h> | 13 #include <errno.h> // NOLINT |
14 #include <magenta/syscalls.h> | 14 #include <fcntl.h> // NOLINT |
| 15 #include <pthread.h> // NOLINT |
| 16 #include <stdio.h> // NOLINT |
| 17 #include <string.h> // NOLINT |
| 18 #include <sys/epoll.h> // NOLINT |
| 19 #include <sys/stat.h> // NOLINT |
| 20 #include <unistd.h> // NOLINT |
15 | 21 |
| 22 #include "bin/fdutils.h" |
| 23 #include "bin/lockers.h" |
16 #include "bin/log.h" | 24 #include "bin/log.h" |
| 25 #include "bin/socket.h" |
17 #include "bin/thread.h" | 26 #include "bin/thread.h" |
18 #include "bin/utils.h" | 27 #include "bin/utils.h" |
| 28 #include "platform/hashmap.h" |
| 29 #include "platform/utils.h" |
19 | 30 |
| 31 // #define EVENTHANDLER_LOGGING 1 |
20 #if defined(EVENTHANDLER_LOGGING) | 32 #if defined(EVENTHANDLER_LOGGING) |
21 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) | 33 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) |
22 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) | 34 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) |
23 #else | 35 #else |
24 #define LOG_ERR(msg, ...) | 36 #define LOG_ERR(msg, ...) |
25 #define LOG_INFO(msg, ...) | 37 #define LOG_INFO(msg, ...) |
26 #endif // defined(EVENTHANDLER_LOGGING) | 38 #endif // defined(EVENTHANDLER_LOGGING) |
27 | 39 |
28 namespace dart { | 40 namespace dart { |
29 namespace bin { | 41 namespace bin { |
30 | 42 |
31 MagentaWaitManyInfo::MagentaWaitManyInfo() | 43 #if defined(EVENTHANDLER_LOGGING) |
32 : capacity_(kInitialCapacity), size_(0) { | 44 static void PrintEventMask(intptr_t fd, intptr_t events) { |
33 descriptor_infos_ = static_cast<DescriptorInfo**>( | 45 Log::PrintErr("%d ", fd); |
34 malloc(kInitialCapacity * sizeof(*descriptor_infos_))); | 46 if ((events & EPOLLIN) != 0) { |
35 if (descriptor_infos_ == NULL) { | 47 Log::PrintErr("EPOLLIN "); |
36 FATAL("Failed to allocate descriptor_infos array"); | 48 } |
37 } | 49 if ((events & EPOLLPRI) != 0) { |
38 items_ = | 50 Log::PrintErr("EPOLLPRI "); |
39 static_cast<mx_wait_item_t*>(malloc(kInitialCapacity * sizeof(*items_))); | 51 } |
40 if (items_ == NULL) { | 52 if ((events & EPOLLOUT) != 0) { |
41 FATAL("Failed to allocate items array"); | 53 Log::PrintErr("EPOLLOUT "); |
42 } | 54 } |
43 } | 55 if ((events & EPOLLERR) != 0) { |
44 | 56 Log::PrintErr("EPOLLERR "); |
45 | 57 } |
46 MagentaWaitManyInfo::~MagentaWaitManyInfo() { | 58 if ((events & EPOLLHUP) != 0) { |
47 free(descriptor_infos_); | 59 Log::PrintErr("EPOLLHUP "); |
48 free(items_); | 60 } |
49 } | 61 if ((events & EPOLLRDHUP) != 0) { |
50 | 62 Log::PrintErr("EPOLLRDHUP "); |
51 | 63 } |
52 void MagentaWaitManyInfo::AddHandle(mx_handle_t handle, | 64 int all_events = |
53 mx_signals_t signals, | 65 EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP; |
54 DescriptorInfo* di) { | 66 if ((events & ~all_events) != 0) { |
55 #if defined(DEBUG) | 67 Log::PrintErr("(and %08x) ", events & ~all_events); |
56 // Check that the handle is not already in the list. | 68 } |
57 for (intptr_t i = 0; i < size_; i++) { | 69 |
58 if (items_[i].handle == handle) { | 70 Log::PrintErr("\n"); |
59 FATAL("The handle is already in the list!"); | 71 } |
| 72 #endif |
| 73 |
| 74 |
| 75 intptr_t DescriptorInfo::GetPollEvents() { |
| 76 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are |
| 77 // triggered anyway. |
| 78 intptr_t events = 0; |
| 79 if ((Mask() & (1 << kInEvent)) != 0) { |
| 80 events |= EPOLLIN; |
| 81 } |
| 82 if ((Mask() & (1 << kOutEvent)) != 0) { |
| 83 events |= EPOLLOUT; |
| 84 } |
| 85 return events; |
| 86 } |
| 87 |
| 88 |
| 89 // Unregister the file descriptor for a DescriptorInfo structure with |
| 90 // epoll. |
| 91 static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { |
| 92 LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd()); |
| 93 VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL)); |
| 94 } |
| 95 |
| 96 |
| 97 static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { |
| 98 struct epoll_event event; |
| 99 event.events = EPOLLRDHUP | di->GetPollEvents(); |
| 100 if (!di->IsListeningSocket()) { |
| 101 event.events |= EPOLLET; |
| 102 } |
| 103 event.data.ptr = di; |
| 104 LOG_INFO("AddToEpollInstance: fd = %ld\n", di->fd()); |
| 105 int status = |
| 106 NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event)); |
| 107 LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", di->fd(), status); |
| 108 #if defined(EVENTHANDLER_LOGGING) |
| 109 PrintEventMask(di->fd(), event.events); |
| 110 #endif |
| 111 if (status == -1) { |
| 112 // TODO(dart:io): Verify that the dart end is handling this correctly. |
| 113 |
| 114 // Epoll does not accept the file descriptor. It could be due to |
| 115 // already closed file descriptor, or unuspported devices, such |
| 116 // as /dev/null. In such case, mark the file descriptor as closed, |
| 117 // so dart will handle it accordingly. |
| 118 di->NotifyAllDartPorts(1 << kCloseEvent); |
| 119 } |
| 120 } |
| 121 |
| 122 |
| 123 EventHandlerImplementation::EventHandlerImplementation() |
| 124 : socket_map_(&HashMap::SamePointerValue, 16) { |
| 125 intptr_t result; |
| 126 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_)); |
| 127 if (result != 0) { |
| 128 FATAL("Pipe creation failed"); |
| 129 } |
| 130 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) { |
| 131 FATAL("Failed to set pipe fd non blocking\n"); |
| 132 } |
| 133 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) { |
| 134 FATAL("Failed to set pipe fd close on exec\n"); |
| 135 } |
| 136 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) { |
| 137 FATAL("Failed to set pipe fd close on exec\n"); |
| 138 } |
| 139 shutdown_ = false; |
| 140 // The initial size passed to epoll_create is ignore on newer (>= |
| 141 // 2.6.8) Linux versions |
| 142 static const int kEpollInitialSize = 64; |
| 143 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize)); |
| 144 if (epoll_fd_ == -1) { |
| 145 FATAL1("Failed creating epoll file descriptor: %i", errno); |
| 146 } |
| 147 if (!FDUtils::SetCloseOnExec(epoll_fd_)) { |
| 148 FATAL("Failed to set epoll fd close on exec\n"); |
| 149 } |
| 150 // Register the interrupt_fd with the epoll instance. |
| 151 struct epoll_event event; |
| 152 event.events = EPOLLIN; |
| 153 event.data.ptr = NULL; |
| 154 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", epoll_fd_); |
| 155 int status = NO_RETRY_EXPECTED( |
| 156 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event)); |
| 157 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n", |
| 158 epoll_fd_, status); |
| 159 if (status == -1) { |
| 160 FATAL("Failed adding interrupt fd to epoll instance"); |
| 161 } |
| 162 } |
| 163 |
| 164 |
| 165 static void DeleteDescriptorInfo(void* info) { |
| 166 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info); |
| 167 di->Close(); |
| 168 LOG_INFO("Closed %d\n", di->fd()); |
| 169 delete di; |
| 170 } |
| 171 |
| 172 |
| 173 EventHandlerImplementation::~EventHandlerImplementation() { |
| 174 socket_map_.Clear(DeleteDescriptorInfo); |
| 175 VOID_NO_RETRY_EXPECTED(close(epoll_fd_)); |
| 176 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0])); |
| 177 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1])); |
| 178 } |
| 179 |
| 180 |
| 181 void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask, |
| 182 DescriptorInfo* di) { |
| 183 intptr_t new_mask = di->Mask(); |
| 184 LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", di->fd(), old_mask, |
| 185 new_mask); |
| 186 if ((old_mask != 0) && (new_mask == 0)) { |
| 187 RemoveFromEpollInstance(epoll_fd_, di); |
| 188 } else if ((old_mask == 0) && (new_mask != 0)) { |
| 189 AddToEpollInstance(epoll_fd_, di); |
| 190 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) { |
| 191 ASSERT(!di->IsListeningSocket()); |
| 192 RemoveFromEpollInstance(epoll_fd_, di); |
| 193 AddToEpollInstance(epoll_fd_, di); |
| 194 } |
| 195 } |
| 196 |
| 197 |
| 198 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( |
| 199 intptr_t fd, |
| 200 bool is_listening) { |
| 201 ASSERT(fd >= 0); |
| 202 HashMap::Entry* entry = socket_map_.Lookup(GetHashmapKeyFromFd(fd), |
| 203 GetHashmapHashFromFd(fd), true); |
| 204 ASSERT(entry != NULL); |
| 205 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value); |
| 206 if (di == NULL) { |
| 207 // If there is no data in the hash map for this file descriptor a |
| 208 // new DescriptorInfo for the file descriptor is inserted. |
| 209 if (is_listening) { |
| 210 di = new DescriptorInfoMultiple(fd); |
| 211 } else { |
| 212 di = new DescriptorInfoSingle(fd); |
60 } | 213 } |
61 } | 214 entry->value = di; |
62 #endif | 215 } |
63 intptr_t new_size = size_ + 1; | 216 ASSERT(fd == di->fd()); |
64 GrowArraysIfNeeded(new_size); | 217 return di; |
65 descriptor_infos_[size_] = di; | 218 } |
66 items_[size_].handle = handle; | 219 |
67 items_[size_].waitfor = signals; | 220 |
68 items_[size_].pending = 0; | 221 static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) { |
69 size_ = new_size; | 222 size_t remaining = count; |
70 LOG_INFO("AddHandle(%ld, %ld, %p), size = %ld\n", handle, signals, di, size_); | 223 char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer)); |
71 } | 224 while (remaining > 0) { |
72 | 225 ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining)); |
73 | 226 if (bytes_written == 0) { |
74 void MagentaWaitManyInfo::RemoveHandle(mx_handle_t handle) { | 227 return count - remaining; |
75 intptr_t idx; | 228 } else if (bytes_written == -1) { |
76 for (idx = 1; idx < size_; idx++) { | 229 ASSERT(EAGAIN == EWOULDBLOCK); |
77 if (handle == items_[idx].handle) { | 230 // Error code EWOULDBLOCK should only happen for non blocking |
78 break; | 231 // file descriptors. |
| 232 ASSERT(errno != EWOULDBLOCK); |
| 233 return -1; |
| 234 } else { |
| 235 ASSERT(bytes_written > 0); |
| 236 remaining -= bytes_written; |
| 237 buffer_pos += bytes_written; |
79 } | 238 } |
80 } | 239 } |
81 if (idx == size_) { | 240 return count; |
82 FATAL("Handle is not in the list!"); | 241 } |
83 } | 242 |
84 | 243 |
85 if (idx != (size_ - 1)) { | |
86 descriptor_infos_[idx] = descriptor_infos_[size_ - 1]; | |
87 items_[idx] = items_[size_ - 1]; | |
88 } | |
89 descriptor_infos_[size_ - 1] = NULL; | |
90 items_[size_ - 1] = {MX_HANDLE_INVALID, 0, 0}; | |
91 size_ = size_ - 1; | |
92 LOG_INFO("RemoveHandle(%ld), size = %ld\n", handle, size_); | |
93 } | |
94 | |
95 | |
96 void MagentaWaitManyInfo::GrowArraysIfNeeded(intptr_t desired_size) { | |
97 if (desired_size < capacity_) { | |
98 return; | |
99 } | |
100 intptr_t new_capacity = desired_size + (desired_size >> 1); | |
101 descriptor_infos_ = static_cast<DescriptorInfo**>( | |
102 realloc(descriptor_infos_, new_capacity * sizeof(*descriptor_infos_))); | |
103 if (descriptor_infos_ == NULL) { | |
104 FATAL("Failed to grow descriptor_infos array"); | |
105 } | |
106 items_ = static_cast<mx_wait_item_t*>( | |
107 realloc(items_, new_capacity * sizeof(*items_))); | |
108 if (items_ == NULL) { | |
109 FATAL("Failed to grow items array"); | |
110 } | |
111 capacity_ = new_capacity; | |
112 LOG_INFO("GrowArraysIfNeeded(%ld), capacity = %ld\n", desired_size, | |
113 capacity_); | |
114 } | |
115 | |
116 | |
117 EventHandlerImplementation::EventHandlerImplementation() { | |
118 mx_status_t status = | |
119 mx_channel_create(0, &interrupt_handles_[0], &interrupt_handles_[1]); | |
120 if (status != NO_ERROR) { | |
121 FATAL1("mx_channel_create failed: %s\n", mx_status_get_string(status)); | |
122 } | |
123 shutdown_ = false; | |
124 info_.AddHandle(interrupt_handles_[0], | |
125 MX_SIGNAL_READABLE | MX_SIGNAL_PEER_CLOSED, NULL); | |
126 LOG_INFO("EventHandlerImplementation initialized\n"); | |
127 } | |
128 | |
129 | |
130 EventHandlerImplementation::~EventHandlerImplementation() { | |
131 mx_status_t status = mx_handle_close(interrupt_handles_[0]); | |
132 if (status != NO_ERROR) { | |
133 FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status)); | |
134 } | |
135 status = mx_handle_close(interrupt_handles_[1]); | |
136 if (status != NO_ERROR) { | |
137 FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status)); | |
138 } | |
139 LOG_INFO("EventHandlerImplementation destroyed\n"); | |
140 } | |
141 | |
142 | |
143 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 244 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
144 Dart_Port dart_port, | 245 Dart_Port dart_port, |
145 int64_t data) { | 246 int64_t data) { |
146 InterruptMessage msg; | 247 InterruptMessage msg; |
147 msg.id = id; | 248 msg.id = id; |
148 msg.dart_port = dart_port; | 249 msg.dart_port = dart_port; |
149 msg.data = data; | 250 msg.data = data; |
150 | 251 // WriteToBlocking will write up to 512 bytes atomically, and since our msg |
151 mx_status_t status = | 252 // is smaller than 512, we don't need a thread lock. |
152 mx_channel_write(interrupt_handles_[1], 0, &msg, sizeof(msg), NULL, 0); | 253 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. |
153 if (status != NO_ERROR) { | 254 ASSERT(kInterruptMessageSize < PIPE_BUF); |
154 FATAL1("mx_channel_write failed: %s\n", mx_status_get_string(status)); | 255 intptr_t result = |
| 256 WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
| 257 if (result != kInterruptMessageSize) { |
| 258 if (result == -1) { |
| 259 perror("Interrupt message failure:"); |
| 260 } |
| 261 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result); |
155 } | 262 } |
156 LOG_INFO("WakeupHandler(%ld, %ld, %lld)\n", id, dart_port, data); | |
157 } | 263 } |
158 | 264 |
159 | 265 |
160 void EventHandlerImplementation::HandleInterruptFd() { | 266 void EventHandlerImplementation::HandleInterruptFd() { |
161 LOG_INFO("HandleInterruptFd entry\n"); | 267 const intptr_t MAX_MESSAGES = kInterruptMessageSize; |
162 InterruptMessage msg; | 268 InterruptMessage msg[MAX_MESSAGES]; |
163 uint32_t bytes = kInterruptMessageSize; | 269 ssize_t bytes = NO_RETRY_EXPECTED( |
164 mx_status_t status; | 270 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); |
165 while (true) { | 271 LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes); |
166 status = mx_channel_read(interrupt_handles_[0], 0, &msg, bytes, &bytes, | 272 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { |
167 NULL, 0, NULL); | 273 if (msg[i].id == kTimerId) { |
168 if (status != NO_ERROR) { | |
169 break; | |
170 } | |
171 ASSERT(bytes == kInterruptMessageSize); | |
172 if (msg.id == kTimerId) { | |
173 LOG_INFO("HandleInterruptFd read timer update\n"); | 274 LOG_INFO("HandleInterruptFd read timer update\n"); |
174 timeout_queue_.UpdateTimeout(msg.dart_port, msg.data); | 275 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); |
175 } else if (msg.id == kShutdownId) { | 276 } else if (msg[i].id == kShutdownId) { |
176 LOG_INFO("HandleInterruptFd read shutdown\n"); | 277 LOG_INFO("HandleInterruptFd read shutdown\n"); |
177 shutdown_ = true; | 278 shutdown_ = true; |
178 } else { | 279 } else { |
179 // TODO(zra): Handle commands to add and remove handles from the | 280 ASSERT((msg[i].data & COMMAND_MASK) != 0); |
180 // MagentaWaitManyInfo. | 281 LOG_INFO("HandleInterruptFd command\n"); |
181 UNIMPLEMENTED(); | 282 DescriptorInfo* di = |
| 283 GetDescriptorInfo(msg[i].id, IS_LISTENING_SOCKET(msg[i].data)); |
| 284 if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) { |
| 285 ASSERT(!di->IsListeningSocket()); |
| 286 // Close the socket for reading. |
| 287 LOG_INFO("\tSHUT_RD: %d\n", di->fd()); |
| 288 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD)); |
| 289 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) { |
| 290 ASSERT(!di->IsListeningSocket()); |
| 291 // Close the socket for writing. |
| 292 LOG_INFO("\tSHUT_WR: %d\n", di->fd()); |
| 293 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR)); |
| 294 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) { |
| 295 // Close the socket and free system resources and move on to next |
| 296 // message. |
| 297 intptr_t old_mask = di->Mask(); |
| 298 Dart_Port port = msg[i].dart_port; |
| 299 di->RemovePort(port); |
| 300 intptr_t new_mask = di->Mask(); |
| 301 UpdateEpollInstance(old_mask, di); |
| 302 |
| 303 LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask); |
| 304 intptr_t fd = di->fd(); |
| 305 if (di->IsListeningSocket()) { |
| 306 // We only close the socket file descriptor from the operating |
| 307 // system if there are no other dart socket objects which |
| 308 // are listening on the same (address, port) combination. |
| 309 ListeningSocketRegistry* registry = |
| 310 ListeningSocketRegistry::Instance(); |
| 311 |
| 312 MutexLocker locker(registry->mutex()); |
| 313 |
| 314 if (registry->CloseSafe(fd)) { |
| 315 ASSERT(new_mask == 0); |
| 316 socket_map_.Remove(GetHashmapKeyFromFd(fd), |
| 317 GetHashmapHashFromFd(fd)); |
| 318 di->Close(); |
| 319 LOG_INFO("Closed %d\n", di->fd()); |
| 320 delete di; |
| 321 } |
| 322 } else { |
| 323 ASSERT(new_mask == 0); |
| 324 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
| 325 di->Close(); |
| 326 LOG_INFO("Closed %d\n", di->fd()); |
| 327 delete di; |
| 328 } |
| 329 |
| 330 DartUtils::PostInt32(port, 1 << kDestroyedEvent); |
| 331 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) { |
| 332 int count = TOKEN_COUNT(msg[i].data); |
| 333 intptr_t old_mask = di->Mask(); |
| 334 LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask); |
| 335 di->ReturnTokens(msg[i].dart_port, count); |
| 336 UpdateEpollInstance(old_mask, di); |
| 337 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) { |
| 338 // `events` can only have kInEvent/kOutEvent flags set. |
| 339 intptr_t events = msg[i].data & EVENT_MASK; |
| 340 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
| 341 |
| 342 intptr_t old_mask = di->Mask(); |
| 343 LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", di->fd(), old_mask, |
| 344 msg[i].data & EVENT_MASK); |
| 345 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK); |
| 346 UpdateEpollInstance(old_mask, di); |
| 347 } else { |
| 348 UNREACHABLE(); |
| 349 } |
182 } | 350 } |
183 } | 351 } |
184 // status == ERR_SHOULD_WAIT when we try to read and there are no messages | |
185 // available, so it is an error if we get here and status != ERR_SHOULD_WAIT. | |
186 if (status != ERR_SHOULD_WAIT) { | |
187 FATAL1("mx_channel_read failed: %s\n", mx_status_get_string(status)); | |
188 } | |
189 LOG_INFO("HandleInterruptFd exit\n"); | 352 LOG_INFO("HandleInterruptFd exit\n"); |
190 } | 353 } |
191 | 354 |
192 | 355 |
193 void EventHandlerImplementation::HandleEvents() { | 356 intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, |
194 LOG_INFO("HandleEvents entry\n"); | 357 DescriptorInfo* di) { |
195 for (intptr_t i = 1; i < info_.size(); i++) { | 358 #ifdef EVENTHANDLER_LOGGING |
196 const mx_wait_item_t& wait_item = info_.items()[i]; | 359 PrintEventMask(di->fd(), events); |
197 if (wait_item.pending & wait_item.waitfor) { | 360 #endif |
198 // Only the control handle has no descriptor info. | 361 if ((events & EPOLLERR) != 0) { |
199 ASSERT(info_.descriptor_infos()[i] != NULL); | 362 // Return error only if EPOLLIN is present. |
200 ASSERT(wait_item.handle != interrupt_handles_[0]); | 363 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0; |
201 // TODO(zra): Handle events on other handles. At the moment we are | 364 } |
202 // only interrupted when there is a message on interrupt_handles_[0]. | 365 intptr_t event_mask = 0; |
203 UNIMPLEMENTED(); | 366 if ((events & EPOLLIN) != 0) { |
| 367 event_mask |= (1 << kInEvent); |
| 368 } |
| 369 if ((events & EPOLLOUT) != 0) { |
| 370 event_mask |= (1 << kOutEvent); |
| 371 } |
| 372 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) { |
| 373 event_mask |= (1 << kCloseEvent); |
| 374 } |
| 375 return event_mask; |
| 376 } |
| 377 |
| 378 |
| 379 void EventHandlerImplementation::HandleEvents(struct epoll_event* events, |
| 380 int size) { |
| 381 bool interrupt_seen = false; |
| 382 for (int i = 0; i < size; i++) { |
| 383 if (events[i].data.ptr == NULL) { |
| 384 interrupt_seen = true; |
| 385 } else { |
| 386 DescriptorInfo* di = |
| 387 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr); |
| 388 intptr_t event_mask = GetPollEvents(events[i].events, di); |
| 389 |
| 390 if ((event_mask & (1 << kErrorEvent)) != 0) { |
| 391 di->NotifyAllDartPorts(event_mask); |
| 392 } |
| 393 event_mask &= ~(1 << kErrorEvent); |
| 394 |
| 395 LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask); |
| 396 if (event_mask != 0) { |
| 397 intptr_t old_mask = di->Mask(); |
| 398 Dart_Port port = di->NextNotifyDartPort(event_mask); |
| 399 ASSERT(port != 0); |
| 400 UpdateEpollInstance(old_mask, di); |
| 401 LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", event_mask, |
| 402 port, di->fd()); |
| 403 bool success = DartUtils::PostInt32(port, event_mask); |
| 404 if (!success) { |
| 405 // This can happen if e.g. the isolate that owns the port has died |
| 406 // for some reason. |
| 407 FATAL2("Failed to post event for fd %ld to port %ld", di->fd(), port); |
| 408 } |
| 409 } |
204 } | 410 } |
205 } | 411 } |
206 | 412 if (interrupt_seen) { |
207 if ((info_.items()[0].pending & MX_SIGNAL_PEER_CLOSED) != 0) { | 413 // Handle after socket events, so we avoid closing a socket before we handle |
208 FATAL("EventHandlerImplementation::Poll: Unexpected peer closed\n"); | 414 // the current events. |
209 } | |
210 if ((info_.items()[0].pending & MX_SIGNAL_READABLE) != 0) { | |
211 LOG_INFO("HandleEvents interrupt_handles_[0] readable\n"); | |
212 HandleInterruptFd(); | 415 HandleInterruptFd(); |
213 } else { | |
214 LOG_INFO("HandleEvents interrupt_handles_[0] not readable\n"); | |
215 } | 416 } |
216 } | 417 } |
217 | 418 |
218 | 419 |
219 int64_t EventHandlerImplementation::GetTimeout() const { | 420 int64_t EventHandlerImplementation::GetTimeout() const { |
220 if (!timeout_queue_.HasTimeout()) { | 421 if (!timeout_queue_.HasTimeout()) { |
221 return kInfinityTimeout; | 422 return kInfinityTimeout; |
222 } | 423 } |
223 int64_t millis = | 424 int64_t millis = |
224 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); | 425 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); |
225 return (millis < 0) ? 0 : millis; | 426 return (millis < 0) ? 0 : millis; |
226 } | 427 } |
227 | 428 |
228 | 429 |
229 void EventHandlerImplementation::HandleTimeout() { | 430 void EventHandlerImplementation::HandleTimeout() { |
230 if (timeout_queue_.HasTimeout()) { | 431 if (timeout_queue_.HasTimeout()) { |
231 int64_t millis = timeout_queue_.CurrentTimeout() - | 432 int64_t millis = timeout_queue_.CurrentTimeout() - |
232 TimerUtils::GetCurrentMonotonicMillis(); | 433 TimerUtils::GetCurrentMonotonicMillis(); |
233 if (millis <= 0) { | 434 if (millis <= 0) { |
234 DartUtils::PostNull(timeout_queue_.CurrentPort()); | 435 DartUtils::PostNull(timeout_queue_.CurrentPort()); |
235 timeout_queue_.RemoveCurrent(); | 436 timeout_queue_.RemoveCurrent(); |
236 } | 437 } |
237 } | 438 } |
238 } | 439 } |
239 | 440 |
240 | 441 |
241 void EventHandlerImplementation::Poll(uword args) { | 442 void EventHandlerImplementation::Poll(uword args) { |
| 443 static const intptr_t kMaxEvents = 16; |
| 444 struct epoll_event events[kMaxEvents]; |
242 EventHandler* handler = reinterpret_cast<EventHandler*>(args); | 445 EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
243 EventHandlerImplementation* handler_impl = &handler->delegate_; | 446 EventHandlerImplementation* handler_impl = &handler->delegate_; |
244 ASSERT(handler_impl != NULL); | 447 ASSERT(handler_impl != NULL); |
245 | 448 |
246 while (!handler_impl->shutdown_) { | 449 while (!handler_impl->shutdown_) { |
247 int64_t millis = handler_impl->GetTimeout(); | 450 int64_t millis = handler_impl->GetTimeout(); |
248 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); | 451 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); |
249 mx_time_t timeout = | 452 LOG_INFO("epoll_wait(millis = %ld)\n", millis); |
250 millis * kMicrosecondsPerMillisecond * kNanosecondsPerMicrosecond; | 453 intptr_t result = NO_RETRY_EXPECTED( |
251 const MagentaWaitManyInfo& info = handler_impl->info(); | 454 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis)); |
252 LOG_INFO("mx_handle_wait_many(%p, %ld, %lld)\n", info.items(), info.size(), | 455 ASSERT(EAGAIN == EWOULDBLOCK); |
253 timeout); | 456 LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result); |
254 mx_status_t status = | 457 if (result < 0) { |
255 mx_handle_wait_many(info.items(), info.size(), timeout); | 458 if (errno != EWOULDBLOCK) { |
256 if ((status != NO_ERROR) && (status != ERR_TIMED_OUT)) { | 459 perror("Poll failed"); |
257 FATAL1("mx_handle_wait_many failed: %s\n", mx_status_get_string(status)); | 460 } |
258 } else { | 461 } else { |
259 LOG_INFO("mx_handle_wait_many returned: %ld\n", status); | |
260 handler_impl->HandleTimeout(); | 462 handler_impl->HandleTimeout(); |
261 handler_impl->HandleEvents(); | 463 handler_impl->HandleEvents(events, result); |
262 } | 464 } |
263 } | 465 } |
264 handler->NotifyShutdownDone(); | 466 handler->NotifyShutdownDone(); |
265 LOG_INFO("EventHandlerImplementation notifying about shutdown\n"); | |
266 } | 467 } |
267 | 468 |
268 | 469 |
269 void EventHandlerImplementation::Start(EventHandler* handler) { | 470 void EventHandlerImplementation::Start(EventHandler* handler) { |
270 int result = Thread::Start(&EventHandlerImplementation::Poll, | 471 int result = Thread::Start(&EventHandlerImplementation::Poll, |
271 reinterpret_cast<uword>(handler)); | 472 reinterpret_cast<uword>(handler)); |
272 if (result != 0) { | 473 if (result != 0) { |
273 FATAL1("Failed to start event handler thread %d", result); | 474 FATAL1("Failed to start event handler thread %d", result); |
274 } | 475 } |
275 } | 476 } |
276 | 477 |
277 | 478 |
278 void EventHandlerImplementation::Shutdown() { | 479 void EventHandlerImplementation::Shutdown() { |
279 SendData(kShutdownId, 0, 0); | 480 SendData(kShutdownId, 0, 0); |
280 } | 481 } |
281 | 482 |
282 | 483 |
283 void EventHandlerImplementation::SendData(intptr_t id, | 484 void EventHandlerImplementation::SendData(intptr_t id, |
284 Dart_Port dart_port, | 485 Dart_Port dart_port, |
285 int64_t data) { | 486 int64_t data) { |
286 WakeupHandler(id, dart_port, data); | 487 WakeupHandler(id, dart_port, data); |
287 } | 488 } |
288 | 489 |
| 490 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { |
| 491 // The hashmap does not support keys with value 0. |
| 492 return reinterpret_cast<void*>(fd + 1); |
| 493 } |
| 494 |
| 495 |
| 496 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { |
| 497 // The hashmap does not support keys with value 0. |
| 498 return dart::Utils::WordHash(fd + 1); |
| 499 } |
| 500 |
289 } // namespace bin | 501 } // namespace bin |
290 } // namespace dart | 502 } // namespace dart |
291 | 503 |
292 #endif // defined(TARGET_OS_FUCHSIA) | 504 #endif // defined(TARGET_OS_FUCHSIA) |
293 | 505 |
294 #endif // !defined(DART_IO_DISABLED) | 506 #endif // !defined(DART_IO_DISABLED) |
OLD | NEW |