Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(602)

Side by Side Diff: runtime/bin/eventhandler_fuchsia.cc

Issue 2910853002: [Fuchsia] EventHandler: epoll -> ports v2 (Closed)
Patch Set: Fix typo Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « runtime/bin/eventhandler_fuchsia.h ('k') | runtime/bin/process_fuchsia.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #if !defined(DART_IO_DISABLED) 5 #if !defined(DART_IO_DISABLED)
6 6
7 #include "platform/globals.h" 7 #include "platform/globals.h"
8 #if defined(HOST_OS_FUCHSIA) 8 #if defined(HOST_OS_FUCHSIA)
9 9
10 #include "bin/eventhandler.h" 10 #include "bin/eventhandler.h"
11 #include "bin/eventhandler_fuchsia.h" 11 #include "bin/eventhandler_fuchsia.h"
12 12
13 #include <errno.h> // NOLINT 13 #include <errno.h>
14 #include <fcntl.h> // NOLINT 14 #include <fcntl.h>
15 #include <pthread.h> // NOLINT 15 #include <magenta/status.h>
16 #include <stdio.h> // NOLINT 16 #include <magenta/syscalls.h>
17 #include <string.h> // NOLINT 17 #include <magenta/syscalls/object.h>
18 #include <sys/epoll.h> // NOLINT 18 #include <magenta/syscalls/port.h>
19 #include <sys/stat.h> // NOLINT 19 #include <mxio/private.h>
20 #include <unistd.h> // NOLINT 20 #include <pthread.h>
21 #include <stdio.h>
22 #include <string.h>
23 #include <sys/epoll.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
21 27
22 #include "bin/fdutils.h" 28 #include "bin/fdutils.h"
23 #include "bin/lockers.h" 29 #include "bin/lockers.h"
24 #include "bin/log.h" 30 #include "bin/log.h"
25 #include "bin/socket.h" 31 #include "bin/socket.h"
26 #include "bin/thread.h" 32 #include "bin/thread.h"
27 #include "bin/utils.h" 33 #include "bin/utils.h"
28 #include "platform/hashmap.h" 34 #include "platform/hashmap.h"
29 #include "platform/utils.h" 35 #include "platform/utils.h"
30 36
31 // #define EVENTHANDLER_LOGGING 1 37 // The EventHandler for Fuchsia uses its "ports v2" API:
32 #if defined(EVENTHANDLER_LOGGING) 38 // https://fuchsia.googlesource.com/magenta/+/HEAD/docs/syscalls/port_create.md
33 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) 39 // This API does not have epoll()-like edge triggering (EPOLLET). Since clients
34 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) 40 // of the EventHandler expect edge-triggered notifications, we must simulate it.
41 // When a packet from mx_port_wait() indicates that a signal is asserted for a
42 // handle, we unsubscribe from that signal until the event that asserted the
43 // signal can be processed. For example:
44 //
45 // 1. We get MX_SOCKET_WRITABLE from mx_port_wait() for a handle.
46 // 2. We send kOutEvent to the Dart thread.
47 // 3. We unsubscribe from further MX_SOCKET_WRITABLE signals for the handle.
48 // 4. Some time later the Dart thread actually does a write().
49 // 5. After writing, the Dart thread resubscribes to write events.
50 //
51 // We use he same procedure for MX_SOCKET_READABLE, and read()/accept().
52
53 // define EVENTHANDLER_LOG_ERROR to get log messages only for errors.
54 // define EVENTHANDLER_LOG_INFO to get log messages for both information and
55 // errors.
56 // #define EVENTHANDLER_LOG_INFO 1
57 #define EVENTHANDLER_LOG_ERROR 1
58 #if defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
59 #define LOG_ERR(msg, ...) \
60 { \
61 int err = errno; \
62 Log::PrintErr("Dart EventHandler ERROR: %s:%d: " msg, __FILE__, __LINE__, \
63 ##__VA_ARGS__); \
64 errno = err; \
65 }
66 #if defined(EVENTHANDLER_LOG_INFO)
67 #define LOG_INFO(msg, ...) \
68 Log::Print("Dart EventHandler INFO: %s:%d: " msg, __FILE__, __LINE__, \
69 ##__VA_ARGS__)
70 #else
71 #define LOG_INFO(msg, ...)
72 #endif // defined(EVENTHANDLER_LOG_INFO)
35 #else 73 #else
36 #define LOG_ERR(msg, ...) 74 #define LOG_ERR(msg, ...)
37 #define LOG_INFO(msg, ...) 75 #define LOG_INFO(msg, ...)
38 #endif // defined(EVENTHANDLER_LOGGING) 76 #endif // defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
39 77
40 namespace dart { 78 namespace dart {
41 namespace bin { 79 namespace bin {
42 80
43 #if defined(EVENTHANDLER_LOGGING) 81 intptr_t IOHandle::Read(void* buffer, intptr_t num_bytes) {
44 static void PrintEventMask(intptr_t fd, intptr_t events) { 82 MutexLocker ml(mutex_);
45 Log::PrintErr("%d ", fd); 83 const ssize_t read_bytes = NO_RETRY_EXPECTED(read(fd_, buffer, num_bytes));
46 if ((events & EPOLLIN) != 0) { 84 const int err = errno;
47 Log::PrintErr("EPOLLIN "); 85 LOG_INFO("IOHandle::Read: fd = %ld. read %ld bytes\n", fd_, read_bytes);
48 } 86
49 if ((events & EPOLLPRI) != 0) { 87 // Resubscribe to read events. We resubscribe to events even if read() returns
50 Log::PrintErr("EPOLLPRI "); 88 // an error. The error might be, e.g. EWOULDBLOCK, in which case
51 } 89 // re-subscription is necessary. Logic in the caller decides which errors are
52 if ((events & EPOLLOUT) != 0) { 90 // real, and which are ignore-and-continue.
53 Log::PrintErr("EPOLLOUT "); 91 read_events_enabled_ = true;
54 } 92 if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLIN, wait_key_)) {
55 if ((events & EPOLLERR) != 0) { 93 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
56 Log::PrintErr("EPOLLERR "); 94 }
57 } 95
58 if ((events & EPOLLHUP) != 0) { 96 errno = err;
59 Log::PrintErr("EPOLLHUP "); 97 return read_bytes;
60 } 98 }
61 if ((events & EPOLLRDHUP) != 0) { 99
62 Log::PrintErr("EPOLLRDHUP "); 100
63 } 101 intptr_t IOHandle::Write(const void* buffer, intptr_t num_bytes) {
64 int all_events = 102 MutexLocker ml(mutex_);
65 EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP; 103 const ssize_t written_bytes =
66 if ((events & ~all_events) != 0) { 104 NO_RETRY_EXPECTED(write(fd_, buffer, num_bytes));
67 Log::PrintErr("(and %08x) ", events & ~all_events); 105 const int err = errno;
68 } 106 LOG_INFO("IOHandle::Write: fd = %ld. wrote %ld bytes\n", fd_, written_bytes);
69 107
70 Log::PrintErr("\n"); 108 // Resubscribe to write events.
71 } 109 write_events_enabled_ = true;
72 #endif 110 if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLOUT, wait_key_)) {
73 111 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
74 112 }
75 intptr_t DescriptorInfo::GetPollEvents() { 113
114 errno = err;
115 return written_bytes;
116 }
117
118
119 intptr_t IOHandle::Accept(struct sockaddr* addr, socklen_t* addrlen) {
120 MutexLocker ml(mutex_);
121 const intptr_t socket = NO_RETRY_EXPECTED(accept(fd_, addr, addrlen));
122 const int err = errno;
123 LOG_INFO("IOHandle::Accept: fd = %ld. socket = %ld\n", fd_, socket);
124
125 // Re-subscribe to read events.
126 read_events_enabled_ = true;
127 if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLIN, wait_key_)) {
128 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
129 }
130
131 errno = err;
132 return socket;
133 }
134
135
136 void IOHandle::Close() {
137 MutexLocker ml(mutex_);
138 VOID_NO_RETRY_EXPECTED(close(fd_));
139 }
140
141
142 uint32_t IOHandle::MaskToEpollEvents(intptr_t mask) {
143 MutexLocker ml(mutex_);
76 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are 144 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
77 // triggered anyway. 145 // triggered anyway.
78 intptr_t events = 0; 146 uint32_t events = EPOLLRDHUP;
79 if ((Mask() & (1 << kInEvent)) != 0) { 147 if (read_events_enabled_ && ((mask & (1 << kInEvent)) != 0)) {
80 events |= EPOLLIN; 148 events |= EPOLLIN;
81 } 149 }
82 if ((Mask() & (1 << kOutEvent)) != 0) { 150 if (write_events_enabled_ && ((mask & (1 << kOutEvent)) != 0)) {
83 events |= EPOLLOUT; 151 events |= EPOLLOUT;
84 } 152 }
85 return events; 153 return events;
86 } 154 }
87 155
88 156
89 // Unregister the file descriptor for a DescriptorInfo structure with 157 intptr_t IOHandle::EpollEventsToMask(intptr_t events) {
90 // epoll. 158 if ((events & EPOLLERR) != 0) {
91 static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { 159 // Return error only if EPOLLIN is present.
92 LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd()); 160 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
93 VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL)); 161 }
94 } 162 intptr_t event_mask = 0;
95 163 if ((events & EPOLLIN) != 0) {
96 164 event_mask |= (1 << kInEvent);
97 static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { 165 }
98 struct epoll_event event; 166 if ((events & EPOLLOUT) != 0) {
99 event.events = EPOLLRDHUP | di->GetPollEvents(); 167 event_mask |= (1 << kOutEvent);
100 if (!di->IsListeningSocket()) { 168 }
101 event.events |= EPOLLET; 169 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
102 } 170 event_mask |= (1 << kCloseEvent);
103 event.data.ptr = di; 171 }
104 LOG_INFO("AddToEpollInstance: fd = %ld\n", di->fd()); 172 return event_mask;
105 int status = 173 }
106 NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event)); 174
107 LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", di->fd(), status); 175
108 #if defined(EVENTHANDLER_LOGGING) 176 bool IOHandle::AsyncWaitLocked(mx_handle_t port,
109 PrintEventMask(di->fd(), event.events); 177 uint32_t events,
110 #endif 178 uint64_t key) {
111 if (status == -1) { 179 LOG_INFO("IOHandle::AsyncWait: fd = %ld\n", fd_);
112 // TODO(dart:io): Verify that the dart end is handling this correctly. 180 // The call to __mxio_fd_to_io() in the DescriptorInfo constructor may have
113 181 // returned NULL. If it did, propagate the problem up to Dart.
114 // Epoll does not accept the file descriptor. It could be due to 182 if (mxio_ == NULL) {
115 // already closed file descriptor, or unuspported devices, such 183 LOG_ERR("__mxio_fd_to_io(%d) returned NULL\n", fd_);
116 // as /dev/null. In such case, mark the file descriptor as closed, 184 return false;
117 // so dart will handle it accordingly. 185 }
186
187 mx_handle_t handle;
188 mx_signals_t signals;
189 __mxio_wait_begin(mxio_, events, &handle, &signals);
190 if (handle == MX_HANDLE_INVALID) {
191 LOG_ERR("fd = %ld __mxio_wait_begin returned an invalid handle\n", fd_);
192 return false;
193 }
194
195 // Remember the port. Use the remembered port if the argument "port" is
196 // MX_HANDLE_INVALID.
197 ASSERT((port != MX_HANDLE_INVALID) || (port_ != MX_HANDLE_INVALID));
198 if ((port_ == MX_HANDLE_INVALID) || (port != MX_HANDLE_INVALID)) {
199 port_ = port;
200 }
201
202 handle_ = handle;
203 wait_key_ = key;
204 LOG_INFO("mx_object_wait_async(fd = %ld, signals = %x)\n", fd_, signals);
205 mx_status_t status =
206 mx_object_wait_async(handle_, port_, key, signals, MX_WAIT_ASYNC_ONCE);
207 if (status != MX_OK) {
208 LOG_ERR("mx_object_wait_async failed: %s\n", mx_status_get_string(status));
209 return false;
210 }
211
212 return true;
213 }
214
215
216 bool IOHandle::AsyncWait(mx_handle_t port, uint32_t events, uint64_t key) {
217 MutexLocker ml(mutex_);
218 return AsyncWaitLocked(port, events, key);
219 }
220
221
222 void IOHandle::CancelWait(mx_handle_t port, uint64_t key) {
223 MutexLocker ml(mutex_);
224 LOG_INFO("IOHandle::CancelWait: fd = %ld\n", fd_);
225 ASSERT(port != MX_HANDLE_INVALID);
226 ASSERT(handle_ != MX_HANDLE_INVALID);
227 mx_status_t status = mx_port_cancel(port, handle_, key);
228 if ((status != MX_OK) && (status != MX_ERR_NOT_FOUND)) {
229 LOG_ERR("mx_port_cancel failed: %s\n", mx_status_get_string(status));
230 }
231 }
232
233
234 uint32_t IOHandle::WaitEnd(mx_signals_t observed) {
235 MutexLocker ml(mutex_);
236 uint32_t events = 0;
237 __mxio_wait_end(mxio_, observed, &events);
238 return events;
239 }
240
241
242 intptr_t IOHandle::ToggleEvents(intptr_t event_mask) {
243 MutexLocker ml(mutex_);
244 if (!write_events_enabled_) {
245 LOG_INFO("IOHandle::ToggleEvents: fd = %ld de-asserting write\n", fd_);
246 event_mask = event_mask & ~(1 << kOutEvent);
247 }
248 if ((event_mask & (1 << kOutEvent)) != 0) {
249 LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting write and disabling\n",
250 fd_);
251 write_events_enabled_ = false;
252 }
253 if (!read_events_enabled_) {
254 LOG_INFO("IOHandle::ToggleEvents: fd=%ld de-asserting read\n", fd_);
255 event_mask = event_mask & ~(1 << kInEvent);
256 }
257 if ((event_mask & (1 << kInEvent)) != 0) {
258 LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting read and disabling\n",
259 fd_);
260 read_events_enabled_ = false;
261 }
262 return event_mask;
263 }
264
265
266 void EventHandlerImplementation::AddToPort(mx_handle_t port_handle,
267 DescriptorInfo* di) {
268 const uint32_t events = di->io_handle()->MaskToEpollEvents(di->Mask());
269 const uint64_t key = reinterpret_cast<uint64_t>(di);
270 if (!di->io_handle()->AsyncWait(port_handle, events, key)) {
118 di->NotifyAllDartPorts(1 << kCloseEvent); 271 di->NotifyAllDartPorts(1 << kCloseEvent);
119 } 272 }
120 } 273 }
121 274
122 275
276 void EventHandlerImplementation::RemoveFromPort(mx_handle_t port_handle,
277 DescriptorInfo* di) {
278 const uint64_t key = reinterpret_cast<uint64_t>(di);
279 di->io_handle()->CancelWait(port_handle, key);
280 }
281
282
123 EventHandlerImplementation::EventHandlerImplementation() 283 EventHandlerImplementation::EventHandlerImplementation()
124 : socket_map_(&HashMap::SamePointerValue, 16) { 284 : socket_map_(&HashMap::SamePointerValue, 16) {
125 intptr_t result;
126 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
127 if (result != 0) {
128 FATAL("Pipe creation failed");
129 }
130 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
131 FATAL("Failed to set pipe fd non blocking\n");
132 }
133 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
134 FATAL("Failed to set pipe fd close on exec\n");
135 }
136 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
137 FATAL("Failed to set pipe fd close on exec\n");
138 }
139 shutdown_ = false; 285 shutdown_ = false;
140 // The initial size passed to epoll_create is ignore on newer (>= 286 // Create the port.
141 // 2.6.8) Linux versions 287 port_handle_ = MX_HANDLE_INVALID;
142 static const int kEpollInitialSize = 64; 288 mx_status_t status = mx_port_create(MX_PORT_OPT_V2, &port_handle_);
143 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize)); 289 if (status != MX_OK) {
144 if (epoll_fd_ == -1) { 290 // This is a FATAL because the VM won't work at all if we can't create this
145 FATAL1("Failed creating epoll file descriptor: %i", errno); 291 // port.
146 } 292 FATAL1("mx_port_create failed: %s\n", mx_status_get_string(status));
147 if (!FDUtils::SetCloseOnExec(epoll_fd_)) { 293 }
148 FATAL("Failed to set epoll fd close on exec\n"); 294 ASSERT(port_handle_ != MX_HANDLE_INVALID);
149 }
150 // Register the interrupt_fd with the epoll instance.
151 struct epoll_event event;
152 event.events = EPOLLIN;
153 event.data.ptr = NULL;
154 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", epoll_fd_);
155 int status = NO_RETRY_EXPECTED(
156 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event));
157 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n",
158 epoll_fd_, status);
159 if (status == -1) {
160 FATAL("Failed adding interrupt fd to epoll instance");
161 }
162 } 295 }
163 296
164 297
165 static void DeleteDescriptorInfo(void* info) { 298 static void DeleteDescriptorInfo(void* info) {
166 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info); 299 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
300 LOG_INFO("Closed %ld\n", di->io_handle()->fd());
167 di->Close(); 301 di->Close();
168 LOG_INFO("Closed %d\n", di->fd());
169 delete di; 302 delete di;
170 } 303 }
171 304
172 305
173 EventHandlerImplementation::~EventHandlerImplementation() { 306 EventHandlerImplementation::~EventHandlerImplementation() {
174 socket_map_.Clear(DeleteDescriptorInfo); 307 socket_map_.Clear(DeleteDescriptorInfo);
175 VOID_NO_RETRY_EXPECTED(close(epoll_fd_)); 308 mx_handle_close(port_handle_);
176 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0])); 309 port_handle_ = MX_HANDLE_INVALID;
177 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1])); 310 }
178 } 311
179 312
180 313 void EventHandlerImplementation::UpdatePort(intptr_t old_mask,
181 void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask, 314 DescriptorInfo* di) {
182 DescriptorInfo* di) { 315 const intptr_t new_mask = di->Mask();
183 intptr_t new_mask = di->Mask();
184 LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", di->fd(), old_mask,
185 new_mask);
186 if ((old_mask != 0) && (new_mask == 0)) { 316 if ((old_mask != 0) && (new_mask == 0)) {
187 RemoveFromEpollInstance(epoll_fd_, di); 317 RemoveFromPort(port_handle_, di);
188 } else if ((old_mask == 0) && (new_mask != 0)) { 318 } else if ((old_mask == 0) && (new_mask != 0)) {
189 AddToEpollInstance(epoll_fd_, di); 319 AddToPort(port_handle_, di);
190 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) { 320 } else if ((old_mask != 0) && (new_mask != 0)) {
191 ASSERT(!di->IsListeningSocket()); 321 ASSERT(!di->IsListeningSocket());
192 RemoveFromEpollInstance(epoll_fd_, di); 322 RemoveFromPort(port_handle_, di);
193 AddToEpollInstance(epoll_fd_, di); 323 AddToPort(port_handle_, di);
194 } 324 }
195 } 325 }
196 326
197 327
198 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( 328 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
199 intptr_t fd, 329 intptr_t fd,
200 bool is_listening) { 330 bool is_listening) {
201 ASSERT(fd >= 0); 331 IOHandle* handle = reinterpret_cast<IOHandle*>(fd);
202 HashMap::Entry* entry = socket_map_.Lookup(GetHashmapKeyFromFd(fd), 332 ASSERT(handle->fd() >= 0);
203 GetHashmapHashFromFd(fd), true); 333 HashMap::Entry* entry =
334 socket_map_.Lookup(GetHashmapKeyFromFd(handle->fd()),
335 GetHashmapHashFromFd(handle->fd()), true);
204 ASSERT(entry != NULL); 336 ASSERT(entry != NULL);
205 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value); 337 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
206 if (di == NULL) { 338 if (di == NULL) {
207 // If there is no data in the hash map for this file descriptor a 339 // If there is no data in the hash map for this file descriptor a
208 // new DescriptorInfo for the file descriptor is inserted. 340 // new DescriptorInfo for the file descriptor is inserted.
209 if (is_listening) { 341 if (is_listening) {
210 di = new DescriptorInfoMultiple(fd); 342 di = new DescriptorInfoMultiple(fd);
211 } else { 343 } else {
212 di = new DescriptorInfoSingle(fd); 344 di = new DescriptorInfoSingle(fd);
213 } 345 }
214 entry->value = di; 346 entry->value = di;
215 } 347 }
216 ASSERT(fd == di->fd()); 348 ASSERT(fd == di->fd());
217 return di; 349 return di;
218 } 350 }
219 351
220 352
221 static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) {
222 size_t remaining = count;
223 char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer));
224 while (remaining > 0) {
225 ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining));
226 if (bytes_written == 0) {
227 return count - remaining;
228 } else if (bytes_written == -1) {
229 ASSERT(EAGAIN == EWOULDBLOCK);
230 // Error code EWOULDBLOCK should only happen for non blocking
231 // file descriptors.
232 ASSERT(errno != EWOULDBLOCK);
233 return -1;
234 } else {
235 ASSERT(bytes_written > 0);
236 remaining -= bytes_written;
237 buffer_pos += bytes_written;
238 }
239 }
240 return count;
241 }
242
243
244 void EventHandlerImplementation::WakeupHandler(intptr_t id, 353 void EventHandlerImplementation::WakeupHandler(intptr_t id,
245 Dart_Port dart_port, 354 Dart_Port dart_port,
246 int64_t data) { 355 int64_t data) {
247 InterruptMessage msg; 356 COMPILE_ASSERT(sizeof(InterruptMessage) <= sizeof(mx_packet_user_t));
248 msg.id = id; 357 mx_port_packet_t pkt;
249 msg.dart_port = dart_port; 358 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt.user);
250 msg.data = data; 359 pkt.key = kInterruptPacketKey;
251 // WriteToBlocking will write up to 512 bytes atomically, and since our msg 360 msg->id = id;
252 // is smaller than 512, we don't need a thread lock. 361 msg->dart_port = dart_port;
253 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. 362 msg->data = data;
254 ASSERT(kInterruptMessageSize < PIPE_BUF); 363 mx_status_t status =
255 intptr_t result = 364 mx_port_queue(port_handle_, reinterpret_cast<void*>(&pkt), 0);
256 WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); 365 if (status != MX_OK) {
257 if (result != kInterruptMessageSize) { 366 // This is a FATAL because the VM won't work at all if we can't send any
258 if (result == -1) { 367 // messages to the EventHandler thread.
259 perror("Interrupt message failure:"); 368 FATAL1("mx_port_queue failed: %s\n", mx_status_get_string(status));
260 }
261 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
262 } 369 }
263 } 370 }
264 371
265 372
266 void EventHandlerImplementation::HandleInterruptFd() { 373 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
267 const intptr_t MAX_MESSAGES = kInterruptMessageSize; 374 if (msg->id == kTimerId) {
268 InterruptMessage msg[MAX_MESSAGES]; 375 LOG_INFO("HandleInterrupt read timer update\n");
269 ssize_t bytes = NO_RETRY_EXPECTED( 376 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
270 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); 377 return;
271 LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes); 378 } else if (msg->id == kShutdownId) {
272 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { 379 LOG_INFO("HandleInterrupt read shutdown\n");
273 if (msg[i].id == kTimerId) { 380 shutdown_ = true;
274 LOG_INFO("HandleInterruptFd read timer update\n"); 381 return;
275 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); 382 }
276 } else if (msg[i].id == kShutdownId) { 383 ASSERT((msg->data & COMMAND_MASK) != 0);
277 LOG_INFO("HandleInterruptFd read shutdown\n"); 384 LOG_INFO("HandleInterrupt command:\n");
278 shutdown_ = true; 385 Socket* socket = reinterpret_cast<Socket*>(msg->id);
386 RefCntReleaseScope<Socket> rs(socket);
387 if (socket->fd() == -1) {
388 return;
389 }
390 IOHandle* io_handle = reinterpret_cast<IOHandle*>(socket->fd());
391 const intptr_t fd = io_handle->fd();
392 DescriptorInfo* di =
393 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg->data));
394 ASSERT(io_handle == di->io_handle());
395 if (IS_COMMAND(msg->data, kShutdownReadCommand)) {
396 ASSERT(!di->IsListeningSocket());
397 // Close the socket for reading.
398 LOG_INFO("\tSHUT_RD: %ld\n", fd);
399 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_RD));
400 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) {
401 ASSERT(!di->IsListeningSocket());
402 // Close the socket for writing.
403 LOG_INFO("\tSHUT_WR: %ld\n", fd);
404 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_WR));
405 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
406 // Close the socket and free system resources and move on to next
407 // message.
408 const intptr_t old_mask = di->Mask();
409 Dart_Port port = msg->dart_port;
410 di->RemovePort(port);
411 const intptr_t new_mask = di->Mask();
412 UpdatePort(old_mask, di);
413
414 LOG_INFO("\tCLOSE: %ld: %lx -> %lx\n", fd, old_mask, new_mask);
415 if (di->IsListeningSocket()) {
416 // We only close the socket file descriptor from the operating
417 // system if there are no other dart socket objects which
418 // are listening on the same (address, port) combination.
419 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
420
421 MutexLocker locker(registry->mutex());
422
423 if (registry->CloseSafe(socket)) {
424 ASSERT(new_mask == 0);
425 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
426 di->Close();
427 delete di;
428 socket->SetClosedFd();
429 }
279 } else { 430 } else {
280 ASSERT((msg[i].data & COMMAND_MASK) != 0); 431 ASSERT(new_mask == 0);
281 LOG_INFO("HandleInterruptFd command\n"); 432 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
282 Socket* socket = reinterpret_cast<Socket*>(msg[i].id); 433 di->Close();
283 RefCntReleaseScope<Socket> rs(socket); 434 delete di;
284 if (socket->fd() == -1) { 435 socket->SetClosedFd();
285 continue; 436 }
286 } 437 if (port != 0) {
287 DescriptorInfo* di = 438 const bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent);
288 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data)); 439 if (!success) {
289 if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) { 440 LOG_ERR("Failed to post destroy event to port %ld\n", port);
290 ASSERT(!di->IsListeningSocket());
291 // Close the socket for reading.
292 LOG_INFO("\tSHUT_RD: %d\n", di->fd());
293 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
294 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
295 ASSERT(!di->IsListeningSocket());
296 // Close the socket for writing.
297 LOG_INFO("\tSHUT_WR: %d\n", di->fd());
298 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
299 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
300 // Close the socket and free system resources and move on to next
301 // message.
302 intptr_t old_mask = di->Mask();
303 Dart_Port port = msg[i].dart_port;
304 di->RemovePort(port);
305 intptr_t new_mask = di->Mask();
306 UpdateEpollInstance(old_mask, di);
307
308 LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask);
309 intptr_t fd = di->fd();
310 if (di->IsListeningSocket()) {
311 // We only close the socket file descriptor from the operating
312 // system if there are no other dart socket objects which
313 // are listening on the same (address, port) combination.
314 ListeningSocketRegistry* registry =
315 ListeningSocketRegistry::Instance();
316
317 MutexLocker locker(registry->mutex());
318
319 if (registry->CloseSafe(socket)) {
320 ASSERT(new_mask == 0);
321 socket_map_.Remove(GetHashmapKeyFromFd(fd),
322 GetHashmapHashFromFd(fd));
323 di->Close();
324 LOG_INFO("Closed %d\n", di->fd());
325 delete di;
326 socket->SetClosedFd();
327 }
328 } else {
329 ASSERT(new_mask == 0);
330 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
331 di->Close();
332 LOG_INFO("Closed %d\n", di->fd());
333 delete di;
334 socket->SetClosedFd();
335 }
336
337 bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent);
338 if (!success) {
339 LOG_ERR("Failed to post destroy event to port %ld", port);
340 }
341 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
342 int count = TOKEN_COUNT(msg[i].data);
343 intptr_t old_mask = di->Mask();
344 LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask);
345 di->ReturnTokens(msg[i].dart_port, count);
346 UpdateEpollInstance(old_mask, di);
347 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
348 // `events` can only have kInEvent/kOutEvent flags set.
349 intptr_t events = msg[i].data & EVENT_MASK;
350 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
351
352 intptr_t old_mask = di->Mask();
353 LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", di->fd(), old_mask,
354 msg[i].data & EVENT_MASK);
355 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
356 UpdateEpollInstance(old_mask, di);
357 } else {
358 UNREACHABLE();
359 } 441 }
360 } 442 }
361 } 443 } else if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
362 LOG_INFO("HandleInterruptFd exit\n"); 444 const int count = TOKEN_COUNT(msg->data);
363 } 445 const intptr_t old_mask = di->Mask();
446 LOG_INFO("\t Return Token: %ld: %lx\n", fd, old_mask);
447 di->ReturnTokens(msg->dart_port, count);
448 UpdatePort(old_mask, di);
449 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
450 // `events` can only have kInEvent/kOutEvent flags set.
451 const intptr_t events = msg->data & EVENT_MASK;
452 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
364 453
365 454 const intptr_t old_mask = di->Mask();
366 intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, 455 LOG_INFO("\t Set Event Mask: %ld: %lx %lx\n", fd, old_mask,
367 DescriptorInfo* di) { 456 msg->data & EVENT_MASK);
368 #ifdef EVENTHANDLER_LOGGING 457 di->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
369 PrintEventMask(di->fd(), events); 458 UpdatePort(old_mask, di);
370 #endif 459 } else {
371 if ((events & EPOLLERR) != 0) { 460 UNREACHABLE();
372 // Return error only if EPOLLIN is present.
373 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
374 }
375 intptr_t event_mask = 0;
376 if ((events & EPOLLIN) != 0) {
377 event_mask |= (1 << kInEvent);
378 }
379 if ((events & EPOLLOUT) != 0) {
380 event_mask |= (1 << kOutEvent);
381 }
382 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
383 event_mask |= (1 << kCloseEvent);
384 }
385 return event_mask;
386 }
387
388
389 void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
390 int size) {
391 bool interrupt_seen = false;
392 for (int i = 0; i < size; i++) {
393 if (events[i].data.ptr == NULL) {
394 interrupt_seen = true;
395 } else {
396 DescriptorInfo* di =
397 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
398 const intptr_t old_mask = di->Mask();
399 const intptr_t event_mask = GetPollEvents(events[i].events, di);
400 LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask);
401 if ((event_mask & (1 << kErrorEvent)) != 0) {
402 di->NotifyAllDartPorts(event_mask);
403 UpdateEpollInstance(old_mask, di);
404 } else if (event_mask != 0) {
405 Dart_Port port = di->NextNotifyDartPort(event_mask);
406 ASSERT(port != 0);
407 UpdateEpollInstance(old_mask, di);
408 LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", event_mask,
409 port, di->fd());
410 bool success = DartUtils::PostInt32(port, event_mask);
411 if (!success) {
412 // This can happen if e.g. the isolate that owns the port has died
413 // for some reason.
414 LOG_ERR("Failed to post event for fd %ld to port %ld", di->fd(),
415 port);
416 }
417 }
418 }
419 }
420 if (interrupt_seen) {
421 // Handle after socket events, so we avoid closing a socket before we handle
422 // the current events.
423 HandleInterruptFd();
424 } 461 }
425 } 462 }
426 463
427 464
465 void EventHandlerImplementation::HandlePacket(mx_port_packet_t* pkt) {
466 LOG_INFO("HandlePacket: Got event packet: key=%lx\n", pkt->key);
467 LOG_INFO("HandlePacket: Got event packet: type=%lx\n", pkt->type);
468 LOG_INFO("HandlePacket: Got event packet: status=%ld\n", pkt->status);
469 if (pkt->type == MX_PKT_TYPE_USER) {
470 ASSERT(pkt->key == kInterruptPacketKey);
471 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt->user);
472 HandleInterrupt(msg);
473 return;
474 }
475 LOG_INFO("HandlePacket: Got event packet: observed = %lx\n",
476 pkt->signal.observed);
477 LOG_INFO("HandlePacket: Got event packet: count = %ld\n", pkt->signal.count);
478
479 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(pkt->key);
480 mx_signals_t observed = pkt->signal.observed;
481 const intptr_t old_mask = di->Mask();
482 const uint32_t epoll_event = di->io_handle()->WaitEnd(observed);
483 intptr_t event_mask = IOHandle::EpollEventsToMask(epoll_event);
484 if ((event_mask & (1 << kErrorEvent)) != 0) {
485 di->NotifyAllDartPorts(event_mask);
486 } else if (event_mask != 0) {
487 event_mask = di->io_handle()->ToggleEvents(event_mask);
488 if (event_mask != 0) {
489 Dart_Port port = di->NextNotifyDartPort(event_mask);
490 ASSERT(port != 0);
491 bool success = DartUtils::PostInt32(port, event_mask);
492 if (!success) {
493 // This can happen if e.g. the isolate that owns the port has died
494 // for some reason.
495 LOG_ERR("Failed to post event to port %ld\n", port);
496 }
497 }
498 }
499 UpdatePort(old_mask, di);
500 }
501
502
428 int64_t EventHandlerImplementation::GetTimeout() const { 503 int64_t EventHandlerImplementation::GetTimeout() const {
429 if (!timeout_queue_.HasTimeout()) { 504 if (!timeout_queue_.HasTimeout()) {
430 return kInfinityTimeout; 505 return kInfinityTimeout;
431 } 506 }
432 int64_t millis = 507 int64_t millis =
433 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); 508 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
434 return (millis < 0) ? 0 : millis; 509 return (millis < 0) ? 0 : millis;
435 } 510 }
436 511
437 512
438 void EventHandlerImplementation::HandleTimeout() { 513 void EventHandlerImplementation::HandleTimeout() {
439 if (timeout_queue_.HasTimeout()) { 514 if (timeout_queue_.HasTimeout()) {
440 int64_t millis = timeout_queue_.CurrentTimeout() - 515 int64_t millis = timeout_queue_.CurrentTimeout() -
441 TimerUtils::GetCurrentMonotonicMillis(); 516 TimerUtils::GetCurrentMonotonicMillis();
442 if (millis <= 0) { 517 if (millis <= 0) {
443 DartUtils::PostNull(timeout_queue_.CurrentPort()); 518 DartUtils::PostNull(timeout_queue_.CurrentPort());
444 timeout_queue_.RemoveCurrent(); 519 timeout_queue_.RemoveCurrent();
445 } 520 }
446 } 521 }
447 } 522 }
448 523
449 524
450 void EventHandlerImplementation::Poll(uword args) { 525 void EventHandlerImplementation::Poll(uword args) {
451 static const intptr_t kMaxEvents = 16;
452 struct epoll_event events[kMaxEvents];
453 EventHandler* handler = reinterpret_cast<EventHandler*>(args); 526 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
454 EventHandlerImplementation* handler_impl = &handler->delegate_; 527 EventHandlerImplementation* handler_impl = &handler->delegate_;
455 ASSERT(handler_impl != NULL); 528 ASSERT(handler_impl != NULL);
456 529
530 mx_port_packet_t pkt;
457 while (!handler_impl->shutdown_) { 531 while (!handler_impl->shutdown_) {
458 int64_t millis = handler_impl->GetTimeout(); 532 int64_t millis = handler_impl->GetTimeout();
459 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); 533 ASSERT((millis == kInfinityTimeout) || (millis >= 0));
460 // TODO(US-109): When the epoll implementation is properly edge-triggered, 534
461 // remove this sleep, which prevents the message queue from being 535 LOG_INFO("mx_port_wait(millis = %ld)\n", millis);
462 // overwhelmed and leading to memory exhaustion. 536 mx_status_t status = mx_port_wait(handler_impl->port_handle_,
463 usleep(5000); 537 millis == kInfinityTimeout
464 LOG_INFO("epoll_wait(millis = %ld)\n", millis); 538 ? MX_TIME_INFINITE
465 intptr_t result = NO_RETRY_EXPECTED( 539 : mx_deadline_after(MX_MSEC(millis)),
466 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis)); 540 reinterpret_cast<void*>(&pkt), 0);
467 ASSERT(EAGAIN == EWOULDBLOCK); 541 if (status == MX_ERR_TIMED_OUT) {
468 LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result); 542 handler_impl->HandleTimeout();
469 if (result < 0) { 543 } else if (status != MX_OK) {
470 if (errno != EWOULDBLOCK) { 544 FATAL1("mx_port_wait failed: %s\n", mx_status_get_string(status));
471 perror("Poll failed");
472 }
473 } else { 545 } else {
474 handler_impl->HandleTimeout(); 546 handler_impl->HandleTimeout();
475 handler_impl->HandleEvents(events, result); 547 handler_impl->HandlePacket(&pkt);
476 } 548 }
477 } 549 }
478 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0); 550 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
479 handler->NotifyShutdownDone(); 551 handler->NotifyShutdownDone();
480 } 552 }
481 553
482 554
483 void EventHandlerImplementation::Start(EventHandler* handler) { 555 void EventHandlerImplementation::Start(EventHandler* handler) {
484 int result = Thread::Start(&EventHandlerImplementation::Poll, 556 int result = Thread::Start(&EventHandlerImplementation::Poll,
485 reinterpret_cast<uword>(handler)); 557 reinterpret_cast<uword>(handler));
(...skipping 24 matching lines...) Expand all
510 // The hashmap does not support keys with value 0. 582 // The hashmap does not support keys with value 0.
511 return dart::Utils::WordHash(fd + 1); 583 return dart::Utils::WordHash(fd + 1);
512 } 584 }
513 585
514 } // namespace bin 586 } // namespace bin
515 } // namespace dart 587 } // namespace dart
516 588
517 #endif // defined(HOST_OS_FUCHSIA) 589 #endif // defined(HOST_OS_FUCHSIA)
518 590
519 #endif // !defined(DART_IO_DISABLED) 591 #endif // !defined(DART_IO_DISABLED)
OLDNEW
« no previous file with comments | « runtime/bin/eventhandler_fuchsia.h ('k') | runtime/bin/process_fuchsia.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698