Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(396)

Side by Side Diff: runtime/bin/eventhandler_fuchsia.cc

Issue 2910853002: [Fuchsia] EventHandler: epoll -> ports v2 (Closed)
Patch Set: Remove unused function Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #if !defined(DART_IO_DISABLED) 5 #if !defined(DART_IO_DISABLED)
6 6
7 #include "platform/globals.h" 7 #include "platform/globals.h"
8 #if defined(HOST_OS_FUCHSIA) 8 #if defined(HOST_OS_FUCHSIA)
9 9
10 #include "bin/eventhandler.h" 10 #include "bin/eventhandler.h"
11 #include "bin/eventhandler_fuchsia.h" 11 #include "bin/eventhandler_fuchsia.h"
12 12
13 #include <errno.h> // NOLINT 13 #include <errno.h>
14 #include <fcntl.h> // NOLINT 14 #include <fcntl.h>
15 #include <pthread.h> // NOLINT 15 #include <magenta/status.h>
16 #include <stdio.h> // NOLINT 16 #include <magenta/syscalls.h>
17 #include <string.h> // NOLINT 17 #include <magenta/syscalls/object.h>
18 #include <sys/epoll.h> // NOLINT 18 #include <magenta/syscalls/port.h>
19 #include <sys/stat.h> // NOLINT 19 #include <mxio/private.h>
20 #include <unistd.h> // NOLINT 20 #include <pthread.h>
21 #include <stdio.h>
22 #include <string.h>
23 #include <sys/epoll.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
21 27
22 #include "bin/fdutils.h" 28 #include "bin/fdutils.h"
23 #include "bin/lockers.h" 29 #include "bin/lockers.h"
24 #include "bin/log.h" 30 #include "bin/log.h"
25 #include "bin/socket.h" 31 #include "bin/socket.h"
26 #include "bin/thread.h" 32 #include "bin/thread.h"
27 #include "bin/utils.h" 33 #include "bin/utils.h"
28 #include "platform/hashmap.h" 34 #include "platform/hashmap.h"
29 #include "platform/utils.h" 35 #include "platform/utils.h"
30 36
31 // #define EVENTHANDLER_LOGGING 1 37 // The EventHandler for Fuchsia uses its "ports v2" API:
32 #if defined(EVENTHANDLER_LOGGING) 38 // https://fuchsia.googlesource.com/magenta/+/HEAD/docs/syscalls/port_create.md
33 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) 39 // This API does not have epoll()-like edge triggering (EPOLLET). Since clients
34 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) 40 // of the EventHandler expect edge-triggered notifications, we must simulate it.
41 // When a packet from mx_port_wait() indicates that a signal is asserted for a
42 // handle, we unsubscribe from that signal until the event that asserted the
43 // signal can be processed. For example:
44 //
45 // 1. We get MX_SOCKET_WRITABLE from mx_port_wait() for a handle.
46 // 2. We send kOutEvent to the Dart thread.
47 // 3. We unsubscribe from further MX_SOCKET_WRITABLE signals for the handle.
48 // 4. Some time later the Dart thread actually does a write().
49 // 5. After writing, the Dart thread resubscribes to write events.
50 //
51 // We use he same procedure for MX_SOCKET_READABLE, and read()/accept().
52
53 // define EVENTHANDLER_LOG_ERROR to get log messages only for errors.
54 // define EVENTHANDLER_LOG_INFO to get log messages for both information and
55 // errors.
56 // #define EVENTHANDLER_LOG_INFO 1
57 #define EVENTHANDLER_LOG_ERROR 1
58 #if defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
59 #define LOG_ERR(msg, ...) \
60 { \
61 int err = errno; \
62 Log::PrintErr("Dart EventHandler ERROR: %s:%d: " msg, __FILE__, __LINE__, \
63 ##__VA_ARGS__); \
64 errno = err; \
65 }
66 #if defined(EVENTHANDLER_LOG_INFO)
67 #define LOG_INFO(msg, ...) \
68 Log::Print("Dart EventHandler INFO: %s:%d: " msg, __FILE__, __LINE__, \
69 ##__VA_ARGS__)
70 #else
71 #define LOG_INFO(msg, ...)
72 #endif // defined(EVENTHANDLER_LOG_INFO)
35 #else 73 #else
36 #define LOG_ERR(msg, ...) 74 #define LOG_ERR(msg, ...)
37 #define LOG_INFO(msg, ...) 75 #define LOG_INFO(msg, ...)
38 #endif // defined(EVENTHANDLER_LOGGING) 76 #endif // defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
39 77
40 namespace dart { 78 namespace dart {
41 namespace bin { 79 namespace bin {
42 80
43 #if defined(EVENTHANDLER_LOGGING) 81 intptr_t IOHandle::Read(void* buffer, intptr_t num_bytes) {
44 static void PrintEventMask(intptr_t fd, intptr_t events) { 82 MutexLocker ml(mutex_);
45 Log::PrintErr("%d ", fd); 83 const ssize_t read_bytes = NO_RETRY_EXPECTED(read(fd_, buffer, num_bytes));
46 if ((events & EPOLLIN) != 0) { 84 LOG_INFO("IOHandle::Read: fd = %ld. read %ld bytes\n", fd_, read_bytes);
siva 2017/06/21 00:48:10 Should we check for error from read? For errors s
zra 2017/06/21 06:43:38 read(), write(), and accept() can "fail" for async
47 Log::PrintErr("EPOLLIN "); 85
48 } 86 // Resubscribe to read events.
49 if ((events & EPOLLPRI) != 0) { 87 read_events_enabled_ = true;
50 Log::PrintErr("EPOLLPRI "); 88 if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLIN, wait_key_)) {
51 } 89 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
52 if ((events & EPOLLOUT) != 0) { 90 }
53 Log::PrintErr("EPOLLOUT "); 91 return read_bytes;
54 } 92 }
55 if ((events & EPOLLERR) != 0) { 93
56 Log::PrintErr("EPOLLERR "); 94
57 } 95 intptr_t IOHandle::Write(const void* buffer, intptr_t num_bytes) {
58 if ((events & EPOLLHUP) != 0) { 96 MutexLocker ml(mutex_);
59 Log::PrintErr("EPOLLHUP "); 97 const ssize_t written_bytes =
60 } 98 NO_RETRY_EXPECTED(write(fd_, buffer, num_bytes));
61 if ((events & EPOLLRDHUP) != 0) { 99 LOG_INFO("IOHandle::Write: fd = %ld. wrote %ld bytes\n", fd_, written_bytes);
siva 2017/06/21 00:48:10 Ditto comment here about errors from write.
zra 2017/06/21 06:43:39 Acknowledged.
62 Log::PrintErr("EPOLLRDHUP "); 100
63 } 101 // Resubscribe to to write events.
64 int all_events = 102 write_events_enabled_ = true;
65 EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP; 103 if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLOUT, wait_key_)) {
66 if ((events & ~all_events) != 0) { 104 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
67 Log::PrintErr("(and %08x) ", events & ~all_events); 105 }
68 } 106 return written_bytes;
69 107 }
70 Log::PrintErr("\n"); 108
71 } 109
72 #endif 110 intptr_t IOHandle::Accept(struct sockaddr* addr, socklen_t* addrlen) {
73 111 MutexLocker ml(mutex_);
74 112 intptr_t socket = NO_RETRY_EXPECTED(accept(fd_, addr, addrlen));
75 intptr_t DescriptorInfo::GetPollEvents() { 113 LOG_INFO("IOHandle::Accept: fd = %ld. socket = %ld\n", fd_, socket);
siva 2017/06/21 00:48:10 ditto comment about errors returned from accept.
zra 2017/06/21 06:43:38 Acknowledged.
114
115 // Re-subscribe to read events.
116 read_events_enabled_ = true;
117 if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLIN, wait_key_)) {
118 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
119 }
120 return socket;
121 }
122
123
124 void IOHandle::Close() {
125 MutexLocker ml(mutex_);
126 VOID_NO_RETRY_EXPECTED(close(fd_));
127 }
128
129
130 uint32_t IOHandle::MaskToEpollEvents(intptr_t mask) {
131 MutexLocker ml(mutex_);
76 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are 132 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
77 // triggered anyway. 133 // triggered anyway.
78 intptr_t events = 0; 134 uint32_t events = EPOLLRDHUP;
79 if ((Mask() & (1 << kInEvent)) != 0) { 135 if (read_events_enabled_ && ((mask & (1 << kInEvent)) != 0)) {
80 events |= EPOLLIN; 136 events |= EPOLLIN;
81 } 137 }
82 if ((Mask() & (1 << kOutEvent)) != 0) { 138 if (write_events_enabled_ && ((mask & (1 << kOutEvent)) != 0)) {
83 events |= EPOLLOUT; 139 events |= EPOLLOUT;
84 } 140 }
85 return events; 141 return events;
86 } 142 }
87 143
88 144
89 // Unregister the file descriptor for a DescriptorInfo structure with 145 intptr_t IOHandle::EpollEventsToMask(intptr_t events) {
90 // epoll. 146 if ((events & EPOLLERR) != 0) {
91 static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { 147 // Return error only if EPOLLIN is present.
92 LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd()); 148 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
93 VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL)); 149 }
94 } 150 intptr_t event_mask = 0;
95 151 if ((events & EPOLLIN) != 0) {
96 152 event_mask |= (1 << kInEvent);
97 static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { 153 }
98 struct epoll_event event; 154 if ((events & EPOLLOUT) != 0) {
99 event.events = EPOLLRDHUP | di->GetPollEvents(); 155 event_mask |= (1 << kOutEvent);
100 if (!di->IsListeningSocket()) { 156 }
101 event.events |= EPOLLET; 157 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
102 } 158 event_mask |= (1 << kCloseEvent);
103 event.data.ptr = di; 159 }
104 LOG_INFO("AddToEpollInstance: fd = %ld\n", di->fd()); 160 return event_mask;
105 int status = 161 }
106 NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event)); 162
107 LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", di->fd(), status); 163
108 #if defined(EVENTHANDLER_LOGGING) 164 bool IOHandle::AsyncWaitLocked(mx_handle_t port,
109 PrintEventMask(di->fd(), event.events); 165 uint32_t events,
110 #endif 166 uint64_t key) {
siva 2017/06/21 00:48:10 ASSERT that the thread holds the mutex lock.
zra 2017/06/21 06:43:38 The Mutex under bin/ doesn't track the owner. It l
111 if (status == -1) { 167 LOG_INFO("IOHandle::AsyncWait: fd = %ld\n", fd_);
112 // TODO(dart:io): Verify that the dart end is handling this correctly. 168 // The call to __mxio_fd_to_io() in the DescriptorInfo constructor may have
113 169 // returned NULL. If it did, propagate the problem up to Dart.
114 // Epoll does not accept the file descriptor. It could be due to 170 if (mxio_ == NULL) {
115 // already closed file descriptor, or unuspported devices, such 171 LOG_ERR("__mxio_fd_to_io(%d) returned NULL\n", fd_);
116 // as /dev/null. In such case, mark the file descriptor as closed, 172 return false;
117 // so dart will handle it accordingly. 173 }
174
175 mx_handle_t handle;
176 mx_signals_t signals;
177 __mxio_wait_begin(mxio_, events, &handle, &signals);
178 if (handle == MX_HANDLE_INVALID) {
179 LOG_ERR("fd = %ld __mxio_wait_begin returned an invalid handle\n", fd_);
180 return false;
181 }
182
183 // Remember the port. Use the remembered port if the argument "port" is
184 // MX_HANDLE_INVALID.
185 ASSERT((port != MX_HANDLE_INVALID) || (port_ != MX_HANDLE_INVALID));
186 if ((port_ == MX_HANDLE_INVALID) || (port != MX_HANDLE_INVALID)) {
187 port_ = port;
188 }
189
190 handle_ = handle;
191 wait_key_ = key;
192 LOG_INFO("mx_object_wait_async(fd = %ld, signals = %x)\n", fd_, signals);
193 mx_status_t status =
194 mx_object_wait_async(handle_, port_, key, signals, MX_WAIT_ASYNC_ONCE);
195 if (status != MX_OK) {
196 LOG_ERR("mx_object_wait_async failed: %s\n", mx_status_get_string(status));
197 return false;
198 }
199
200 return true;
201 }
202
203
204 bool IOHandle::AsyncWait(mx_handle_t port, uint32_t events, uint64_t key) {
205 MutexLocker ml(mutex_);
206 return AsyncWaitLocked(port, events, key);
207 }
208
209
210 void IOHandle::CancelWait(mx_handle_t port, uint64_t key) {
211 MutexLocker ml(mutex_);
212 LOG_INFO("IOHandle::CancelWait: fd = %ld\n", fd_);
213 ASSERT(port != MX_HANDLE_INVALID);
214 ASSERT(handle_ != MX_HANDLE_INVALID);
215 mx_status_t status = mx_port_cancel(port, handle_, key);
216 if ((status != MX_OK) && (status != MX_ERR_NOT_FOUND)) {
swetland 2017/06/21 00:09:51 I haven't charted out everything here, but just an
zra 2017/06/21 06:43:38 Just to make sure I understand: If there is a pend
swetland 2017/06/21 20:07:28 Yes. Whenever you object_wait_async(), if any of t
217 LOG_ERR("mx_port_cancel failed: %s\n", mx_status_get_string(status));
218 }
219 }
220
221
222 uint32_t IOHandle::WaitEnd(mx_signals_t observed) {
223 MutexLocker ml(mutex_);
224 uint32_t events = 0;
225 __mxio_wait_end(mxio_, observed, &events);
226 return events;
227 }
228
229
230 intptr_t IOHandle::ToggleEvents(intptr_t event_mask) {
231 MutexLocker ml(mutex_);
232 if (!write_events_enabled_) {
233 LOG_INFO("IOHandle::ToggleEvents: fd = %ld de-asserting write\n", fd_);
234 event_mask = event_mask & ~(1 << kOutEvent);
235 }
236 if ((event_mask & (1 << kOutEvent)) != 0) {
237 LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting write and disabling\n",
238 fd_);
239 write_events_enabled_ = false;
240 }
241 if (!read_events_enabled_) {
242 LOG_INFO("IOHandle::ToggleEvents: fd=%ld de-asserting read\n", fd_);
243 event_mask = event_mask & ~(1 << kInEvent);
244 }
245 if ((event_mask & (1 << kInEvent)) != 0) {
246 LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting read and disabling\n",
247 fd_);
248 read_events_enabled_ = false;
249 }
250 return event_mask;
251 }
252
253
254 void EventHandlerImplementation::AddToPort(mx_handle_t port_handle,
255 DescriptorInfo* di) {
256 const uint32_t events = di->io_handle()->MaskToEpollEvents(di->Mask());
257 const uint64_t key = reinterpret_cast<uint64_t>(di);
258 if (!di->io_handle()->AsyncWait(port_handle, events, key)) {
118 di->NotifyAllDartPorts(1 << kCloseEvent); 259 di->NotifyAllDartPorts(1 << kCloseEvent);
119 } 260 }
120 } 261 }
121 262
122 263
264 void EventHandlerImplementation::RemoveFromPort(mx_handle_t port_handle,
265 DescriptorInfo* di) {
266 const uint64_t key = reinterpret_cast<uint64_t>(di);
267 di->io_handle()->CancelWait(port_handle, key);
268 }
269
270
123 EventHandlerImplementation::EventHandlerImplementation() 271 EventHandlerImplementation::EventHandlerImplementation()
124 : socket_map_(&HashMap::SamePointerValue, 16) { 272 : socket_map_(&HashMap::SamePointerValue, 16) {
125 intptr_t result;
126 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
127 if (result != 0) {
128 FATAL("Pipe creation failed");
129 }
130 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
131 FATAL("Failed to set pipe fd non blocking\n");
132 }
133 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
134 FATAL("Failed to set pipe fd close on exec\n");
135 }
136 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
137 FATAL("Failed to set pipe fd close on exec\n");
138 }
139 shutdown_ = false; 273 shutdown_ = false;
140 // The initial size passed to epoll_create is ignore on newer (>= 274 // Create the port.
141 // 2.6.8) Linux versions 275 port_handle_ = MX_HANDLE_INVALID;
142 static const int kEpollInitialSize = 64; 276 mx_status_t status = mx_port_create(MX_PORT_OPT_V2, &port_handle_);
143 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize)); 277 if (status != MX_OK) {
144 if (epoll_fd_ == -1) { 278 // This is a FATAL because the VM won't work at all if we can't create this
145 FATAL1("Failed creating epoll file descriptor: %i", errno); 279 // port.
146 } 280 FATAL1("mx_port_create failed: %s\n", mx_status_get_string(status));
147 if (!FDUtils::SetCloseOnExec(epoll_fd_)) { 281 }
148 FATAL("Failed to set epoll fd close on exec\n"); 282 ASSERT(port_handle_ != MX_HANDLE_INVALID);
149 }
150 // Register the interrupt_fd with the epoll instance.
151 struct epoll_event event;
152 event.events = EPOLLIN;
153 event.data.ptr = NULL;
154 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", epoll_fd_);
155 int status = NO_RETRY_EXPECTED(
156 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event));
157 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n",
158 epoll_fd_, status);
159 if (status == -1) {
160 FATAL("Failed adding interrupt fd to epoll instance");
161 }
162 } 283 }
163 284
164 285
165 static void DeleteDescriptorInfo(void* info) { 286 static void DeleteDescriptorInfo(void* info) {
166 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info); 287 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
288 LOG_INFO("Closed %ld\n", di->io_handle()->fd());
167 di->Close(); 289 di->Close();
168 LOG_INFO("Closed %d\n", di->fd());
169 delete di; 290 delete di;
170 } 291 }
171 292
172 293
173 EventHandlerImplementation::~EventHandlerImplementation() { 294 EventHandlerImplementation::~EventHandlerImplementation() {
174 socket_map_.Clear(DeleteDescriptorInfo); 295 socket_map_.Clear(DeleteDescriptorInfo);
175 VOID_NO_RETRY_EXPECTED(close(epoll_fd_)); 296 mx_handle_close(port_handle_);
176 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0])); 297 port_handle_ = MX_HANDLE_INVALID;
177 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1])); 298 }
178 } 299
179 300
180 301 void EventHandlerImplementation::UpdatePort(intptr_t old_mask,
181 void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask, 302 DescriptorInfo* di) {
182 DescriptorInfo* di) { 303 const intptr_t new_mask = di->Mask();
183 intptr_t new_mask = di->Mask();
184 LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", di->fd(), old_mask,
185 new_mask);
186 if ((old_mask != 0) && (new_mask == 0)) { 304 if ((old_mask != 0) && (new_mask == 0)) {
187 RemoveFromEpollInstance(epoll_fd_, di); 305 RemoveFromPort(port_handle_, di);
188 } else if ((old_mask == 0) && (new_mask != 0)) { 306 } else if ((old_mask == 0) && (new_mask != 0)) {
189 AddToEpollInstance(epoll_fd_, di); 307 AddToPort(port_handle_, di);
190 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) { 308 } else if ((old_mask != 0) && (new_mask != 0)) {
191 ASSERT(!di->IsListeningSocket()); 309 ASSERT(!di->IsListeningSocket());
192 RemoveFromEpollInstance(epoll_fd_, di); 310 RemoveFromPort(port_handle_, di);
193 AddToEpollInstance(epoll_fd_, di); 311 AddToPort(port_handle_, di);
194 } 312 }
195 } 313 }
196 314
197 315
198 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( 316 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
199 intptr_t fd, 317 intptr_t fd,
200 bool is_listening) { 318 bool is_listening) {
201 ASSERT(fd >= 0); 319 IOHandle* handle = reinterpret_cast<IOHandle*>(fd);
202 HashMap::Entry* entry = socket_map_.Lookup(GetHashmapKeyFromFd(fd), 320 ASSERT(handle->fd() >= 0);
203 GetHashmapHashFromFd(fd), true); 321 HashMap::Entry* entry =
322 socket_map_.Lookup(GetHashmapKeyFromFd(handle->fd()),
323 GetHashmapHashFromFd(handle->fd()), true);
204 ASSERT(entry != NULL); 324 ASSERT(entry != NULL);
205 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value); 325 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
206 if (di == NULL) { 326 if (di == NULL) {
207 // If there is no data in the hash map for this file descriptor a 327 // If there is no data in the hash map for this file descriptor a
208 // new DescriptorInfo for the file descriptor is inserted. 328 // new DescriptorInfo for the file descriptor is inserted.
209 if (is_listening) { 329 if (is_listening) {
210 di = new DescriptorInfoMultiple(fd); 330 di = new DescriptorInfoMultiple(fd);
211 } else { 331 } else {
212 di = new DescriptorInfoSingle(fd); 332 di = new DescriptorInfoSingle(fd);
213 } 333 }
214 entry->value = di; 334 entry->value = di;
215 } 335 }
216 ASSERT(fd == di->fd()); 336 ASSERT(fd == di->fd());
217 return di; 337 return di;
218 } 338 }
219 339
220 340
221 static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) {
222 size_t remaining = count;
223 char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer));
224 while (remaining > 0) {
225 ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining));
226 if (bytes_written == 0) {
227 return count - remaining;
228 } else if (bytes_written == -1) {
229 ASSERT(EAGAIN == EWOULDBLOCK);
230 // Error code EWOULDBLOCK should only happen for non blocking
231 // file descriptors.
232 ASSERT(errno != EWOULDBLOCK);
233 return -1;
234 } else {
235 ASSERT(bytes_written > 0);
236 remaining -= bytes_written;
237 buffer_pos += bytes_written;
238 }
239 }
240 return count;
241 }
242
243
244 void EventHandlerImplementation::WakeupHandler(intptr_t id, 341 void EventHandlerImplementation::WakeupHandler(intptr_t id,
245 Dart_Port dart_port, 342 Dart_Port dart_port,
246 int64_t data) { 343 int64_t data) {
247 InterruptMessage msg; 344 COMPILE_ASSERT(sizeof(InterruptMessage) <= sizeof(mx_packet_user_t));
248 msg.id = id; 345 mx_port_packet_t pkt;
249 msg.dart_port = dart_port; 346 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt.user);
250 msg.data = data; 347 pkt.key = kInterruptPacketKey;
251 // WriteToBlocking will write up to 512 bytes atomically, and since our msg 348 msg->id = id;
252 // is smaller than 512, we don't need a thread lock. 349 msg->dart_port = dart_port;
253 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. 350 msg->data = data;
254 ASSERT(kInterruptMessageSize < PIPE_BUF); 351 mx_status_t status =
255 intptr_t result = 352 mx_port_queue(port_handle_, reinterpret_cast<void*>(&pkt), 0);
256 WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); 353 if (status != MX_OK) {
257 if (result != kInterruptMessageSize) { 354 // This is a FATAL because the VM won't work at all if we can't send any
258 if (result == -1) { 355 // messages to the EventHandler thread.
259 perror("Interrupt message failure:"); 356 FATAL1("mx_port_queue failed: %s\n", mx_status_get_string(status));
260 }
261 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
262 } 357 }
263 } 358 }
264 359
265 360
266 void EventHandlerImplementation::HandleInterruptFd() { 361 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
267 const intptr_t MAX_MESSAGES = kInterruptMessageSize; 362 if (msg->id == kTimerId) {
268 InterruptMessage msg[MAX_MESSAGES]; 363 LOG_INFO("HandleInterrupt read timer update\n");
269 ssize_t bytes = NO_RETRY_EXPECTED( 364 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
270 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); 365 return;
271 LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes); 366 } else if (msg->id == kShutdownId) {
272 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { 367 LOG_INFO("HandleInterrupt read shutdown\n");
273 if (msg[i].id == kTimerId) { 368 shutdown_ = true;
274 LOG_INFO("HandleInterruptFd read timer update\n"); 369 return;
275 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); 370 }
276 } else if (msg[i].id == kShutdownId) { 371 ASSERT((msg->data & COMMAND_MASK) != 0);
277 LOG_INFO("HandleInterruptFd read shutdown\n"); 372 LOG_INFO("HandleInterrupt command:\n");
278 shutdown_ = true; 373 Socket* socket = reinterpret_cast<Socket*>(msg->id);
374 RefCntReleaseScope<Socket> rs(socket);
375 if (socket->fd() == -1) {
376 return;
377 }
378 IOHandle* io_handle = reinterpret_cast<IOHandle*>(socket->fd());
379 const intptr_t fd = io_handle->fd();
380 DescriptorInfo* di =
381 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg->data));
382 ASSERT(io_handle == di->io_handle());
383 if (IS_COMMAND(msg->data, kShutdownReadCommand)) {
384 ASSERT(!di->IsListeningSocket());
385 // Close the socket for reading.
386 LOG_INFO("\tSHUT_RD: %ld\n", fd);
387 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_RD));
388 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) {
389 ASSERT(!di->IsListeningSocket());
390 // Close the socket for writing.
391 LOG_INFO("\tSHUT_WR: %ld\n", fd);
392 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_WR));
393 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
394 // Close the socket and free system resources and move on to next
395 // message.
396 const intptr_t old_mask = di->Mask();
397 Dart_Port port = msg->dart_port;
398 di->RemovePort(port);
399 const intptr_t new_mask = di->Mask();
400 UpdatePort(old_mask, di);
401
402 LOG_INFO("\tCLOSE: %ld: %lx -> %lx\n", fd, old_mask, new_mask);
403 if (di->IsListeningSocket()) {
404 // We only close the socket file descriptor from the operating
405 // system if there are no other dart socket objects which
406 // are listening on the same (address, port) combination.
407 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
408
409 MutexLocker locker(registry->mutex());
410
411 if (registry->CloseSafe(socket)) {
412 ASSERT(new_mask == 0);
413 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
414 di->Close();
415 delete di;
416 socket->SetClosedFd();
417 }
279 } else { 418 } else {
280 ASSERT((msg[i].data & COMMAND_MASK) != 0); 419 ASSERT(new_mask == 0);
281 LOG_INFO("HandleInterruptFd command\n"); 420 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
282 Socket* socket = reinterpret_cast<Socket*>(msg[i].id); 421 di->Close();
283 RefCntReleaseScope<Socket> rs(socket); 422 delete di;
284 if (socket->fd() == -1) { 423 socket->SetClosedFd();
285 continue; 424 }
286 } 425 if (port != 0) {
287 DescriptorInfo* di = 426 const bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent);
288 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data)); 427 if (!success) {
289 if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) { 428 LOG_ERR("Failed to post destroy event to port %ld\n", port);
290 ASSERT(!di->IsListeningSocket());
291 // Close the socket for reading.
292 LOG_INFO("\tSHUT_RD: %d\n", di->fd());
293 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
294 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
295 ASSERT(!di->IsListeningSocket());
296 // Close the socket for writing.
297 LOG_INFO("\tSHUT_WR: %d\n", di->fd());
298 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
299 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
300 // Close the socket and free system resources and move on to next
301 // message.
302 intptr_t old_mask = di->Mask();
303 Dart_Port port = msg[i].dart_port;
304 di->RemovePort(port);
305 intptr_t new_mask = di->Mask();
306 UpdateEpollInstance(old_mask, di);
307
308 LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask);
309 intptr_t fd = di->fd();
310 if (di->IsListeningSocket()) {
311 // We only close the socket file descriptor from the operating
312 // system if there are no other dart socket objects which
313 // are listening on the same (address, port) combination.
314 ListeningSocketRegistry* registry =
315 ListeningSocketRegistry::Instance();
316
317 MutexLocker locker(registry->mutex());
318
319 if (registry->CloseSafe(socket)) {
320 ASSERT(new_mask == 0);
321 socket_map_.Remove(GetHashmapKeyFromFd(fd),
322 GetHashmapHashFromFd(fd));
323 di->Close();
324 LOG_INFO("Closed %d\n", di->fd());
325 delete di;
326 socket->SetClosedFd();
327 }
328 } else {
329 ASSERT(new_mask == 0);
330 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
331 di->Close();
332 LOG_INFO("Closed %d\n", di->fd());
333 delete di;
334 socket->SetClosedFd();
335 }
336
337 bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent);
338 if (!success) {
339 LOG_ERR("Failed to post destroy event to port %ld", port);
340 }
341 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
342 int count = TOKEN_COUNT(msg[i].data);
343 intptr_t old_mask = di->Mask();
344 LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask);
345 di->ReturnTokens(msg[i].dart_port, count);
346 UpdateEpollInstance(old_mask, di);
347 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
348 // `events` can only have kInEvent/kOutEvent flags set.
349 intptr_t events = msg[i].data & EVENT_MASK;
350 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
351
352 intptr_t old_mask = di->Mask();
353 LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", di->fd(), old_mask,
354 msg[i].data & EVENT_MASK);
355 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
356 UpdateEpollInstance(old_mask, di);
357 } else {
358 UNREACHABLE();
359 } 429 }
360 } 430 }
361 } 431 } else if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
362 LOG_INFO("HandleInterruptFd exit\n"); 432 const int count = TOKEN_COUNT(msg->data);
363 } 433 const intptr_t old_mask = di->Mask();
434 LOG_INFO("\t Return Token: %ld: %lx\n", fd, old_mask);
435 di->ReturnTokens(msg->dart_port, count);
436 UpdatePort(old_mask, di);
437 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
438 // `events` can only have kInEvent/kOutEvent flags set.
439 const intptr_t events = msg->data & EVENT_MASK;
440 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
364 441
365 442 const intptr_t old_mask = di->Mask();
366 intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, 443 LOG_INFO("\t Set Event Mask: %ld: %lx %lx\n", fd, old_mask,
367 DescriptorInfo* di) { 444 msg->data & EVENT_MASK);
368 #ifdef EVENTHANDLER_LOGGING 445 di->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
369 PrintEventMask(di->fd(), events); 446 UpdatePort(old_mask, di);
370 #endif 447 } else {
371 if ((events & EPOLLERR) != 0) { 448 UNREACHABLE();
372 // Return error only if EPOLLIN is present.
373 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
374 }
375 intptr_t event_mask = 0;
376 if ((events & EPOLLIN) != 0) {
377 event_mask |= (1 << kInEvent);
378 }
379 if ((events & EPOLLOUT) != 0) {
380 event_mask |= (1 << kOutEvent);
381 }
382 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
383 event_mask |= (1 << kCloseEvent);
384 }
385 return event_mask;
386 }
387
388
389 void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
390 int size) {
391 bool interrupt_seen = false;
392 for (int i = 0; i < size; i++) {
393 if (events[i].data.ptr == NULL) {
394 interrupt_seen = true;
395 } else {
396 DescriptorInfo* di =
397 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
398 const intptr_t old_mask = di->Mask();
399 const intptr_t event_mask = GetPollEvents(events[i].events, di);
400 LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask);
401 if ((event_mask & (1 << kErrorEvent)) != 0) {
402 di->NotifyAllDartPorts(event_mask);
403 UpdateEpollInstance(old_mask, di);
404 } else if (event_mask != 0) {
405 Dart_Port port = di->NextNotifyDartPort(event_mask);
406 ASSERT(port != 0);
407 UpdateEpollInstance(old_mask, di);
408 LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", event_mask,
409 port, di->fd());
410 bool success = DartUtils::PostInt32(port, event_mask);
411 if (!success) {
412 // This can happen if e.g. the isolate that owns the port has died
413 // for some reason.
414 LOG_ERR("Failed to post event for fd %ld to port %ld", di->fd(),
415 port);
416 }
417 }
418 }
419 }
420 if (interrupt_seen) {
421 // Handle after socket events, so we avoid closing a socket before we handle
422 // the current events.
423 HandleInterruptFd();
424 } 449 }
425 } 450 }
426 451
427 452
453 void EventHandlerImplementation::HandlePacket(mx_port_packet_t* pkt) {
454 LOG_INFO("HandlePacket: Got event packet: key=%lx\n", pkt->key);
455 LOG_INFO("HandlePacket: Got event packet: type=%lx\n", pkt->type);
456 LOG_INFO("HandlePacket: Got event packet: status=%ld\n", pkt->status);
457 if (pkt->type == MX_PKT_TYPE_USER) {
458 ASSERT(pkt->key == kInterruptPacketKey);
459 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt->user);
460 HandleInterrupt(msg);
461 return;
462 }
463 LOG_INFO("HandlePacket: Got event packet: observed = %lx\n",
464 pkt->signal.observed);
465 LOG_INFO("HandlePacket: Got event packet: count = %ld\n", pkt->signal.count);
466
467 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(pkt->key);
468 mx_signals_t observed = pkt->signal.observed;
469 const intptr_t old_mask = di->Mask();
470 const uint32_t epoll_event = di->io_handle()->WaitEnd(observed);
471 intptr_t event_mask = IOHandle::EpollEventsToMask(epoll_event);
472 if ((event_mask & (1 << kErrorEvent)) != 0) {
473 di->NotifyAllDartPorts(event_mask);
474 } else if (event_mask != 0) {
475 event_mask = di->io_handle()->ToggleEvents(event_mask);
476 if (event_mask != 0) {
477 Dart_Port port = di->NextNotifyDartPort(event_mask);
478 ASSERT(port != 0);
479 bool success = DartUtils::PostInt32(port, event_mask);
480 if (!success) {
481 // This can happen if e.g. the isolate that owns the port has died
482 // for some reason.
483 LOG_ERR("Failed to post event to port %ld\n", port);
484 }
485 }
486 }
487 UpdatePort(old_mask, di);
488 }
489
490
428 int64_t EventHandlerImplementation::GetTimeout() const { 491 int64_t EventHandlerImplementation::GetTimeout() const {
429 if (!timeout_queue_.HasTimeout()) { 492 if (!timeout_queue_.HasTimeout()) {
430 return kInfinityTimeout; 493 return kInfinityTimeout;
431 } 494 }
432 int64_t millis = 495 int64_t millis =
433 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); 496 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
434 return (millis < 0) ? 0 : millis; 497 return (millis < 0) ? 0 : millis;
435 } 498 }
436 499
437 500
438 void EventHandlerImplementation::HandleTimeout() { 501 void EventHandlerImplementation::HandleTimeout() {
439 if (timeout_queue_.HasTimeout()) { 502 if (timeout_queue_.HasTimeout()) {
440 int64_t millis = timeout_queue_.CurrentTimeout() - 503 int64_t millis = timeout_queue_.CurrentTimeout() -
441 TimerUtils::GetCurrentMonotonicMillis(); 504 TimerUtils::GetCurrentMonotonicMillis();
442 if (millis <= 0) { 505 if (millis <= 0) {
443 DartUtils::PostNull(timeout_queue_.CurrentPort()); 506 DartUtils::PostNull(timeout_queue_.CurrentPort());
444 timeout_queue_.RemoveCurrent(); 507 timeout_queue_.RemoveCurrent();
445 } 508 }
446 } 509 }
447 } 510 }
448 511
449 512
450 void EventHandlerImplementation::Poll(uword args) { 513 void EventHandlerImplementation::Poll(uword args) {
451 static const intptr_t kMaxEvents = 16;
452 struct epoll_event events[kMaxEvents];
453 EventHandler* handler = reinterpret_cast<EventHandler*>(args); 514 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
454 EventHandlerImplementation* handler_impl = &handler->delegate_; 515 EventHandlerImplementation* handler_impl = &handler->delegate_;
455 ASSERT(handler_impl != NULL); 516 ASSERT(handler_impl != NULL);
456 517
518 mx_port_packet_t pkt;
457 while (!handler_impl->shutdown_) { 519 while (!handler_impl->shutdown_) {
458 int64_t millis = handler_impl->GetTimeout(); 520 int64_t millis = handler_impl->GetTimeout();
459 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); 521 ASSERT((millis == kInfinityTimeout) || (millis >= 0));
460 // TODO(US-109): When the epoll implementation is properly edge-triggered, 522
461 // remove this sleep, which prevents the message queue from being 523 LOG_INFO("mx_port_wait(millis = %ld)\n", millis);
462 // overwhelmed and leading to memory exhaustion. 524 mx_status_t status = mx_port_wait(handler_impl->port_handle_,
463 usleep(5000); 525 millis == kInfinityTimeout
464 LOG_INFO("epoll_wait(millis = %ld)\n", millis); 526 ? MX_TIME_INFINITE
465 intptr_t result = NO_RETRY_EXPECTED( 527 : mx_deadline_after(MX_MSEC(millis)),
466 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis)); 528 reinterpret_cast<void*>(&pkt), 0);
467 ASSERT(EAGAIN == EWOULDBLOCK); 529 if (status == MX_ERR_TIMED_OUT) {
468 LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result); 530 handler_impl->HandleTimeout();
469 if (result < 0) { 531 } else if (status != MX_OK) {
470 if (errno != EWOULDBLOCK) { 532 FATAL1("mx_port_wait failed: %s\n", mx_status_get_string(status));
471 perror("Poll failed");
472 }
473 } else { 533 } else {
474 handler_impl->HandleTimeout(); 534 handler_impl->HandleTimeout();
475 handler_impl->HandleEvents(events, result); 535 handler_impl->HandlePacket(&pkt);
476 } 536 }
477 } 537 }
478 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0); 538 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
479 handler->NotifyShutdownDone(); 539 handler->NotifyShutdownDone();
480 } 540 }
481 541
482 542
483 void EventHandlerImplementation::Start(EventHandler* handler) { 543 void EventHandlerImplementation::Start(EventHandler* handler) {
484 int result = Thread::Start(&EventHandlerImplementation::Poll, 544 int result = Thread::Start(&EventHandlerImplementation::Poll,
485 reinterpret_cast<uword>(handler)); 545 reinterpret_cast<uword>(handler));
(...skipping 24 matching lines...) Expand all
510 // The hashmap does not support keys with value 0. 570 // The hashmap does not support keys with value 0.
511 return dart::Utils::WordHash(fd + 1); 571 return dart::Utils::WordHash(fd + 1);
512 } 572 }
513 573
514 } // namespace bin 574 } // namespace bin
515 } // namespace dart 575 } // namespace dart
516 576
517 #endif // defined(HOST_OS_FUCHSIA) 577 #endif // defined(HOST_OS_FUCHSIA)
518 578
519 #endif // !defined(DART_IO_DISABLED) 579 #endif // !defined(DART_IO_DISABLED)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698