Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(677)

Side by Side Diff: runtime/bin/eventhandler_fuchsia.cc

Issue 2910853002: [Fuchsia] EventHandler: epoll -> ports v2 (Closed)
Patch Set: Format Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #if !defined(DART_IO_DISABLED) 5 #if !defined(DART_IO_DISABLED)
6 6
7 #include "platform/globals.h" 7 #include "platform/globals.h"
8 #if defined(HOST_OS_FUCHSIA) 8 #if defined(HOST_OS_FUCHSIA)
9 9
10 #include "bin/eventhandler.h" 10 #include "bin/eventhandler.h"
11 #include "bin/eventhandler_fuchsia.h" 11 #include "bin/eventhandler_fuchsia.h"
12 12
13 #include <errno.h> // NOLINT 13 #include <errno.h>
14 #include <fcntl.h> // NOLINT 14 #include <fcntl.h>
15 #include <pthread.h> // NOLINT 15 #include <magenta/status.h>
16 #include <stdio.h> // NOLINT 16 #include <magenta/syscalls.h>
17 #include <string.h> // NOLINT 17 #include <magenta/syscalls/object.h>
18 #include <sys/epoll.h> // NOLINT 18 #include <magenta/syscalls/port.h>
19 #include <sys/stat.h> // NOLINT 19 #include <mxio/private.h>
20 #include <unistd.h> // NOLINT 20 #include <pthread.h>
21 #include <stdio.h>
22 #include <string.h>
23 #include <sys/epoll.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
21 27
22 #include "bin/fdutils.h" 28 #include "bin/fdutils.h"
23 #include "bin/lockers.h" 29 #include "bin/lockers.h"
24 #include "bin/log.h" 30 #include "bin/log.h"
25 #include "bin/socket.h" 31 #include "bin/socket.h"
26 #include "bin/thread.h" 32 #include "bin/thread.h"
27 #include "bin/utils.h" 33 #include "bin/utils.h"
28 #include "platform/hashmap.h" 34 #include "platform/hashmap.h"
29 #include "platform/utils.h" 35 #include "platform/utils.h"
30 36
31 // #define EVENTHANDLER_LOGGING 1 37 // The EventHandler for Fuchsia uses its "ports v2" API:
32 #if defined(EVENTHANDLER_LOGGING) 38 // https://fuchsia.googlesource.com/magenta/+/HEAD/docs/syscalls/port_create.md
33 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) 39 // This API does not have epoll()-like edge triggering (EPOLLET). Since clients
34 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) 40 // of the EventHandler expect edge-triggered notifications, we must simulate it.
41 // When a packet from mx_port_wait() indicates that a signal is asserted for a
42 // handle, we unsubscribe from that signal until the event that asserted the
43 // signal can be processed. For example:
44 //
45 // 1. We get MX_SOCKET_WRITABLE from mx_port_wait() for a handle.
46 // 2. We send kOutEvent to the Dart thread.
47 // 3. We unsubscribe from further MX_SOCKET_WRITABLE signals for the handle.
48 // 4. Some time later the Dart thread actually does a write().
49 // 5. After writing, the Dart thread manually asserts SYNTHETIC_WRITE_SIGNAL
50 // on the handle.
51 // 6. The EventHandler thread gets SYNTHETIC_WRITE_SIGNAL from mx_port_wait()
52 // for the handle.
53 // 7. The EventHandler thread re-subscribes to MX_SOCKET_WRITABLE, and sends
54 // another kOutEvent to the Dart thread if it happens to be asserted.
55 //
56 // We use he same procedure for MX_SOCKET_READABLE, SYNTHETIC_READ_SIGNAL, and
57 // read()/accept().
58
59 // define EVENTHANDLER_LOG_ERROR to get log messages only for errors.
60 // define EVENTHANDLER_LOG_INFO to get log messages for both information and
61 // errors.
62 // #define EVENTHANDLER_LOG_INFO 1
63 #define EVENTHANDLER_LOG_ERROR 1
64 #if defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
65 #define LOG_ERR(msg, ...) \
66 { \
67 int err = errno; \
68 Log::PrintErr("Dart EventHandler ERROR: %s:%d: " msg, __FILE__, __LINE__, \
69 ##__VA_ARGS__); \
70 errno = err; \
71 }
72 #if defined(EVENTHANDLER_LOG_INFO)
73 #define LOG_INFO(msg, ...) \
74 Log::Print("Dart EventHandler INFO: %s:%d: " msg, __FILE__, __LINE__, \
75 ##__VA_ARGS__)
76 #else
77 #define LOG_INFO(msg, ...)
78 #endif // defined(EVENTHANDLER_LOG_INFO)
35 #else 79 #else
36 #define LOG_ERR(msg, ...) 80 #define LOG_ERR(msg, ...)
37 #define LOG_INFO(msg, ...) 81 #define LOG_INFO(msg, ...)
38 #endif // defined(EVENTHANDLER_LOGGING) 82 #endif // defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
39 83
40 namespace dart { 84 namespace dart {
41 namespace bin { 85 namespace bin {
42 86
43 #if defined(EVENTHANDLER_LOGGING) 87 #define SYNTHETIC_READ_SIGNAL MX_USER_SIGNAL_6
44 static void PrintEventMask(intptr_t fd, intptr_t events) { 88 #define SYNTHETIC_WRITE_SIGNAL MX_USER_SIGNAL_7
45 Log::PrintErr("%d ", fd); 89
46 if ((events & EPOLLIN) != 0) { 90 intptr_t IOHandle::Read(void* buffer, intptr_t num_bytes) {
47 Log::PrintErr("EPOLLIN "); 91 MutexLocker ml(mutex_);
48 } 92 const ssize_t read_bytes = NO_RETRY_EXPECTED(read(fd_, buffer, num_bytes));
49 if ((events & EPOLLPRI) != 0) { 93 LOG_INFO("IOHandle::Read: fd = %ld. read %ld bytes\n", fd_, read_bytes);
50 Log::PrintErr("EPOLLPRI "); 94
51 } 95 read_events_enabled_ = true;
52 if ((events & EPOLLOUT) != 0) { 96 mx_status_t status = mx_object_signal(handle_, 0, SYNTHETIC_READ_SIGNAL);
53 Log::PrintErr("EPOLLOUT "); 97 if (status != MX_OK) {
54 } 98 LOG_ERR("Failed to send user signal to handle: %s\n",
55 if ((events & EPOLLERR) != 0) { 99 mx_status_get_string(status));
56 Log::PrintErr("EPOLLERR "); 100 }
57 } 101
58 if ((events & EPOLLHUP) != 0) { 102 return read_bytes;
59 Log::PrintErr("EPOLLHUP "); 103 }
60 } 104
61 if ((events & EPOLLRDHUP) != 0) { 105
62 Log::PrintErr("EPOLLRDHUP "); 106 intptr_t IOHandle::Write(const void* buffer, intptr_t num_bytes) {
63 } 107 MutexLocker ml(mutex_);
64 int all_events = 108 const ssize_t written_bytes =
65 EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP; 109 NO_RETRY_EXPECTED(write(fd_, buffer, num_bytes));
66 if ((events & ~all_events) != 0) { 110 LOG_INFO("IOHandle::Write: fd = %ld. wrote %ld bytes\n", fd_, written_bytes);
67 Log::PrintErr("(and %08x) ", events & ~all_events); 111
68 } 112 // Assert a signal on the handle indicating that a write() completed.
69 113 write_events_enabled_ = true;
70 Log::PrintErr("\n"); 114 mx_status_t status = mx_object_signal(handle_, 0, SYNTHETIC_WRITE_SIGNAL);
71 } 115 if (status != MX_OK) {
72 #endif 116 LOG_ERR("Failed to send user signal to handle: %s\n",
73 117 mx_status_get_string(status));
74 118 }
75 intptr_t DescriptorInfo::GetPollEvents() { 119
120 return written_bytes;
121 }
122
123
124 intptr_t IOHandle::Accept(struct sockaddr* addr, socklen_t* addrlen) {
125 MutexLocker ml(mutex_);
126 intptr_t socket = NO_RETRY_EXPECTED(accept(fd_, addr, addrlen));
127 LOG_INFO("IOHandle::Accept: fd = %ld. socket = %ld\n", fd_, socket);
128
129 read_events_enabled_ = true;
130 mx_status_t status = mx_object_signal(handle_, 0, SYNTHETIC_READ_SIGNAL);
131 if (status != MX_OK) {
132 LOG_ERR("Failed to send user signal to handle: %s\n",
133 mx_status_get_string(status));
134 }
135
136 return socket;
137 }
138
139
140 void IOHandle::Close() {
141 MutexLocker ml(mutex_);
142 VOID_NO_RETRY_EXPECTED(close(fd_));
143 }
144
145
146 uint32_t IOHandle::MaskToEpollEvents(intptr_t mask) {
147 MutexLocker ml(mutex_);
76 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are 148 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
77 // triggered anyway. 149 // triggered anyway.
78 intptr_t events = 0; 150 uint32_t events = EPOLLRDHUP;
79 if ((Mask() & (1 << kInEvent)) != 0) { 151 if (read_events_enabled_ && ((mask & (1 << kInEvent)) != 0)) {
80 events |= EPOLLIN; 152 events |= EPOLLIN;
81 } 153 }
82 if ((Mask() & (1 << kOutEvent)) != 0) { 154 if (write_events_enabled_ && ((mask & (1 << kOutEvent)) != 0)) {
83 events |= EPOLLOUT; 155 events |= EPOLLOUT;
84 } 156 }
85 return events; 157 return events;
86 } 158 }
87 159
88 160
89 // Unregister the file descriptor for a DescriptorInfo structure with 161 intptr_t IOHandle::EpollEventsToMask(intptr_t events) {
90 // epoll. 162 if ((events & EPOLLERR) != 0) {
91 static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { 163 // Return error only if EPOLLIN is present.
92 LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd()); 164 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
93 VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL)); 165 }
94 } 166 intptr_t event_mask = 0;
95 167 if ((events & EPOLLIN) != 0) {
96 168 event_mask |= (1 << kInEvent);
97 static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { 169 }
98 struct epoll_event event; 170 if ((events & EPOLLOUT) != 0) {
99 event.events = EPOLLRDHUP | di->GetPollEvents(); 171 event_mask |= (1 << kOutEvent);
100 if (!di->IsListeningSocket()) { 172 }
101 event.events |= EPOLLET; 173 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
102 } 174 event_mask |= (1 << kCloseEvent);
103 event.data.ptr = di; 175 }
104 LOG_INFO("AddToEpollInstance: fd = %ld\n", di->fd()); 176 return event_mask;
105 int status = 177 }
106 NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event)); 178
107 LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", di->fd(), status); 179
108 #if defined(EVENTHANDLER_LOGGING) 180 bool IOHandle::AsyncWait(mx_handle_t port, uint32_t events, uint64_t key) {
109 PrintEventMask(di->fd(), event.events); 181 MutexLocker ml(mutex_);
110 #endif 182 LOG_INFO("IOHandle::AsyncWait: fd = %ld\n", fd_);
111 if (status == -1) { 183 // The call to __mxio_fd_to_io() in the DescriptorInfo constructor may have
112 // TODO(dart:io): Verify that the dart end is handling this correctly. 184 // returned NULL. If it did, propagate the problem up to Dart.
113 185 if (mxio_ == NULL) {
114 // Epoll does not accept the file descriptor. It could be due to 186 LOG_ERR("__mxio_fd_to_io(%d) returned NULL\n", fd_);
115 // already closed file descriptor, or unuspported devices, such 187 return false;
116 // as /dev/null. In such case, mark the file descriptor as closed, 188 }
117 // so dart will handle it accordingly. 189
190 mx_handle_t handle;
191 mx_signals_t signals;
192 __mxio_wait_begin(mxio_, events, &handle, &signals);
193 if (handle == MX_HANDLE_INVALID) {
194 LOG_ERR("fd = %ld __mxio_wait_begin returned an invalid handle\n", fd_);
195 return false;
196 }
197
198 // These signals are used to wake us up if we should start listening for
199 // the real readable and writable signals again.
200 signals |= SYNTHETIC_READ_SIGNAL | SYNTHETIC_WRITE_SIGNAL;
201
202 handle_ = handle;
203 LOG_INFO("mx_object_wait_async(fd = %ld, signals = %x)\n", fd_, signals);
204 mx_status_t status =
205 mx_object_wait_async(handle_, port, key, signals, MX_WAIT_ASYNC_ONCE);
206 if (status != MX_OK) {
207 LOG_ERR("mx_object_wait_async failed: %s\n", mx_status_get_string(status));
208 return false;
209 }
210
211 return true;
212 }
213
214
215 void IOHandle::CancelWait(mx_handle_t port, uint64_t key) {
216 MutexLocker ml(mutex_);
217 LOG_INFO("IOHandle::CancelWait: fd = %ld\n", fd_);
218 ASSERT(port != MX_HANDLE_INVALID);
219 ASSERT(handle_ != MX_HANDLE_INVALID);
220 mx_status_t status = mx_port_cancel(port, handle_, key);
221 if ((status != MX_OK) && (status != MX_ERR_NOT_FOUND)) {
222 LOG_ERR("mx_port_cancel failed: %s\n", mx_status_get_string(status));
223 }
224 }
225
226
227 mx_status_t IOHandle::ReadySignalsLocked(mx_signals_t* ready_signals) {
228 const mx_signals_t watched_signals = MX_SOCKET_READABLE | MX_SOCKET_WRITABLE;
229 mx_status_t status = mx_object_wait_one(handle_, watched_signals,
230 /* deadline */ 0, ready_signals);
231 if ((status != MX_OK) && (status != MX_ERR_TIMED_OUT)) {
232 LOG_ERR("mx_object_wait_one failed: %s\n", mx_status_get_string(status));
233 return status;
234 }
235 return MX_OK;
236 }
237
238
239 uint32_t IOHandle::WaitEnd(mx_signals_t observed) {
240 MutexLocker ml(mutex_);
241 const bool synthetic_read = (observed & SYNTHETIC_READ_SIGNAL) != 0;
242 const bool synthetic_write = (observed & SYNTHETIC_WRITE_SIGNAL) != 0;
243 if (synthetic_read || synthetic_write) {
244 mx_signals_t read_write_signals = MX_SIGNAL_NONE;
245 mx_status_t status = ReadySignalsLocked(&read_write_signals);
246 ASSERT(status == MX_OK);
247 const bool real_read = (read_write_signals & MX_SOCKET_READABLE) != 0;
248 const bool real_write = (read_write_signals & MX_SOCKET_WRITABLE) != 0;
249 if (synthetic_read) {
250 LOG_INFO("IOHandle::WaitEnd: fd = %ld got synthetic read signal\n", fd_);
251 observed |= (real_read ? MX_SOCKET_READABLE : 0);
252 observed &= ~SYNTHETIC_READ_SIGNAL;
253 status = mx_object_signal(handle_, SYNTHETIC_READ_SIGNAL, 0);
254 if (status != MX_OK) {
255 LOG_ERR("Failed to clear SYNTHETIC_READ_SIGNAL on fd=%ld\n", fd_);
256 }
257 }
258
259 if (synthetic_write) {
260 LOG_INFO("IOHandle::WaitEnd: fd = %ld got synthetic write signal\n", fd_);
261 observed |= (real_write ? MX_SOCKET_WRITABLE : 0);
262 observed &= ~SYNTHETIC_WRITE_SIGNAL;
263 status = mx_object_signal(handle_, SYNTHETIC_WRITE_SIGNAL, 0);
264 if (status != MX_OK) {
265 LOG_ERR("Failed to clear SYNTHETIC_WRITE_SIGNAL on fd=%ld\n", fd_);
266 }
267 }
268 }
269
270 uint32_t events = 0;
271 __mxio_wait_end(mxio_, observed, &events);
272 return events;
273 }
274
275
276 intptr_t IOHandle::ToggleEvents(intptr_t event_mask) {
277 MutexLocker ml(mutex_);
278 if (!write_events_enabled_) {
279 LOG_INFO("IOHandle::ToggleEvents: fd = %ld de-asserting write\n", fd_);
280 event_mask = event_mask & ~(1 << kOutEvent);
281 }
282 if ((event_mask & (1 << kOutEvent)) != 0) {
283 LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting write and disabling\n",
284 fd_);
285 write_events_enabled_ = false;
286 }
287 if (!read_events_enabled_) {
288 LOG_INFO("IOHandle::ToggleEvents: fd=%ld de-asserting read\n", fd_);
289 event_mask = event_mask & ~(1 << kInEvent);
290 }
291 if ((event_mask & (1 << kInEvent)) != 0) {
292 LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting read and disabling\n",
293 fd_);
294 read_events_enabled_ = false;
295 }
296 return event_mask;
297 }
298
299
300 void EventHandlerImplementation::AddToPort(mx_handle_t port_handle,
301 DescriptorInfo* di) {
302 const uint32_t events = di->io_handle()->MaskToEpollEvents(di->Mask());
303 const uint64_t key = reinterpret_cast<uint64_t>(di);
304 if (!di->io_handle()->AsyncWait(port_handle, events, key)) {
118 di->NotifyAllDartPorts(1 << kCloseEvent); 305 di->NotifyAllDartPorts(1 << kCloseEvent);
119 } 306 }
120 } 307 }
121 308
122 309
310 void EventHandlerImplementation::RemoveFromPort(mx_handle_t port_handle,
311 DescriptorInfo* di) {
312 const uint64_t key = reinterpret_cast<uint64_t>(di);
313 di->io_handle()->CancelWait(port_handle, key);
314 }
315
316
123 EventHandlerImplementation::EventHandlerImplementation() 317 EventHandlerImplementation::EventHandlerImplementation()
124 : socket_map_(&HashMap::SamePointerValue, 16) { 318 : socket_map_(&HashMap::SamePointerValue, 16) {
125 intptr_t result;
126 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
127 if (result != 0) {
128 FATAL("Pipe creation failed");
129 }
130 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
131 FATAL("Failed to set pipe fd non blocking\n");
132 }
133 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
134 FATAL("Failed to set pipe fd close on exec\n");
135 }
136 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
137 FATAL("Failed to set pipe fd close on exec\n");
138 }
139 shutdown_ = false; 319 shutdown_ = false;
140 // The initial size passed to epoll_create is ignore on newer (>= 320 // Create the port.
141 // 2.6.8) Linux versions 321 port_handle_ = MX_HANDLE_INVALID;
142 static const int kEpollInitialSize = 64; 322 mx_status_t status = mx_port_create(MX_PORT_OPT_V2, &port_handle_);
143 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize)); 323 if (status != MX_OK) {
144 if (epoll_fd_ == -1) { 324 // This is a FATAL because the VM won't work at all if we can't create this
145 FATAL1("Failed creating epoll file descriptor: %i", errno); 325 // port.
146 } 326 FATAL1("mx_port_create failed: %s\n", mx_status_get_string(status));
147 if (!FDUtils::SetCloseOnExec(epoll_fd_)) { 327 }
148 FATAL("Failed to set epoll fd close on exec\n"); 328 ASSERT(port_handle_ != MX_HANDLE_INVALID);
149 }
150 // Register the interrupt_fd with the epoll instance.
151 struct epoll_event event;
152 event.events = EPOLLIN;
153 event.data.ptr = NULL;
154 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", epoll_fd_);
155 int status = NO_RETRY_EXPECTED(
156 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event));
157 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n",
158 epoll_fd_, status);
159 if (status == -1) {
160 FATAL("Failed adding interrupt fd to epoll instance");
161 }
162 } 329 }
163 330
164 331
165 static void DeleteDescriptorInfo(void* info) { 332 static void DeleteDescriptorInfo(void* info) {
166 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info); 333 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
334 LOG_INFO("Closed %ld\n", di->io_handle()->fd());
167 di->Close(); 335 di->Close();
168 LOG_INFO("Closed %d\n", di->fd());
169 delete di; 336 delete di;
170 } 337 }
171 338
172 339
173 EventHandlerImplementation::~EventHandlerImplementation() { 340 EventHandlerImplementation::~EventHandlerImplementation() {
174 socket_map_.Clear(DeleteDescriptorInfo); 341 socket_map_.Clear(DeleteDescriptorInfo);
175 VOID_NO_RETRY_EXPECTED(close(epoll_fd_)); 342 mx_handle_close(port_handle_);
176 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0])); 343 port_handle_ = MX_HANDLE_INVALID;
177 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1])); 344 }
178 } 345
179 346
180 347 void EventHandlerImplementation::UpdatePort(intptr_t old_mask,
181 void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask, 348 DescriptorInfo* di) {
182 DescriptorInfo* di) { 349 const intptr_t new_mask = di->Mask();
183 intptr_t new_mask = di->Mask();
184 LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", di->fd(), old_mask,
185 new_mask);
186 if ((old_mask != 0) && (new_mask == 0)) { 350 if ((old_mask != 0) && (new_mask == 0)) {
187 RemoveFromEpollInstance(epoll_fd_, di); 351 RemoveFromPort(port_handle_, di);
188 } else if ((old_mask == 0) && (new_mask != 0)) { 352 } else if ((old_mask == 0) && (new_mask != 0)) {
189 AddToEpollInstance(epoll_fd_, di); 353 AddToPort(port_handle_, di);
190 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) { 354 } else if ((old_mask != 0) && (new_mask != 0)) {
191 ASSERT(!di->IsListeningSocket()); 355 ASSERT(!di->IsListeningSocket());
192 RemoveFromEpollInstance(epoll_fd_, di); 356 RemoveFromPort(port_handle_, di);
193 AddToEpollInstance(epoll_fd_, di); 357 AddToPort(port_handle_, di);
194 } 358 }
195 } 359 }
196 360
197 361
198 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( 362 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
199 intptr_t fd, 363 intptr_t fd,
200 bool is_listening) { 364 bool is_listening) {
201 ASSERT(fd >= 0); 365 IOHandle* handle = reinterpret_cast<IOHandle*>(fd);
202 HashMap::Entry* entry = socket_map_.Lookup(GetHashmapKeyFromFd(fd), 366 ASSERT(handle->fd() >= 0);
203 GetHashmapHashFromFd(fd), true); 367 HashMap::Entry* entry =
368 socket_map_.Lookup(GetHashmapKeyFromFd(handle->fd()),
369 GetHashmapHashFromFd(handle->fd()), true);
204 ASSERT(entry != NULL); 370 ASSERT(entry != NULL);
205 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value); 371 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
206 if (di == NULL) { 372 if (di == NULL) {
207 // If there is no data in the hash map for this file descriptor a 373 // If there is no data in the hash map for this file descriptor a
208 // new DescriptorInfo for the file descriptor is inserted. 374 // new DescriptorInfo for the file descriptor is inserted.
209 if (is_listening) { 375 if (is_listening) {
210 di = new DescriptorInfoMultiple(fd); 376 di = new DescriptorInfoMultiple(fd);
211 } else { 377 } else {
212 di = new DescriptorInfoSingle(fd); 378 di = new DescriptorInfoSingle(fd);
213 } 379 }
214 entry->value = di; 380 entry->value = di;
215 } 381 }
216 ASSERT(fd == di->fd()); 382 ASSERT(fd == di->fd());
217 return di; 383 return di;
218 } 384 }
219 385
220 386
221 static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) {
222 size_t remaining = count;
223 char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer));
224 while (remaining > 0) {
225 ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining));
226 if (bytes_written == 0) {
227 return count - remaining;
228 } else if (bytes_written == -1) {
229 ASSERT(EAGAIN == EWOULDBLOCK);
230 // Error code EWOULDBLOCK should only happen for non blocking
231 // file descriptors.
232 ASSERT(errno != EWOULDBLOCK);
233 return -1;
234 } else {
235 ASSERT(bytes_written > 0);
236 remaining -= bytes_written;
237 buffer_pos += bytes_written;
238 }
239 }
240 return count;
241 }
242
243
244 void EventHandlerImplementation::WakeupHandler(intptr_t id, 387 void EventHandlerImplementation::WakeupHandler(intptr_t id,
245 Dart_Port dart_port, 388 Dart_Port dart_port,
246 int64_t data) { 389 int64_t data) {
247 InterruptMessage msg; 390 COMPILE_ASSERT(sizeof(InterruptMessage) <= sizeof(mx_packet_user_t));
248 msg.id = id; 391 mx_port_packet_t pkt;
249 msg.dart_port = dart_port; 392 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt.user);
250 msg.data = data; 393 pkt.key = kInterruptPacketKey;
251 // WriteToBlocking will write up to 512 bytes atomically, and since our msg 394 msg->id = id;
252 // is smaller than 512, we don't need a thread lock. 395 msg->dart_port = dart_port;
253 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. 396 msg->data = data;
254 ASSERT(kInterruptMessageSize < PIPE_BUF); 397 mx_status_t status =
255 intptr_t result = 398 mx_port_queue(port_handle_, reinterpret_cast<void*>(&pkt), 0);
256 WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); 399 if (status != MX_OK) {
257 if (result != kInterruptMessageSize) { 400 // This is a FATAL because the VM won't work at all if we can't send any
258 if (result == -1) { 401 // messages to the EventHandler thread.
259 perror("Interrupt message failure:"); 402 FATAL1("mx_port_queue failed: %s\n", mx_status_get_string(status));
260 }
261 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
262 } 403 }
263 } 404 }
264 405
265 406
266 void EventHandlerImplementation::HandleInterruptFd() { 407 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
267 const intptr_t MAX_MESSAGES = kInterruptMessageSize; 408 if (msg->id == kTimerId) {
268 InterruptMessage msg[MAX_MESSAGES]; 409 LOG_INFO("HandleInterrupt read timer update\n");
269 ssize_t bytes = NO_RETRY_EXPECTED( 410 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
270 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); 411 return;
271 LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes); 412 } else if (msg->id == kShutdownId) {
272 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { 413 LOG_INFO("HandleInterrupt read shutdown\n");
273 if (msg[i].id == kTimerId) { 414 shutdown_ = true;
274 LOG_INFO("HandleInterruptFd read timer update\n"); 415 return;
275 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); 416 }
276 } else if (msg[i].id == kShutdownId) { 417 ASSERT((msg->data & COMMAND_MASK) != 0);
277 LOG_INFO("HandleInterruptFd read shutdown\n"); 418 LOG_INFO("HandleInterrupt command:\n");
278 shutdown_ = true; 419 Socket* socket = reinterpret_cast<Socket*>(msg->id);
420 RefCntReleaseScope<Socket> rs(socket);
421 if (socket->fd() == -1) {
422 return;
423 }
424 IOHandle* io_handle = reinterpret_cast<IOHandle*>(socket->fd());
425 const intptr_t fd = io_handle->fd();
426 DescriptorInfo* di =
427 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg->data));
428 ASSERT(io_handle == di->io_handle());
429 if (IS_COMMAND(msg->data, kShutdownReadCommand)) {
430 ASSERT(!di->IsListeningSocket());
431 // Close the socket for reading.
432 LOG_INFO("\tSHUT_RD: %ld\n", fd);
433 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_RD));
434 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) {
435 ASSERT(!di->IsListeningSocket());
436 // Close the socket for writing.
437 LOG_INFO("\tSHUT_WR: %ld\n", fd);
438 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_WR));
439 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
440 // Close the socket and free system resources and move on to next
441 // message.
442 const intptr_t old_mask = di->Mask();
443 Dart_Port port = msg->dart_port;
444 di->RemovePort(port);
445 const intptr_t new_mask = di->Mask();
446 UpdatePort(old_mask, di);
447
448 LOG_INFO("\tCLOSE: %ld: %lx -> %lx\n", fd, old_mask, new_mask);
449 if (di->IsListeningSocket()) {
450 // We only close the socket file descriptor from the operating
451 // system if there are no other dart socket objects which
452 // are listening on the same (address, port) combination.
453 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
454
455 MutexLocker locker(registry->mutex());
456
457 if (registry->CloseSafe(socket)) {
458 ASSERT(new_mask == 0);
459 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
460 di->Close();
461 delete di;
462 socket->SetClosedFd();
463 }
279 } else { 464 } else {
280 ASSERT((msg[i].data & COMMAND_MASK) != 0); 465 ASSERT(new_mask == 0);
281 LOG_INFO("HandleInterruptFd command\n"); 466 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
282 Socket* socket = reinterpret_cast<Socket*>(msg[i].id); 467 di->Close();
283 RefCntReleaseScope<Socket> rs(socket); 468 delete di;
284 if (socket->fd() == -1) { 469 socket->SetClosedFd();
285 continue; 470 }
286 } 471 if (port != 0) {
287 DescriptorInfo* di = 472 const bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent);
288 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data)); 473 if (!success) {
289 if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) { 474 LOG_ERR("Failed to post destroy event to port %ld\n", port);
290 ASSERT(!di->IsListeningSocket());
291 // Close the socket for reading.
292 LOG_INFO("\tSHUT_RD: %d\n", di->fd());
293 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
294 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
295 ASSERT(!di->IsListeningSocket());
296 // Close the socket for writing.
297 LOG_INFO("\tSHUT_WR: %d\n", di->fd());
298 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
299 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
300 // Close the socket and free system resources and move on to next
301 // message.
302 intptr_t old_mask = di->Mask();
303 Dart_Port port = msg[i].dart_port;
304 di->RemovePort(port);
305 intptr_t new_mask = di->Mask();
306 UpdateEpollInstance(old_mask, di);
307
308 LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask);
309 intptr_t fd = di->fd();
310 if (di->IsListeningSocket()) {
311 // We only close the socket file descriptor from the operating
312 // system if there are no other dart socket objects which
313 // are listening on the same (address, port) combination.
314 ListeningSocketRegistry* registry =
315 ListeningSocketRegistry::Instance();
316
317 MutexLocker locker(registry->mutex());
318
319 if (registry->CloseSafe(socket)) {
320 ASSERT(new_mask == 0);
321 socket_map_.Remove(GetHashmapKeyFromFd(fd),
322 GetHashmapHashFromFd(fd));
323 di->Close();
324 LOG_INFO("Closed %d\n", di->fd());
325 delete di;
326 socket->SetClosedFd();
327 }
328 } else {
329 ASSERT(new_mask == 0);
330 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
331 di->Close();
332 LOG_INFO("Closed %d\n", di->fd());
333 delete di;
334 socket->SetClosedFd();
335 }
336
337 bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent);
338 if (!success) {
339 LOG_ERR("Failed to post destroy event to port %ld", port);
340 }
341 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
342 int count = TOKEN_COUNT(msg[i].data);
343 intptr_t old_mask = di->Mask();
344 LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask);
345 di->ReturnTokens(msg[i].dart_port, count);
346 UpdateEpollInstance(old_mask, di);
347 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
348 // `events` can only have kInEvent/kOutEvent flags set.
349 intptr_t events = msg[i].data & EVENT_MASK;
350 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
351
352 intptr_t old_mask = di->Mask();
353 LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", di->fd(), old_mask,
354 msg[i].data & EVENT_MASK);
355 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
356 UpdateEpollInstance(old_mask, di);
357 } else {
358 UNREACHABLE();
359 } 475 }
360 } 476 }
361 } 477 } else if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
362 LOG_INFO("HandleInterruptFd exit\n"); 478 const int count = TOKEN_COUNT(msg->data);
363 } 479 const intptr_t old_mask = di->Mask();
480 LOG_INFO("\t Return Token: %ld: %lx\n", fd, old_mask);
481 di->ReturnTokens(msg->dart_port, count);
482 UpdatePort(old_mask, di);
483 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
484 // `events` can only have kInEvent/kOutEvent flags set.
485 const intptr_t events = msg->data & EVENT_MASK;
486 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
364 487
365 488 const intptr_t old_mask = di->Mask();
366 intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, 489 LOG_INFO("\t Set Event Mask: %ld: %lx %lx\n", fd, old_mask,
367 DescriptorInfo* di) { 490 msg->data & EVENT_MASK);
368 #ifdef EVENTHANDLER_LOGGING 491 di->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
369 PrintEventMask(di->fd(), events); 492 UpdatePort(old_mask, di);
370 #endif 493 } else {
371 if ((events & EPOLLERR) != 0) { 494 UNREACHABLE();
372 // Return error only if EPOLLIN is present.
373 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
374 }
375 intptr_t event_mask = 0;
376 if ((events & EPOLLIN) != 0) {
377 event_mask |= (1 << kInEvent);
378 }
379 if ((events & EPOLLOUT) != 0) {
380 event_mask |= (1 << kOutEvent);
381 }
382 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
383 event_mask |= (1 << kCloseEvent);
384 }
385 return event_mask;
386 }
387
388
389 void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
390 int size) {
391 bool interrupt_seen = false;
392 for (int i = 0; i < size; i++) {
393 if (events[i].data.ptr == NULL) {
394 interrupt_seen = true;
395 } else {
396 DescriptorInfo* di =
397 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
398 const intptr_t old_mask = di->Mask();
399 const intptr_t event_mask = GetPollEvents(events[i].events, di);
400 LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask);
401 if ((event_mask & (1 << kErrorEvent)) != 0) {
402 di->NotifyAllDartPorts(event_mask);
403 UpdateEpollInstance(old_mask, di);
404 } else if (event_mask != 0) {
405 Dart_Port port = di->NextNotifyDartPort(event_mask);
406 ASSERT(port != 0);
407 UpdateEpollInstance(old_mask, di);
408 LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", event_mask,
409 port, di->fd());
410 bool success = DartUtils::PostInt32(port, event_mask);
411 if (!success) {
412 // This can happen if e.g. the isolate that owns the port has died
413 // for some reason.
414 LOG_ERR("Failed to post event for fd %ld to port %ld", di->fd(),
415 port);
416 }
417 }
418 }
419 }
420 if (interrupt_seen) {
421 // Handle after socket events, so we avoid closing a socket before we handle
422 // the current events.
423 HandleInterruptFd();
424 } 495 }
425 } 496 }
426 497
427 498
499 void EventHandlerImplementation::HandlePacket(mx_port_packet_t* pkt) {
500 LOG_INFO("HandlePacket: Got event packet: key=%lx\n", pkt->key);
501 LOG_INFO("HandlePacket: Got event packet: type=%lx\n", pkt->type);
502 LOG_INFO("HandlePacket: Got event packet: status=%ld\n", pkt->status);
503 if (pkt->type == MX_PKT_TYPE_USER) {
504 ASSERT(pkt->key == kInterruptPacketKey);
505 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt->user);
506 HandleInterrupt(msg);
507 return;
508 }
509 LOG_INFO("HandlePacket: Got event packet: observed = %lx\n",
510 pkt->signal.observed);
511 LOG_INFO("HandlePacket: Got event packet: count = %ld\n", pkt->signal.count);
512
513 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(pkt->key);
514 mx_signals_t observed = pkt->signal.observed;
515 const intptr_t old_mask = di->Mask();
516 const uint32_t epoll_event = di->io_handle()->WaitEnd(observed);
517 intptr_t event_mask = IOHandle::EpollEventsToMask(epoll_event);
518 if ((event_mask & (1 << kErrorEvent)) != 0) {
519 di->NotifyAllDartPorts(event_mask);
520 } else if (event_mask != 0) {
521 event_mask = di->io_handle()->ToggleEvents(event_mask);
522 if (event_mask != 0) {
523 Dart_Port port = di->NextNotifyDartPort(event_mask);
524 ASSERT(port != 0);
525 bool success = DartUtils::PostInt32(port, event_mask);
526 if (!success) {
527 // This can happen if e.g. the isolate that owns the port has died
528 // for some reason.
529 LOG_ERR("Failed to post event to port %ld\n", port);
530 }
531 }
532 }
533 UpdatePort(old_mask, di);
534 }
535
536
428 int64_t EventHandlerImplementation::GetTimeout() const { 537 int64_t EventHandlerImplementation::GetTimeout() const {
429 if (!timeout_queue_.HasTimeout()) { 538 if (!timeout_queue_.HasTimeout()) {
430 return kInfinityTimeout; 539 return kInfinityTimeout;
431 } 540 }
432 int64_t millis = 541 int64_t millis =
433 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); 542 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
434 return (millis < 0) ? 0 : millis; 543 return (millis < 0) ? 0 : millis;
435 } 544 }
436 545
437 546
438 void EventHandlerImplementation::HandleTimeout() { 547 void EventHandlerImplementation::HandleTimeout() {
439 if (timeout_queue_.HasTimeout()) { 548 if (timeout_queue_.HasTimeout()) {
440 int64_t millis = timeout_queue_.CurrentTimeout() - 549 int64_t millis = timeout_queue_.CurrentTimeout() -
441 TimerUtils::GetCurrentMonotonicMillis(); 550 TimerUtils::GetCurrentMonotonicMillis();
442 if (millis <= 0) { 551 if (millis <= 0) {
443 DartUtils::PostNull(timeout_queue_.CurrentPort()); 552 DartUtils::PostNull(timeout_queue_.CurrentPort());
444 timeout_queue_.RemoveCurrent(); 553 timeout_queue_.RemoveCurrent();
445 } 554 }
446 } 555 }
447 } 556 }
448 557
449 558
450 void EventHandlerImplementation::Poll(uword args) { 559 void EventHandlerImplementation::Poll(uword args) {
451 static const intptr_t kMaxEvents = 16;
452 struct epoll_event events[kMaxEvents];
453 EventHandler* handler = reinterpret_cast<EventHandler*>(args); 560 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
454 EventHandlerImplementation* handler_impl = &handler->delegate_; 561 EventHandlerImplementation* handler_impl = &handler->delegate_;
455 ASSERT(handler_impl != NULL); 562 ASSERT(handler_impl != NULL);
456 563
564 mx_port_packet_t pkt;
457 while (!handler_impl->shutdown_) { 565 while (!handler_impl->shutdown_) {
458 int64_t millis = handler_impl->GetTimeout(); 566 int64_t millis = handler_impl->GetTimeout();
459 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); 567 ASSERT((millis == kInfinityTimeout) || (millis >= 0));
460 // TODO(US-109): When the epoll implementation is properly edge-triggered, 568
461 // remove this sleep, which prevents the message queue from being 569 LOG_INFO("mx_port_wait(millis = %ld)\n", millis);
462 // overwhelmed and leading to memory exhaustion. 570 mx_status_t status = mx_port_wait(handler_impl->port_handle_,
463 usleep(5000); 571 millis == kInfinityTimeout
464 LOG_INFO("epoll_wait(millis = %ld)\n", millis); 572 ? MX_TIME_INFINITE
465 intptr_t result = NO_RETRY_EXPECTED( 573 : mx_deadline_after(MX_MSEC(millis)),
466 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis)); 574 reinterpret_cast<void*>(&pkt), 0);
467 ASSERT(EAGAIN == EWOULDBLOCK); 575 if (status == MX_ERR_TIMED_OUT) {
468 LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result); 576 handler_impl->HandleTimeout();
469 if (result < 0) { 577 } else if (status != MX_OK) {
470 if (errno != EWOULDBLOCK) { 578 FATAL1("mx_port_wait failed: %s\n", mx_status_get_string(status));
471 perror("Poll failed");
472 }
473 } else { 579 } else {
474 handler_impl->HandleTimeout(); 580 handler_impl->HandleTimeout();
475 handler_impl->HandleEvents(events, result); 581 handler_impl->HandlePacket(&pkt);
476 } 582 }
477 } 583 }
478 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0); 584 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
479 handler->NotifyShutdownDone(); 585 handler->NotifyShutdownDone();
480 } 586 }
481 587
482 588
483 void EventHandlerImplementation::Start(EventHandler* handler) { 589 void EventHandlerImplementation::Start(EventHandler* handler) {
484 int result = Thread::Start(&EventHandlerImplementation::Poll, 590 int result = Thread::Start(&EventHandlerImplementation::Poll,
485 reinterpret_cast<uword>(handler)); 591 reinterpret_cast<uword>(handler));
(...skipping 24 matching lines...) Expand all
510 // The hashmap does not support keys with value 0. 616 // The hashmap does not support keys with value 0.
511 return dart::Utils::WordHash(fd + 1); 617 return dart::Utils::WordHash(fd + 1);
512 } 618 }
513 619
514 } // namespace bin 620 } // namespace bin
515 } // namespace dart 621 } // namespace dart
516 622
517 #endif // defined(HOST_OS_FUCHSIA) 623 #endif // defined(HOST_OS_FUCHSIA)
518 624
519 #endif // !defined(DART_IO_DISABLED) 625 #endif // !defined(DART_IO_DISABLED)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698