OLD | NEW |
---|---|
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #if !defined(DART_IO_DISABLED) | 5 #if !defined(DART_IO_DISABLED) |
6 | 6 |
7 #include "platform/globals.h" | 7 #include "platform/globals.h" |
8 #if defined(TARGET_OS_FUCHSIA) | 8 #if defined(TARGET_OS_FUCHSIA) |
9 | 9 |
10 #include "bin/eventhandler.h" | 10 #include "bin/eventhandler.h" |
11 #include "bin/eventhandler_fuchsia.h" | 11 #include "bin/eventhandler_fuchsia.h" |
12 | 12 |
13 #include <magenta/status.h> | 13 #include <errno.h> // NOLINT |
14 #include <magenta/syscalls.h> | 14 #include <fcntl.h> // NOLINT |
15 #include <pthread.h> // NOLINT | |
16 #include <stdio.h> // NOLINT | |
17 #include <string.h> // NOLINT | |
18 #include <sys/epoll.h> // NOLINT | |
19 #include <sys/stat.h> // NOLINT | |
20 #include <unistd.h> // NOLINT | |
15 | 21 |
22 #include "bin/fdutils.h" | |
23 #include "bin/lockers.h" | |
16 #include "bin/log.h" | 24 #include "bin/log.h" |
25 #include "bin/socket.h" | |
17 #include "bin/thread.h" | 26 #include "bin/thread.h" |
18 #include "bin/utils.h" | 27 #include "bin/utils.h" |
28 #include "platform/hashmap.h" | |
29 #include "platform/utils.h" | |
19 | 30 |
31 // #define EVENTHANDLER_LOGGING 1 | |
20 #if defined(EVENTHANDLER_LOGGING) | 32 #if defined(EVENTHANDLER_LOGGING) |
21 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) | 33 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) |
22 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) | 34 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) |
23 #else | 35 #else |
24 #define LOG_ERR(msg, ...) | 36 #define LOG_ERR(msg, ...) |
25 #define LOG_INFO(msg, ...) | 37 #define LOG_INFO(msg, ...) |
26 #endif // defined(EVENTHANDLER_LOGGING) | 38 #endif // defined(EVENTHANDLER_LOGGING) |
27 | 39 |
28 namespace dart { | 40 namespace dart { |
29 namespace bin { | 41 namespace bin { |
30 | 42 |
31 MagentaWaitManyInfo::MagentaWaitManyInfo() | 43 #if defined(EVENTHANDLER_LOGGING) |
32 : capacity_(kInitialCapacity), size_(0) { | 44 static void PrintEventMask(intptr_t fd, intptr_t events) { |
33 descriptor_infos_ = static_cast<DescriptorInfo**>( | 45 Log::PrintErr("%d ", fd); |
34 malloc(kInitialCapacity * sizeof(*descriptor_infos_))); | 46 if ((events & EPOLLIN) != 0) { |
35 if (descriptor_infos_ == NULL) { | 47 Log::PrintErr("EPOLLIN "); |
36 FATAL("Failed to allocate descriptor_infos array"); | 48 } |
37 } | 49 if ((events & EPOLLPRI) != 0) { |
38 items_ = | 50 Log::PrintErr("EPOLLPRI "); |
39 static_cast<mx_wait_item_t*>(malloc(kInitialCapacity * sizeof(*items_))); | 51 } |
40 if (items_ == NULL) { | 52 if ((events & EPOLLOUT) != 0) { |
41 FATAL("Failed to allocate items array"); | 53 Log::PrintErr("EPOLLOUT "); |
42 } | 54 } |
43 } | 55 if ((events & EPOLLERR) != 0) { |
44 | 56 Log::PrintErr("EPOLLERR "); |
45 | 57 } |
46 MagentaWaitManyInfo::~MagentaWaitManyInfo() { | 58 if ((events & EPOLLHUP) != 0) { |
47 free(descriptor_infos_); | 59 Log::PrintErr("EPOLLHUP "); |
48 free(items_); | 60 } |
49 } | 61 if ((events & EPOLLRDHUP) != 0) { |
50 | 62 Log::PrintErr("EPOLLRDHUP "); |
51 | 63 } |
52 void MagentaWaitManyInfo::AddHandle(mx_handle_t handle, | 64 int all_events = EPOLLIN | EPOLLPRI | EPOLLOUT | |
53 mx_signals_t signals, | 65 EPOLLERR | EPOLLHUP | EPOLLRDHUP; |
66 if ((events & ~all_events) != 0) { | |
67 Log::PrintErr("(and %08x) ", events & ~all_events); | |
68 } | |
69 | |
70 Log::PrintErr("\n"); | |
71 } | |
72 #endif | |
73 | |
74 | |
75 intptr_t DescriptorInfo::GetPollEvents() { | |
76 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are | |
77 // triggered anyway. | |
78 intptr_t events = 0; | |
79 if ((Mask() & (1 << kInEvent)) != 0) { | |
80 events |= EPOLLIN; | |
81 } | |
82 if ((Mask() & (1 << kOutEvent)) != 0) { | |
83 events |= EPOLLOUT; | |
84 } | |
85 return events; | |
86 } | |
87 | |
88 | |
89 // Unregister the file descriptor for a DescriptorInfo structure with | |
90 // epoll. | |
91 static void RemoveFromEpollInstance(intptr_t epoll_fd_, | |
54 DescriptorInfo* di) { | 92 DescriptorInfo* di) { |
55 #if defined(DEBUG) | 93 LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd()); |
56 // Check that the handle is not already in the list. | 94 VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, |
57 for (intptr_t i = 0; i < size_; i++) { | 95 EPOLL_CTL_DEL, |
58 if (items_[i].handle == handle) { | 96 di->fd(), |
59 FATAL("The handle is already in the list!"); | 97 NULL)); |
98 } | |
99 | |
100 | |
101 static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) { | |
102 struct epoll_event event; | |
103 event.events = EPOLLRDHUP | di->GetPollEvents(); | |
104 if (!di->IsListeningSocket()) { | |
105 event.events |= EPOLLET; | |
106 } | |
107 event.data.ptr = di; | |
108 LOG_INFO("AddToEpollInstance: fd = %ld\n", | |
109 di->fd()); | |
110 int status = NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, | |
111 EPOLL_CTL_ADD, | |
112 di->fd(), | |
113 &event)); | |
114 LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", | |
115 di->fd(), status); | |
116 #if defined(EVENTHANDLER_LOGGING) | |
117 PrintEventMask(di->fd(), event.events); | |
118 #endif | |
119 if (status == -1) { | |
120 // TODO(dart:io): Verify that the dart end is handling this correctly. | |
121 | |
122 // Epoll does not accept the file descriptor. It could be due to | |
123 // already closed file descriptor, or unuspported devices, such | |
124 // as /dev/null. In such case, mark the file descriptor as closed, | |
125 // so dart will handle it accordingly. | |
126 di->NotifyAllDartPorts(1 << kCloseEvent); | |
127 } | |
128 } | |
129 | |
130 | |
131 EventHandlerImplementation::EventHandlerImplementation() | |
132 : socket_map_(&HashMap::SamePointerValue, 16) { | |
133 intptr_t result; | |
134 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_)); | |
135 if (result != 0) { | |
136 FATAL("Pipe creation failed"); | |
137 } | |
138 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) { | |
139 FATAL("Failed to set pipe fd non blocking\n"); | |
140 } | |
141 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) { | |
142 FATAL("Failed to set pipe fd close on exec\n"); | |
143 } | |
144 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) { | |
145 FATAL("Failed to set pipe fd close on exec\n"); | |
146 } | |
147 shutdown_ = false; | |
148 // The initial size passed to epoll_create is ignore on newer (>= | |
149 // 2.6.8) Linux versions | |
150 static const int kEpollInitialSize = 64; | |
151 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize)); | |
152 if (epoll_fd_ == -1) { | |
153 FATAL1("Failed creating epoll file descriptor: %i", errno); | |
154 } | |
155 if (!FDUtils::SetCloseOnExec(epoll_fd_)) { | |
156 FATAL("Failed to set epoll fd close on exec\n"); | |
157 } | |
158 // Register the interrupt_fd with the epoll instance. | |
159 struct epoll_event event; | |
160 event.events = EPOLLIN; | |
161 event.data.ptr = NULL; | |
162 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", | |
163 epoll_fd_); | |
164 int status = NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, | |
165 EPOLL_CTL_ADD, | |
166 interrupt_fds_[0], | |
167 &event)); | |
168 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n", | |
169 epoll_fd_, status); | |
170 if (status == -1) { | |
171 FATAL("Failed adding interrupt fd to epoll instance"); | |
172 } | |
siva
2016/11/21 02:22:41
timer_fd is not being registered here, do we not h
zra
2016/11/21 05:56:24
AFAIK there are no timerfds in Fuchsia. Timers are
| |
173 } | |
174 | |
175 | |
176 static void DeleteDescriptorInfo(void* info) { | |
177 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info); | |
178 di->Close(); | |
179 LOG_INFO("Closed %d\n", di->fd()); | |
180 delete di; | |
181 } | |
182 | |
183 | |
184 EventHandlerImplementation::~EventHandlerImplementation() { | |
185 socket_map_.Clear(DeleteDescriptorInfo); | |
186 VOID_NO_RETRY_EXPECTED(close(epoll_fd_)); | |
siva
2016/11/21 02:22:41
Ditto comment about missing timer_fd_
zra
2016/11/21 05:56:24
Acknowledged.
| |
187 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0])); | |
188 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1])); | |
189 } | |
190 | |
191 | |
192 void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask, | |
193 DescriptorInfo *di) { | |
194 intptr_t new_mask = di->Mask(); | |
195 LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", | |
196 di->fd(), old_mask, new_mask); | |
197 if ((old_mask != 0) && (new_mask == 0)) { | |
198 RemoveFromEpollInstance(epoll_fd_, di); | |
199 } else if ((old_mask == 0) && (new_mask != 0)) { | |
200 AddToEpollInstance(epoll_fd_, di); | |
201 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) { | |
202 ASSERT(!di->IsListeningSocket()); | |
203 RemoveFromEpollInstance(epoll_fd_, di); | |
204 AddToEpollInstance(epoll_fd_, di); | |
205 } | |
206 } | |
207 | |
208 | |
209 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( | |
210 intptr_t fd, bool is_listening) { | |
211 ASSERT(fd >= 0); | |
212 HashMap::Entry* entry = socket_map_.Lookup( | |
213 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); | |
214 ASSERT(entry != NULL); | |
215 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value); | |
216 if (di == NULL) { | |
217 // If there is no data in the hash map for this file descriptor a | |
218 // new DescriptorInfo for the file descriptor is inserted. | |
219 if (is_listening) { | |
220 di = new DescriptorInfoMultiple(fd); | |
221 } else { | |
222 di = new DescriptorInfoSingle(fd); | |
60 } | 223 } |
61 } | 224 entry->value = di; |
62 #endif | 225 } |
63 intptr_t new_size = size_ + 1; | 226 ASSERT(fd == di->fd()); |
64 GrowArraysIfNeeded(new_size); | 227 return di; |
65 descriptor_infos_[size_] = di; | 228 } |
66 items_[size_].handle = handle; | 229 |
67 items_[size_].waitfor = signals; | 230 |
68 items_[size_].pending = 0; | 231 static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) { |
69 size_ = new_size; | 232 size_t remaining = count; |
70 LOG_INFO("AddHandle(%ld, %ld, %p), size = %ld\n", handle, signals, di, size_); | 233 char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer)); |
71 } | 234 while (remaining > 0) { |
72 | 235 ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining)); |
73 | 236 if (bytes_written == 0) { |
74 void MagentaWaitManyInfo::RemoveHandle(mx_handle_t handle) { | 237 return count - remaining; |
75 intptr_t idx; | 238 } else if (bytes_written == -1) { |
76 for (idx = 1; idx < size_; idx++) { | 239 ASSERT(EAGAIN == EWOULDBLOCK); |
77 if (handle == items_[idx].handle) { | 240 // Error code EWOULDBLOCK should only happen for non blocking |
78 break; | 241 // file descriptors. |
242 ASSERT(errno != EWOULDBLOCK); | |
243 return -1; | |
244 } else { | |
245 ASSERT(bytes_written > 0); | |
246 remaining -= bytes_written; | |
247 buffer_pos += bytes_written; | |
79 } | 248 } |
80 } | 249 } |
81 if (idx == size_) { | 250 return count; |
82 FATAL("Handle is not in the list!"); | 251 } |
83 } | 252 |
84 | 253 |
85 if (idx != (size_ - 1)) { | |
86 descriptor_infos_[idx] = descriptor_infos_[size_ - 1]; | |
87 items_[idx] = items_[size_ - 1]; | |
88 } | |
89 descriptor_infos_[size_ - 1] = NULL; | |
90 items_[size_ - 1] = {MX_HANDLE_INVALID, 0, 0}; | |
91 size_ = size_ - 1; | |
92 LOG_INFO("RemoveHandle(%ld), size = %ld\n", handle, size_); | |
93 } | |
94 | |
95 | |
96 void MagentaWaitManyInfo::GrowArraysIfNeeded(intptr_t desired_size) { | |
97 if (desired_size < capacity_) { | |
98 return; | |
99 } | |
100 intptr_t new_capacity = desired_size + (desired_size >> 1); | |
101 descriptor_infos_ = static_cast<DescriptorInfo**>( | |
102 realloc(descriptor_infos_, new_capacity * sizeof(*descriptor_infos_))); | |
103 if (descriptor_infos_ == NULL) { | |
104 FATAL("Failed to grow descriptor_infos array"); | |
105 } | |
106 items_ = static_cast<mx_wait_item_t*>( | |
107 realloc(items_, new_capacity * sizeof(*items_))); | |
108 if (items_ == NULL) { | |
109 FATAL("Failed to grow items array"); | |
110 } | |
111 capacity_ = new_capacity; | |
112 LOG_INFO("GrowArraysIfNeeded(%ld), capacity = %ld\n", desired_size, | |
113 capacity_); | |
114 } | |
115 | |
116 | |
117 EventHandlerImplementation::EventHandlerImplementation() { | |
118 mx_status_t status = | |
119 mx_channel_create(0, &interrupt_handles_[0], &interrupt_handles_[1]); | |
120 if (status != NO_ERROR) { | |
121 FATAL1("mx_channel_create failed: %s\n", mx_status_get_string(status)); | |
122 } | |
123 shutdown_ = false; | |
124 info_.AddHandle(interrupt_handles_[0], | |
125 MX_SIGNAL_READABLE | MX_SIGNAL_PEER_CLOSED, NULL); | |
126 LOG_INFO("EventHandlerImplementation initialized\n"); | |
127 } | |
128 | |
129 | |
130 EventHandlerImplementation::~EventHandlerImplementation() { | |
131 mx_status_t status = mx_handle_close(interrupt_handles_[0]); | |
132 if (status != NO_ERROR) { | |
133 FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status)); | |
134 } | |
135 status = mx_handle_close(interrupt_handles_[1]); | |
136 if (status != NO_ERROR) { | |
137 FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status)); | |
138 } | |
139 LOG_INFO("EventHandlerImplementation destroyed\n"); | |
140 } | |
141 | |
142 | |
143 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 254 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
144 Dart_Port dart_port, | 255 Dart_Port dart_port, |
145 int64_t data) { | 256 int64_t data) { |
146 InterruptMessage msg; | 257 InterruptMessage msg; |
147 msg.id = id; | 258 msg.id = id; |
148 msg.dart_port = dart_port; | 259 msg.dart_port = dart_port; |
149 msg.data = data; | 260 msg.data = data; |
150 | 261 // WriteToBlocking will write up to 512 bytes atomically, and since our msg |
151 mx_status_t status = | 262 // is smaller than 512, we don't need a thread lock. |
152 mx_channel_write(interrupt_handles_[1], 0, &msg, sizeof(msg), NULL, 0); | 263 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'. |
153 if (status != NO_ERROR) { | 264 ASSERT(kInterruptMessageSize < PIPE_BUF); |
154 FATAL1("mx_channel_write failed: %s\n", mx_status_get_string(status)); | 265 intptr_t result = |
266 WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); | |
267 if (result != kInterruptMessageSize) { | |
268 if (result == -1) { | |
269 perror("Interrupt message failure:"); | |
270 } | |
271 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result); | |
155 } | 272 } |
156 LOG_INFO("WakeupHandler(%ld, %ld, %lld)\n", id, dart_port, data); | |
157 } | 273 } |
158 | 274 |
159 | 275 |
160 void EventHandlerImplementation::HandleInterruptFd() { | 276 void EventHandlerImplementation::HandleInterruptFd() { |
161 LOG_INFO("HandleInterruptFd entry\n"); | 277 const intptr_t MAX_MESSAGES = kInterruptMessageSize; |
162 InterruptMessage msg; | 278 InterruptMessage msg[MAX_MESSAGES]; |
163 uint32_t bytes = kInterruptMessageSize; | 279 ssize_t bytes = NO_RETRY_EXPECTED( |
164 mx_status_t status; | 280 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); |
165 while (true) { | 281 LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes); |
166 status = mx_channel_read(interrupt_handles_[0], 0, &msg, bytes, &bytes, | 282 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { |
167 NULL, 0, NULL); | 283 if (msg[i].id == kTimerId) { |
168 if (status != NO_ERROR) { | |
169 break; | |
170 } | |
171 ASSERT(bytes == kInterruptMessageSize); | |
172 if (msg.id == kTimerId) { | |
173 LOG_INFO("HandleInterruptFd read timer update\n"); | 284 LOG_INFO("HandleInterruptFd read timer update\n"); |
174 timeout_queue_.UpdateTimeout(msg.dart_port, msg.data); | 285 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); |
175 } else if (msg.id == kShutdownId) { | 286 } else if (msg[i].id == kShutdownId) { |
176 LOG_INFO("HandleInterruptFd read shutdown\n"); | 287 LOG_INFO("HandleInterruptFd read shutdown\n"); |
177 shutdown_ = true; | 288 shutdown_ = true; |
178 } else { | 289 } else { |
179 // TODO(zra): Handle commands to add and remove handles from the | 290 ASSERT((msg[i].data & COMMAND_MASK) != 0); |
180 // MagentaWaitManyInfo. | 291 LOG_INFO("HandleInterruptFd command\n"); |
181 UNIMPLEMENTED(); | 292 DescriptorInfo* di = GetDescriptorInfo( |
293 msg[i].id, IS_LISTENING_SOCKET(msg[i].data)); | |
294 if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) { | |
295 ASSERT(!di->IsListeningSocket()); | |
296 // Close the socket for reading. | |
297 LOG_INFO("\tSHUT_RD: %d\n", di->fd()); | |
298 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD)); | |
299 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) { | |
300 ASSERT(!di->IsListeningSocket()); | |
301 // Close the socket for writing. | |
302 LOG_INFO("\tSHUT_WR: %d\n", di->fd()); | |
303 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR)); | |
304 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) { | |
305 // Close the socket and free system resources and move on to next | |
306 // message. | |
307 intptr_t old_mask = di->Mask(); | |
308 Dart_Port port = msg[i].dart_port; | |
309 di->RemovePort(port); | |
310 intptr_t new_mask = di->Mask(); | |
311 UpdateEpollInstance(old_mask, di); | |
312 | |
313 LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask); | |
314 intptr_t fd = di->fd(); | |
315 if (di->IsListeningSocket()) { | |
316 // We only close the socket file descriptor from the operating | |
317 // system if there are no other dart socket objects which | |
318 // are listening on the same (address, port) combination. | |
319 ListeningSocketRegistry *registry = | |
320 ListeningSocketRegistry::Instance(); | |
321 | |
322 MutexLocker locker(registry->mutex()); | |
323 | |
324 if (registry->CloseSafe(fd)) { | |
325 ASSERT(new_mask == 0); | |
326 socket_map_.Remove(GetHashmapKeyFromFd(fd), | |
327 GetHashmapHashFromFd(fd)); | |
328 di->Close(); | |
329 LOG_INFO("Closed %d\n", di->fd()); | |
330 delete di; | |
331 } | |
332 } else { | |
333 ASSERT(new_mask == 0); | |
334 socket_map_.Remove( | |
335 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); | |
336 di->Close(); | |
337 LOG_INFO("Closed %d\n", di->fd()); | |
338 delete di; | |
339 } | |
340 | |
341 DartUtils::PostInt32(port, 1 << kDestroyedEvent); | |
342 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) { | |
343 int count = TOKEN_COUNT(msg[i].data); | |
344 intptr_t old_mask = di->Mask(); | |
345 LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask); | |
346 di->ReturnTokens(msg[i].dart_port, count); | |
347 UpdateEpollInstance(old_mask, di); | |
348 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) { | |
349 // `events` can only have kInEvent/kOutEvent flags set. | |
350 intptr_t events = msg[i].data & EVENT_MASK; | |
351 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); | |
352 | |
353 intptr_t old_mask = di->Mask(); | |
354 LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", | |
355 di->fd(), old_mask, msg[i].data & EVENT_MASK); | |
356 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK); | |
357 UpdateEpollInstance(old_mask, di); | |
358 } else { | |
359 UNREACHABLE(); | |
360 } | |
182 } | 361 } |
183 } | 362 } |
184 // status == ERR_SHOULD_WAIT when we try to read and there are no messages | |
185 // available, so it is an error if we get here and status != ERR_SHOULD_WAIT. | |
186 if (status != ERR_SHOULD_WAIT) { | |
187 FATAL1("mx_channel_read failed: %s\n", mx_status_get_string(status)); | |
188 } | |
189 LOG_INFO("HandleInterruptFd exit\n"); | 363 LOG_INFO("HandleInterruptFd exit\n"); |
190 } | 364 } |
191 | 365 |
192 | 366 |
193 void EventHandlerImplementation::HandleEvents() { | 367 intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, |
194 LOG_INFO("HandleEvents entry\n"); | 368 DescriptorInfo* di) { |
195 for (intptr_t i = 1; i < info_.size(); i++) { | 369 #ifdef EVENTHANDLER_LOGGING |
196 const mx_wait_item_t& wait_item = info_.items()[i]; | 370 PrintEventMask(di->fd(), events); |
197 if (wait_item.pending & wait_item.waitfor) { | 371 #endif |
198 // Only the control handle has no descriptor info. | 372 if ((events & EPOLLERR) != 0) { |
199 ASSERT(info_.descriptor_infos()[i] != NULL); | 373 // Return error only if EPOLLIN is present. |
200 ASSERT(wait_item.handle != interrupt_handles_[0]); | 374 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0; |
201 // TODO(zra): Handle events on other handles. At the moment we are | 375 } |
202 // only interrupted when there is a message on interrupt_handles_[0]. | 376 intptr_t event_mask = 0; |
203 UNIMPLEMENTED(); | 377 if ((events & EPOLLIN) != 0) { |
378 event_mask |= (1 << kInEvent); | |
379 } | |
380 if ((events & EPOLLOUT) != 0) { | |
381 event_mask |= (1 << kOutEvent); | |
382 } | |
383 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) { | |
384 event_mask |= (1 << kCloseEvent); | |
385 } | |
386 return event_mask; | |
387 } | |
388 | |
389 | |
390 void EventHandlerImplementation::HandleEvents(struct epoll_event* events, | |
391 int size) { | |
392 bool interrupt_seen = false; | |
393 for (int i = 0; i < size; i++) { | |
394 if (events[i].data.ptr == NULL) { | |
395 interrupt_seen = true; | |
396 } else { | |
397 DescriptorInfo* di = | |
398 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr); | |
399 intptr_t event_mask = GetPollEvents(events[i].events, di); | |
400 | |
401 if ((event_mask & (1 << kErrorEvent)) != 0) { | |
402 di->NotifyAllDartPorts(event_mask); | |
403 } | |
404 event_mask &= ~(1 << kErrorEvent); | |
405 | |
406 LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask); | |
407 if (event_mask != 0) { | |
408 intptr_t old_mask = di->Mask(); | |
409 Dart_Port port = di->NextNotifyDartPort(event_mask); | |
410 ASSERT(port != 0); | |
411 UpdateEpollInstance(old_mask, di); | |
412 LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", | |
413 event_mask, port, di->fd()); | |
414 bool success = DartUtils::PostInt32(port, event_mask); | |
415 if (!success) { | |
416 // This can happen if e.g. the isolate that owns the port has died | |
417 // for some reason. | |
418 FATAL2("Failed to post event for fd %ld to port %ld", di->fd(), port); | |
419 } | |
420 } | |
204 } | 421 } |
205 } | 422 } |
206 | 423 if (interrupt_seen) { |
207 if ((info_.items()[0].pending & MX_SIGNAL_PEER_CLOSED) != 0) { | 424 // Handle after socket events, so we avoid closing a socket before we handle |
208 FATAL("EventHandlerImplementation::Poll: Unexpected peer closed\n"); | 425 // the current events. |
209 } | |
210 if ((info_.items()[0].pending & MX_SIGNAL_READABLE) != 0) { | |
211 LOG_INFO("HandleEvents interrupt_handles_[0] readable\n"); | |
212 HandleInterruptFd(); | 426 HandleInterruptFd(); |
213 } else { | |
214 LOG_INFO("HandleEvents interrupt_handles_[0] not readable\n"); | |
215 } | 427 } |
216 } | 428 } |
217 | 429 |
218 | 430 |
219 int64_t EventHandlerImplementation::GetTimeout() const { | 431 int64_t EventHandlerImplementation::GetTimeout() const { |
220 if (!timeout_queue_.HasTimeout()) { | 432 if (!timeout_queue_.HasTimeout()) { |
221 return kInfinityTimeout; | 433 return kInfinityTimeout; |
222 } | 434 } |
223 int64_t millis = | 435 int64_t millis = timeout_queue_.CurrentTimeout() - |
224 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); | 436 TimerUtils::GetCurrentMonotonicMillis(); |
225 return (millis < 0) ? 0 : millis; | 437 return (millis < 0) ? 0 : millis; |
226 } | 438 } |
227 | 439 |
228 | 440 |
229 void EventHandlerImplementation::HandleTimeout() { | 441 void EventHandlerImplementation::HandleTimeout() { |
230 if (timeout_queue_.HasTimeout()) { | 442 if (timeout_queue_.HasTimeout()) { |
231 int64_t millis = timeout_queue_.CurrentTimeout() - | 443 int64_t millis = timeout_queue_.CurrentTimeout() - |
232 TimerUtils::GetCurrentMonotonicMillis(); | 444 TimerUtils::GetCurrentMonotonicMillis(); |
233 if (millis <= 0) { | 445 if (millis <= 0) { |
234 DartUtils::PostNull(timeout_queue_.CurrentPort()); | 446 DartUtils::PostNull(timeout_queue_.CurrentPort()); |
235 timeout_queue_.RemoveCurrent(); | 447 timeout_queue_.RemoveCurrent(); |
236 } | 448 } |
237 } | 449 } |
238 } | 450 } |
239 | 451 |
240 | 452 |
241 void EventHandlerImplementation::Poll(uword args) { | 453 void EventHandlerImplementation::Poll(uword args) { |
454 static const intptr_t kMaxEvents = 16; | |
455 struct epoll_event events[kMaxEvents]; | |
242 EventHandler* handler = reinterpret_cast<EventHandler*>(args); | 456 EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
243 EventHandlerImplementation* handler_impl = &handler->delegate_; | 457 EventHandlerImplementation* handler_impl = &handler->delegate_; |
244 ASSERT(handler_impl != NULL); | 458 ASSERT(handler_impl != NULL); |
245 | 459 |
246 while (!handler_impl->shutdown_) { | 460 while (!handler_impl->shutdown_) { |
247 int64_t millis = handler_impl->GetTimeout(); | 461 int64_t millis = handler_impl->GetTimeout(); |
248 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); | 462 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); |
249 mx_time_t timeout = | 463 LOG_INFO("epoll_wait(millis = %ld)\n", millis); |
250 millis * kMicrosecondsPerMillisecond * kNanosecondsPerMicrosecond; | 464 intptr_t result = NO_RETRY_EXPECTED( |
251 const MagentaWaitManyInfo& info = handler_impl->info(); | 465 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis)); |
252 LOG_INFO("mx_handle_wait_many(%p, %ld, %lld)\n", info.items(), info.size(), | 466 ASSERT(EAGAIN == EWOULDBLOCK); |
253 timeout); | 467 LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result); |
254 mx_status_t status = | 468 if (result < 0) { |
255 mx_handle_wait_many(info.items(), info.size(), timeout); | 469 if (errno != EWOULDBLOCK) { |
256 if ((status != NO_ERROR) && (status != ERR_TIMED_OUT)) { | 470 perror("Poll failed"); |
257 FATAL1("mx_handle_wait_many failed: %s\n", mx_status_get_string(status)); | 471 } |
258 } else { | 472 } else { |
259 LOG_INFO("mx_handle_wait_many returned: %ld\n", status); | |
260 handler_impl->HandleTimeout(); | 473 handler_impl->HandleTimeout(); |
261 handler_impl->HandleEvents(); | 474 handler_impl->HandleEvents(events, result); |
262 } | 475 } |
263 } | 476 } |
264 handler->NotifyShutdownDone(); | 477 handler->NotifyShutdownDone(); |
265 LOG_INFO("EventHandlerImplementation notifying about shutdown\n"); | |
266 } | 478 } |
267 | 479 |
268 | 480 |
269 void EventHandlerImplementation::Start(EventHandler* handler) { | 481 void EventHandlerImplementation::Start(EventHandler* handler) { |
270 int result = Thread::Start(&EventHandlerImplementation::Poll, | 482 int result = Thread::Start(&EventHandlerImplementation::Poll, |
271 reinterpret_cast<uword>(handler)); | 483 reinterpret_cast<uword>(handler)); |
272 if (result != 0) { | 484 if (result != 0) { |
273 FATAL1("Failed to start event handler thread %d", result); | 485 FATAL1("Failed to start event handler thread %d", result); |
274 } | 486 } |
275 } | 487 } |
276 | 488 |
277 | 489 |
278 void EventHandlerImplementation::Shutdown() { | 490 void EventHandlerImplementation::Shutdown() { |
279 SendData(kShutdownId, 0, 0); | 491 SendData(kShutdownId, 0, 0); |
280 } | 492 } |
281 | 493 |
282 | 494 |
283 void EventHandlerImplementation::SendData(intptr_t id, | 495 void EventHandlerImplementation::SendData(intptr_t id, |
284 Dart_Port dart_port, | 496 Dart_Port dart_port, |
285 int64_t data) { | 497 int64_t data) { |
286 WakeupHandler(id, dart_port, data); | 498 WakeupHandler(id, dart_port, data); |
287 } | 499 } |
288 | 500 |
501 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { | |
502 // The hashmap does not support keys with value 0. | |
503 return reinterpret_cast<void*>(fd + 1); | |
504 } | |
505 | |
506 | |
507 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { | |
508 // The hashmap does not support keys with value 0. | |
509 return dart::Utils::WordHash(fd + 1); | |
510 } | |
511 | |
289 } // namespace bin | 512 } // namespace bin |
290 } // namespace dart | 513 } // namespace dart |
291 | 514 |
292 #endif // defined(TARGET_OS_FUCHSIA) | 515 #endif // defined(TARGET_OS_FUCHSIA) |
293 | 516 |
294 #endif // !defined(DART_IO_DISABLED) | 517 #endif // !defined(DART_IO_DISABLED) |
OLD | NEW |