Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(463)

Side by Side Diff: runtime/bin/eventhandler_linux.cc

Issue 169383003: Make event-handlers edge-triggered and move socket-state to Dart. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/bin/eventhandler_linux.h ('k') | runtime/bin/eventhandler_macos.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "platform/globals.h" 5 #include "platform/globals.h"
6 #if defined(TARGET_OS_LINUX) 6 #if defined(TARGET_OS_LINUX)
7 7
8 #include "bin/eventhandler.h" 8 #include "bin/eventhandler.h"
9 9
10 #include <errno.h> // NOLINT 10 #include <errno.h> // NOLINT
11 #include <pthread.h> // NOLINT 11 #include <pthread.h> // NOLINT
12 #include <stdio.h> // NOLINT 12 #include <stdio.h> // NOLINT
13 #include <string.h> // NOLINT 13 #include <string.h> // NOLINT
14 #include <sys/epoll.h> // NOLINT 14 #include <sys/epoll.h> // NOLINT
15 #include <sys/stat.h> // NOLINT 15 #include <sys/stat.h> // NOLINT
16 #include <sys/timerfd.h> // NOLINT 16 #include <sys/timerfd.h> // NOLINT
17 #include <unistd.h> // NOLINT 17 #include <unistd.h> // NOLINT
18 #include <fcntl.h> // NOLINT 18 #include <fcntl.h> // NOLINT
19 19
20 #include "bin/dartutils.h" 20 #include "bin/dartutils.h"
21 #include "bin/fdutils.h" 21 #include "bin/fdutils.h"
22 #include "bin/log.h" 22 #include "bin/log.h"
23 #include "bin/utils.h" 23 #include "bin/utils.h"
24 #include "platform/hashmap.h" 24 #include "platform/hashmap.h"
25 #include "platform/thread.h" 25 #include "platform/thread.h"
26 #include "platform/utils.h" 26 #include "platform/utils.h"
27 #include "vm/thread.h"
27 28
28 29
29 namespace dart { 30 namespace dart {
30 namespace bin { 31 namespace bin {
31 32
32 static const int kInterruptMessageSize = sizeof(InterruptMessage); 33 static const int kInterruptMessageSize = sizeof(InterruptMessage);
33 static const int kTimerId = -1; 34 static const int kTimerId = -1;
34 static const int kShutdownId = -2; 35 static const int kShutdownId = -2;
35 36
36 37
37 intptr_t SocketData::GetPollEvents() {
38 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
39 // triggered anyway.
40 intptr_t events = 0;
41 if (!IsClosedRead()) {
42 if ((mask_ & (1 << kInEvent)) != 0) {
43 events |= EPOLLIN;
44 }
45 }
46 if (!IsClosedWrite()) {
47 if ((mask_ & (1 << kOutEvent)) != 0) {
48 events |= EPOLLOUT;
49 }
50 }
51 return events;
52 }
53
54
55 // Unregister the file descriptor for a SocketData structure with epoll. 38 // Unregister the file descriptor for a SocketData structure with epoll.
56 static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) { 39 static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
57 if (sd->tracked_by_epoll()) { 40 if (!sd->tracked_by_epoll()) return;
58 int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, 41 int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
59 EPOLL_CTL_DEL, 42 EPOLL_CTL_DEL,
60 sd->fd(), 43 sd->fd(),
61 NULL)); 44 NULL));
62 if (status == -1) { 45 if (status == -1) {
63 FATAL("Failed unregistering events for file descriptor"); 46 FATAL("Failed unregistering events for file descriptor");
64 } 47 }
65 sd->set_tracked_by_epoll(false); 48 sd->set_tracked_by_epoll(false);
49 }
50
51
52 static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
53 ASSERT(!sd->tracked_by_epoll());
54 struct epoll_event event;
55 event.events = EPOLLET | EPOLLRDHUP;
56 if ((sd->mask() & (1 << kInEvent)) != 0) event.events |= EPOLLIN;
57 if ((sd->mask() & (1 << kOutEvent)) != 0) event.events |= EPOLLOUT;
58 event.data.ptr = sd;
59 int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
60 EPOLL_CTL_ADD,
61 sd->fd(),
62 &event));
63 if (status == -1) {
64 // Epoll does not accept the file descriptor. It could be due to
65 // already closed file descriptor, or unuspported devices, such
66 // as /dev/null. In such case, mark the file descriptor as closed,
67 // so dart will handle it accordingly.
68 DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
69 } else {
70 sd->set_tracked_by_epoll(true);
66 } 71 }
67 } 72 }
68 73
69
70 // Register the file descriptor for a SocketData structure with epoll
71 // if events are requested.
72 static void UpdateEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
73 struct epoll_event event;
74 event.events = sd->GetPollEvents();
75 event.data.ptr = sd;
76 if (sd->port() != 0 && event.events != 0) {
77 // Only report events once and wait for them to be re-enabled after the
78 // event has been handled by the Dart code.
79 event.events |= EPOLLONESHOT;
80 int status = 0;
81 if (sd->tracked_by_epoll()) {
82 status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
83 EPOLL_CTL_MOD,
84 sd->fd(),
85 &event));
86 } else {
87 status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
88 EPOLL_CTL_ADD,
89 sd->fd(),
90 &event));
91 sd->set_tracked_by_epoll(true);
92 }
93 if (status == -1) {
94 // Epoll does not accept the file descriptor. It could be due to
95 // already closed file descriptor, or unuspported devices, such
96 // as /dev/null. In such case, mark the file descriptor as closed,
97 // so dart will handle it accordingly.
98 sd->set_tracked_by_epoll(false);
99 sd->ShutdownRead();
100 sd->ShutdownWrite();
101 DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
102 }
103 }
104 }
105
106 74
107 EventHandlerImplementation::EventHandlerImplementation() 75 EventHandlerImplementation::EventHandlerImplementation()
108 : socket_map_(&HashMap::SamePointerValue, 16) { 76 : socket_map_(&HashMap::SamePointerValue, 16) {
109 intptr_t result; 77 intptr_t result;
110 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); 78 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_));
111 if (result != 0) { 79 if (result != 0) {
112 FATAL("Pipe creation failed"); 80 FATAL("Pipe creation failed");
113 } 81 }
114 FDUtils::SetNonBlocking(interrupt_fds_[0]); 82 FDUtils::SetNonBlocking(interrupt_fds_[0]);
115 FDUtils::SetCloseOnExec(interrupt_fds_[0]); 83 FDUtils::SetCloseOnExec(interrupt_fds_[0]);
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
153 121
154 122
155 EventHandlerImplementation::~EventHandlerImplementation() { 123 EventHandlerImplementation::~EventHandlerImplementation() {
156 TEMP_FAILURE_RETRY(close(epoll_fd_)); 124 TEMP_FAILURE_RETRY(close(epoll_fd_));
157 TEMP_FAILURE_RETRY(close(timer_fd_)); 125 TEMP_FAILURE_RETRY(close(timer_fd_));
158 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); 126 TEMP_FAILURE_RETRY(close(interrupt_fds_[0]));
159 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); 127 TEMP_FAILURE_RETRY(close(interrupt_fds_[1]));
160 } 128 }
161 129
162 130
163 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { 131 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd,
132 bool* is_new) {
164 ASSERT(fd >= 0); 133 ASSERT(fd >= 0);
165 HashMap::Entry* entry = socket_map_.Lookup( 134 HashMap::Entry* entry = socket_map_.Lookup(
166 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); 135 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
167 ASSERT(entry != NULL); 136 ASSERT(entry != NULL);
168 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); 137 SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
169 if (sd == NULL) { 138 if (sd == NULL) {
170 // If there is no data in the hash map for this file descriptor a 139 // If there is no data in the hash map for this file descriptor a
171 // new SocketData for the file descriptor is inserted. 140 // new SocketData for the file descriptor is inserted.
172 sd = new SocketData(fd); 141 sd = new SocketData(fd);
173 entry->value = sd; 142 entry->value = sd;
143 *is_new = true;
174 } 144 }
175 ASSERT(fd == sd->fd()); 145 ASSERT(fd == sd->fd());
176 return sd; 146 return sd;
177 } 147 }
178 148
179 149
180 void EventHandlerImplementation::WakeupHandler(intptr_t id, 150 void EventHandlerImplementation::WakeupHandler(intptr_t id,
181 Dart_Port dart_port, 151 Dart_Port dart_port,
182 int64_t data) { 152 int64_t data) {
183 InterruptMessage msg; 153 InterruptMessage msg;
(...skipping 27 matching lines...) Expand all
211 memset(&it, 0, sizeof(it)); 181 memset(&it, 0, sizeof(it));
212 if (timeout_queue_.HasTimeout()) { 182 if (timeout_queue_.HasTimeout()) {
213 int64_t millis = timeout_queue_.CurrentTimeout(); 183 int64_t millis = timeout_queue_.CurrentTimeout();
214 it.it_value.tv_sec = millis / 1000; 184 it.it_value.tv_sec = millis / 1000;
215 it.it_value.tv_nsec = (millis % 1000) * 1000000; 185 it.it_value.tv_nsec = (millis % 1000) * 1000000;
216 } 186 }
217 timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL); 187 timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL);
218 } else if (msg[i].id == kShutdownId) { 188 } else if (msg[i].id == kShutdownId) {
219 shutdown_ = true; 189 shutdown_ = true;
220 } else { 190 } else {
221 SocketData* sd = GetSocketData(msg[i].id); 191 bool is_new = false;
192 SocketData* sd = GetSocketData(msg[i].id, &is_new);
193 if (is_new) {
194 sd->SetPortAndMask(msg[i].dart_port, msg[i].data);
195 AddToEpollInstance(epoll_fd_, sd);
196 }
222 if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) { 197 if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) {
223 ASSERT(msg[i].data == (1 << kShutdownReadCommand)); 198 ASSERT(msg[i].data == (1 << kShutdownReadCommand));
224 // Close the socket for reading. 199 // Close the socket for reading.
225 sd->ShutdownRead(); 200 sd->ShutdownRead();
226 UpdateEpollInstance(epoll_fd_, sd);
227 } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) { 201 } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) {
228 ASSERT(msg[i].data == (1 << kShutdownWriteCommand)); 202 ASSERT(msg[i].data == (1 << kShutdownWriteCommand));
229 // Close the socket for writing. 203 // Close the socket for writing.
230 sd->ShutdownWrite(); 204 sd->ShutdownWrite();
231 UpdateEpollInstance(epoll_fd_, sd);
232 } else if ((msg[i].data & (1 << kCloseCommand)) != 0) { 205 } else if ((msg[i].data & (1 << kCloseCommand)) != 0) {
233 ASSERT(msg[i].data == (1 << kCloseCommand)); 206 ASSERT(msg[i].data == (1 << kCloseCommand));
234 // Close the socket and free system resources and move on to 207 // Close the socket and free system resources and move on to
235 // next message. 208 // next message.
236 RemoveFromEpollInstance(epoll_fd_, sd); 209 RemoveFromEpollInstance(epoll_fd_, sd);
237 intptr_t fd = sd->fd(); 210 intptr_t fd = sd->fd();
238 sd->Close(); 211 sd->Close();
239 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); 212 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
240 delete sd; 213 delete sd;
241 DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent); 214 DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
242 } else {
243 if ((msg[i].data & (1 << kInEvent)) != 0 && sd->IsClosedRead()) {
244 DartUtils::PostInt32(msg[i].dart_port, 1 << kCloseEvent);
245 } else {
246 // Setup events to wait for.
247 sd->SetPortAndMask(msg[i].dart_port, msg[i].data);
248 UpdateEpollInstance(epoll_fd_, sd);
249 }
250 } 215 }
251 } 216 }
252 } 217 }
253 } 218 }
254 219
255 #ifdef DEBUG_POLL 220 #ifdef DEBUG_POLL
256 static void PrintEventMask(intptr_t fd, intptr_t events) { 221 static void PrintEventMask(intptr_t fd, intptr_t events) {
257 Log::Print("%d ", fd); 222 Log::Print("%d ", fd);
258 if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN "); 223 if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN ");
259 if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI "); 224 if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI ");
(...skipping 17 matching lines...) Expand all
277 #ifdef DEBUG_POLL 242 #ifdef DEBUG_POLL
278 PrintEventMask(sd->fd(), events); 243 PrintEventMask(sd->fd(), events);
279 #endif 244 #endif
280 intptr_t event_mask = 0; 245 intptr_t event_mask = 0;
281 if (sd->IsListeningSocket()) { 246 if (sd->IsListeningSocket()) {
282 // For listening sockets the EPOLLIN event indicate that there are 247 // For listening sockets the EPOLLIN event indicate that there are
283 // connections ready for accept unless accompanied with one of the 248 // connections ready for accept unless accompanied with one of the
284 // other flags. 249 // other flags.
285 if ((events & EPOLLIN) != 0) { 250 if ((events & EPOLLIN) != 0) {
286 if ((events & EPOLLHUP) != 0) event_mask |= (1 << kCloseEvent); 251 if ((events & EPOLLHUP) != 0) event_mask |= (1 << kCloseEvent);
287 if ((events & EPOLLERR) != 0) event_mask |= (1 << kErrorEvent); 252 if ((events & EPOLLERR) != 0) {
253 event_mask |= (1 << kErrorEvent);
254 }
288 if (event_mask == 0) event_mask |= (1 << kInEvent); 255 if (event_mask == 0) event_mask |= (1 << kInEvent);
289 } 256 }
290 } else { 257 } else {
291 // Prioritize data events over close and error events. 258 // Prioritize data events over close and error events.
292 if ((events & (EPOLLIN | EPOLLHUP | EPOLLERR)) != 0) { 259 if ((events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP)) != 0) {
293 // If we have EPOLLIN and we have available bytes, report that. 260 // If we have EPOLLIN and we have available bytes, report that.
294 if ((events & EPOLLIN) && FDUtils::AvailableBytes(sd->fd()) != 0) { 261 if ((events & EPOLLIN) != 0) {
295 event_mask = (1 << kInEvent); 262 event_mask = (1 << kInEvent);
296 } else if ((events & EPOLLHUP) != 0) { 263 }
264 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
297 // If both EPOLLHUP and EPOLLERR are reported treat it as an 265 // If both EPOLLHUP and EPOLLERR are reported treat it as an
298 // error. 266 // error.
299 if ((events & EPOLLERR) != 0) { 267 if ((events & EPOLLERR) != 0) {
300 event_mask = (1 << kErrorEvent); 268 event_mask = (1 << kErrorEvent);
301 } else { 269 } else {
302 event_mask = (1 << kCloseEvent); 270 event_mask |= (1 << kCloseEvent);
303 } 271 }
304 sd->MarkClosedRead();
305 } else if ((events & EPOLLERR) != 0) { 272 } else if ((events & EPOLLERR) != 0) {
306 event_mask = (1 << kErrorEvent); 273 event_mask = (1 << kErrorEvent);
307 } else {
308 if (sd->IsPipe()) {
309 // When reading from stdin (either from a terminal or piped
310 // input) treat EPOLLIN with 0 available bytes as
311 // end-of-file.
312 if (sd->fd() == STDIN_FILENO) {
313 event_mask = (1 << kCloseEvent);
314 sd->MarkClosedRead();
315 }
316 } else {
317 // If EPOLLIN is set with no available data and no EPOLLHUP use
318 // recv to peek for whether the other end of the socket
319 // actually closed.
320 char buffer;
321 ssize_t bytesPeeked =
322 TEMP_FAILURE_RETRY(recv(sd->fd(), &buffer, 1, MSG_PEEK));
323 ASSERT(EAGAIN == EWOULDBLOCK);
324 if (bytesPeeked == 0) {
325 event_mask = (1 << kCloseEvent);
326 sd->MarkClosedRead();
327 } else if (errno != EWOULDBLOCK) {
328 const int kBufferSize = 1024;
329 char error_buf[kBufferSize];
330 Log::PrintErr("Error recv: %s\n",
331 strerror_r(errno, error_buf, kBufferSize));
332 }
333 }
334 } 274 }
335 } 275 }
336 276
337 if ((events & EPOLLOUT) != 0) { 277 if ((events & EPOLLOUT) != 0) {
338 if ((events & EPOLLERR) != 0) { 278 if ((events & EPOLLERR) != 0) {
339 event_mask = (1 << kErrorEvent); 279 if (!sd->IsPipe()) {
340 sd->MarkClosedWrite(); 280 event_mask = (1 << kErrorEvent);
281 }
341 } else { 282 } else {
342 event_mask |= (1 << kOutEvent); 283 event_mask |= (1 << kOutEvent);
343 } 284 }
344 } 285 }
345 } 286 }
346 287
347 return event_mask; 288 return event_mask;
348 } 289 }
349 290
350 291
351 void EventHandlerImplementation::HandleEvents(struct epoll_event* events, 292 void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
352 int size) { 293 int size) {
353 bool interrupt_seen = false; 294 bool interrupt_seen = false;
354 for (int i = 0; i < size; i++) { 295 for (int i = 0; i < size; i++) {
355 if (events[i].data.ptr == NULL) { 296 if (events[i].data.ptr == NULL) {
356 interrupt_seen = true; 297 interrupt_seen = true;
357 } else if (events[i].data.fd == timer_fd_) { 298 } else if (events[i].data.fd == timer_fd_) {
358 int64_t val; 299 int64_t val;
359 VOID_TEMP_FAILURE_RETRY(read(timer_fd_, &val, sizeof(val))); 300 VOID_TEMP_FAILURE_RETRY(read(timer_fd_, &val, sizeof(val)));
360 if (timeout_queue_.HasTimeout()) { 301 if (timeout_queue_.HasTimeout()) {
361 DartUtils::PostNull(timeout_queue_.CurrentPort()); 302 DartUtils::PostNull(timeout_queue_.CurrentPort());
362 timeout_queue_.RemoveCurrent(); 303 timeout_queue_.RemoveCurrent();
363 } 304 }
364 } else { 305 } else {
365 SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr); 306 SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr);
366 intptr_t event_mask = GetPollEvents(events[i].events, sd); 307 int32_t event_mask = GetPollEvents(events[i].events, sd);
367 if (event_mask == 0) { 308 if (event_mask != 0) {
368 // Event not handled, re-add to epoll.
369 UpdateEpollInstance(epoll_fd_, sd);
370 } else {
371 Dart_Port port = sd->port(); 309 Dart_Port port = sd->port();
372 ASSERT(port != 0); 310 ASSERT(port != 0);
373 DartUtils::PostInt32(port, event_mask); 311 DartUtils::PostInt32(port, event_mask);
374 } 312 }
375 } 313 }
376 } 314 }
377 if (interrupt_seen) { 315 if (interrupt_seen) {
378 // Handle after socket events, so we avoid closing a socket before we handle 316 // Handle after socket events, so we avoid closing a socket before we handle
379 // the current events. 317 // the current events.
380 HandleInterruptFd(); 318 HandleInterruptFd();
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
435 373
436 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { 374 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
437 // The hashmap does not support keys with value 0. 375 // The hashmap does not support keys with value 0.
438 return dart::Utils::WordHash(fd + 1); 376 return dart::Utils::WordHash(fd + 1);
439 } 377 }
440 378
441 } // namespace bin 379 } // namespace bin
442 } // namespace dart 380 } // namespace dart
443 381
444 #endif // defined(TARGET_OS_LINUX) 382 #endif // defined(TARGET_OS_LINUX)
OLDNEW
« no previous file with comments | « runtime/bin/eventhandler_linux.h ('k') | runtime/bin/eventhandler_macos.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698