Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(280)

Side by Side Diff: runtime/bin/eventhandler_linux.cc

Issue 171503009: Remove SocketData and now only pass the dart port to epoll. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "platform/globals.h" 5 #include "platform/globals.h"
6 #if defined(TARGET_OS_LINUX) 6 #if defined(TARGET_OS_LINUX)
7 7
8 #include "bin/eventhandler.h" 8 #include "bin/eventhandler.h"
9 9
10 #include <errno.h> // NOLINT 10 #include <errno.h> // NOLINT
11 #include <pthread.h> // NOLINT 11 #include <pthread.h> // NOLINT
12 #include <stdio.h> // NOLINT 12 #include <stdio.h> // NOLINT
13 #include <string.h> // NOLINT 13 #include <string.h> // NOLINT
14 #include <sys/epoll.h> // NOLINT 14 #include <sys/epoll.h> // NOLINT
15 #include <sys/stat.h> // NOLINT 15 #include <sys/stat.h> // NOLINT
16 #include <sys/timerfd.h> // NOLINT 16 #include <sys/timerfd.h> // NOLINT
17 #include <unistd.h> // NOLINT 17 #include <unistd.h> // NOLINT
18 #include <fcntl.h> // NOLINT 18 #include <fcntl.h> // NOLINT
19 19
20 #include "bin/dartutils.h" 20 #include "bin/dartutils.h"
21 #include "bin/fdutils.h" 21 #include "bin/fdutils.h"
22 #include "bin/log.h" 22 #include "bin/log.h"
23 #include "bin/utils.h" 23 #include "bin/socket.h"
24 #include "platform/hashmap.h" 24 #include "platform/hashmap.h"
25 #include "platform/thread.h" 25 #include "platform/thread.h"
26 #include "platform/utils.h" 26 #include "platform/utils.h"
27 27
28 28
29 namespace dart { 29 namespace dart {
30 namespace bin { 30 namespace bin {
31 31
32 static const int kInterruptMessageSize = sizeof(InterruptMessage);
33 static const int kTimerId = -1; 32 static const int kTimerId = -1;
34 static const int kShutdownId = -2; 33
34 // MSB is used to mark the timer fd.
35 static const uint64_t TIMER_BIT = 0x8000000000000000;
35 36
36 37
37 // Unregister the file descriptor for a SocketData structure with epoll. 38 static void AddToEpollInstance(intptr_t epoll_fd_, int fd, Dart_Port port,
Søren Gjesse 2014/02/20 08:15:54 Move all arguments to the secone line for better r
Anders Johnsen 2014/02/20 08:56:21 Done.
38 static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) { 39 int mask) {
39 if (!sd->tracked_by_epoll()) return;
40 int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
41 EPOLL_CTL_DEL,
42 sd->fd(),
43 NULL));
44 if (status == -1) {
45 FATAL("Failed unregistering events for file descriptor");
46 }
47 sd->set_tracked_by_epoll(false);
48 }
49
50
51 static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd, int mask) {
52 ASSERT(!sd->tracked_by_epoll());
53 struct epoll_event event; 40 struct epoll_event event;
54 event.events = EPOLLET | EPOLLRDHUP; 41 event.events = EPOLLET | EPOLLRDHUP;
55 if ((mask & (1 << kInEvent)) != 0) event.events |= EPOLLIN; 42 if ((mask & (1 << kInEvent)) != 0) event.events |= EPOLLIN;
56 if ((mask & (1 << kOutEvent)) != 0) event.events |= EPOLLOUT; 43 if ((mask & (1 << kOutEvent)) != 0) event.events |= EPOLLOUT;
57 event.data.ptr = sd; 44 // Be sure we don't collide with the TIMER_BIT.
45 if (port & TIMER_BIT) {
46 FATAL("Port not in expected range");
47 }
48 event.data.u64 = port;
58 int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, 49 int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
59 EPOLL_CTL_ADD, 50 EPOLL_CTL_ADD,
60 sd->fd(), 51 fd,
61 &event)); 52 &event));
62 if (status == -1) { 53 if (status == -1) {
63 // Epoll does not accept the file descriptor. It could be due to 54 // Epoll does not accept the file descriptor. It could be due to
64 // already closed file descriptor, or unuspported devices, such 55 // already closed file descriptor, or unuspported devices, such
65 // as /dev/null. In such case, mark the file descriptor as closed, 56 // as /dev/null. In such case, mark the file descriptor as closed,
66 // so dart will handle it accordingly. 57 // so dart will handle it accordingly.
67 DartUtils::PostInt32(sd->port(), 1 << kCloseEvent); 58 DartUtils::PostInt32(port, 1 << kCloseEvent);
68 } else {
69 sd->set_tracked_by_epoll(true);
70 } 59 }
71 } 60 }
72 61
73 62
74 EventHandlerImplementation::EventHandlerImplementation() 63 EventHandlerImplementation::EventHandlerImplementation()
75 : socket_map_(&HashMap::SamePointerValue, 16) { 64 : shutdown_(false) {
Søren Gjesse 2014/02/20 08:15:54 Does this fit on the previous line?
Anders Johnsen 2014/02/20 08:56:21 Done.
76 intptr_t result;
77 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_));
78 if (result != 0) {
79 FATAL("Pipe creation failed");
80 }
81 FDUtils::SetNonBlocking(interrupt_fds_[0]);
82 FDUtils::SetCloseOnExec(interrupt_fds_[0]);
83 FDUtils::SetCloseOnExec(interrupt_fds_[1]);
84 shutdown_ = false;
85 // The initial size passed to epoll_create is ignore on newer (>= 65 // The initial size passed to epoll_create is ignore on newer (>=
86 // 2.6.8) Linux versions 66 // 2.6.8) Linux versions
87 static const int kEpollInitialSize = 64; 67 static const int kEpollInitialSize = 64;
88 epoll_fd_ = TEMP_FAILURE_RETRY(epoll_create(kEpollInitialSize)); 68 epoll_fd_ = TEMP_FAILURE_RETRY(epoll_create(kEpollInitialSize));
89 if (epoll_fd_ == -1) { 69 if (epoll_fd_ == -1) {
90 FATAL("Failed creating epoll file descriptor"); 70 FATAL("Failed creating epoll file descriptor");
91 } 71 }
92 FDUtils::SetCloseOnExec(epoll_fd_); 72 FDUtils::SetCloseOnExec(epoll_fd_);
93 // Register the interrupt_fd with the epoll instance.
94 struct epoll_event event;
95 event.events = EPOLLIN;
96 event.data.ptr = NULL;
97 int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
98 EPOLL_CTL_ADD,
99 interrupt_fds_[0],
100 &event));
101 if (status == -1) {
102 FATAL("Failed adding interrupt fd to epoll instance");
103 }
104 timer_fd_ = TEMP_FAILURE_RETRY(timerfd_create(CLOCK_REALTIME, TFD_CLOEXEC)); 73 timer_fd_ = TEMP_FAILURE_RETRY(timerfd_create(CLOCK_REALTIME, TFD_CLOEXEC));
105 if (epoll_fd_ == -1) { 74 if (epoll_fd_ == -1) {
106 FATAL("Failed creating timerfd file descriptor"); 75 FATAL("Failed creating timerfd file descriptor");
107 } 76 }
108 // Register the timer_fd_ with the epoll instance. 77 // Register the timer_fd_ with the epoll instance.
78 struct epoll_event event;
109 event.events = EPOLLIN; 79 event.events = EPOLLIN;
110 event.data.fd = timer_fd_; 80 event.data.u64 = timer_fd_ | TIMER_BIT;
Søren Gjesse 2014/02/20 08:15:54 Can't we just use ILLEGAL_PORT (value 0) here, and
Anders Johnsen 2014/02/20 08:56:21 Done.
111 status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_, 81 int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
112 EPOLL_CTL_ADD, 82 EPOLL_CTL_ADD,
113 timer_fd_, 83 timer_fd_,
114 &event)); 84 &event));
115 if (status == -1) { 85 if (status == -1) {
116 FATAL2( 86 FATAL2(
117 "Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_, errno); 87 "Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_, errno);
118 } 88 }
119 } 89 }
120 90
121 91
122 EventHandlerImplementation::~EventHandlerImplementation() { 92 EventHandlerImplementation::~EventHandlerImplementation() {
123 TEMP_FAILURE_RETRY(close(epoll_fd_)); 93 TEMP_FAILURE_RETRY(close(epoll_fd_));
124 TEMP_FAILURE_RETRY(close(timer_fd_)); 94 TEMP_FAILURE_RETRY(close(timer_fd_));
125 TEMP_FAILURE_RETRY(close(interrupt_fds_[0]));
126 TEMP_FAILURE_RETRY(close(interrupt_fds_[1]));
127 }
128
129
130 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd,
131 bool* is_new) {
132 ASSERT(fd >= 0);
133 HashMap::Entry* entry = socket_map_.Lookup(
134 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
135 ASSERT(entry != NULL);
136 SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
137 if (sd == NULL) {
138 // If there is no data in the hash map for this file descriptor a
139 // new SocketData for the file descriptor is inserted.
140 sd = new SocketData(fd);
141 entry->value = sd;
142 *is_new = true;
143 }
144 ASSERT(fd == sd->fd());
145 return sd;
146 }
147
148
149 void EventHandlerImplementation::WakeupHandler(intptr_t id,
150 Dart_Port dart_port,
151 int64_t data) {
152 InterruptMessage msg;
153 msg.id = id;
154 msg.dart_port = dart_port;
155 msg.data = data;
156 // WriteToBlocking will write up to 512 bytes atomically, and since our msg
157 // is smaller than 512, we don't need a thread lock.
158 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
159 ASSERT(kInterruptMessageSize < PIPE_BUF);
160 intptr_t result =
161 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
162 if (result != kInterruptMessageSize) {
163 if (result == -1) {
164 perror("Interrupt message failure:");
165 }
166 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
167 }
168 }
169
170
171 void EventHandlerImplementation::HandleInterruptFd() {
172 const intptr_t MAX_MESSAGES = kInterruptMessageSize;
173 InterruptMessage msg[MAX_MESSAGES];
174 ssize_t bytes = TEMP_FAILURE_RETRY(
175 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
176 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
177 if (msg[i].id == kTimerId) {
178 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
179 struct itimerspec it;
180 memset(&it, 0, sizeof(it));
181 if (timeout_queue_.HasTimeout()) {
182 int64_t millis = timeout_queue_.CurrentTimeout();
183 it.it_value.tv_sec = millis / 1000;
184 it.it_value.tv_nsec = (millis % 1000) * 1000000;
185 }
186 timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL);
187 } else if (msg[i].id == kShutdownId) {
188 shutdown_ = true;
189 } else {
190 bool is_new = false;
191 SocketData* sd = GetSocketData(msg[i].id, &is_new);
192 if (is_new) {
193 sd->SetPort(msg[i].dart_port);
194 AddToEpollInstance(epoll_fd_, sd, msg[i].data);
195 }
196 if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) {
197 ASSERT(msg[i].data == (1 << kShutdownReadCommand));
198 // Close the socket for reading.
199 sd->ShutdownRead();
200 } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) {
201 ASSERT(msg[i].data == (1 << kShutdownWriteCommand));
202 // Close the socket for writing.
203 sd->ShutdownWrite();
204 } else if ((msg[i].data & (1 << kCloseCommand)) != 0) {
205 ASSERT(msg[i].data == (1 << kCloseCommand));
206 // Close the socket and free system resources and move on to
207 // next message.
208 RemoveFromEpollInstance(epoll_fd_, sd);
209 intptr_t fd = sd->fd();
210 sd->Close();
211 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
212 delete sd;
213 DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
214 }
215 }
216 }
217 } 95 }
218 96
219 #ifdef DEBUG_POLL 97 #ifdef DEBUG_POLL
220 static void PrintEventMask(intptr_t fd, intptr_t events) { 98 static void PrintEventMask(intptr_t events) {
221 Log::Print("%d ", fd); 99 // TODO(ajohnsen): When DEBUG_POLL is enabled, we could add the fd to the
100 // epoll-data as well.
222 if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN "); 101 if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN ");
223 if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI "); 102 if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI ");
224 if ((events & EPOLLOUT) != 0) Log::Print("EPOLLOUT "); 103 if ((events & EPOLLOUT) != 0) Log::Print("EPOLLOUT ");
225 if ((events & EPOLLERR) != 0) Log::Print("EPOLLERR "); 104 if ((events & EPOLLERR) != 0) Log::Print("EPOLLERR ");
226 if ((events & EPOLLHUP) != 0) Log::Print("EPOLLHUP "); 105 if ((events & EPOLLHUP) != 0) Log::Print("EPOLLHUP ");
227 if ((events & EPOLLRDHUP) != 0) Log::Print("EPOLLRDHUP "); 106 if ((events & EPOLLRDHUP) != 0) Log::Print("EPOLLRDHUP ");
228 int all_events = EPOLLIN | EPOLLPRI | EPOLLOUT | 107 int all_events = EPOLLIN | EPOLLPRI | EPOLLOUT |
229 EPOLLERR | EPOLLHUP | EPOLLRDHUP; 108 EPOLLERR | EPOLLHUP | EPOLLRDHUP;
230 if ((events & ~all_events) != 0) { 109 if ((events & ~all_events) != 0) {
231 Log::Print("(and %08x) ", events & ~all_events); 110 Log::Print("(and %08x) ", events & ~all_events);
232 } 111 }
233 Log::Print("(available %d) ", FDUtils::AvailableBytes(fd));
234 112
235 Log::Print("\n"); 113 Log::Print("\n");
236 } 114 }
237 #endif 115 #endif
238 116
239 intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events, 117 intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) {
240 SocketData* sd) {
241 #ifdef DEBUG_POLL 118 #ifdef DEBUG_POLL
242 PrintEventMask(sd->fd(), events); 119 PrintEventMask(events);
243 #endif 120 #endif
244 if (events & EPOLLERR) { 121 if (events & EPOLLERR) {
245 // Return only error if EPOLLIN is present. 122 // Return only error if EPOLLIN is present.
246 return (events & EPOLLIN) ? (1 << kErrorEvent) : 0; 123 return (events & EPOLLIN) ? (1 << kErrorEvent) : 0;
247 } 124 }
248 intptr_t event_mask = 0; 125 intptr_t event_mask = 0;
249 if (events & EPOLLIN) event_mask |= (1 << kInEvent); 126 if (events & EPOLLIN) event_mask |= (1 << kInEvent);
250 if (events & EPOLLOUT) event_mask |= (1 << kOutEvent); 127 if (events & EPOLLOUT) event_mask |= (1 << kOutEvent);
251 if (events & (EPOLLHUP | EPOLLRDHUP)) event_mask |= (1 << kCloseEvent); 128 if (events & (EPOLLHUP | EPOLLRDHUP)) event_mask |= (1 << kCloseEvent);
252 return event_mask; 129 return event_mask;
253 } 130 }
254 131
255 132
256 void EventHandlerImplementation::HandleEvents(struct epoll_event* events, 133 void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
257 int size) { 134 int size) {
258 bool interrupt_seen = false;
259 for (int i = 0; i < size; i++) { 135 for (int i = 0; i < size; i++) {
260 if (events[i].data.ptr == NULL) { 136 uint64_t data = events[i].data.u64;
261 interrupt_seen = true; 137 if (data & TIMER_BIT) {
262 } else if (events[i].data.fd == timer_fd_) {
263 int64_t val; 138 int64_t val;
264 VOID_TEMP_FAILURE_RETRY(read(timer_fd_, &val, sizeof(val))); 139 VOID_TEMP_FAILURE_RETRY(read(timer_fd_, &val, sizeof(val)));
265 if (timeout_queue_.HasTimeout()) { 140 if (timeout_queue_.HasTimeout()) {
266 DartUtils::PostNull(timeout_queue_.CurrentPort()); 141 DartUtils::PostNull(timeout_queue_.CurrentPort());
267 timeout_queue_.RemoveCurrent(); 142 timeout_queue_.RemoveCurrent();
268 } 143 }
269 } else { 144 } else {
270 SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr); 145 int32_t event_mask = GetPollEvents(events[i].events);
271 int32_t event_mask = GetPollEvents(events[i].events, sd);
272 if (event_mask != 0) { 146 if (event_mask != 0) {
273 Dart_Port port = sd->port(); 147 Dart_Port port = data;
274 ASSERT(port != 0); 148 ASSERT(port != 0);
275 DartUtils::PostInt32(port, event_mask); 149 DartUtils::PostInt32(port, event_mask);
276 } 150 }
277 } 151 }
278 } 152 }
279 if (interrupt_seen) {
280 // Handle after socket events, so we avoid closing a socket before we handle
281 // the current events.
282 HandleInterruptFd();
283 }
284 } 153 }
285 154
286 155
287 void EventHandlerImplementation::Poll(uword args) { 156 void EventHandlerImplementation::Poll(uword args) {
288 static const intptr_t kMaxEvents = 16; 157 static const intptr_t kMaxEvents = 16;
289 struct epoll_event events[kMaxEvents]; 158 struct epoll_event events[kMaxEvents];
290 EventHandler* handler = reinterpret_cast<EventHandler*>(args); 159 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
291 EventHandlerImplementation* handler_impl = &handler->delegate_; 160 EventHandlerImplementation* handler_impl = &handler->delegate_;
292 ASSERT(handler_impl != NULL); 161 ASSERT(handler_impl != NULL);
293 while (!handler_impl->shutdown_) { 162 while (!handler_impl->shutdown_) {
(...skipping 17 matching lines...) Expand all
311 void EventHandlerImplementation::Start(EventHandler* handler) { 180 void EventHandlerImplementation::Start(EventHandler* handler) {
312 int result = dart::Thread::Start(&EventHandlerImplementation::Poll, 181 int result = dart::Thread::Start(&EventHandlerImplementation::Poll,
313 reinterpret_cast<uword>(handler)); 182 reinterpret_cast<uword>(handler));
314 if (result != 0) { 183 if (result != 0) {
315 FATAL1("Failed to start event handler thread %d", result); 184 FATAL1("Failed to start event handler thread %d", result);
316 } 185 }
317 } 186 }
318 187
319 188
320 void EventHandlerImplementation::Shutdown() { 189 void EventHandlerImplementation::Shutdown() {
321 SendData(kShutdownId, 0, 0); 190 shutdown_ = true;
322 } 191 }
323 192
324 193
325 void EventHandlerImplementation::SendData(intptr_t id, 194 void EventHandlerImplementation::SendData(intptr_t id,
Søren Gjesse 2014/02/20 08:15:54 This method is no longer sending data, but just ma
Anders Johnsen 2014/02/20 08:56:21 Done.
326 Dart_Port dart_port, 195 Dart_Port dart_port,
327 int64_t data) { 196 int64_t data) {
328 WakeupHandler(id, dart_port, data); 197 if (id == kTimerId) {
329 } 198 // Lock this region, as multiple isolates may attempt to update
330 199 // timeout_queue_.
331 200 // TODO(ajohnsen): Consider using a timer-fd per isolate to avoid the lock.
332 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { 201 timer_mutex_.Lock();
333 // The hashmap does not support keys with value 0. 202 timeout_queue_.UpdateTimeout(dart_port, data);
334 return reinterpret_cast<void*>(fd + 1); 203 struct itimerspec it;
335 } 204 memset(&it, 0, sizeof(it));
336 205 if (timeout_queue_.HasTimeout()) {
337 206 int64_t millis = timeout_queue_.CurrentTimeout();
338 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { 207 it.it_value.tv_sec = millis / 1000;
339 // The hashmap does not support keys with value 0. 208 it.it_value.tv_nsec = (millis % 1000) * 1000000;
340 return dart::Utils::WordHash(fd + 1); 209 }
210 timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL);
211 timer_mutex_.Unlock();
212 } else {
213 if ((data & (1 << kShutdownReadCommand)) != 0) {
214 ASSERT(data == (1 << kShutdownReadCommand));
215 // Close the socket for reading.
216 shutdown(id, SHUT_RD);
217 } else if ((data & (1 << kShutdownWriteCommand)) != 0) {
218 ASSERT(data == (1 << kShutdownWriteCommand));
219 // Close the socket for writing.
220 shutdown(id, SHUT_WR);
221 } else if ((data & (1 << kCloseCommand)) != 0) {
222 ASSERT(data == (1 << kCloseCommand));
223 // Close the socket and free system resources and move on to
224 // next message.
225 // This will also remove the file descriptor from epoll.
226 Socket::Close(id);
227 DartUtils::PostInt32(dart_port, 1 << kDestroyedEvent);
228 } else {
229 // Add to epoll - this is the first time we see it.
230 AddToEpollInstance(epoll_fd_, id, dart_port, data);
231 }
232 }
341 } 233 }
342 234
343 } // namespace bin 235 } // namespace bin
344 } // namespace dart 236 } // namespace dart
345 237
346 #endif // defined(TARGET_OS_LINUX) 238 #endif // defined(TARGET_OS_LINUX)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698