Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(63)

Side by Side Diff: runtime/bin/eventhandler_fuchsia.cc

Issue 2515643004: Fuchsia: Partial implementation of dart:io sockets (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #if !defined(DART_IO_DISABLED) 5 #if !defined(DART_IO_DISABLED)
6 6
7 #include "platform/globals.h" 7 #include "platform/globals.h"
8 #if defined(TARGET_OS_FUCHSIA) 8 #if defined(TARGET_OS_FUCHSIA)
9 9
10 #include "bin/eventhandler.h" 10 #include "bin/eventhandler.h"
11 #include "bin/eventhandler_fuchsia.h" 11 #include "bin/eventhandler_fuchsia.h"
12 12
13 #include <magenta/status.h> 13 #include <errno.h> // NOLINT
14 #include <magenta/syscalls.h> 14 #include <fcntl.h> // NOLINT
15 #include <pthread.h> // NOLINT
16 #include <stdio.h> // NOLINT
17 #include <string.h> // NOLINT
18 #include <sys/epoll.h> // NOLINT
19 #include <sys/stat.h> // NOLINT
20 #include <unistd.h> // NOLINT
15 21
22 #include "bin/fdutils.h"
23 #include "bin/lockers.h"
16 #include "bin/log.h" 24 #include "bin/log.h"
25 #include "bin/socket.h"
17 #include "bin/thread.h" 26 #include "bin/thread.h"
18 #include "bin/utils.h" 27 #include "bin/utils.h"
28 #include "platform/hashmap.h"
29 #include "platform/utils.h"
19 30
31 // #define EVENTHANDLER_LOGGING 1
20 #if defined(EVENTHANDLER_LOGGING) 32 #if defined(EVENTHANDLER_LOGGING)
21 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) 33 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__)
22 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) 34 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__)
23 #else 35 #else
24 #define LOG_ERR(msg, ...) 36 #define LOG_ERR(msg, ...)
25 #define LOG_INFO(msg, ...) 37 #define LOG_INFO(msg, ...)
26 #endif // defined(EVENTHANDLER_LOGGING) 38 #endif // defined(EVENTHANDLER_LOGGING)
27 39
28 namespace dart { 40 namespace dart {
29 namespace bin { 41 namespace bin {
30 42
31 MagentaWaitManyInfo::MagentaWaitManyInfo() 43 #if defined(EVENTHANDLER_LOGGING)
32 : capacity_(kInitialCapacity), size_(0) { 44 static void PrintEventMask(intptr_t fd, intptr_t events) {
33 descriptor_infos_ = static_cast<DescriptorInfo**>( 45 Log::PrintErr("%d ", fd);
34 malloc(kInitialCapacity * sizeof(*descriptor_infos_))); 46 if ((events & EPOLLIN) != 0) {
35 if (descriptor_infos_ == NULL) { 47 Log::PrintErr("EPOLLIN ");
36 FATAL("Failed to allocate descriptor_infos array"); 48 }
37 } 49 if ((events & EPOLLPRI) != 0) {
38 items_ = 50 Log::PrintErr("EPOLLPRI ");
39 static_cast<mx_wait_item_t*>(malloc(kInitialCapacity * sizeof(*items_))); 51 }
40 if (items_ == NULL) { 52 if ((events & EPOLLOUT) != 0) {
41 FATAL("Failed to allocate items array"); 53 Log::PrintErr("EPOLLOUT ");
42 } 54 }
43 } 55 if ((events & EPOLLERR) != 0) {
44 56 Log::PrintErr("EPOLLERR ");
45 57 }
46 MagentaWaitManyInfo::~MagentaWaitManyInfo() { 58 if ((events & EPOLLHUP) != 0) {
47 free(descriptor_infos_); 59 Log::PrintErr("EPOLLHUP ");
48 free(items_); 60 }
49 } 61 if ((events & EPOLLRDHUP) != 0) {
50 62 Log::PrintErr("EPOLLRDHUP ");
51 63 }
52 void MagentaWaitManyInfo::AddHandle(mx_handle_t handle, 64 int all_events = EPOLLIN | EPOLLPRI | EPOLLOUT |
53 mx_signals_t signals, 65 EPOLLERR | EPOLLHUP | EPOLLRDHUP;
66 if ((events & ~all_events) != 0) {
67 Log::PrintErr("(and %08x) ", events & ~all_events);
68 }
69
70 Log::PrintErr("\n");
71 }
72 #endif
73
74
75 intptr_t DescriptorInfo::GetPollEvents() {
76 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
77 // triggered anyway.
78 intptr_t events = 0;
79 if ((Mask() & (1 << kInEvent)) != 0) {
80 events |= EPOLLIN;
81 }
82 if ((Mask() & (1 << kOutEvent)) != 0) {
83 events |= EPOLLOUT;
84 }
85 return events;
86 }
87
88
89 // Unregister the file descriptor for a DescriptorInfo structure with
90 // epoll.
91 static void RemoveFromEpollInstance(intptr_t epoll_fd_,
54 DescriptorInfo* di) { 92 DescriptorInfo* di) {
55 #if defined(DEBUG) 93 LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd());
56 // Check that the handle is not already in the list. 94 VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_,
57 for (intptr_t i = 0; i < size_; i++) { 95 EPOLL_CTL_DEL,
58 if (items_[i].handle == handle) { 96 di->fd(),
59 FATAL("The handle is already in the list!"); 97 NULL));
98 }
99
100
101 static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
102 struct epoll_event event;
103 event.events = EPOLLRDHUP | di->GetPollEvents();
104 if (!di->IsListeningSocket()) {
105 event.events |= EPOLLET;
106 }
107 event.data.ptr = di;
108 LOG_INFO("AddToEpollInstance: fd = %ld\n",
109 di->fd());
110 int status = NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_,
111 EPOLL_CTL_ADD,
112 di->fd(),
113 &event));
114 LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n",
115 di->fd(), status);
116 #if defined(EVENTHANDLER_LOGGING)
117 PrintEventMask(di->fd(), event.events);
118 #endif
119 if (status == -1) {
120 // TODO(dart:io): Verify that the dart end is handling this correctly.
121
122 // Epoll does not accept the file descriptor. It could be due to
123 // already closed file descriptor, or unuspported devices, such
124 // as /dev/null. In such case, mark the file descriptor as closed,
125 // so dart will handle it accordingly.
126 di->NotifyAllDartPorts(1 << kCloseEvent);
127 }
128 }
129
130
131 EventHandlerImplementation::EventHandlerImplementation()
132 : socket_map_(&HashMap::SamePointerValue, 16) {
133 intptr_t result;
134 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
135 if (result != 0) {
136 FATAL("Pipe creation failed");
137 }
138 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
139 FATAL("Failed to set pipe fd non blocking\n");
140 }
141 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
142 FATAL("Failed to set pipe fd close on exec\n");
143 }
144 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
145 FATAL("Failed to set pipe fd close on exec\n");
146 }
147 shutdown_ = false;
148 // The initial size passed to epoll_create is ignore on newer (>=
149 // 2.6.8) Linux versions
150 static const int kEpollInitialSize = 64;
151 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize));
152 if (epoll_fd_ == -1) {
153 FATAL1("Failed creating epoll file descriptor: %i", errno);
154 }
155 if (!FDUtils::SetCloseOnExec(epoll_fd_)) {
156 FATAL("Failed to set epoll fd close on exec\n");
157 }
158 // Register the interrupt_fd with the epoll instance.
159 struct epoll_event event;
160 event.events = EPOLLIN;
161 event.data.ptr = NULL;
162 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n",
163 epoll_fd_);
164 int status = NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_,
165 EPOLL_CTL_ADD,
166 interrupt_fds_[0],
167 &event));
168 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n",
169 epoll_fd_, status);
170 if (status == -1) {
171 FATAL("Failed adding interrupt fd to epoll instance");
172 }
siva 2016/11/21 02:22:41 timer_fd is not being registered here, do we not h
zra 2016/11/21 05:56:24 AFAIK there are no timerfds in Fuchsia. Timers are
173 }
174
175
176 static void DeleteDescriptorInfo(void* info) {
177 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
178 di->Close();
179 LOG_INFO("Closed %d\n", di->fd());
180 delete di;
181 }
182
183
184 EventHandlerImplementation::~EventHandlerImplementation() {
185 socket_map_.Clear(DeleteDescriptorInfo);
186 VOID_NO_RETRY_EXPECTED(close(epoll_fd_));
siva 2016/11/21 02:22:41 Ditto comment about missing timer_fd_
zra 2016/11/21 05:56:24 Acknowledged.
187 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0]));
188 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1]));
189 }
190
191
192 void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,
193 DescriptorInfo *di) {
194 intptr_t new_mask = di->Mask();
195 LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n",
196 di->fd(), old_mask, new_mask);
197 if ((old_mask != 0) && (new_mask == 0)) {
198 RemoveFromEpollInstance(epoll_fd_, di);
199 } else if ((old_mask == 0) && (new_mask != 0)) {
200 AddToEpollInstance(epoll_fd_, di);
201 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) {
202 ASSERT(!di->IsListeningSocket());
203 RemoveFromEpollInstance(epoll_fd_, di);
204 AddToEpollInstance(epoll_fd_, di);
205 }
206 }
207
208
209 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
210 intptr_t fd, bool is_listening) {
211 ASSERT(fd >= 0);
212 HashMap::Entry* entry = socket_map_.Lookup(
213 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
214 ASSERT(entry != NULL);
215 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
216 if (di == NULL) {
217 // If there is no data in the hash map for this file descriptor a
218 // new DescriptorInfo for the file descriptor is inserted.
219 if (is_listening) {
220 di = new DescriptorInfoMultiple(fd);
221 } else {
222 di = new DescriptorInfoSingle(fd);
60 } 223 }
61 } 224 entry->value = di;
62 #endif 225 }
63 intptr_t new_size = size_ + 1; 226 ASSERT(fd == di->fd());
64 GrowArraysIfNeeded(new_size); 227 return di;
65 descriptor_infos_[size_] = di; 228 }
66 items_[size_].handle = handle; 229
67 items_[size_].waitfor = signals; 230
68 items_[size_].pending = 0; 231 static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) {
69 size_ = new_size; 232 size_t remaining = count;
70 LOG_INFO("AddHandle(%ld, %ld, %p), size = %ld\n", handle, signals, di, size_); 233 char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer));
71 } 234 while (remaining > 0) {
72 235 ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining));
73 236 if (bytes_written == 0) {
74 void MagentaWaitManyInfo::RemoveHandle(mx_handle_t handle) { 237 return count - remaining;
75 intptr_t idx; 238 } else if (bytes_written == -1) {
76 for (idx = 1; idx < size_; idx++) { 239 ASSERT(EAGAIN == EWOULDBLOCK);
77 if (handle == items_[idx].handle) { 240 // Error code EWOULDBLOCK should only happen for non blocking
78 break; 241 // file descriptors.
242 ASSERT(errno != EWOULDBLOCK);
243 return -1;
244 } else {
245 ASSERT(bytes_written > 0);
246 remaining -= bytes_written;
247 buffer_pos += bytes_written;
79 } 248 }
80 } 249 }
81 if (idx == size_) { 250 return count;
82 FATAL("Handle is not in the list!"); 251 }
83 } 252
84 253
85 if (idx != (size_ - 1)) {
86 descriptor_infos_[idx] = descriptor_infos_[size_ - 1];
87 items_[idx] = items_[size_ - 1];
88 }
89 descriptor_infos_[size_ - 1] = NULL;
90 items_[size_ - 1] = {MX_HANDLE_INVALID, 0, 0};
91 size_ = size_ - 1;
92 LOG_INFO("RemoveHandle(%ld), size = %ld\n", handle, size_);
93 }
94
95
96 void MagentaWaitManyInfo::GrowArraysIfNeeded(intptr_t desired_size) {
97 if (desired_size < capacity_) {
98 return;
99 }
100 intptr_t new_capacity = desired_size + (desired_size >> 1);
101 descriptor_infos_ = static_cast<DescriptorInfo**>(
102 realloc(descriptor_infos_, new_capacity * sizeof(*descriptor_infos_)));
103 if (descriptor_infos_ == NULL) {
104 FATAL("Failed to grow descriptor_infos array");
105 }
106 items_ = static_cast<mx_wait_item_t*>(
107 realloc(items_, new_capacity * sizeof(*items_)));
108 if (items_ == NULL) {
109 FATAL("Failed to grow items array");
110 }
111 capacity_ = new_capacity;
112 LOG_INFO("GrowArraysIfNeeded(%ld), capacity = %ld\n", desired_size,
113 capacity_);
114 }
115
116
117 EventHandlerImplementation::EventHandlerImplementation() {
118 mx_status_t status =
119 mx_channel_create(0, &interrupt_handles_[0], &interrupt_handles_[1]);
120 if (status != NO_ERROR) {
121 FATAL1("mx_channel_create failed: %s\n", mx_status_get_string(status));
122 }
123 shutdown_ = false;
124 info_.AddHandle(interrupt_handles_[0],
125 MX_SIGNAL_READABLE | MX_SIGNAL_PEER_CLOSED, NULL);
126 LOG_INFO("EventHandlerImplementation initialized\n");
127 }
128
129
130 EventHandlerImplementation::~EventHandlerImplementation() {
131 mx_status_t status = mx_handle_close(interrupt_handles_[0]);
132 if (status != NO_ERROR) {
133 FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status));
134 }
135 status = mx_handle_close(interrupt_handles_[1]);
136 if (status != NO_ERROR) {
137 FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status));
138 }
139 LOG_INFO("EventHandlerImplementation destroyed\n");
140 }
141
142
143 void EventHandlerImplementation::WakeupHandler(intptr_t id, 254 void EventHandlerImplementation::WakeupHandler(intptr_t id,
144 Dart_Port dart_port, 255 Dart_Port dart_port,
145 int64_t data) { 256 int64_t data) {
146 InterruptMessage msg; 257 InterruptMessage msg;
147 msg.id = id; 258 msg.id = id;
148 msg.dart_port = dart_port; 259 msg.dart_port = dart_port;
149 msg.data = data; 260 msg.data = data;
150 261 // WriteToBlocking will write up to 512 bytes atomically, and since our msg
151 mx_status_t status = 262 // is smaller than 512, we don't need a thread lock.
152 mx_channel_write(interrupt_handles_[1], 0, &msg, sizeof(msg), NULL, 0); 263 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
153 if (status != NO_ERROR) { 264 ASSERT(kInterruptMessageSize < PIPE_BUF);
154 FATAL1("mx_channel_write failed: %s\n", mx_status_get_string(status)); 265 intptr_t result =
266 WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
267 if (result != kInterruptMessageSize) {
268 if (result == -1) {
269 perror("Interrupt message failure:");
270 }
271 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
155 } 272 }
156 LOG_INFO("WakeupHandler(%ld, %ld, %lld)\n", id, dart_port, data);
157 } 273 }
158 274
159 275
160 void EventHandlerImplementation::HandleInterruptFd() { 276 void EventHandlerImplementation::HandleInterruptFd() {
161 LOG_INFO("HandleInterruptFd entry\n"); 277 const intptr_t MAX_MESSAGES = kInterruptMessageSize;
162 InterruptMessage msg; 278 InterruptMessage msg[MAX_MESSAGES];
163 uint32_t bytes = kInterruptMessageSize; 279 ssize_t bytes = NO_RETRY_EXPECTED(
164 mx_status_t status; 280 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
165 while (true) { 281 LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes);
166 status = mx_channel_read(interrupt_handles_[0], 0, &msg, bytes, &bytes, 282 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
167 NULL, 0, NULL); 283 if (msg[i].id == kTimerId) {
168 if (status != NO_ERROR) {
169 break;
170 }
171 ASSERT(bytes == kInterruptMessageSize);
172 if (msg.id == kTimerId) {
173 LOG_INFO("HandleInterruptFd read timer update\n"); 284 LOG_INFO("HandleInterruptFd read timer update\n");
174 timeout_queue_.UpdateTimeout(msg.dart_port, msg.data); 285 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
175 } else if (msg.id == kShutdownId) { 286 } else if (msg[i].id == kShutdownId) {
176 LOG_INFO("HandleInterruptFd read shutdown\n"); 287 LOG_INFO("HandleInterruptFd read shutdown\n");
177 shutdown_ = true; 288 shutdown_ = true;
178 } else { 289 } else {
179 // TODO(zra): Handle commands to add and remove handles from the 290 ASSERT((msg[i].data & COMMAND_MASK) != 0);
180 // MagentaWaitManyInfo. 291 LOG_INFO("HandleInterruptFd command\n");
181 UNIMPLEMENTED(); 292 DescriptorInfo* di = GetDescriptorInfo(
293 msg[i].id, IS_LISTENING_SOCKET(msg[i].data));
294 if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
295 ASSERT(!di->IsListeningSocket());
296 // Close the socket for reading.
297 LOG_INFO("\tSHUT_RD: %d\n", di->fd());
298 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
299 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
300 ASSERT(!di->IsListeningSocket());
301 // Close the socket for writing.
302 LOG_INFO("\tSHUT_WR: %d\n", di->fd());
303 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
304 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
305 // Close the socket and free system resources and move on to next
306 // message.
307 intptr_t old_mask = di->Mask();
308 Dart_Port port = msg[i].dart_port;
309 di->RemovePort(port);
310 intptr_t new_mask = di->Mask();
311 UpdateEpollInstance(old_mask, di);
312
313 LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask);
314 intptr_t fd = di->fd();
315 if (di->IsListeningSocket()) {
316 // We only close the socket file descriptor from the operating
317 // system if there are no other dart socket objects which
318 // are listening on the same (address, port) combination.
319 ListeningSocketRegistry *registry =
320 ListeningSocketRegistry::Instance();
321
322 MutexLocker locker(registry->mutex());
323
324 if (registry->CloseSafe(fd)) {
325 ASSERT(new_mask == 0);
326 socket_map_.Remove(GetHashmapKeyFromFd(fd),
327 GetHashmapHashFromFd(fd));
328 di->Close();
329 LOG_INFO("Closed %d\n", di->fd());
330 delete di;
331 }
332 } else {
333 ASSERT(new_mask == 0);
334 socket_map_.Remove(
335 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
336 di->Close();
337 LOG_INFO("Closed %d\n", di->fd());
338 delete di;
339 }
340
341 DartUtils::PostInt32(port, 1 << kDestroyedEvent);
342 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
343 int count = TOKEN_COUNT(msg[i].data);
344 intptr_t old_mask = di->Mask();
345 LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask);
346 di->ReturnTokens(msg[i].dart_port, count);
347 UpdateEpollInstance(old_mask, di);
348 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
349 // `events` can only have kInEvent/kOutEvent flags set.
350 intptr_t events = msg[i].data & EVENT_MASK;
351 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
352
353 intptr_t old_mask = di->Mask();
354 LOG_INFO("\t Set Event Mask: %d: %lx %lx\n",
355 di->fd(), old_mask, msg[i].data & EVENT_MASK);
356 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
357 UpdateEpollInstance(old_mask, di);
358 } else {
359 UNREACHABLE();
360 }
182 } 361 }
183 } 362 }
184 // status == ERR_SHOULD_WAIT when we try to read and there are no messages
185 // available, so it is an error if we get here and status != ERR_SHOULD_WAIT.
186 if (status != ERR_SHOULD_WAIT) {
187 FATAL1("mx_channel_read failed: %s\n", mx_status_get_string(status));
188 }
189 LOG_INFO("HandleInterruptFd exit\n"); 363 LOG_INFO("HandleInterruptFd exit\n");
190 } 364 }
191 365
192 366
193 void EventHandlerImplementation::HandleEvents() { 367 intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
194 LOG_INFO("HandleEvents entry\n"); 368 DescriptorInfo* di) {
195 for (intptr_t i = 1; i < info_.size(); i++) { 369 #ifdef EVENTHANDLER_LOGGING
196 const mx_wait_item_t& wait_item = info_.items()[i]; 370 PrintEventMask(di->fd(), events);
197 if (wait_item.pending & wait_item.waitfor) { 371 #endif
198 // Only the control handle has no descriptor info. 372 if ((events & EPOLLERR) != 0) {
199 ASSERT(info_.descriptor_infos()[i] != NULL); 373 // Return error only if EPOLLIN is present.
200 ASSERT(wait_item.handle != interrupt_handles_[0]); 374 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
201 // TODO(zra): Handle events on other handles. At the moment we are 375 }
202 // only interrupted when there is a message on interrupt_handles_[0]. 376 intptr_t event_mask = 0;
203 UNIMPLEMENTED(); 377 if ((events & EPOLLIN) != 0) {
378 event_mask |= (1 << kInEvent);
379 }
380 if ((events & EPOLLOUT) != 0) {
381 event_mask |= (1 << kOutEvent);
382 }
383 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
384 event_mask |= (1 << kCloseEvent);
385 }
386 return event_mask;
387 }
388
389
390 void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
391 int size) {
392 bool interrupt_seen = false;
393 for (int i = 0; i < size; i++) {
394 if (events[i].data.ptr == NULL) {
395 interrupt_seen = true;
396 } else {
397 DescriptorInfo* di =
398 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
399 intptr_t event_mask = GetPollEvents(events[i].events, di);
400
401 if ((event_mask & (1 << kErrorEvent)) != 0) {
402 di->NotifyAllDartPorts(event_mask);
403 }
404 event_mask &= ~(1 << kErrorEvent);
405
406 LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask);
407 if (event_mask != 0) {
408 intptr_t old_mask = di->Mask();
409 Dart_Port port = di->NextNotifyDartPort(event_mask);
410 ASSERT(port != 0);
411 UpdateEpollInstance(old_mask, di);
412 LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n",
413 event_mask, port, di->fd());
414 bool success = DartUtils::PostInt32(port, event_mask);
415 if (!success) {
416 // This can happen if e.g. the isolate that owns the port has died
417 // for some reason.
418 FATAL2("Failed to post event for fd %ld to port %ld", di->fd(), port);
419 }
420 }
204 } 421 }
205 } 422 }
206 423 if (interrupt_seen) {
207 if ((info_.items()[0].pending & MX_SIGNAL_PEER_CLOSED) != 0) { 424 // Handle after socket events, so we avoid closing a socket before we handle
208 FATAL("EventHandlerImplementation::Poll: Unexpected peer closed\n"); 425 // the current events.
209 }
210 if ((info_.items()[0].pending & MX_SIGNAL_READABLE) != 0) {
211 LOG_INFO("HandleEvents interrupt_handles_[0] readable\n");
212 HandleInterruptFd(); 426 HandleInterruptFd();
213 } else {
214 LOG_INFO("HandleEvents interrupt_handles_[0] not readable\n");
215 } 427 }
216 } 428 }
217 429
218 430
219 int64_t EventHandlerImplementation::GetTimeout() const { 431 int64_t EventHandlerImplementation::GetTimeout() const {
220 if (!timeout_queue_.HasTimeout()) { 432 if (!timeout_queue_.HasTimeout()) {
221 return kInfinityTimeout; 433 return kInfinityTimeout;
222 } 434 }
223 int64_t millis = 435 int64_t millis = timeout_queue_.CurrentTimeout() -
224 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); 436 TimerUtils::GetCurrentMonotonicMillis();
225 return (millis < 0) ? 0 : millis; 437 return (millis < 0) ? 0 : millis;
226 } 438 }
227 439
228 440
229 void EventHandlerImplementation::HandleTimeout() { 441 void EventHandlerImplementation::HandleTimeout() {
230 if (timeout_queue_.HasTimeout()) { 442 if (timeout_queue_.HasTimeout()) {
231 int64_t millis = timeout_queue_.CurrentTimeout() - 443 int64_t millis = timeout_queue_.CurrentTimeout() -
232 TimerUtils::GetCurrentMonotonicMillis(); 444 TimerUtils::GetCurrentMonotonicMillis();
233 if (millis <= 0) { 445 if (millis <= 0) {
234 DartUtils::PostNull(timeout_queue_.CurrentPort()); 446 DartUtils::PostNull(timeout_queue_.CurrentPort());
235 timeout_queue_.RemoveCurrent(); 447 timeout_queue_.RemoveCurrent();
236 } 448 }
237 } 449 }
238 } 450 }
239 451
240 452
241 void EventHandlerImplementation::Poll(uword args) { 453 void EventHandlerImplementation::Poll(uword args) {
454 static const intptr_t kMaxEvents = 16;
455 struct epoll_event events[kMaxEvents];
242 EventHandler* handler = reinterpret_cast<EventHandler*>(args); 456 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
243 EventHandlerImplementation* handler_impl = &handler->delegate_; 457 EventHandlerImplementation* handler_impl = &handler->delegate_;
244 ASSERT(handler_impl != NULL); 458 ASSERT(handler_impl != NULL);
245 459
246 while (!handler_impl->shutdown_) { 460 while (!handler_impl->shutdown_) {
247 int64_t millis = handler_impl->GetTimeout(); 461 int64_t millis = handler_impl->GetTimeout();
248 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); 462 ASSERT((millis == kInfinityTimeout) || (millis >= 0));
249 mx_time_t timeout = 463 LOG_INFO("epoll_wait(millis = %ld)\n", millis);
250 millis * kMicrosecondsPerMillisecond * kNanosecondsPerMicrosecond; 464 intptr_t result = NO_RETRY_EXPECTED(
251 const MagentaWaitManyInfo& info = handler_impl->info(); 465 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis));
252 LOG_INFO("mx_handle_wait_many(%p, %ld, %lld)\n", info.items(), info.size(), 466 ASSERT(EAGAIN == EWOULDBLOCK);
253 timeout); 467 LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result);
254 mx_status_t status = 468 if (result < 0) {
255 mx_handle_wait_many(info.items(), info.size(), timeout); 469 if (errno != EWOULDBLOCK) {
256 if ((status != NO_ERROR) && (status != ERR_TIMED_OUT)) { 470 perror("Poll failed");
257 FATAL1("mx_handle_wait_many failed: %s\n", mx_status_get_string(status)); 471 }
258 } else { 472 } else {
259 LOG_INFO("mx_handle_wait_many returned: %ld\n", status);
260 handler_impl->HandleTimeout(); 473 handler_impl->HandleTimeout();
261 handler_impl->HandleEvents(); 474 handler_impl->HandleEvents(events, result);
262 } 475 }
263 } 476 }
264 handler->NotifyShutdownDone(); 477 handler->NotifyShutdownDone();
265 LOG_INFO("EventHandlerImplementation notifying about shutdown\n");
266 } 478 }
267 479
268 480
269 void EventHandlerImplementation::Start(EventHandler* handler) { 481 void EventHandlerImplementation::Start(EventHandler* handler) {
270 int result = Thread::Start(&EventHandlerImplementation::Poll, 482 int result = Thread::Start(&EventHandlerImplementation::Poll,
271 reinterpret_cast<uword>(handler)); 483 reinterpret_cast<uword>(handler));
272 if (result != 0) { 484 if (result != 0) {
273 FATAL1("Failed to start event handler thread %d", result); 485 FATAL1("Failed to start event handler thread %d", result);
274 } 486 }
275 } 487 }
276 488
277 489
278 void EventHandlerImplementation::Shutdown() { 490 void EventHandlerImplementation::Shutdown() {
279 SendData(kShutdownId, 0, 0); 491 SendData(kShutdownId, 0, 0);
280 } 492 }
281 493
282 494
283 void EventHandlerImplementation::SendData(intptr_t id, 495 void EventHandlerImplementation::SendData(intptr_t id,
284 Dart_Port dart_port, 496 Dart_Port dart_port,
285 int64_t data) { 497 int64_t data) {
286 WakeupHandler(id, dart_port, data); 498 WakeupHandler(id, dart_port, data);
287 } 499 }
288 500
501 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
502 // The hashmap does not support keys with value 0.
503 return reinterpret_cast<void*>(fd + 1);
504 }
505
506
507 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
508 // The hashmap does not support keys with value 0.
509 return dart::Utils::WordHash(fd + 1);
510 }
511
289 } // namespace bin 512 } // namespace bin
290 } // namespace dart 513 } // namespace dart
291 514
292 #endif // defined(TARGET_OS_FUCHSIA) 515 #endif // defined(TARGET_OS_FUCHSIA)
293 516
294 #endif // !defined(DART_IO_DISABLED) 517 #endif // !defined(DART_IO_DISABLED)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698