Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(222)

Side by Side Diff: runtime/bin/eventhandler_fuchsia.cc

Issue 2515643004: Fuchsia: Partial implementation of dart:io sockets (Closed)
Patch Set: Formatting Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « runtime/bin/eventhandler_fuchsia.h ('k') | runtime/bin/fdutils.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #if !defined(DART_IO_DISABLED) 5 #if !defined(DART_IO_DISABLED)
6 6
7 #include "platform/globals.h" 7 #include "platform/globals.h"
8 #if defined(TARGET_OS_FUCHSIA) 8 #if defined(TARGET_OS_FUCHSIA)
9 9
10 #include "bin/eventhandler.h" 10 #include "bin/eventhandler.h"
11 #include "bin/eventhandler_fuchsia.h" 11 #include "bin/eventhandler_fuchsia.h"
12 12
13 #include <magenta/status.h> 13 #include <errno.h> // NOLINT
14 #include <magenta/syscalls.h> 14 #include <fcntl.h> // NOLINT
15 #include <pthread.h> // NOLINT
16 #include <stdio.h> // NOLINT
17 #include <string.h> // NOLINT
18 #include <sys/epoll.h> // NOLINT
19 #include <sys/stat.h> // NOLINT
20 #include <unistd.h> // NOLINT
15 21
22 #include "bin/fdutils.h"
23 #include "bin/lockers.h"
16 #include "bin/log.h" 24 #include "bin/log.h"
25 #include "bin/socket.h"
17 #include "bin/thread.h" 26 #include "bin/thread.h"
18 #include "bin/utils.h" 27 #include "bin/utils.h"
28 #include "platform/hashmap.h"
29 #include "platform/utils.h"
19 30
31 // #define EVENTHANDLER_LOGGING 1
20 #if defined(EVENTHANDLER_LOGGING) 32 #if defined(EVENTHANDLER_LOGGING)
21 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__) 33 #define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__)
22 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__) 34 #define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__)
23 #else 35 #else
24 #define LOG_ERR(msg, ...) 36 #define LOG_ERR(msg, ...)
25 #define LOG_INFO(msg, ...) 37 #define LOG_INFO(msg, ...)
26 #endif // defined(EVENTHANDLER_LOGGING) 38 #endif // defined(EVENTHANDLER_LOGGING)
27 39
28 namespace dart { 40 namespace dart {
29 namespace bin { 41 namespace bin {
30 42
31 MagentaWaitManyInfo::MagentaWaitManyInfo() 43 #if defined(EVENTHANDLER_LOGGING)
32 : capacity_(kInitialCapacity), size_(0) { 44 static void PrintEventMask(intptr_t fd, intptr_t events) {
33 descriptor_infos_ = static_cast<DescriptorInfo**>( 45 Log::PrintErr("%d ", fd);
34 malloc(kInitialCapacity * sizeof(*descriptor_infos_))); 46 if ((events & EPOLLIN) != 0) {
35 if (descriptor_infos_ == NULL) { 47 Log::PrintErr("EPOLLIN ");
36 FATAL("Failed to allocate descriptor_infos array"); 48 }
37 } 49 if ((events & EPOLLPRI) != 0) {
38 items_ = 50 Log::PrintErr("EPOLLPRI ");
39 static_cast<mx_wait_item_t*>(malloc(kInitialCapacity * sizeof(*items_))); 51 }
40 if (items_ == NULL) { 52 if ((events & EPOLLOUT) != 0) {
41 FATAL("Failed to allocate items array"); 53 Log::PrintErr("EPOLLOUT ");
42 } 54 }
43 } 55 if ((events & EPOLLERR) != 0) {
44 56 Log::PrintErr("EPOLLERR ");
45 57 }
46 MagentaWaitManyInfo::~MagentaWaitManyInfo() { 58 if ((events & EPOLLHUP) != 0) {
47 free(descriptor_infos_); 59 Log::PrintErr("EPOLLHUP ");
48 free(items_); 60 }
49 } 61 if ((events & EPOLLRDHUP) != 0) {
50 62 Log::PrintErr("EPOLLRDHUP ");
51 63 }
52 void MagentaWaitManyInfo::AddHandle(mx_handle_t handle, 64 int all_events =
53 mx_signals_t signals, 65 EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
54 DescriptorInfo* di) { 66 if ((events & ~all_events) != 0) {
55 #if defined(DEBUG) 67 Log::PrintErr("(and %08x) ", events & ~all_events);
56 // Check that the handle is not already in the list. 68 }
57 for (intptr_t i = 0; i < size_; i++) { 69
58 if (items_[i].handle == handle) { 70 Log::PrintErr("\n");
59 FATAL("The handle is already in the list!"); 71 }
72 #endif
73
74
75 intptr_t DescriptorInfo::GetPollEvents() {
76 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
77 // triggered anyway.
78 intptr_t events = 0;
79 if ((Mask() & (1 << kInEvent)) != 0) {
80 events |= EPOLLIN;
81 }
82 if ((Mask() & (1 << kOutEvent)) != 0) {
83 events |= EPOLLOUT;
84 }
85 return events;
86 }
87
88
89 // Unregister the file descriptor for a DescriptorInfo structure with
90 // epoll.
91 static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
92 LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd());
93 VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL));
94 }
95
96
97 static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
98 struct epoll_event event;
99 event.events = EPOLLRDHUP | di->GetPollEvents();
100 if (!di->IsListeningSocket()) {
101 event.events |= EPOLLET;
102 }
103 event.data.ptr = di;
104 LOG_INFO("AddToEpollInstance: fd = %ld\n", di->fd());
105 int status =
106 NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event));
107 LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", di->fd(), status);
108 #if defined(EVENTHANDLER_LOGGING)
109 PrintEventMask(di->fd(), event.events);
110 #endif
111 if (status == -1) {
112 // TODO(dart:io): Verify that the dart end is handling this correctly.
113
114 // Epoll does not accept the file descriptor. It could be due to
115 // already closed file descriptor, or unuspported devices, such
116 // as /dev/null. In such case, mark the file descriptor as closed,
117 // so dart will handle it accordingly.
118 di->NotifyAllDartPorts(1 << kCloseEvent);
119 }
120 }
121
122
123 EventHandlerImplementation::EventHandlerImplementation()
124 : socket_map_(&HashMap::SamePointerValue, 16) {
125 intptr_t result;
126 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
127 if (result != 0) {
128 FATAL("Pipe creation failed");
129 }
130 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
131 FATAL("Failed to set pipe fd non blocking\n");
132 }
133 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
134 FATAL("Failed to set pipe fd close on exec\n");
135 }
136 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
137 FATAL("Failed to set pipe fd close on exec\n");
138 }
139 shutdown_ = false;
140 // The initial size passed to epoll_create is ignore on newer (>=
141 // 2.6.8) Linux versions
142 static const int kEpollInitialSize = 64;
143 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize));
144 if (epoll_fd_ == -1) {
145 FATAL1("Failed creating epoll file descriptor: %i", errno);
146 }
147 if (!FDUtils::SetCloseOnExec(epoll_fd_)) {
148 FATAL("Failed to set epoll fd close on exec\n");
149 }
150 // Register the interrupt_fd with the epoll instance.
151 struct epoll_event event;
152 event.events = EPOLLIN;
153 event.data.ptr = NULL;
154 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", epoll_fd_);
155 int status = NO_RETRY_EXPECTED(
156 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event));
157 LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n",
158 epoll_fd_, status);
159 if (status == -1) {
160 FATAL("Failed adding interrupt fd to epoll instance");
161 }
162 }
163
164
165 static void DeleteDescriptorInfo(void* info) {
166 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
167 di->Close();
168 LOG_INFO("Closed %d\n", di->fd());
169 delete di;
170 }
171
172
173 EventHandlerImplementation::~EventHandlerImplementation() {
174 socket_map_.Clear(DeleteDescriptorInfo);
175 VOID_NO_RETRY_EXPECTED(close(epoll_fd_));
176 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0]));
177 VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1]));
178 }
179
180
181 void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,
182 DescriptorInfo* di) {
183 intptr_t new_mask = di->Mask();
184 LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", di->fd(), old_mask,
185 new_mask);
186 if ((old_mask != 0) && (new_mask == 0)) {
187 RemoveFromEpollInstance(epoll_fd_, di);
188 } else if ((old_mask == 0) && (new_mask != 0)) {
189 AddToEpollInstance(epoll_fd_, di);
190 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) {
191 ASSERT(!di->IsListeningSocket());
192 RemoveFromEpollInstance(epoll_fd_, di);
193 AddToEpollInstance(epoll_fd_, di);
194 }
195 }
196
197
198 DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
199 intptr_t fd,
200 bool is_listening) {
201 ASSERT(fd >= 0);
202 HashMap::Entry* entry = socket_map_.Lookup(GetHashmapKeyFromFd(fd),
203 GetHashmapHashFromFd(fd), true);
204 ASSERT(entry != NULL);
205 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
206 if (di == NULL) {
207 // If there is no data in the hash map for this file descriptor a
208 // new DescriptorInfo for the file descriptor is inserted.
209 if (is_listening) {
210 di = new DescriptorInfoMultiple(fd);
211 } else {
212 di = new DescriptorInfoSingle(fd);
60 } 213 }
61 } 214 entry->value = di;
62 #endif 215 }
63 intptr_t new_size = size_ + 1; 216 ASSERT(fd == di->fd());
64 GrowArraysIfNeeded(new_size); 217 return di;
65 descriptor_infos_[size_] = di; 218 }
66 items_[size_].handle = handle; 219
67 items_[size_].waitfor = signals; 220
68 items_[size_].pending = 0; 221 static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) {
69 size_ = new_size; 222 size_t remaining = count;
70 LOG_INFO("AddHandle(%ld, %ld, %p), size = %ld\n", handle, signals, di, size_); 223 char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer));
71 } 224 while (remaining > 0) {
72 225 ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining));
73 226 if (bytes_written == 0) {
74 void MagentaWaitManyInfo::RemoveHandle(mx_handle_t handle) { 227 return count - remaining;
75 intptr_t idx; 228 } else if (bytes_written == -1) {
76 for (idx = 1; idx < size_; idx++) { 229 ASSERT(EAGAIN == EWOULDBLOCK);
77 if (handle == items_[idx].handle) { 230 // Error code EWOULDBLOCK should only happen for non blocking
78 break; 231 // file descriptors.
232 ASSERT(errno != EWOULDBLOCK);
233 return -1;
234 } else {
235 ASSERT(bytes_written > 0);
236 remaining -= bytes_written;
237 buffer_pos += bytes_written;
79 } 238 }
80 } 239 }
81 if (idx == size_) { 240 return count;
82 FATAL("Handle is not in the list!"); 241 }
83 } 242
84 243
85 if (idx != (size_ - 1)) {
86 descriptor_infos_[idx] = descriptor_infos_[size_ - 1];
87 items_[idx] = items_[size_ - 1];
88 }
89 descriptor_infos_[size_ - 1] = NULL;
90 items_[size_ - 1] = {MX_HANDLE_INVALID, 0, 0};
91 size_ = size_ - 1;
92 LOG_INFO("RemoveHandle(%ld), size = %ld\n", handle, size_);
93 }
94
95
96 void MagentaWaitManyInfo::GrowArraysIfNeeded(intptr_t desired_size) {
97 if (desired_size < capacity_) {
98 return;
99 }
100 intptr_t new_capacity = desired_size + (desired_size >> 1);
101 descriptor_infos_ = static_cast<DescriptorInfo**>(
102 realloc(descriptor_infos_, new_capacity * sizeof(*descriptor_infos_)));
103 if (descriptor_infos_ == NULL) {
104 FATAL("Failed to grow descriptor_infos array");
105 }
106 items_ = static_cast<mx_wait_item_t*>(
107 realloc(items_, new_capacity * sizeof(*items_)));
108 if (items_ == NULL) {
109 FATAL("Failed to grow items array");
110 }
111 capacity_ = new_capacity;
112 LOG_INFO("GrowArraysIfNeeded(%ld), capacity = %ld\n", desired_size,
113 capacity_);
114 }
115
116
117 EventHandlerImplementation::EventHandlerImplementation() {
118 mx_status_t status =
119 mx_channel_create(0, &interrupt_handles_[0], &interrupt_handles_[1]);
120 if (status != NO_ERROR) {
121 FATAL1("mx_channel_create failed: %s\n", mx_status_get_string(status));
122 }
123 shutdown_ = false;
124 info_.AddHandle(interrupt_handles_[0],
125 MX_SIGNAL_READABLE | MX_SIGNAL_PEER_CLOSED, NULL);
126 LOG_INFO("EventHandlerImplementation initialized\n");
127 }
128
129
130 EventHandlerImplementation::~EventHandlerImplementation() {
131 mx_status_t status = mx_handle_close(interrupt_handles_[0]);
132 if (status != NO_ERROR) {
133 FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status));
134 }
135 status = mx_handle_close(interrupt_handles_[1]);
136 if (status != NO_ERROR) {
137 FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status));
138 }
139 LOG_INFO("EventHandlerImplementation destroyed\n");
140 }
141
142
143 void EventHandlerImplementation::WakeupHandler(intptr_t id, 244 void EventHandlerImplementation::WakeupHandler(intptr_t id,
144 Dart_Port dart_port, 245 Dart_Port dart_port,
145 int64_t data) { 246 int64_t data) {
146 InterruptMessage msg; 247 InterruptMessage msg;
147 msg.id = id; 248 msg.id = id;
148 msg.dart_port = dart_port; 249 msg.dart_port = dart_port;
149 msg.data = data; 250 msg.data = data;
150 251 // WriteToBlocking will write up to 512 bytes atomically, and since our msg
151 mx_status_t status = 252 // is smaller than 512, we don't need a thread lock.
152 mx_channel_write(interrupt_handles_[1], 0, &msg, sizeof(msg), NULL, 0); 253 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
153 if (status != NO_ERROR) { 254 ASSERT(kInterruptMessageSize < PIPE_BUF);
154 FATAL1("mx_channel_write failed: %s\n", mx_status_get_string(status)); 255 intptr_t result =
256 WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
257 if (result != kInterruptMessageSize) {
258 if (result == -1) {
259 perror("Interrupt message failure:");
260 }
261 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
155 } 262 }
156 LOG_INFO("WakeupHandler(%ld, %ld, %lld)\n", id, dart_port, data);
157 } 263 }
158 264
159 265
160 void EventHandlerImplementation::HandleInterruptFd() { 266 void EventHandlerImplementation::HandleInterruptFd() {
161 LOG_INFO("HandleInterruptFd entry\n"); 267 const intptr_t MAX_MESSAGES = kInterruptMessageSize;
162 InterruptMessage msg; 268 InterruptMessage msg[MAX_MESSAGES];
163 uint32_t bytes = kInterruptMessageSize; 269 ssize_t bytes = NO_RETRY_EXPECTED(
164 mx_status_t status; 270 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
165 while (true) { 271 LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes);
166 status = mx_channel_read(interrupt_handles_[0], 0, &msg, bytes, &bytes, 272 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
167 NULL, 0, NULL); 273 if (msg[i].id == kTimerId) {
168 if (status != NO_ERROR) {
169 break;
170 }
171 ASSERT(bytes == kInterruptMessageSize);
172 if (msg.id == kTimerId) {
173 LOG_INFO("HandleInterruptFd read timer update\n"); 274 LOG_INFO("HandleInterruptFd read timer update\n");
174 timeout_queue_.UpdateTimeout(msg.dart_port, msg.data); 275 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
175 } else if (msg.id == kShutdownId) { 276 } else if (msg[i].id == kShutdownId) {
176 LOG_INFO("HandleInterruptFd read shutdown\n"); 277 LOG_INFO("HandleInterruptFd read shutdown\n");
177 shutdown_ = true; 278 shutdown_ = true;
178 } else { 279 } else {
179 // TODO(zra): Handle commands to add and remove handles from the 280 ASSERT((msg[i].data & COMMAND_MASK) != 0);
180 // MagentaWaitManyInfo. 281 LOG_INFO("HandleInterruptFd command\n");
181 UNIMPLEMENTED(); 282 DescriptorInfo* di =
283 GetDescriptorInfo(msg[i].id, IS_LISTENING_SOCKET(msg[i].data));
284 if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
285 ASSERT(!di->IsListeningSocket());
286 // Close the socket for reading.
287 LOG_INFO("\tSHUT_RD: %d\n", di->fd());
288 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
289 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
290 ASSERT(!di->IsListeningSocket());
291 // Close the socket for writing.
292 LOG_INFO("\tSHUT_WR: %d\n", di->fd());
293 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
294 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
295 // Close the socket and free system resources and move on to next
296 // message.
297 intptr_t old_mask = di->Mask();
298 Dart_Port port = msg[i].dart_port;
299 di->RemovePort(port);
300 intptr_t new_mask = di->Mask();
301 UpdateEpollInstance(old_mask, di);
302
303 LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask);
304 intptr_t fd = di->fd();
305 if (di->IsListeningSocket()) {
306 // We only close the socket file descriptor from the operating
307 // system if there are no other dart socket objects which
308 // are listening on the same (address, port) combination.
309 ListeningSocketRegistry* registry =
310 ListeningSocketRegistry::Instance();
311
312 MutexLocker locker(registry->mutex());
313
314 if (registry->CloseSafe(fd)) {
315 ASSERT(new_mask == 0);
316 socket_map_.Remove(GetHashmapKeyFromFd(fd),
317 GetHashmapHashFromFd(fd));
318 di->Close();
319 LOG_INFO("Closed %d\n", di->fd());
320 delete di;
321 }
322 } else {
323 ASSERT(new_mask == 0);
324 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
325 di->Close();
326 LOG_INFO("Closed %d\n", di->fd());
327 delete di;
328 }
329
330 DartUtils::PostInt32(port, 1 << kDestroyedEvent);
331 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
332 int count = TOKEN_COUNT(msg[i].data);
333 intptr_t old_mask = di->Mask();
334 LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask);
335 di->ReturnTokens(msg[i].dart_port, count);
336 UpdateEpollInstance(old_mask, di);
337 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
338 // `events` can only have kInEvent/kOutEvent flags set.
339 intptr_t events = msg[i].data & EVENT_MASK;
340 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
341
342 intptr_t old_mask = di->Mask();
343 LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", di->fd(), old_mask,
344 msg[i].data & EVENT_MASK);
345 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
346 UpdateEpollInstance(old_mask, di);
347 } else {
348 UNREACHABLE();
349 }
182 } 350 }
183 } 351 }
184 // status == ERR_SHOULD_WAIT when we try to read and there are no messages
185 // available, so it is an error if we get here and status != ERR_SHOULD_WAIT.
186 if (status != ERR_SHOULD_WAIT) {
187 FATAL1("mx_channel_read failed: %s\n", mx_status_get_string(status));
188 }
189 LOG_INFO("HandleInterruptFd exit\n"); 352 LOG_INFO("HandleInterruptFd exit\n");
190 } 353 }
191 354
192 355
193 void EventHandlerImplementation::HandleEvents() { 356 intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
194 LOG_INFO("HandleEvents entry\n"); 357 DescriptorInfo* di) {
195 for (intptr_t i = 1; i < info_.size(); i++) { 358 #ifdef EVENTHANDLER_LOGGING
196 const mx_wait_item_t& wait_item = info_.items()[i]; 359 PrintEventMask(di->fd(), events);
197 if (wait_item.pending & wait_item.waitfor) { 360 #endif
198 // Only the control handle has no descriptor info. 361 if ((events & EPOLLERR) != 0) {
199 ASSERT(info_.descriptor_infos()[i] != NULL); 362 // Return error only if EPOLLIN is present.
200 ASSERT(wait_item.handle != interrupt_handles_[0]); 363 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
201 // TODO(zra): Handle events on other handles. At the moment we are 364 }
202 // only interrupted when there is a message on interrupt_handles_[0]. 365 intptr_t event_mask = 0;
203 UNIMPLEMENTED(); 366 if ((events & EPOLLIN) != 0) {
367 event_mask |= (1 << kInEvent);
368 }
369 if ((events & EPOLLOUT) != 0) {
370 event_mask |= (1 << kOutEvent);
371 }
372 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
373 event_mask |= (1 << kCloseEvent);
374 }
375 return event_mask;
376 }
377
378
379 void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
380 int size) {
381 bool interrupt_seen = false;
382 for (int i = 0; i < size; i++) {
383 if (events[i].data.ptr == NULL) {
384 interrupt_seen = true;
385 } else {
386 DescriptorInfo* di =
387 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
388 intptr_t event_mask = GetPollEvents(events[i].events, di);
389
390 if ((event_mask & (1 << kErrorEvent)) != 0) {
391 di->NotifyAllDartPorts(event_mask);
392 }
393 event_mask &= ~(1 << kErrorEvent);
394
395 LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask);
396 if (event_mask != 0) {
397 intptr_t old_mask = di->Mask();
398 Dart_Port port = di->NextNotifyDartPort(event_mask);
399 ASSERT(port != 0);
400 UpdateEpollInstance(old_mask, di);
401 LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", event_mask,
402 port, di->fd());
403 bool success = DartUtils::PostInt32(port, event_mask);
404 if (!success) {
405 // This can happen if e.g. the isolate that owns the port has died
406 // for some reason.
407 FATAL2("Failed to post event for fd %ld to port %ld", di->fd(), port);
408 }
409 }
204 } 410 }
205 } 411 }
206 412 if (interrupt_seen) {
207 if ((info_.items()[0].pending & MX_SIGNAL_PEER_CLOSED) != 0) { 413 // Handle after socket events, so we avoid closing a socket before we handle
208 FATAL("EventHandlerImplementation::Poll: Unexpected peer closed\n"); 414 // the current events.
209 }
210 if ((info_.items()[0].pending & MX_SIGNAL_READABLE) != 0) {
211 LOG_INFO("HandleEvents interrupt_handles_[0] readable\n");
212 HandleInterruptFd(); 415 HandleInterruptFd();
213 } else {
214 LOG_INFO("HandleEvents interrupt_handles_[0] not readable\n");
215 } 416 }
216 } 417 }
217 418
218 419
219 int64_t EventHandlerImplementation::GetTimeout() const { 420 int64_t EventHandlerImplementation::GetTimeout() const {
220 if (!timeout_queue_.HasTimeout()) { 421 if (!timeout_queue_.HasTimeout()) {
221 return kInfinityTimeout; 422 return kInfinityTimeout;
222 } 423 }
223 int64_t millis = 424 int64_t millis =
224 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); 425 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
225 return (millis < 0) ? 0 : millis; 426 return (millis < 0) ? 0 : millis;
226 } 427 }
227 428
228 429
229 void EventHandlerImplementation::HandleTimeout() { 430 void EventHandlerImplementation::HandleTimeout() {
230 if (timeout_queue_.HasTimeout()) { 431 if (timeout_queue_.HasTimeout()) {
231 int64_t millis = timeout_queue_.CurrentTimeout() - 432 int64_t millis = timeout_queue_.CurrentTimeout() -
232 TimerUtils::GetCurrentMonotonicMillis(); 433 TimerUtils::GetCurrentMonotonicMillis();
233 if (millis <= 0) { 434 if (millis <= 0) {
234 DartUtils::PostNull(timeout_queue_.CurrentPort()); 435 DartUtils::PostNull(timeout_queue_.CurrentPort());
235 timeout_queue_.RemoveCurrent(); 436 timeout_queue_.RemoveCurrent();
236 } 437 }
237 } 438 }
238 } 439 }
239 440
240 441
241 void EventHandlerImplementation::Poll(uword args) { 442 void EventHandlerImplementation::Poll(uword args) {
443 static const intptr_t kMaxEvents = 16;
444 struct epoll_event events[kMaxEvents];
242 EventHandler* handler = reinterpret_cast<EventHandler*>(args); 445 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
243 EventHandlerImplementation* handler_impl = &handler->delegate_; 446 EventHandlerImplementation* handler_impl = &handler->delegate_;
244 ASSERT(handler_impl != NULL); 447 ASSERT(handler_impl != NULL);
245 448
246 while (!handler_impl->shutdown_) { 449 while (!handler_impl->shutdown_) {
247 int64_t millis = handler_impl->GetTimeout(); 450 int64_t millis = handler_impl->GetTimeout();
248 ASSERT((millis == kInfinityTimeout) || (millis >= 0)); 451 ASSERT((millis == kInfinityTimeout) || (millis >= 0));
249 mx_time_t timeout = 452 LOG_INFO("epoll_wait(millis = %ld)\n", millis);
250 millis * kMicrosecondsPerMillisecond * kNanosecondsPerMicrosecond; 453 intptr_t result = NO_RETRY_EXPECTED(
251 const MagentaWaitManyInfo& info = handler_impl->info(); 454 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis));
252 LOG_INFO("mx_handle_wait_many(%p, %ld, %lld)\n", info.items(), info.size(), 455 ASSERT(EAGAIN == EWOULDBLOCK);
253 timeout); 456 LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result);
254 mx_status_t status = 457 if (result < 0) {
255 mx_handle_wait_many(info.items(), info.size(), timeout); 458 if (errno != EWOULDBLOCK) {
256 if ((status != NO_ERROR) && (status != ERR_TIMED_OUT)) { 459 perror("Poll failed");
257 FATAL1("mx_handle_wait_many failed: %s\n", mx_status_get_string(status)); 460 }
258 } else { 461 } else {
259 LOG_INFO("mx_handle_wait_many returned: %ld\n", status);
260 handler_impl->HandleTimeout(); 462 handler_impl->HandleTimeout();
261 handler_impl->HandleEvents(); 463 handler_impl->HandleEvents(events, result);
262 } 464 }
263 } 465 }
264 handler->NotifyShutdownDone(); 466 handler->NotifyShutdownDone();
265 LOG_INFO("EventHandlerImplementation notifying about shutdown\n");
266 } 467 }
267 468
268 469
269 void EventHandlerImplementation::Start(EventHandler* handler) { 470 void EventHandlerImplementation::Start(EventHandler* handler) {
270 int result = Thread::Start(&EventHandlerImplementation::Poll, 471 int result = Thread::Start(&EventHandlerImplementation::Poll,
271 reinterpret_cast<uword>(handler)); 472 reinterpret_cast<uword>(handler));
272 if (result != 0) { 473 if (result != 0) {
273 FATAL1("Failed to start event handler thread %d", result); 474 FATAL1("Failed to start event handler thread %d", result);
274 } 475 }
275 } 476 }
276 477
277 478
278 void EventHandlerImplementation::Shutdown() { 479 void EventHandlerImplementation::Shutdown() {
279 SendData(kShutdownId, 0, 0); 480 SendData(kShutdownId, 0, 0);
280 } 481 }
281 482
282 483
283 void EventHandlerImplementation::SendData(intptr_t id, 484 void EventHandlerImplementation::SendData(intptr_t id,
284 Dart_Port dart_port, 485 Dart_Port dart_port,
285 int64_t data) { 486 int64_t data) {
286 WakeupHandler(id, dart_port, data); 487 WakeupHandler(id, dart_port, data);
287 } 488 }
288 489
490 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
491 // The hashmap does not support keys with value 0.
492 return reinterpret_cast<void*>(fd + 1);
493 }
494
495
496 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
497 // The hashmap does not support keys with value 0.
498 return dart::Utils::WordHash(fd + 1);
499 }
500
289 } // namespace bin 501 } // namespace bin
290 } // namespace dart 502 } // namespace dart
291 503
292 #endif // defined(TARGET_OS_FUCHSIA) 504 #endif // defined(TARGET_OS_FUCHSIA)
293 505
294 #endif // !defined(DART_IO_DISABLED) 506 #endif // !defined(DART_IO_DISABLED)
OLDNEW
« no previous file with comments | « runtime/bin/eventhandler_fuchsia.h ('k') | runtime/bin/fdutils.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698