OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include <errno.h> | 5 #include <errno.h> |
6 #include <poll.h> | 6 #include <poll.h> |
7 #include <pthread.h> | 7 #include <pthread.h> |
8 #include <stdio.h> | 8 #include <stdio.h> |
9 #include <string.h> | 9 #include <string.h> |
10 #include <sys/time.h> | 10 #include <sys/time.h> |
11 #include <unistd.h> | 11 #include <unistd.h> |
12 | 12 |
13 #include "bin/eventhandler.h" | 13 #include "bin/eventhandler.h" |
14 #include "bin/fdutils.h" | 14 #include "bin/fdutils.h" |
15 | 15 |
16 | 16 |
17 int64_t GetCurrentTimeMilliseconds() { | 17 int64_t GetCurrentTimeMilliseconds() { |
18 struct timeval tv; | 18 struct timeval tv; |
19 if (gettimeofday(&tv, NULL) < 0) { | 19 if (gettimeofday(&tv, NULL) < 0) { |
20 UNREACHABLE(); | 20 UNREACHABLE(); |
21 return 0; | 21 return 0; |
22 } | 22 } |
23 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; | 23 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; |
24 } | 24 } |
25 | 25 |
26 | 26 |
27 static const int kInitialPortMapSize = 128; | 27 static const int kInitialPortMapSize = 16; |
28 static const int kPortMapGrowingFactor = 2; | 28 static const int kPortMapGrowingFactor = 2; |
29 static const int kInterruptMessageSize = sizeof(InterruptMessage); | 29 static const int kInterruptMessageSize = sizeof(InterruptMessage); |
30 static const int kInfinityTimeout = -1; | 30 static const int kInfinityTimeout = -1; |
31 static const int kTimerId = -1; | 31 static const int kTimerId = -1; |
32 | 32 |
33 | 33 |
34 | 34 intptr_t SocketData::GetPollEvents() { |
35 void SocketData::FillPollEvents(struct pollfd* pollfds) { | |
36 // Do not ask for POLLERR and POLLHUP explicitly as they are | 35 // Do not ask for POLLERR and POLLHUP explicitly as they are |
37 // triggered anyway. | 36 // triggered anyway. |
38 if ((_mask & (1 << kInEvent)) != 0) { | 37 intptr_t events = 0; |
39 pollfds->events |= POLLIN; | 38 if (!IsClosedRead()) { |
| 39 if ((mask_ & (1 << kInEvent)) != 0) { |
| 40 events |= POLLIN; |
| 41 } |
40 } | 42 } |
41 if ((_mask & (1 << kOutEvent)) != 0) { | 43 if (!IsClosedWrite()) { |
42 pollfds->events |= POLLOUT; | 44 if ((mask_ & (1 << kOutEvent)) != 0) { |
| 45 events |= POLLOUT; |
| 46 } |
43 } | 47 } |
44 pollfds->events |= POLLRDHUP; | 48 return events; |
45 } | 49 } |
46 | 50 |
47 | 51 |
48 EventHandlerImplementation::EventHandlerImplementation() { | 52 EventHandlerImplementation::EventHandlerImplementation() { |
49 intptr_t result; | 53 intptr_t result; |
50 socket_map_entries_ = 0; | |
51 socket_map_size_ = kInitialPortMapSize; | 54 socket_map_size_ = kInitialPortMapSize; |
52 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_, | 55 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_, |
53 sizeof(SocketData))); | 56 sizeof(SocketData))); |
54 ASSERT(socket_map_ != NULL); | 57 ASSERT(socket_map_ != NULL); |
55 result = pipe(interrupt_fds_); | 58 result = pipe(interrupt_fds_); |
56 if (result != 0) { | 59 if (result != 0) { |
57 FATAL("Pipe creation failed"); | 60 FATAL("Pipe creation failed"); |
58 } | 61 } |
59 FDUtils::SetNonBlocking(interrupt_fds_[0]); | 62 FDUtils::SetNonBlocking(interrupt_fds_[0]); |
60 FDUtils::SetNonBlocking(interrupt_fds_[1]); | 63 FDUtils::SetNonBlocking(interrupt_fds_[1]); |
(...skipping 21 matching lines...) Expand all Loading... |
82 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_, | 85 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_, |
83 new_socket_map_bytes)); | 86 new_socket_map_bytes)); |
84 ASSERT(socket_map_ != NULL); | 87 ASSERT(socket_map_ != NULL); |
85 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData); | 88 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData); |
86 memset(socket_map_ + socket_map_size_, | 89 memset(socket_map_ + socket_map_size_, |
87 0, | 90 0, |
88 new_socket_map_bytes - socket_map_bytes); | 91 new_socket_map_bytes - socket_map_bytes); |
89 socket_map_size_ = new_socket_map_size; | 92 socket_map_size_ = new_socket_map_size; |
90 } | 93 } |
91 | 94 |
92 return socket_map_ + fd; | 95 SocketData* sd = socket_map_ + fd; |
| 96 sd->set_fd(fd); // For now just make sure the fd is set. |
| 97 return sd; |
93 } | 98 } |
94 | 99 |
95 | 100 |
96 void EventHandlerImplementation::SetPort(intptr_t fd, | |
97 Dart_Port dart_port, | |
98 intptr_t mask) { | |
99 SocketData* sd = GetSocketData(fd); | |
100 | |
101 // Only change the port map entries count if SetPort changes the | |
102 // port map state. | |
103 if (dart_port == 0 && sd->port() != 0) { | |
104 socket_map_entries_--; | |
105 } else if (dart_port != 0 && sd->port() == 0) { | |
106 socket_map_entries_++; | |
107 } | |
108 | |
109 sd->set_port(dart_port); | |
110 sd->set_mask(mask); | |
111 } | |
112 | |
113 | |
114 void EventHandlerImplementation::RegisterFdWakeup(intptr_t id, | |
115 Dart_Port dart_port, | |
116 intptr_t data) { | |
117 WakeupHandler(id, dart_port, data); | |
118 } | |
119 | |
120 | |
121 void EventHandlerImplementation::CloseFd(intptr_t id) { | |
122 SetPort(id, 0, 0); | |
123 close(id); | |
124 } | |
125 | |
126 | |
127 void EventHandlerImplementation::UnregisterFdWakeup(intptr_t id) { | |
128 WakeupHandler(id, 0, 0); | |
129 } | |
130 | |
131 | |
132 void EventHandlerImplementation::UnregisterFd(intptr_t id) { | |
133 SetPort(id, 0, 0); | |
134 } | |
135 | |
136 | |
137 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 101 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
138 Dart_Port dart_port, | 102 Dart_Port dart_port, |
139 int64_t data) { | 103 int64_t data) { |
140 InterruptMessage msg; | 104 InterruptMessage msg; |
141 msg.id = id; | 105 msg.id = id; |
142 msg.dart_port = dart_port; | 106 msg.dart_port = dart_port; |
143 msg.data = data; | 107 msg.data = data; |
144 intptr_t result = | 108 intptr_t result = |
145 write(interrupt_fds_[1], &msg, kInterruptMessageSize); | 109 write(interrupt_fds_[1], &msg, kInterruptMessageSize); |
146 if (result != kInterruptMessageSize) { | 110 if (result != kInterruptMessageSize) { |
147 perror("Interrupt message failure"); | 111 perror("Interrupt message failure"); |
148 } | 112 } |
149 } | 113 } |
150 | 114 |
151 | 115 |
152 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { | 116 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { |
153 struct pollfd* pollfds; | 117 struct pollfd* pollfds; |
154 | 118 |
155 intptr_t numPollfds = 1 + socket_map_entries_; | 119 // Calculate the number of file descriptors to poll on. |
| 120 intptr_t numPollfds = 1; |
| 121 for (int i = 0; i < socket_map_size_; i++) { |
| 122 SocketData* sd = &socket_map_[i]; |
| 123 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; |
| 124 } |
| 125 |
156 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), | 126 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), |
157 numPollfds)); | 127 numPollfds)); |
158 pollfds[0].fd = interrupt_fds_[0]; | 128 pollfds[0].fd = interrupt_fds_[0]; |
159 pollfds[0].events |= POLLIN; | 129 pollfds[0].events |= POLLIN; |
160 | 130 |
161 // TODO(hpayer): optimize the following iteration over the hash map | 131 // TODO(hpayer): optimize the following iteration over the hash map |
162 int j = 1; | 132 int j = 1; |
163 for (int i = 0; i < socket_map_size_; i++) { | 133 for (int i = 0; i < socket_map_size_; i++) { |
164 SocketData* sd = &socket_map_[i]; | 134 SocketData* sd = &socket_map_[i]; |
165 if (sd->port() != 0) { | 135 intptr_t events = sd->GetPollEvents(); |
| 136 if (sd->port() > 0 && events != 0) { |
166 // Fd is added to the poll set. | 137 // Fd is added to the poll set. |
167 pollfds[j].fd = i; | 138 pollfds[j].fd = sd->fd(); |
168 sd->FillPollEvents(&pollfds[j]); | 139 pollfds[j].events = events; |
169 j++; | 140 j++; |
170 } | 141 } |
171 } | 142 } |
172 *pollfds_size = numPollfds; | 143 ASSERT(numPollfds == j); |
| 144 *pollfds_size = j; |
173 return pollfds; | 145 return pollfds; |
174 } | 146 } |
175 | 147 |
176 | 148 |
177 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { | 149 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { |
178 int total_read = 0; | 150 int total_read = 0; |
179 int bytes_read = read(interrupt_fds_[0], msg, kInterruptMessageSize); | 151 int bytes_read = read(interrupt_fds_[0], msg, kInterruptMessageSize); |
180 if (bytes_read < 0) { | 152 if (bytes_read < 0) { |
181 return false; | 153 return false; |
182 } | 154 } |
183 total_read = bytes_read; | 155 total_read = bytes_read; |
184 while (total_read < kInterruptMessageSize) { | 156 while (total_read < kInterruptMessageSize) { |
185 bytes_read = read(interrupt_fds_[0], | 157 bytes_read = read(interrupt_fds_[0], |
186 msg + total_read, | 158 msg + total_read, |
187 kInterruptMessageSize - total_read); | 159 kInterruptMessageSize - total_read); |
188 if (bytes_read > 0) { | 160 if (bytes_read > 0) { |
189 total_read = total_read + bytes_read; | 161 total_read = total_read + bytes_read; |
190 } | 162 } |
191 } | 163 } |
192 return (total_read == kInterruptMessageSize) ? true : false; | 164 return (total_read == kInterruptMessageSize) ? true : false; |
193 } | 165 } |
194 | 166 |
195 void EventHandlerImplementation::HandleInterruptFd() { | 167 void EventHandlerImplementation::HandleInterruptFd() { |
196 InterruptMessage msg; | 168 InterruptMessage msg; |
197 while (GetInterruptMessage(&msg)) { | 169 while (GetInterruptMessage(&msg)) { |
198 if (msg.id == kTimerId) { | 170 if (msg.id == kTimerId) { |
199 timeout_ = msg.data; | 171 timeout_ = msg.data; |
200 timeout_port_ = msg.dart_port; | 172 timeout_port_ = msg.dart_port; |
201 } else if ((msg.data & (1 << kCloseCommand)) != 0) { | |
202 /* | |
203 * A close event happened in dart, we have to explicitly unregister | |
204 * the fd and close the fd. | |
205 */ | |
206 CloseFd(msg.id); | |
207 } else { | 173 } else { |
208 SetPort(msg.id, msg.dart_port, msg.data); | 174 SocketData* sd = GetSocketData(msg.id); |
| 175 if ((msg.data & (1 << kShutdownReadCommand)) != 0) { |
| 176 ASSERT(msg.data == (1 << kShutdownReadCommand)); |
| 177 // Close the socket for reading. |
| 178 sd->ShutdownRead(); |
| 179 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { |
| 180 ASSERT(msg.data == (1 << kShutdownWriteCommand)); |
| 181 // Close the socket for writing. |
| 182 sd->ShutdownWrite(); |
| 183 } else if ((msg.data & (1 << kCloseCommand)) != 0) { |
| 184 ASSERT(msg.data == (1 << kCloseCommand)); |
| 185 // Close the socket and free system resources. |
| 186 sd->Close(); |
| 187 } else { |
| 188 // Setup events to wait for. |
| 189 sd->SetPortAndMask(msg.dart_port, msg.data); |
| 190 } |
209 } | 191 } |
210 } | 192 } |
211 } | 193 } |
212 | 194 |
| 195 #ifdef DEBUG_POLL |
| 196 static void PrintEventMask(struct pollfd* pollfd) { |
| 197 printf("%d ", pollfd->fd); |
| 198 if ((pollfd->revents & POLLIN) != 0) printf("POLLIN "); |
| 199 if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI "); |
| 200 if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT "); |
| 201 if ((pollfd->revents & POLLERR) != 0) printf("POLLERR "); |
| 202 if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP "); |
| 203 if ((pollfd->revents & POLLRDHUP) != 0) printf("POLLRDHUP "); |
| 204 if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL "); |
| 205 int all_events = POLLIN | POLLPRI | POLLOUT | |
| 206 POLLERR | POLLHUP | POLLRDHUP | POLLNVAL; |
| 207 if ((pollfd->revents & ~all_events) != 0) { |
| 208 printf("(and %08x) ", pollfd->revents & ~all_events); |
| 209 } |
| 210 printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd)); |
| 211 |
| 212 printf("\n"); |
| 213 } |
| 214 #endif |
213 | 215 |
214 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { | 216 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { |
| 217 #ifdef DEBUG_POLL |
| 218 if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd); |
| 219 #endif |
215 intptr_t event_mask = 0; | 220 intptr_t event_mask = 0; |
216 SocketData* sd = GetSocketData(pollfd->fd); | 221 SocketData* sd = GetSocketData(pollfd->fd); |
217 if (sd->IsListeningSocket()) { | 222 if (sd->IsListeningSocket()) { |
218 // For listening sockets the POLLIN event indicate that there are | 223 // For listening sockets the POLLIN event indicate that there are |
219 // connections ready for accept unless accompanied with one of the | 224 // connections ready for accept unless accompanied with one of the |
220 // other flags. | 225 // other flags. |
221 if ((pollfd->revents & POLLIN) != 0) { | 226 if ((pollfd->revents & POLLIN) != 0) { |
222 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); | 227 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); |
223 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); | 228 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); |
224 if (event_mask == 0) event_mask |= (1 << kInEvent); | 229 if (event_mask == 0) event_mask |= (1 << kInEvent); |
225 } | 230 } |
226 } else { | 231 } else { |
| 232 if ((pollfd->revents & POLLNVAL) != 0) { |
| 233 return 0; |
| 234 } |
| 235 |
227 // Prioritize data events over close and error events. | 236 // Prioritize data events over close and error events. |
228 if ((pollfd->revents & POLLIN) != 0) { | 237 if ((pollfd->revents & POLLIN) != 0) { |
229 if (FDUtils::AvailableBytes(pollfd->fd) != 0) { | 238 if (FDUtils::AvailableBytes(pollfd->fd) != 0) { |
230 event_mask = (1 << kInEvent); | 239 event_mask = (1 << kInEvent); |
231 } else if (((pollfd->revents & POLLHUP) != 0) || | 240 } else if (((pollfd->revents & POLLHUP) != 0)) { |
232 ((pollfd->revents & POLLRDHUP) != 0)) { | |
233 event_mask = (1 << kCloseEvent); | 241 event_mask = (1 << kCloseEvent); |
| 242 sd->MarkClosedRead(); |
234 } else if ((pollfd->revents & POLLERR) != 0) { | 243 } else if ((pollfd->revents & POLLERR) != 0) { |
235 event_mask = (1 << kErrorEvent); | 244 event_mask = (1 << kErrorEvent); |
| 245 } else { |
| 246 // If POLLIN is set with no available data and no POLLHUP use |
| 247 // recv to peek for whether the other end of the socket |
| 248 // actually closed. |
| 249 char buffer; |
| 250 ssize_t bytesPeeked = recv(sd->fd(), &buffer, 1, MSG_PEEK); |
| 251 if (bytesPeeked == 0) { |
| 252 event_mask = (1 << kCloseEvent); |
| 253 sd->MarkClosedRead(); |
| 254 } else if (errno != EAGAIN) { |
| 255 fprintf(stderr, "Error recv: %s\n", strerror(errno)); |
| 256 } |
236 } | 257 } |
237 } | 258 } |
238 | 259 |
| 260 // On pipes POLLHUP is reported without POLLIN. |
| 261 if (((pollfd->revents & POLLIN) == 0) && |
| 262 ((pollfd->revents & POLLHUP) != 0)) { |
| 263 event_mask = (1 << kCloseEvent); |
| 264 sd->MarkClosedRead(); |
| 265 } |
| 266 |
239 if ((pollfd->revents & POLLOUT) != 0) event_mask |= (1 << kOutEvent); | 267 if ((pollfd->revents & POLLOUT) != 0) event_mask |= (1 << kOutEvent); |
240 } | 268 } |
241 | 269 |
242 return event_mask; | 270 return event_mask; |
243 } | 271 } |
244 | 272 |
245 | 273 |
246 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, | 274 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, |
247 int pollfds_size, | 275 int pollfds_size, |
248 int result_size) { | 276 int result_size) { |
249 if ((pollfds[0].revents & POLLIN) != 0) { | 277 if ((pollfds[0].revents & POLLIN) != 0) { |
250 result_size -= 1; | 278 result_size -= 1; |
251 } | 279 } |
252 if (result_size > 0) { | 280 if (result_size > 0) { |
253 for (int i = 1; i < pollfds_size; i++) { | 281 for (int i = 1; i < pollfds_size; i++) { |
254 /* | 282 /* |
255 * The fd is unregistered. It gets re-registered when the request | 283 * The fd is unregistered. It gets re-registered when the request |
256 * was handled by dart. | 284 * was handled by dart. |
257 */ | 285 */ |
258 intptr_t event_mask = GetPollEvents(&pollfds[i]); | 286 intptr_t event_mask = GetPollEvents(&pollfds[i]); |
259 if (event_mask != 0) { | 287 if (event_mask != 0) { |
260 intptr_t fd = pollfds[i].fd; | 288 intptr_t fd = pollfds[i].fd; |
261 Dart_Port port = GetSocketData(fd)->port(); | 289 SocketData* sd = GetSocketData(fd); |
| 290 Dart_Port port = sd->port(); |
262 ASSERT(port != 0); | 291 ASSERT(port != 0); |
263 UnregisterFd(fd); | 292 sd->Unregister(); |
264 Dart_PostIntArray(port, 1, &event_mask); | 293 Dart_PostIntArray(port, 1, &event_mask); |
265 } | 294 } |
266 } | 295 } |
267 } | 296 } |
268 HandleInterruptFd(); | 297 HandleInterruptFd(); |
269 } | 298 } |
270 | 299 |
271 | 300 |
272 intptr_t EventHandlerImplementation::GetTimeout() { | 301 intptr_t EventHandlerImplementation::GetTimeout() { |
273 if (timeout_ == kInfinityTimeout) { | 302 if (timeout_ == kInfinityTimeout) { |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
319 this); | 348 this); |
320 if (result != 0) { | 349 if (result != 0) { |
321 FATAL("Create start event handler thread"); | 350 FATAL("Create start event handler thread"); |
322 } | 351 } |
323 } | 352 } |
324 | 353 |
325 | 354 |
326 void EventHandlerImplementation::SendData(intptr_t id, | 355 void EventHandlerImplementation::SendData(intptr_t id, |
327 Dart_Port dart_port, | 356 Dart_Port dart_port, |
328 intptr_t data) { | 357 intptr_t data) { |
329 RegisterFdWakeup(id, dart_port, data); | 358 WakeupHandler(id, dart_port, data); |
330 } | 359 } |
OLD | NEW |