OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include <errno.h> | 5 #include <errno.h> |
6 #include <poll.h> | 6 #include <poll.h> |
7 #include <pthread.h> | 7 #include <pthread.h> |
8 #include <stdio.h> | 8 #include <stdio.h> |
9 #include <string.h> | 9 #include <string.h> |
10 #include <sys/time.h> | 10 #include <sys/time.h> |
11 #include <unistd.h> | 11 #include <unistd.h> |
12 | 12 |
13 #include "bin/eventhandler.h" | 13 #include "bin/eventhandler.h" |
14 #include "bin/fdutils.h" | 14 #include "bin/fdutils.h" |
15 | 15 |
16 | 16 |
17 int64_t GetCurrentTimeMilliseconds() { | 17 int64_t GetCurrentTimeMilliseconds() { |
18 struct timeval tv; | 18 struct timeval tv; |
19 if (gettimeofday(&tv, NULL) < 0) { | 19 if (gettimeofday(&tv, NULL) < 0) { |
20 UNREACHABLE(); | 20 UNREACHABLE(); |
21 return 0; | 21 return 0; |
22 } | 22 } |
23 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; | 23 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; |
24 } | 24 } |
25 | 25 |
26 | 26 |
27 static const int kInitialPortMapSize = 128; | 27 static const int kInitialPortMapSize = 16; |
28 static const int kPortMapGrowingFactor = 2; | 28 static const int kPortMapGrowingFactor = 2; |
29 static const int kInterruptMessageSize = sizeof(InterruptMessage); | 29 static const int kInterruptMessageSize = sizeof(InterruptMessage); |
30 static const int kInfinityTimeout = -1; | 30 static const int kInfinityTimeout = -1; |
31 static const int kTimerId = -1; | 31 static const int kTimerId = -1; |
32 | 32 |
33 | 33 |
34 | 34 intptr_t SocketData::GetPollEvents() { |
35 void SocketData::FillPollEvents(struct pollfd* pollfds) { | |
36 // Do not ask for POLLERR and POLLHUP explicitly as they are | 35 // Do not ask for POLLERR and POLLHUP explicitly as they are |
37 // triggered anyway. | 36 // triggered anyway. |
38 if ((_mask & (1 << kInEvent)) != 0) { | 37 intptr_t events = 0; |
39 pollfds->events |= POLLIN; | 38 if (!IsClosedRead()) { |
| 39 if ((mask_ & (1 << kInEvent)) != 0) { |
| 40 events |= POLLIN; |
| 41 } |
40 } | 42 } |
41 if ((_mask & (1 << kOutEvent)) != 0) { | 43 if (!IsClosedWrite()) { |
42 pollfds->events |= POLLOUT; | 44 if ((mask_ & (1 << kOutEvent)) != 0) { |
| 45 events |= POLLOUT; |
| 46 } |
43 } | 47 } |
| 48 return events; |
44 } | 49 } |
45 | 50 |
46 | 51 |
47 EventHandlerImplementation::EventHandlerImplementation() { | 52 EventHandlerImplementation::EventHandlerImplementation() { |
48 intptr_t result; | 53 intptr_t result; |
49 socket_map_entries_ = 0; | |
50 socket_map_size_ = kInitialPortMapSize; | 54 socket_map_size_ = kInitialPortMapSize; |
51 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_, | 55 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_, |
52 sizeof(SocketData))); | 56 sizeof(SocketData))); |
53 ASSERT(socket_map_ != NULL); | 57 ASSERT(socket_map_ != NULL); |
54 result = pipe(interrupt_fds_); | 58 result = pipe(interrupt_fds_); |
55 if (result != 0) { | 59 if (result != 0) { |
56 FATAL("Pipe creation failed"); | 60 FATAL("Pipe creation failed"); |
57 } | 61 } |
58 FDUtils::SetNonBlocking(interrupt_fds_[0]); | 62 FDUtils::SetNonBlocking(interrupt_fds_[0]); |
59 FDUtils::SetNonBlocking(interrupt_fds_[1]); | 63 FDUtils::SetNonBlocking(interrupt_fds_[1]); |
(...skipping 21 matching lines...) Expand all Loading... |
81 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_, | 85 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_, |
82 new_socket_map_bytes)); | 86 new_socket_map_bytes)); |
83 ASSERT(socket_map_ != NULL); | 87 ASSERT(socket_map_ != NULL); |
84 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData); | 88 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData); |
85 memset(socket_map_ + socket_map_size_, | 89 memset(socket_map_ + socket_map_size_, |
86 0, | 90 0, |
87 new_socket_map_bytes - socket_map_bytes); | 91 new_socket_map_bytes - socket_map_bytes); |
88 socket_map_size_ = new_socket_map_size; | 92 socket_map_size_ = new_socket_map_size; |
89 } | 93 } |
90 | 94 |
91 return socket_map_ + fd; | 95 SocketData* sd = socket_map_ + fd; |
| 96 sd->set_fd(fd); // For now just make sure the fd is set. |
| 97 return sd; |
92 } | 98 } |
93 | 99 |
94 | 100 |
95 void EventHandlerImplementation::SetPort(intptr_t fd, | |
96 Dart_Port dart_port, | |
97 intptr_t mask) { | |
98 SocketData* sd = GetSocketData(fd); | |
99 | |
100 // Only change the port map entries count if SetPort changes the | |
101 // port map state. | |
102 if (dart_port == 0 && sd->port() != 0) { | |
103 socket_map_entries_--; | |
104 } else if (dart_port != 0 && sd->port() == 0) { | |
105 socket_map_entries_++; | |
106 } | |
107 | |
108 sd->set_port(dart_port); | |
109 sd->set_mask(mask); | |
110 } | |
111 | |
112 | |
113 void EventHandlerImplementation::RegisterFdWakeup(intptr_t id, | |
114 Dart_Port dart_port, | |
115 intptr_t data) { | |
116 WakeupHandler(id, dart_port, data); | |
117 } | |
118 | |
119 | |
120 void EventHandlerImplementation::CloseFd(intptr_t id) { | |
121 SetPort(id, 0, 0); | |
122 close(id); | |
123 } | |
124 | |
125 | |
126 void EventHandlerImplementation::UnregisterFdWakeup(intptr_t id) { | |
127 WakeupHandler(id, 0, 0); | |
128 } | |
129 | |
130 | |
131 void EventHandlerImplementation::UnregisterFd(intptr_t id) { | |
132 SetPort(id, 0, 0); | |
133 } | |
134 | |
135 | |
136 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 101 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
137 Dart_Port dart_port, | 102 Dart_Port dart_port, |
138 int64_t data) { | 103 int64_t data) { |
139 InterruptMessage msg; | 104 InterruptMessage msg; |
140 msg.id = id; | 105 msg.id = id; |
141 msg.dart_port = dart_port; | 106 msg.dart_port = dart_port; |
142 msg.data = data; | 107 msg.data = data; |
143 intptr_t result = | 108 intptr_t result = |
144 write(interrupt_fds_[1], &msg, kInterruptMessageSize); | 109 write(interrupt_fds_[1], &msg, kInterruptMessageSize); |
145 if (result != kInterruptMessageSize) { | 110 if (result != kInterruptMessageSize) { |
146 perror("Interrupt message failure"); | 111 perror("Interrupt message failure"); |
147 } | 112 } |
148 } | 113 } |
149 | 114 |
150 | 115 |
151 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { | 116 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { |
152 struct pollfd* pollfds; | 117 struct pollfd* pollfds; |
153 | 118 |
154 intptr_t numPollfds = 1 + socket_map_entries_; | 119 // Calculate the number of file descriptors to poll on. |
| 120 intptr_t numPollfds = 1; |
| 121 for (int i = 0; i < socket_map_size_; i++) { |
| 122 SocketData* sd = &socket_map_[i]; |
| 123 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; |
| 124 } |
| 125 |
155 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), | 126 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), |
156 numPollfds)); | 127 numPollfds)); |
157 pollfds[0].fd = interrupt_fds_[0]; | 128 pollfds[0].fd = interrupt_fds_[0]; |
158 pollfds[0].events |= POLLIN; | 129 pollfds[0].events |= POLLIN; |
159 | 130 |
160 // TODO(hpayer): optimize the following iteration over the hash map | 131 // TODO(hpayer): optimize the following iteration over the hash map |
161 int j = 1; | 132 int j = 1; |
162 for (int i = 0; i < socket_map_size_; i++) { | 133 for (int i = 0; i < socket_map_size_; i++) { |
163 SocketData* sd = &socket_map_[i]; | 134 SocketData* sd = &socket_map_[i]; |
164 if (sd->port() != 0) { | 135 intptr_t events = sd->GetPollEvents(); |
| 136 if (sd->port() > 0 && events != 0) { |
165 // Fd is added to the poll set. | 137 // Fd is added to the poll set. |
166 pollfds[j].fd = i; | 138 pollfds[j].fd = sd->fd(); |
167 sd->FillPollEvents(&pollfds[j]); | 139 pollfds[j].events = events; |
168 j++; | 140 j++; |
169 } | 141 } |
170 } | 142 } |
171 *pollfds_size = numPollfds; | 143 ASSERT(numPollfds == j); |
| 144 *pollfds_size = j; |
172 return pollfds; | 145 return pollfds; |
173 } | 146 } |
174 | 147 |
175 | 148 |
176 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { | 149 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { |
177 int total_read = 0; | 150 int total_read = 0; |
178 int bytes_read = read(interrupt_fds_[0], msg, kInterruptMessageSize); | 151 int bytes_read = read(interrupt_fds_[0], msg, kInterruptMessageSize); |
179 if (bytes_read < 0) { | 152 if (bytes_read < 0) { |
180 return false; | 153 return false; |
181 } | 154 } |
182 total_read = bytes_read; | 155 total_read = bytes_read; |
183 while (total_read < kInterruptMessageSize) { | 156 while (total_read < kInterruptMessageSize) { |
184 bytes_read = read(interrupt_fds_[0], | 157 bytes_read = read(interrupt_fds_[0], |
185 msg + total_read, | 158 msg + total_read, |
186 kInterruptMessageSize - total_read); | 159 kInterruptMessageSize - total_read); |
187 if (bytes_read > 0) { | 160 if (bytes_read > 0) { |
188 total_read = total_read + bytes_read; | 161 total_read = total_read + bytes_read; |
189 } | 162 } |
190 } | 163 } |
191 return (total_read == kInterruptMessageSize) ? true : false; | 164 return (total_read == kInterruptMessageSize) ? true : false; |
192 } | 165 } |
193 | 166 |
194 void EventHandlerImplementation::HandleInterruptFd() { | 167 void EventHandlerImplementation::HandleInterruptFd() { |
195 InterruptMessage msg; | 168 InterruptMessage msg; |
196 while (GetInterruptMessage(&msg)) { | 169 while (GetInterruptMessage(&msg)) { |
197 if (msg.id == kTimerId) { | 170 if (msg.id == kTimerId) { |
198 timeout_ = msg.data; | 171 timeout_ = msg.data; |
199 timeout_port_ = msg.dart_port; | 172 timeout_port_ = msg.dart_port; |
200 } else if ((msg.data & (1 << kCloseCommand)) != 0) { | |
201 /* | |
202 * A close event happened in dart, we have to explicitly unregister | |
203 * the fd and close the fd. | |
204 */ | |
205 CloseFd(msg.id); | |
206 } else { | 173 } else { |
207 SetPort(msg.id, msg.dart_port, msg.data); | 174 SocketData* sd = GetSocketData(msg.id); |
| 175 if ((msg.data & (1 << kShutdownReadCommand)) != 0) { |
| 176 ASSERT(msg.data == (1 << kShutdownReadCommand)); |
| 177 // Close the socket for reading. |
| 178 sd->ShutdownRead(); |
| 179 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { |
| 180 ASSERT(msg.data == (1 << kShutdownWriteCommand)); |
| 181 // Close the socket for writing. |
| 182 sd->ShutdownWrite(); |
| 183 } else if ((msg.data & (1 << kCloseCommand)) != 0) { |
| 184 ASSERT(msg.data == (1 << kCloseCommand)); |
| 185 // Close the socket and free system resources. |
| 186 sd->Close(); |
| 187 } else { |
| 188 // Setup events to wait for. |
| 189 sd->SetPortAndMask(msg.dart_port, msg.data); |
| 190 } |
208 } | 191 } |
209 } | 192 } |
210 } | 193 } |
211 | 194 |
| 195 #ifdef DEBUG_POLL |
| 196 static void PrintEventMask(struct pollfd* pollfd) { |
| 197 printf("%d ", pollfd->fd); |
| 198 if ((pollfd->revents & POLLIN) != 0) printf("POLLIN "); |
| 199 if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI "); |
| 200 if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT "); |
| 201 if ((pollfd->revents & POLLERR) != 0) printf("POLLERR "); |
| 202 if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP "); |
| 203 if ((pollfd->revents & POLLRDHUP) != 0) printf("POLLRDHUP "); |
| 204 if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL "); |
| 205 int all_events = POLLIN | POLLPRI | POLLOUT | |
| 206 POLLERR | POLLHUP | POLLRDHUP | POLLNVAL; |
| 207 if ((pollfd->revents & ~all_events) != 0) { |
| 208 printf("(and %08x) ", pollfd->revents & ~all_events); |
| 209 } |
| 210 printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd)); |
| 211 |
| 212 printf("\n"); |
| 213 } |
| 214 #endif |
212 | 215 |
213 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { | 216 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { |
| 217 #ifdef DEBUG_POLL |
| 218 if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd); |
| 219 #endif |
214 intptr_t event_mask = 0; | 220 intptr_t event_mask = 0; |
215 SocketData* sd = GetSocketData(pollfd->fd); | 221 SocketData* sd = GetSocketData(pollfd->fd); |
216 if (sd->IsListeningSocket()) { | 222 if (sd->IsListeningSocket()) { |
217 // For listening sockets the POLLIN event indicate that there are | 223 // For listening sockets the POLLIN event indicate that there are |
218 // connections ready for accept unless accompanied with one of the | 224 // connections ready for accept unless accompanied with one of the |
219 // other flags. | 225 // other flags. |
220 if ((pollfd->revents & POLLIN) != 0) { | 226 if ((pollfd->revents & POLLIN) != 0) { |
221 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); | 227 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); |
222 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); | 228 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); |
223 if (event_mask == 0) event_mask |= (1 << kInEvent); | 229 if (event_mask == 0) event_mask |= (1 << kInEvent); |
224 } | 230 } |
225 } else { | 231 } else { |
| 232 if ((pollfd->revents & POLLNVAL) != 0) { |
| 233 return 0; |
| 234 } |
| 235 |
226 // Prioritize data events over close and error events. | 236 // Prioritize data events over close and error events. |
227 if ((pollfd->revents & POLLIN) != 0) { | 237 if ((pollfd->revents & POLLIN) != 0) { |
228 if (FDUtils::AvailableBytes(pollfd->fd) != 0) { | 238 if (FDUtils::AvailableBytes(pollfd->fd) != 0) { |
229 event_mask = (1 << kInEvent); | 239 event_mask = (1 << kInEvent); |
230 } else if ((pollfd->revents & POLLHUP) != 0) { | 240 } else if (((pollfd->revents & POLLHUP) != 0)) { |
231 event_mask = (1 << kCloseEvent); | 241 event_mask = (1 << kCloseEvent); |
| 242 sd->MarkClosedRead(); |
232 } else if ((pollfd->revents & POLLERR) != 0) { | 243 } else if ((pollfd->revents & POLLERR) != 0) { |
233 event_mask = (1 << kErrorEvent); | 244 event_mask = (1 << kErrorEvent); |
| 245 } else { |
| 246 // If POLLIN is set with no available data and no POLLHUP use |
| 247 // recv to peek for whether the other end of the socket |
| 248 // actually closed. |
| 249 char buffer; |
| 250 ssize_t bytesPeeked = recv(sd->fd(), &buffer, 1, MSG_PEEK); |
| 251 if (bytesPeeked == 0) { |
| 252 event_mask = (1 << kCloseEvent); |
| 253 sd->MarkClosedRead(); |
| 254 } else if (errno != EAGAIN) { |
| 255 fprintf(stderr, "Error recv: %s\n", strerror(errno)); |
| 256 } |
234 } | 257 } |
235 } | 258 } |
236 | 259 |
| 260 // On pipes POLLHUP is reported without POLLIN. |
| 261 if (((pollfd->revents & POLLIN) == 0) && |
| 262 ((pollfd->revents & POLLHUP) != 0)) { |
| 263 event_mask = (1 << kCloseEvent); |
| 264 sd->MarkClosedRead(); |
| 265 } |
| 266 |
237 if ((pollfd->revents & POLLOUT) != 0) event_mask |= (1 << kOutEvent); | 267 if ((pollfd->revents & POLLOUT) != 0) event_mask |= (1 << kOutEvent); |
238 } | 268 } |
239 | 269 |
240 return event_mask; | 270 return event_mask; |
241 } | 271 } |
242 | 272 |
243 | 273 |
244 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, | 274 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, |
245 int pollfds_size, | 275 int pollfds_size, |
246 int result_size) { | 276 int result_size) { |
247 if ((pollfds[0].revents & POLLIN) != 0) { | 277 if ((pollfds[0].revents & POLLIN) != 0) { |
248 result_size -= 1; | 278 result_size -= 1; |
249 } | 279 } |
250 if (result_size > 0) { | 280 if (result_size > 0) { |
251 for (int i = 1; i < pollfds_size; i++) { | 281 for (int i = 1; i < pollfds_size; i++) { |
252 /* | 282 /* |
253 * The fd is unregistered. It gets re-registered when the request | 283 * The fd is unregistered. It gets re-registered when the request |
254 * was handled by dart. | 284 * was handled by dart. |
255 */ | 285 */ |
256 intptr_t event_mask = GetPollEvents(&pollfds[i]); | 286 intptr_t event_mask = GetPollEvents(&pollfds[i]); |
257 if (event_mask != 0) { | 287 if (event_mask != 0) { |
258 intptr_t fd = pollfds[i].fd; | 288 intptr_t fd = pollfds[i].fd; |
259 Dart_Port port = GetSocketData(fd)->port(); | 289 SocketData* sd = GetSocketData(fd); |
| 290 Dart_Port port = sd->port(); |
260 ASSERT(port != 0); | 291 ASSERT(port != 0); |
261 UnregisterFd(fd); | 292 sd->Unregister(); |
262 Dart_PostIntArray(port, 1, &event_mask); | 293 Dart_PostIntArray(port, 1, &event_mask); |
263 } | 294 } |
264 } | 295 } |
265 } | 296 } |
266 HandleInterruptFd(); | 297 HandleInterruptFd(); |
267 } | 298 } |
268 | 299 |
269 | 300 |
270 intptr_t EventHandlerImplementation::GetTimeout() { | 301 intptr_t EventHandlerImplementation::GetTimeout() { |
271 if (timeout_ == kInfinityTimeout) { | 302 if (timeout_ == kInfinityTimeout) { |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
317 this); | 348 this); |
318 if (result != 0) { | 349 if (result != 0) { |
319 FATAL("Create start event handler thread"); | 350 FATAL("Create start event handler thread"); |
320 } | 351 } |
321 } | 352 } |
322 | 353 |
323 | 354 |
324 void EventHandlerImplementation::SendData(intptr_t id, | 355 void EventHandlerImplementation::SendData(intptr_t id, |
325 Dart_Port dart_port, | 356 Dart_Port dart_port, |
326 intptr_t data) { | 357 intptr_t data) { |
327 RegisterFdWakeup(id, dart_port, data); | 358 WakeupHandler(id, dart_port, data); |
328 } | 359 } |
OLD | NEW |