OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #if !defined(DART_IO_DISABLED) | 5 #if !defined(DART_IO_DISABLED) |
6 | 6 |
7 #include "platform/globals.h" | 7 #include "platform/globals.h" |
8 #if defined(HOST_OS_FUCHSIA) | 8 #if defined(HOST_OS_FUCHSIA) |
9 | 9 |
10 #include "bin/eventhandler.h" | 10 #include "bin/eventhandler.h" |
11 #include "bin/eventhandler_fuchsia.h" | 11 #include "bin/eventhandler_fuchsia.h" |
12 | 12 |
13 #include <errno.h> // NOLINT | 13 #include <errno.h> |
14 #include <fcntl.h> // NOLINT | 14 #include <fcntl.h> |
15 #include <pthread.h> // NOLINT | 15 #include <magenta/status.h> |
16 #include <stdio.h> // NOLINT | 16 #include <magenta/syscalls.h> |
17 #include <string.h> // NOLINT | 17 #include <magenta/syscalls/object.h> |
18 #include <sys/epoll.h> // NOLINT | 18 #include <magenta/syscalls/port.h> |
19 #include <sys/stat.h> // NOLINT | 19 #include <mxio/private.h> |
20 #include <unistd.h> // NOLINT | 20 #include <pthread.h> |
| 21 #include <stdio.h> |
| 22 #include <string.h> |
| 23 #include <sys/epoll.h> |
| 24 #include <sys/socket.h> |
| 25 #include <sys/stat.h> |
| 26 #include <unistd.h> |
21 | 27 |
22 #include "bin/fdutils.h" | 28 #include "bin/fdutils.h" |
23 #include "bin/lockers.h" | 29 #include "bin/lockers.h" |
24 #include "bin/log.h" | 30 #include "bin/log.h" |
25 #include "bin/socket.h" | 31 #include "bin/socket.h" |
26 #include "bin/thread.h" | 32 #include "bin/thread.h" |
27 #include "bin/utils.h" | 33 #include "bin/utils.h" |
28 #include "platform/hashmap.h" | 34 #include "platform/hashmap.h" |
29 #include "platform/utils.h" | 35 #include "platform/utils.h" |
30 | 36 |
31 // #define EVENTHANDLER_LOGGING 1 | 37 // The EventHandler for Fuchsia uses its "ports v2" API: |
32 #if defined(EVENTHANDLER_LOGGING) | 38 // https://fuchsia.googlesource.com/magenta/+/HEAD/docs/syscalls/port_create.md |
33 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) | 39 // This API does not have epoll()-like edge triggering (EPOLLET). Since clients |
34 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) | 40 // of the EventHandler expect edge-triggered notifications, we must simulate it. |
| 41 // When a packet from mx_port_wait() indicates that a signal is asserted for a |
| 42 // handle, we unsubscribe from that signal until the event that asserted the |
| 43 // signal can be processed. For example: |
| 44 // |
| 45 // 1. We get MX_SOCKET_WRITABLE from mx_port_wait() for a handle. |
| 46 // 2. We send kOutEvent to the Dart thread. |
| 47 // 3. We unsubscribe from further MX_SOCKET_WRITABLE signals for the handle. |
| 48 // 4. Some time later the Dart thread actually does a write(). |
| 49 // 5. After writing, the Dart thread manually asserts SYNTHETIC_WRITE_SIGNAL |
| 50 // on the handle. |
| 51 // 6. The EventHandler thread gets SYNTHETIC_WRITE_SIGNAL from mx_port_wait() |
| 52 // for the handle. |
| 53 // 7. The EventHandler thread re-subscribes to MX_SOCKET_WRITABLE, and sends |
| 54 // another kOutEvent to the Dart thread if it happens to be asserted. |
| 55 // |
| 56 // We use he same procedure for MX_SOCKET_READABLE, SYNTHETIC_READ_SIGNAL, and |
| 57 // read()/accept(). |
| 58 |
| 59 // define EVENTHANDLER_LOG_ERROR to get log messages only for errors. |
| 60 // define EVENTHANDLER_LOG_INFO to get log messages for both information and |
| 61 // errors. |
| 62 // #define EVENTHANDLER_LOG_INFO 1 |
| 63 #define EVENTHANDLER_LOG_ERROR 1 |
| 64 #if defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR) |
| 65 #define LOG_ERR(msg, ...) \ |
| 66 { \ |
| 67 int err = errno; \ |
| 68 Log::PrintErr("Dart EventHandler ERROR: %s:%d: " msg, __FILE__, __LINE__, \ |
| 69 ##__VA_ARGS__); \ |
| 70 errno = err; \ |
| 71 } |
| 72 #if defined(EVENTHANDLER_LOG_INFO) |
| 73 #define LOG_INFO(msg, ...) \ |
| 74 Log::Print("Dart EventHandler INFO: %s:%d: " msg, __FILE__, __LINE__, \ |
| 75 ##__VA_ARGS__) |
| 76 #else |
| 77 #define LOG_INFO(msg, ...) |
| 78 #endif // defined(EVENTHANDLER_LOG_INFO) |
35 #else | 79 #else |
36 #define LOG_ERR(msg, ...) | 80 #define LOG_ERR(msg, ...) |
37 #define LOG_INFO(msg, ...) | 81 #define LOG_INFO(msg, ...) |
38 #endif // defined(EVENTHANDLER_LOGGING) | 82 #endif // defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR) |
39 | 83 |
40 namespace dart { | 84 namespace dart { |
41 namespace bin { | 85 namespace bin { |
42 | 86 |
43 #if defined(EVENTHANDLER_LOGGING) | 87 #define SYNTHETIC_READ_SIGNAL MX_USER_SIGNAL_6 |
44 static void PrintEventMask(intptr_t fd, intptr_t events) { | 88 #define SYNTHETIC_WRITE_SIGNAL MX_USER_SIGNAL_7 |
45 Log::PrintErr("%d ", fd); | 89 |
46 if ((events & EPOLLIN) != 0) { | 90 intptr_t IOHandle::Read(void* buffer, intptr_t num_bytes) { |
47 Log::PrintErr("EPOLLIN "); | 91 MutexLocker ml(mutex_); |
48 } | 92 const ssize_t read_bytes = NO_RETRY_EXPECTED(read(fd_, buffer, num_bytes)); |
49 if ((events & EPOLLPRI) != 0) { | 93 LOG_INFO("IOHandle::Read: fd = %ld. read %ld bytes\n", fd_, read_bytes); |
50 Log::PrintErr("EPOLLPRI "); | 94 |
51 } | 95 read_events_enabled_ = true; |
52 if ((events & EPOLLOUT) != 0) { | 96 mx_status_t status = mx_object_signal(handle_, 0, SYNTHETIC_READ_SIGNAL); |
53 Log::PrintErr("EPOLLOUT "); | 97 if (status != MX_OK) { |
54 } | 98 LOG_ERR("Failed to send user signal to handle: %s\n", |
55 if ((events & EPOLLERR) != 0) { | 99 mx_status_get_string(status)); |
56 Log::PrintErr("EPOLLERR "); | 100 } |
57 } | 101 |
58 if ((events & EPOLLHUP) != 0) { | 102 return read_bytes; |
59 Log::PrintErr("EPOLLHUP "); | 103 } |
60 } | 104 |
61 if ((events & EPOLLRDHUP) != 0) { | 105 |
62 Log::PrintErr("EPOLLRDHUP "); | 106 intptr_t IOHandle::Write(const void* buffer, intptr_t num_bytes) { |
63 } | 107 MutexLocker ml(mutex_); |
64 int all_events = | 108 const ssize_t written_bytes = |
65 EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP; | 109 NO_RETRY_EXPECTED(write(fd_, buffer, num_bytes)); |
66 if ((events & ~all_events) != 0) { | 110 LOG_INFO("IOHandle::Write: fd = %ld. wrote %ld bytes\n", fd_, written_bytes); |
67 Log::PrintErr("(and %08x) ", events & ~all_events); | 111 |
68 } | 112 // Assert a signal on the handle indicating that a write() completed. |
69 | 113 write_events_enabled_ = true; |
70 Log::PrintErr("\n"); | 114 mx_status_t status = mx_object_signal(handle_, 0, SYNTHETIC_WRITE_SIGNAL); |
71 } | 115 if (status != MX_OK) { |
72 #endif | 116 LOG_ERR("Failed to send user signal to handle: %s\n", |
73 | 117 mx_status_get_string(status)); |
74 | 118 } |
75 intptr_t DescriptorInfo::GetPollEvents() { | 119 |
| 120 return written_bytes; |
| 121 } |
| 122 |
| 123 |
| 124 intptr_t IOHandle::Accept(struct sockaddr* addr, socklen_t* addrlen) { |
| 125 MutexLocker ml(mutex_); |
| 126 intptr_t socket = NO_RETRY_EXPECTED(accept(fd_, addr, addrlen)); |
| 127 LOG_INFO("IOHandle::Accept: fd = %ld. socket = %ld\n", fd_, socket); |
| 128 |
| 129 read_events_enabled_ = true; |
| 130 mx_status_t status = mx_object_signal(handle_, 0, SYNTHETIC_READ_SIGNAL); |
| 131 if (status != MX_OK) { |
| 132 LOG_ERR("Failed to send user signal to handle: %s\n", |
| 133 mx_status_get_string(status)); |
| 134 } |
| 135 |
| 136 return socket; |
| 137 } |
| 138 |
| 139 |
| 140 void IOHandle::Close() { |
| 141 MutexLocker ml(mutex_); |
| 142 VOID_NO_RETRY_EXPECTED(close(fd_)); |
| 143 } |
| 144 |
| 145 |
| 146 uint32_t IOHandle::MaskToEpollEvents(intptr_t mask) { |
| 147 MutexLocker ml(mutex_); |
76 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are | 148 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are |
77 // triggered anyway. | 149 // triggered anyway. |
78 intptr_t events = 0; | 150 uint32_t events = EPOLLRDHUP; |
79 if ((Mask() & (1 << kInEvent)) != 0) { | 151 if (read_events_enabled_ && ((mask & (1 << kInEvent)) != 0)) { |
80 events |= EPOLLIN; | 152 events |= EPOLLIN; |
81 } | 153 } |
82 if ((Mask() & (1 << kOutEvent)) != 0) { | 154 if (write_events_enabled_ && ((mask & (1 << kOutEvent)) != 0)) { |
83 events |= EPOLLOUT; | 155 events |= EPOLLOUT; |
84 } | 156 } |
85 return events; | 157 return events; |
86 } | 158 } |
87 | 159 |
88 | 160 |
89 // Unregister the file descriptor for a DescriptorInfo structure with | 161 intptr_t IOHandle::EpollEventsToMask(intptr_t events) { |
90 // epoll. | 162 if ((events & EPOLLERR) != 0) { |
91 static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { | 163 // Return error only if EPOLLIN is present. |
92 LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd()); | 164 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0; |
93 VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL)); | 165 } |
94 } | 166 intptr_t event_mask = 0; |
95 | 167 if ((events & EPOLLIN) != 0) { |
96 | 168 event_mask |= (1 << kInEvent); |
97 static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { | 169 } |
98 struct epoll_event event; | 170 if ((events & EPOLLOUT) != 0) { |
99 event.events = EPOLLRDHUP | di->GetPollEvents(); | 171 event_mask |= (1 << kOutEvent); |
100 if (!di->IsListeningSocket()) { | 172 } |
101 event.events |= EPOLLET; | 173 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) { |
102 } | 174 event_mask |= (1 << kCloseEvent); |
103 event.data.ptr = di; | 175 } |
104 LOG_INFO("AddToEpollInstance: fd = %ld\n", di->fd()); | 176 return event_mask; |
105 int status = | 177 } |
106 NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event)); | 178 |
107 LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", di->fd(), status); | 179 |
108 #if defined(EVENTHANDLER_LOGGING) | 180 bool IOHandle::AsyncWait(mx_handle_t port, uint32_t events, uint64_t key) { |
109 PrintEventMask(di->fd(), event.events); | 181 MutexLocker ml(mutex_); |
110 #endif | 182 LOG_INFO("IOHandle::AsyncWait: fd = %ld\n", fd_); |
111 if (status == -1) { | 183 // The call to __mxio_fd_to_io() in the DescriptorInfo constructor may have |
112 // TODO(dart:io): Verify that the dart end is handling this correctly. | 184 // returned NULL. If it did, propagate the problem up to Dart. |
113 | 185 if (mxio_ == NULL) { |
114 // Epoll does not accept the file descriptor. It could be due to | 186 LOG_ERR("__mxio_fd_to_io(%d) returned NULL\n", fd_); |
115 // already closed file descriptor, or unuspported devices, such | 187 return false; |
116 // as /dev/null. In such case, mark the file descriptor as closed, | 188 } |
117 // so dart will handle it accordingly. | 189 |
| 190 mx_handle_t handle; |
| 191 mx_signals_t signals; |
| 192 __mxio_wait_begin(mxio_, events, &handle, &signals); |
| 193 if (handle == MX_HANDLE_INVALID) { |
| 194 LOG_ERR("fd = %ld __mxio_wait_begin returned an invalid handle\n", fd_); |
| 195 return false; |
| 196 } |
| 197 |
| 198 // These signals are used to wake us up if we should start listening for |
| 199 // the real readable and writable signals again. |
| 200 signals |= SYNTHETIC_READ_SIGNAL | SYNTHETIC_WRITE_SIGNAL; |
| 201 |
| 202 handle_ = handle; |
| 203 LOG_INFO("mx_object_wait_async(fd = %ld, signals = %x)\n", fd_, signals); |
| 204 mx_status_t status = |
| 205 mx_object_wait_async(handle_, port, key, signals, MX_WAIT_ASYNC_ONCE); |
| 206 if (status != MX_OK) { |
| 207 LOG_ERR("mx_object_wait_async failed: %s\n", mx_status_get_string(status)); |
| 208 return false; |
| 209 } |
| 210 |
| 211 return true; |
| 212 } |
| 213 |
| 214 |
| 215 void IOHandle::CancelWait(mx_handle_t port, uint64_t key) { |
| 216 MutexLocker ml(mutex_); |
| 217 LOG_INFO("IOHandle::CancelWait: fd = %ld\n", fd_); |
| 218 ASSERT(port != MX_HANDLE_INVALID); |
| 219 ASSERT(handle_ != MX_HANDLE_INVALID); |
| 220 mx_status_t status = mx_port_cancel(port, handle_, key); |
| 221 if ((status != MX_OK) && (status != MX_ERR_NOT_FOUND)) { |
| 222 LOG_ERR("mx_port_cancel failed: %s\n", mx_status_get_string(status)); |
| 223 } |
| 224 } |
| 225 |
| 226 |
| 227 mx_status_t IOHandle::ReadySignalsLocked(mx_signals_t* ready_signals) { |
| 228 const mx_signals_t watched_signals = MX_SOCKET_READABLE | MX_SOCKET_WRITABLE; |
| 229 mx_status_t status = mx_object_wait_one(handle_, watched_signals, |
| 230 /* deadline */ 0, ready_signals); |
| 231 if ((status != MX_OK) && (status != MX_ERR_TIMED_OUT)) { |
| 232 LOG_ERR("mx_object_wait_one failed: %s\n", mx_status_get_string(status)); |
| 233 return status; |
| 234 } |
| 235 return MX_OK; |
| 236 } |
| 237 |
| 238 |
| 239 uint32_t IOHandle::WaitEnd(mx_signals_t observed) { |
| 240 MutexLocker ml(mutex_); |
| 241 const bool synthetic_read = (observed & SYNTHETIC_READ_SIGNAL) != 0; |
| 242 const bool synthetic_write = (observed & SYNTHETIC_WRITE_SIGNAL) != 0; |
| 243 if (synthetic_read || synthetic_write) { |
| 244 mx_signals_t read_write_signals = MX_SIGNAL_NONE; |
| 245 mx_status_t status = ReadySignalsLocked(&read_write_signals); |
| 246 ASSERT(status == MX_OK); |
| 247 const bool real_read = (read_write_signals & MX_SOCKET_READABLE) != 0; |
| 248 const bool real_write = (read_write_signals & MX_SOCKET_WRITABLE) != 0; |
| 249 if (synthetic_read) { |
| 250 LOG_INFO("IOHandle::WaitEnd: fd = %ld got synthetic read signal\n", fd_); |
| 251 observed |= (real_read ? MX_SOCKET_READABLE : 0); |
| 252 observed &= ~SYNTHETIC_READ_SIGNAL; |
| 253 status = mx_object_signal(handle_, SYNTHETIC_READ_SIGNAL, 0); |
| 254 if (status != MX_OK) { |
| 255 LOG_ERR("Failed to clear SYNTHETIC_READ_SIGNAL on fd=%ld\n", fd_); |
| 256 } |
| 257 } |
| 258 |
| 259 if (synthetic_write) { |
| 260 LOG_INFO("IOHandle::WaitEnd: fd = %ld got synthetic write signal\n", fd_); |
| 261 observed |= (real_write ? MX_SOCKET_WRITABLE : 0); |
| 262 observed &= ~SYNTHETIC_WRITE_SIGNAL; |
| 263 status = mx_object_signal(handle_, SYNTHETIC_WRITE_SIGNAL, 0); |
| 264 if (status != MX_OK) { |
| 265 LOG_ERR("Failed to clear SYNTHETIC_WRITE_SIGNAL on fd=%ld\n", fd_); |
| 266 } |
| 267 } |
| 268 } |
| 269 |
| 270 uint32_t events = 0; |
| 271 __mxio_wait_end(mxio_, observed, &events); |
| 272 return events; |
| 273 } |
| 274 |
| 275 |
| 276 intptr_t IOHandle::ToggleEvents(intptr_t event_mask) { |
| 277 MutexLocker ml(mutex_); |
| 278 if (!write_events_enabled_) { |
| 279 LOG_INFO("IOHandle::ToggleEvents: fd = %ld de-asserting write\n", fd_); |
| 280 event_mask = event_mask & ~(1 << kOutEvent); |
| 281 } |
| 282 if ((event_mask & (1 << kOutEvent)) != 0) { |
| 283 LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting write and disabling\n", |
| 284 fd_); |
| 285 write_events_enabled_ = false; |
| 286 } |
| 287 if (!read_events_enabled_) { |
| 288 LOG_INFO("IOHandle::ToggleEvents: fd=%ld de-asserting read\n", fd_); |
| 289 event_mask = event_mask & ~(1 << kInEvent); |
| 290 } |
| 291 if ((event_mask & (1 << kInEvent)) != 0) { |
| 292 LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting read and disabling\n", |
| 293 fd_); |
| 294 read_events_enabled_ = false; |
| 295 } |
| 296 return event_mask; |
| 297 } |
| 298 |
| 299 |
| 300 void EventHandlerImplementation::AddToPort(mx_handle_t port_handle, |
| 301 DescriptorInfo* di) { |
| 302 const uint32_t events = di->io_handle()->MaskToEpollEvents(di->Mask()); |
| 303 const uint64_t key = reinterpret_cast<uint64_t>(di); |
| 304 if (!di->io_handle()->AsyncWait(port_handle, events, key)) { |
118 di->NotifyAllDartPorts(1 << kCloseEvent); | 305 di->NotifyAllDartPorts(1 << kCloseEvent); |
119 } | 306 } |
120 } | 307 } |
121 | 308 |
122 | 309 |
| 310 void EventHandlerImplementation::RemoveFromPort(mx_handle_t port_handle, |
| 311 DescriptorInfo* di) { |
| 312 const uint64_t key = reinterpret_cast<uint64_t>(di); |
| 313 di->io_handle()->CancelWait(port_handle, key); |
| 314 } |
| 315 |
| 316 |
123 EventHandlerImplementation::EventHandlerImplementation() | 317 EventHandlerImplementation::EventHandlerImplementation() |
124 : socket_map_(&HashMap::SamePointerValue, 16) { | 318 : socket_map_(&HashMap::SamePointerValue, 16) { |
125 intptr_t result; | |
126 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_)); | |
127 if (result != 0) { | |
128 FATAL("Pipe creation failed"); | |
129 } | |
130 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) { | |
131 FATAL("Failed to set pipe fd non blocking\n"); | |
132 } | |
133 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) { | |
134 FATAL("Failed to set pipe fd close on exec\n"); | |
135 } | |
136 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) { | |
137 FATAL("Failed to set pipe fd close on exec\n"); | |
138 } | |
139 shutdown_ = false; | 319 shutdown_ = false; |
140 // The initial size passed to epoll_create is ignore on newer (>= | 320 // Create the port. |
141 // 2.6.8) Linux versions | 321 port_handle_ = MX_HANDLE_INVALID; |
142 static const int kEpollInitialSize = 64; | 322 mx_status_t status = mx_port_create(MX_PORT_OPT_V2, &port_handle_); |
143 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize)); | 323 if (status != MX_OK) { |
144 if (epoll_fd_ == -1) { | 324 // This is a FATAL because the VM won't work at all if we can't create this |
145 FATAL1("Failed creating epoll file descriptor: %i", errno); | 325 // port. |
146 } | 326 FATAL1("mx_port_create failed: %s\n", mx_status_get_string(status)); |
147 if (!FDUtils::SetCloseOnExec(epoll_fd_)) { | 327 } |
148 FATAL("Failed to set epoll fd close on exec\n"); | 328 ASSERT(port_handle_ != MX_HANDLE_INVALID); |
149 } | |
150 // Register the interrupt_fd with the epoll instance. | |
151 struct epoll_event event; | |
152 event.events = EPOLLIN; | |
153 event.data.ptr = NULL; | |
154 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", epoll_fd_); | |
155 int status = NO_RETRY_EXPECTED( | |
156 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event)); | |
157 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n", | |
158 epoll_fd_, status); | |
159 if (status == -1) { | |
160 FATAL("Failed adding interrupt fd to epoll instance"); | |
161 } | |
162 } | 329 } |
163 | 330 |
164 | 331 |
165 static void DeleteDescriptorInfo(void* info) { | 332 static void DeleteDescriptorInfo(void* info) { |
166 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info); | 333 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info); |
| 334 LOG_INFO("Closed %ld\n", di->io_handle()->fd()); |
167 di->Close(); | 335 di->Close(); |
168 LOG_INFO("Closed %d\n", di->fd()); | |
169 delete di; | 336 delete di; |
170 } | 337 } |
171 | 338 |
172 | 339 |
173 EventHandlerImplementation::~EventHandlerImplementation() { | 340 EventHandlerImplementation::~EventHandlerImplementation() { |
174 socket_map_.Clear(DeleteDescriptorInfo); | 341 socket_map_.Clear(DeleteDescriptorInfo); |
175 VOID_NO_RETRY_EXPECTED(close(epoll_fd_)); | 342 mx_handle_close(port_handle_); |
176 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0])); | 343 port_handle_ = MX_HANDLE_INVALID; |
177 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1])); | 344 } |
178 } | 345 |
179 | 346 |
180 | 347 void EventHandlerImplementation::UpdatePort(intptr_t old_mask, |
181 void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask, | 348 DescriptorInfo* di) { |
182 DescriptorInfo* di) { | 349 const intptr_t new_mask = di->Mask(); |
183 intptr_t new_mask = di->Mask(); | |
184 LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", di->fd(), old_mask, | |
185 new_mask); | |
186 if ((old_mask != 0) && (new_mask == 0)) { | 350 if ((old_mask != 0) && (new_mask == 0)) { |
187 RemoveFromEpollInstance(epoll_fd_, di); | 351 RemoveFromPort(port_handle_, di); |
188 } else if ((old_mask == 0) && (new_mask != 0)) { | 352 } else if ((old_mask == 0) && (new_mask != 0)) { |
189 AddToEpollInstance(epoll_fd_, di); | 353 AddToPort(port_handle_, di); |
190 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) { | 354 } else if ((old_mask != 0) && (new_mask != 0)) { |
191 ASSERT(!di->IsListeningSocket()); | 355 ASSERT(!di->IsListeningSocket()); |
192 RemoveFromEpollInstance(epoll_fd_, di); | 356 RemoveFromPort(port_handle_, di); |
193 AddToEpollInstance(epoll_fd_, di); | 357 AddToPort(port_handle_, di); |
194 } | 358 } |
195 } | 359 } |
196 | 360 |
197 | 361 |
198 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( | 362 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( |
199 intptr_t fd, | 363 intptr_t fd, |
200 bool is_listening) { | 364 bool is_listening) { |
201 ASSERT(fd >= 0); | 365 IOHandle* handle = reinterpret_cast<IOHandle*>(fd); |
202 HashMap::Entry* entry = socket_map_.Lookup(GetHashmapKeyFromFd(fd), | 366 ASSERT(handle->fd() >= 0); |
203 GetHashmapHashFromFd(fd), true); | 367 HashMap::Entry* entry = |
| 368 socket_map_.Lookup(GetHashmapKeyFromFd(handle->fd()), |
| 369 GetHashmapHashFromFd(handle->fd()), true); |
204 ASSERT(entry != NULL); | 370 ASSERT(entry != NULL); |
205 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value); | 371 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value); |
206 if (di == NULL) { | 372 if (di == NULL) { |
207 // If there is no data in the hash map for this file descriptor a | 373 // If there is no data in the hash map for this file descriptor a |
208 // new DescriptorInfo for the file descriptor is inserted. | 374 // new DescriptorInfo for the file descriptor is inserted. |
209 if (is_listening) { | 375 if (is_listening) { |
210 di = new DescriptorInfoMultiple(fd); | 376 di = new DescriptorInfoMultiple(fd); |
211 } else { | 377 } else { |
212 di = new DescriptorInfoSingle(fd); | 378 di = new DescriptorInfoSingle(fd); |
213 } | 379 } |
214 entry->value = di; | 380 entry->value = di; |
215 } | 381 } |
216 ASSERT(fd == di->fd()); | 382 ASSERT(fd == di->fd()); |
217 return di; | 383 return di; |
218 } | 384 } |
219 | 385 |
220 | 386 |
221 static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) { | |
222 size_t remaining = count; | |
223 char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer)); | |
224 while (remaining > 0) { | |
225 ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining)); | |
226 if (bytes_written == 0) { | |
227 return count - remaining; | |
228 } else if (bytes_written == -1) { | |
229 ASSERT(EAGAIN == EWOULDBLOCK); | |
230 // Error code EWOULDBLOCK should only happen for non blocking | |
231 // file descriptors. | |
232 ASSERT(errno != EWOULDBLOCK); | |
233 return -1; | |
234 } else { | |
235 ASSERT(bytes_written > 0); | |
236 remaining -= bytes_written; | |
237 buffer_pos += bytes_written; | |
238 } | |
239 } | |
240 return count; | |
241 } | |
242 | |
243 | |
244 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 387 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
245 Dart_Port dart_port, | 388 Dart_Port dart_port, |
246 int64_t data) { | 389 int64_t data) { |
247 InterruptMessage msg; | 390 COMPILE_ASSERT(sizeof(InterruptMessage) <= sizeof(mx_packet_user_t)); |
248 msg.id = id; | 391 mx_port_packet_t pkt; |
249 msg.dart_port = dart_port; | 392 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt.user); |
250 msg.data = data; | 393 pkt.key = kInterruptPacketKey; |
251 // WriteToBlocking will write up to 512 bytes atomically, and since our msg | 394 msg->id = id; |
252 // is smaller than 512, we don't need a thread lock. | 395 msg->dart_port = dart_port; |
253 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. | 396 msg->data = data; |
254 ASSERT(kInterruptMessageSize < PIPE_BUF); | 397 mx_status_t status = |
255 intptr_t result = | 398 mx_port_queue(port_handle_, reinterpret_cast<void*>(&pkt), 0); |
256 WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); | 399 if (status != MX_OK) { |
257 if (result != kInterruptMessageSize) { | 400 // This is a FATAL because the VM won't work at all if we can't send any |
258 if (result == -1) { | 401 // messages to the EventHandler thread. |
259 perror("Interrupt message failure:"); | 402 FATAL1("mx_port_queue failed: %s\n", mx_status_get_string(status)); |
260 } | |
261 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result); | |
262 } | 403 } |
263 } | 404 } |
264 | 405 |
265 | 406 |
266 void EventHandlerImplementation::HandleInterruptFd() { | 407 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
267 const intptr_t MAX_MESSAGES = kInterruptMessageSize; | 408 if (msg->id == kTimerId) { |
268 InterruptMessage msg[MAX_MESSAGES]; | 409 LOG_INFO("HandleInterrupt read timer update\n"); |
269 ssize_t bytes = NO_RETRY_EXPECTED( | 410 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); |
270 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); | 411 return; |
271 LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes); | 412 } else if (msg->id == kShutdownId) { |
272 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { | 413 LOG_INFO("HandleInterrupt read shutdown\n"); |
273 if (msg[i].id == kTimerId) { | 414 shutdown_ = true; |
274 LOG_INFO("HandleInterruptFd read timer update\n"); | 415 return; |
275 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); | 416 } |
276 } else if (msg[i].id == kShutdownId) { | 417 ASSERT((msg->data & COMMAND_MASK) != 0); |
277 LOG_INFO("HandleInterruptFd read shutdown\n"); | 418 LOG_INFO("HandleInterrupt command:\n"); |
278 shutdown_ = true; | 419 Socket* socket = reinterpret_cast<Socket*>(msg->id); |
| 420 RefCntReleaseScope<Socket> rs(socket); |
| 421 if (socket->fd() == -1) { |
| 422 return; |
| 423 } |
| 424 IOHandle* io_handle = reinterpret_cast<IOHandle*>(socket->fd()); |
| 425 const intptr_t fd = io_handle->fd(); |
| 426 DescriptorInfo* di = |
| 427 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg->data)); |
| 428 ASSERT(io_handle == di->io_handle()); |
| 429 if (IS_COMMAND(msg->data, kShutdownReadCommand)) { |
| 430 ASSERT(!di->IsListeningSocket()); |
| 431 // Close the socket for reading. |
| 432 LOG_INFO("\tSHUT_RD: %ld\n", fd); |
| 433 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_RD)); |
| 434 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) { |
| 435 ASSERT(!di->IsListeningSocket()); |
| 436 // Close the socket for writing. |
| 437 LOG_INFO("\tSHUT_WR: %ld\n", fd); |
| 438 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_WR)); |
| 439 } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
| 440 // Close the socket and free system resources and move on to next |
| 441 // message. |
| 442 const intptr_t old_mask = di->Mask(); |
| 443 Dart_Port port = msg->dart_port; |
| 444 di->RemovePort(port); |
| 445 const intptr_t new_mask = di->Mask(); |
| 446 UpdatePort(old_mask, di); |
| 447 |
| 448 LOG_INFO("\tCLOSE: %ld: %lx -> %lx\n", fd, old_mask, new_mask); |
| 449 if (di->IsListeningSocket()) { |
| 450 // We only close the socket file descriptor from the operating |
| 451 // system if there are no other dart socket objects which |
| 452 // are listening on the same (address, port) combination. |
| 453 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance(); |
| 454 |
| 455 MutexLocker locker(registry->mutex()); |
| 456 |
| 457 if (registry->CloseSafe(socket)) { |
| 458 ASSERT(new_mask == 0); |
| 459 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
| 460 di->Close(); |
| 461 delete di; |
| 462 socket->SetClosedFd(); |
| 463 } |
279 } else { | 464 } else { |
280 ASSERT((msg[i].data & COMMAND_MASK) != 0); | 465 ASSERT(new_mask == 0); |
281 LOG_INFO("HandleInterruptFd command\n"); | 466 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
282 Socket* socket = reinterpret_cast<Socket*>(msg[i].id); | 467 di->Close(); |
283 RefCntReleaseScope<Socket> rs(socket); | 468 delete di; |
284 if (socket->fd() == -1) { | 469 socket->SetClosedFd(); |
285 continue; | 470 } |
286 } | 471 if (port != 0) { |
287 DescriptorInfo* di = | 472 const bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent); |
288 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data)); | 473 if (!success) { |
289 if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) { | 474 LOG_ERR("Failed to post destroy event to port %ld\n", port); |
290 ASSERT(!di->IsListeningSocket()); | |
291 // Close the socket for reading. | |
292 LOG_INFO("\tSHUT_RD: %d\n", di->fd()); | |
293 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD)); | |
294 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) { | |
295 ASSERT(!di->IsListeningSocket()); | |
296 // Close the socket for writing. | |
297 LOG_INFO("\tSHUT_WR: %d\n", di->fd()); | |
298 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR)); | |
299 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) { | |
300 // Close the socket and free system resources and move on to next | |
301 // message. | |
302 intptr_t old_mask = di->Mask(); | |
303 Dart_Port port = msg[i].dart_port; | |
304 di->RemovePort(port); | |
305 intptr_t new_mask = di->Mask(); | |
306 UpdateEpollInstance(old_mask, di); | |
307 | |
308 LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask); | |
309 intptr_t fd = di->fd(); | |
310 if (di->IsListeningSocket()) { | |
311 // We only close the socket file descriptor from the operating | |
312 // system if there are no other dart socket objects which | |
313 // are listening on the same (address, port) combination. | |
314 ListeningSocketRegistry* registry = | |
315 ListeningSocketRegistry::Instance(); | |
316 | |
317 MutexLocker locker(registry->mutex()); | |
318 | |
319 if (registry->CloseSafe(socket)) { | |
320 ASSERT(new_mask == 0); | |
321 socket_map_.Remove(GetHashmapKeyFromFd(fd), | |
322 GetHashmapHashFromFd(fd)); | |
323 di->Close(); | |
324 LOG_INFO("Closed %d\n", di->fd()); | |
325 delete di; | |
326 socket->SetClosedFd(); | |
327 } | |
328 } else { | |
329 ASSERT(new_mask == 0); | |
330 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); | |
331 di->Close(); | |
332 LOG_INFO("Closed %d\n", di->fd()); | |
333 delete di; | |
334 socket->SetClosedFd(); | |
335 } | |
336 | |
337 bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent); | |
338 if (!success) { | |
339 LOG_ERR("Failed to post destroy event to port %ld", port); | |
340 } | |
341 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) { | |
342 int count = TOKEN_COUNT(msg[i].data); | |
343 intptr_t old_mask = di->Mask(); | |
344 LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask); | |
345 di->ReturnTokens(msg[i].dart_port, count); | |
346 UpdateEpollInstance(old_mask, di); | |
347 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) { | |
348 // `events` can only have kInEvent/kOutEvent flags set. | |
349 intptr_t events = msg[i].data & EVENT_MASK; | |
350 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); | |
351 | |
352 intptr_t old_mask = di->Mask(); | |
353 LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", di->fd(), old_mask, | |
354 msg[i].data & EVENT_MASK); | |
355 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK); | |
356 UpdateEpollInstance(old_mask, di); | |
357 } else { | |
358 UNREACHABLE(); | |
359 } | 475 } |
360 } | 476 } |
361 } | 477 } else if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
362 LOG_INFO("HandleInterruptFd exit\n"); | 478 const int count = TOKEN_COUNT(msg->data); |
363 } | 479 const intptr_t old_mask = di->Mask(); |
| 480 LOG_INFO("\t Return Token: %ld: %lx\n", fd, old_mask); |
| 481 di->ReturnTokens(msg->dart_port, count); |
| 482 UpdatePort(old_mask, di); |
| 483 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { |
| 484 // `events` can only have kInEvent/kOutEvent flags set. |
| 485 const intptr_t events = msg->data & EVENT_MASK; |
| 486 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
364 | 487 |
365 | 488 const intptr_t old_mask = di->Mask(); |
366 intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, | 489 LOG_INFO("\t Set Event Mask: %ld: %lx %lx\n", fd, old_mask, |
367 DescriptorInfo* di) { | 490 msg->data & EVENT_MASK); |
368 #ifdef EVENTHANDLER_LOGGING | 491 di->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK); |
369 PrintEventMask(di->fd(), events); | 492 UpdatePort(old_mask, di); |
370 #endif | 493 } else { |
371 if ((events & EPOLLERR) != 0) { | 494 UNREACHABLE(); |
372 // Return error only if EPOLLIN is present. | |
373 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0; | |
374 } | |
375 intptr_t event_mask = 0; | |
376 if ((events & EPOLLIN) != 0) { | |
377 event_mask |= (1 << kInEvent); | |
378 } | |
379 if ((events & EPOLLOUT) != 0) { | |
380 event_mask |= (1 << kOutEvent); | |
381 } | |
382 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) { | |
383 event_mask |= (1 << kCloseEvent); | |
384 } | |
385 return event_mask; | |
386 } | |
387 | |
388 | |
389 void EventHandlerImplementation::HandleEvents(struct epoll_event* events, | |
390 int size) { | |
391 bool interrupt_seen = false; | |
392 for (int i = 0; i < size; i++) { | |
393 if (events[i].data.ptr == NULL) { | |
394 interrupt_seen = true; | |
395 } else { | |
396 DescriptorInfo* di = | |
397 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr); | |
398 const intptr_t old_mask = di->Mask(); | |
399 const intptr_t event_mask = GetPollEvents(events[i].events, di); | |
400 LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask); | |
401 if ((event_mask & (1 << kErrorEvent)) != 0) { | |
402 di->NotifyAllDartPorts(event_mask); | |
403 UpdateEpollInstance(old_mask, di); | |
404 } else if (event_mask != 0) { | |
405 Dart_Port port = di->NextNotifyDartPort(event_mask); | |
406 ASSERT(port != 0); | |
407 UpdateEpollInstance(old_mask, di); | |
408 LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", event_mask, | |
409 port, di->fd()); | |
410 bool success = DartUtils::PostInt32(port, event_mask); | |
411 if (!success) { | |
412 // This can happen if e.g. the isolate that owns the port has died | |
413 // for some reason. | |
414 LOG_ERR("Failed to post event for fd %ld to port %ld", di->fd(), | |
415 port); | |
416 } | |
417 } | |
418 } | |
419 } | |
420 if (interrupt_seen) { | |
421 // Handle after socket events, so we avoid closing a socket before we handle | |
422 // the current events. | |
423 HandleInterruptFd(); | |
424 } | 495 } |
425 } | 496 } |
426 | 497 |
427 | 498 |
| 499 void EventHandlerImplementation::HandlePacket(mx_port_packet_t* pkt) { |
| 500 LOG_INFO("HandlePacket: Got event packet: key=%lx\n", pkt->key); |
| 501 LOG_INFO("HandlePacket: Got event packet: type=%lx\n", pkt->type); |
| 502 LOG_INFO("HandlePacket: Got event packet: status=%ld\n", pkt->status); |
| 503 if (pkt->type == MX_PKT_TYPE_USER) { |
| 504 ASSERT(pkt->key == kInterruptPacketKey); |
| 505 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt->user); |
| 506 HandleInterrupt(msg); |
| 507 return; |
| 508 } |
| 509 LOG_INFO("HandlePacket: Got event packet: observed = %lx\n", |
| 510 pkt->signal.observed); |
| 511 LOG_INFO("HandlePacket: Got event packet: count = %ld\n", pkt->signal.count); |
| 512 |
| 513 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(pkt->key); |
| 514 mx_signals_t observed = pkt->signal.observed; |
| 515 const intptr_t old_mask = di->Mask(); |
| 516 const uint32_t epoll_event = di->io_handle()->WaitEnd(observed); |
| 517 intptr_t event_mask = IOHandle::EpollEventsToMask(epoll_event); |
| 518 if ((event_mask & (1 << kErrorEvent)) != 0) { |
| 519 di->NotifyAllDartPorts(event_mask); |
| 520 } else if (event_mask != 0) { |
| 521 event_mask = di->io_handle()->ToggleEvents(event_mask); |
| 522 if (event_mask != 0) { |
| 523 Dart_Port port = di->NextNotifyDartPort(event_mask); |
| 524 ASSERT(port != 0); |
| 525 bool success = DartUtils::PostInt32(port, event_mask); |
| 526 if (!success) { |
| 527 // This can happen if e.g. the isolate that owns the port has died |
| 528 // for some reason. |
| 529 LOG_ERR("Failed to post event to port %ld\n", port); |
| 530 } |
| 531 } |
| 532 } |
| 533 UpdatePort(old_mask, di); |
| 534 } |
| 535 |
| 536 |
428 int64_t EventHandlerImplementation::GetTimeout() const { | 537 int64_t EventHandlerImplementation::GetTimeout() const { |
429 if (!timeout_queue_.HasTimeout()) { | 538 if (!timeout_queue_.HasTimeout()) { |
430 return kInfinityTimeout; | 539 return kInfinityTimeout; |
431 } | 540 } |
432 int64_t millis = | 541 int64_t millis = |
433 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); | 542 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); |
434 return (millis < 0) ? 0 : millis; | 543 return (millis < 0) ? 0 : millis; |
435 } | 544 } |
436 | 545 |
437 | 546 |
438 void EventHandlerImplementation::HandleTimeout() { | 547 void EventHandlerImplementation::HandleTimeout() { |
439 if (timeout_queue_.HasTimeout()) { | 548 if (timeout_queue_.HasTimeout()) { |
440 int64_t millis = timeout_queue_.CurrentTimeout() - | 549 int64_t millis = timeout_queue_.CurrentTimeout() - |
441 TimerUtils::GetCurrentMonotonicMillis(); | 550 TimerUtils::GetCurrentMonotonicMillis(); |
442 if (millis <= 0) { | 551 if (millis <= 0) { |
443 DartUtils::PostNull(timeout_queue_.CurrentPort()); | 552 DartUtils::PostNull(timeout_queue_.CurrentPort()); |
444 timeout_queue_.RemoveCurrent(); | 553 timeout_queue_.RemoveCurrent(); |
445 } | 554 } |
446 } | 555 } |
447 } | 556 } |
448 | 557 |
449 | 558 |
450 void EventHandlerImplementation::Poll(uword args) { | 559 void EventHandlerImplementation::Poll(uword args) { |
451 static const intptr_t kMaxEvents = 16; | |
452 struct epoll_event events[kMaxEvents]; | |
453 EventHandler* handler = reinterpret_cast<EventHandler*>(args); | 560 EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
454 EventHandlerImplementation* handler_impl = &handler->delegate_; | 561 EventHandlerImplementation* handler_impl = &handler->delegate_; |
455 ASSERT(handler_impl != NULL); | 562 ASSERT(handler_impl != NULL); |
456 | 563 |
| 564 mx_port_packet_t pkt; |
457 while (!handler_impl->shutdown_) { | 565 while (!handler_impl->shutdown_) { |
458 int64_t millis = handler_impl->GetTimeout(); | 566 int64_t millis = handler_impl->GetTimeout(); |
459 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); | 567 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); |
460 // TODO(US-109): When the epoll implementation is properly edge-triggered, | 568 |
461 // remove this sleep, which prevents the message queue from being | 569 LOG_INFO("mx_port_wait(millis = %ld)\n", millis); |
462 // overwhelmed and leading to memory exhaustion. | 570 mx_status_t status = mx_port_wait(handler_impl->port_handle_, |
463 usleep(5000); | 571 millis == kInfinityTimeout |
464 LOG_INFO("epoll_wait(millis = %ld)\n", millis); | 572 ? MX_TIME_INFINITE |
465 intptr_t result = NO_RETRY_EXPECTED( | 573 : mx_deadline_after(MX_MSEC(millis)), |
466 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis)); | 574 reinterpret_cast<void*>(&pkt), 0); |
467 ASSERT(EAGAIN == EWOULDBLOCK); | 575 if (status == MX_ERR_TIMED_OUT) { |
468 LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result); | 576 handler_impl->HandleTimeout(); |
469 if (result < 0) { | 577 } else if (status != MX_OK) { |
470 if (errno != EWOULDBLOCK) { | 578 FATAL1("mx_port_wait failed: %s\n", mx_status_get_string(status)); |
471 perror("Poll failed"); | |
472 } | |
473 } else { | 579 } else { |
474 handler_impl->HandleTimeout(); | 580 handler_impl->HandleTimeout(); |
475 handler_impl->HandleEvents(events, result); | 581 handler_impl->HandlePacket(&pkt); |
476 } | 582 } |
477 } | 583 } |
478 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0); | 584 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0); |
479 handler->NotifyShutdownDone(); | 585 handler->NotifyShutdownDone(); |
480 } | 586 } |
481 | 587 |
482 | 588 |
483 void EventHandlerImplementation::Start(EventHandler* handler) { | 589 void EventHandlerImplementation::Start(EventHandler* handler) { |
484 int result = Thread::Start(&EventHandlerImplementation::Poll, | 590 int result = Thread::Start(&EventHandlerImplementation::Poll, |
485 reinterpret_cast<uword>(handler)); | 591 reinterpret_cast<uword>(handler)); |
(...skipping 24 matching lines...) Expand all Loading... |
510 // The hashmap does not support keys with value 0. | 616 // The hashmap does not support keys with value 0. |
511 return dart::Utils::WordHash(fd + 1); | 617 return dart::Utils::WordHash(fd + 1); |
512 } | 618 } |
513 | 619 |
514 } // namespace bin | 620 } // namespace bin |
515 } // namespace dart | 621 } // namespace dart |
516 | 622 |
517 #endif // defined(HOST_OS_FUCHSIA) | 623 #endif // defined(HOST_OS_FUCHSIA) |
518 | 624 |
519 #endif // !defined(DART_IO_DISABLED) | 625 #endif // !defined(DART_IO_DISABLED) |
OLD | NEW |