OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include <errno.h> | 5 #include <errno.h> |
6 #include <poll.h> | 6 #include <poll.h> |
7 #include <pthread.h> | 7 #include <pthread.h> |
8 #include <stdio.h> | 8 #include <stdio.h> |
9 #include <string.h> | 9 #include <string.h> |
10 #include <sys/time.h> | 10 #include <sys/time.h> |
(...skipping 13 matching lines...) Expand all Loading... |
24 } | 24 } |
25 | 25 |
26 | 26 |
27 static const int kInitialPortMapSize = 128; | 27 static const int kInitialPortMapSize = 128; |
28 static const int kPortMapGrowingFactor = 2; | 28 static const int kPortMapGrowingFactor = 2; |
29 static const int kInterruptMessageSize = sizeof(InterruptMessage); | 29 static const int kInterruptMessageSize = sizeof(InterruptMessage); |
30 static const int kInfinityTimeout = -1; | 30 static const int kInfinityTimeout = -1; |
31 static const int kTimerId = -1; | 31 static const int kTimerId = -1; |
32 | 32 |
33 | 33 |
| 34 |
| 35 void SocketData::FillPollEvents(struct pollfd* pollfds) { |
| 36 // Do not ask for POLLERR and POLLHUP explicitly as they are |
| 37 // triggered anyway. |
| 38 if ((_mask & (1 << kInEvent)) != 0) { |
| 39 pollfds->events |= POLLIN; |
| 40 } |
| 41 if ((_mask & (1 << kOutEvent)) != 0) { |
| 42 pollfds->events |= POLLOUT; |
| 43 } |
| 44 } |
| 45 |
| 46 |
34 EventHandlerImplementation::EventHandlerImplementation() { | 47 EventHandlerImplementation::EventHandlerImplementation() { |
35 intptr_t result; | 48 intptr_t result; |
36 port_map_entries_ = 0; | 49 port_map_entries_ = 0; |
37 port_map_size_ = kInitialPortMapSize; | 50 port_map_size_ = kInitialPortMapSize; |
38 port_map_ = reinterpret_cast<PortData*>(calloc(port_map_size_, | 51 port_map_ = reinterpret_cast<SocketData*>(calloc(port_map_size_, |
39 sizeof(PortData))); | 52 sizeof(SocketData))); |
40 ASSERT(port_map_ != NULL); | 53 ASSERT(port_map_ != NULL); |
41 result = pipe(interrupt_fds_); | 54 result = pipe(interrupt_fds_); |
42 if (result != 0) { | 55 if (result != 0) { |
43 FATAL("Pipe creation failed"); | 56 FATAL("Pipe creation failed"); |
44 } | 57 } |
45 FDUtils::SetNonBlocking(interrupt_fds_[0]); | 58 FDUtils::SetNonBlocking(interrupt_fds_[0]); |
46 FDUtils::SetNonBlocking(interrupt_fds_[1]); | 59 FDUtils::SetNonBlocking(interrupt_fds_[1]); |
47 timeout_ = kInfinityTimeout; | 60 timeout_ = kInfinityTimeout; |
48 timeout_port_ = 0; | 61 timeout_port_ = 0; |
49 } | 62 } |
50 | 63 |
51 | 64 |
52 EventHandlerImplementation::~EventHandlerImplementation() { | 65 EventHandlerImplementation::~EventHandlerImplementation() { |
53 free(port_map_); | 66 free(port_map_); |
54 close(interrupt_fds_[0]); | 67 close(interrupt_fds_[0]); |
55 close(interrupt_fds_[1]); | 68 close(interrupt_fds_[1]); |
56 } | 69 } |
57 | 70 |
58 | 71 |
59 // TODO(hpayer): Use hash table instead of array. | 72 // TODO(hpayer): Use hash table instead of array. |
60 void EventHandlerImplementation::SetPort(intptr_t fd, | 73 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
61 Dart_Port dart_port, | |
62 intptr_t mask) { | |
63 ASSERT(fd >= 0); | 74 ASSERT(fd >= 0); |
64 if (fd >= port_map_size_) { | 75 if (fd >= port_map_size_) { |
65 intptr_t new_port_map_size = port_map_size_; | 76 intptr_t new_port_map_size = port_map_size_; |
66 do { | 77 do { |
67 new_port_map_size = new_port_map_size * kPortMapGrowingFactor; | 78 new_port_map_size = new_port_map_size * kPortMapGrowingFactor; |
68 } while (fd >= new_port_map_size); | 79 } while (fd >= new_port_map_size); |
69 size_t new_port_map_bytes = new_port_map_size * sizeof(PortData); | 80 size_t new_port_map_bytes = new_port_map_size * sizeof(SocketData); |
70 port_map_ = reinterpret_cast<PortData*>(realloc(port_map_, | 81 port_map_ = reinterpret_cast<SocketData*>(realloc(port_map_, |
71 new_port_map_bytes)); | 82 new_port_map_bytes)); |
72 ASSERT(port_map_ != NULL); | 83 ASSERT(port_map_ != NULL); |
73 size_t port_map_bytes = port_map_size_ * sizeof(PortData); | 84 size_t port_map_bytes = port_map_size_ * sizeof(SocketData); |
74 memset(port_map_ + port_map_size_, | 85 memset(port_map_ + port_map_size_, |
75 0, | 86 0, |
76 new_port_map_bytes - port_map_bytes); | 87 new_port_map_bytes - port_map_bytes); |
77 port_map_size_ = new_port_map_size; | 88 port_map_size_ = new_port_map_size; |
78 } | 89 } |
79 | 90 |
80 /* | 91 return port_map_ + fd; |
81 * Only change the port map entries count if SetPort changes | |
82 * the port map state. | |
83 */ | |
84 if (dart_port == 0 && PortFor(fd) != 0) { | |
85 port_map_entries_--; | |
86 } else if (dart_port != 0 && PortFor(fd) == 0) { | |
87 port_map_entries_++; | |
88 } | |
89 port_map_[fd].dart_port = dart_port; | |
90 port_map_[fd].mask = mask; | |
91 } | 92 } |
92 | 93 |
93 | 94 |
94 Dart_Port EventHandlerImplementation::PortFor(intptr_t fd) { | 95 void EventHandlerImplementation::SetPort(intptr_t fd, |
95 return port_map_[fd].dart_port; | 96 Dart_Port dart_port, |
| 97 intptr_t mask) { |
| 98 SocketData* sd = GetSocketData(fd); |
| 99 |
| 100 // Only change the port map entries count if SetPort changes the |
| 101 // port map state. |
| 102 if (dart_port == 0 && sd->port() != 0) { |
| 103 port_map_entries_--; |
| 104 } else if (dart_port != 0 && sd->port() == 0) { |
| 105 port_map_entries_++; |
| 106 } |
| 107 |
| 108 sd->set_port(dart_port); |
| 109 sd->set_mask(mask); |
96 } | 110 } |
97 | 111 |
98 | 112 |
99 bool EventHandlerImplementation::IsListeningSocket(intptr_t fd) { | |
100 return (port_map_[fd].mask & (1 << kListeningSocket)) != 0; | |
101 } | |
102 | |
103 | |
104 void EventHandlerImplementation::RegisterFdWakeup(intptr_t id, | 113 void EventHandlerImplementation::RegisterFdWakeup(intptr_t id, |
105 Dart_Port dart_port, | 114 Dart_Port dart_port, |
106 intptr_t data) { | 115 intptr_t data) { |
107 WakeupHandler(id, dart_port, data); | 116 WakeupHandler(id, dart_port, data); |
108 } | 117 } |
109 | 118 |
110 | 119 |
111 void EventHandlerImplementation::CloseFd(intptr_t id) { | 120 void EventHandlerImplementation::CloseFd(intptr_t id) { |
112 SetPort(id, 0, 0); | 121 SetPort(id, 0, 0); |
113 close(id); | 122 close(id); |
(...skipping 18 matching lines...) Expand all Loading... |
132 msg.dart_port = dart_port; | 141 msg.dart_port = dart_port; |
133 msg.data = data; | 142 msg.data = data; |
134 intptr_t result = | 143 intptr_t result = |
135 write(interrupt_fds_[1], &msg, kInterruptMessageSize); | 144 write(interrupt_fds_[1], &msg, kInterruptMessageSize); |
136 if (result != kInterruptMessageSize) { | 145 if (result != kInterruptMessageSize) { |
137 perror("Interrupt message failure"); | 146 perror("Interrupt message failure"); |
138 } | 147 } |
139 } | 148 } |
140 | 149 |
141 | 150 |
142 void EventHandlerImplementation::SetPollEvents(struct pollfd* pollfds, | |
143 intptr_t mask) { | |
144 // Do not ask for POLLERR and POLLHUP explicitly as they are | |
145 // triggered anyway. | |
146 if ((mask & (1 << kInEvent)) != 0) { | |
147 pollfds->events |= POLLIN; | |
148 } | |
149 if ((mask & (1 << kOutEvent)) != 0) { | |
150 pollfds->events |= POLLOUT; | |
151 } | |
152 } | |
153 | |
154 | |
155 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { | 151 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { |
156 struct pollfd* pollfds; | 152 struct pollfd* pollfds; |
157 | 153 |
158 intptr_t numPollfds = 1 + port_map_entries_; | 154 intptr_t numPollfds = 1 + port_map_entries_; |
159 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), | 155 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), |
160 numPollfds)); | 156 numPollfds)); |
161 pollfds[0].fd = interrupt_fds_[0]; | 157 pollfds[0].fd = interrupt_fds_[0]; |
162 pollfds[0].events |= POLLIN; | 158 pollfds[0].events |= POLLIN; |
163 | 159 |
164 // TODO(hpayer): optimize the following iteration over the hash map | 160 // TODO(hpayer): optimize the following iteration over the hash map |
165 int j = 1; | 161 int j = 1; |
166 for (int i = 0; i < port_map_size_; i++) { | 162 for (int i = 0; i < port_map_size_; i++) { |
167 if (port_map_[i].dart_port != 0) { | 163 SocketData* sd = &port_map_[i]; |
| 164 if (sd->port() != 0) { |
168 // Fd is added to the poll set. | 165 // Fd is added to the poll set. |
169 pollfds[j].fd = i; | 166 pollfds[j].fd = i; |
170 SetPollEvents(&pollfds[j], port_map_[i].mask); | 167 sd->FillPollEvents(&pollfds[j]); |
171 j++; | 168 j++; |
172 } | 169 } |
173 } | 170 } |
174 *pollfds_size = numPollfds; | 171 *pollfds_size = numPollfds; |
175 return pollfds; | 172 return pollfds; |
176 } | 173 } |
177 | 174 |
178 | 175 |
179 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { | 176 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { |
180 int total_read = 0; | 177 int total_read = 0; |
(...skipping 27 matching lines...) Expand all Loading... |
208 CloseFd(msg.id); | 205 CloseFd(msg.id); |
209 } else { | 206 } else { |
210 SetPort(msg.id, msg.dart_port, msg.data); | 207 SetPort(msg.id, msg.dart_port, msg.data); |
211 } | 208 } |
212 } | 209 } |
213 } | 210 } |
214 | 211 |
215 | 212 |
216 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { | 213 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { |
217 intptr_t event_mask = 0; | 214 intptr_t event_mask = 0; |
218 if (IsListeningSocket(pollfd->fd)) { | 215 SocketData* sd = GetSocketData(pollfd->fd); |
| 216 if (sd->IsListeningSocket()) { |
219 // For listening sockets the POLLIN event indicate that there are | 217 // For listening sockets the POLLIN event indicate that there are |
220 // connections ready for accept unless accompanied with one of the | 218 // connections ready for accept unless accompanied with one of the |
221 // other flags. | 219 // other flags. |
222 if ((pollfd->revents & POLLIN) != 0) { | 220 if ((pollfd->revents & POLLIN) != 0) { |
223 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); | 221 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); |
224 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); | 222 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); |
225 if (event_mask == 0) event_mask |= (1 << kInEvent); | 223 if (event_mask == 0) event_mask |= (1 << kInEvent); |
226 } | 224 } |
227 } else { | 225 } else { |
228 // Prioritize data events over close and error events. | 226 // Prioritize data events over close and error events. |
(...skipping 22 matching lines...) Expand all Loading... |
251 } | 249 } |
252 if (result_size > 0) { | 250 if (result_size > 0) { |
253 for (int i = 1; i < pollfds_size; i++) { | 251 for (int i = 1; i < pollfds_size; i++) { |
254 /* | 252 /* |
255 * The fd is unregistered. It gets re-registered when the request | 253 * The fd is unregistered. It gets re-registered when the request |
256 * was handled by dart. | 254 * was handled by dart. |
257 */ | 255 */ |
258 intptr_t event_mask = GetPollEvents(&pollfds[i]); | 256 intptr_t event_mask = GetPollEvents(&pollfds[i]); |
259 if (event_mask != 0) { | 257 if (event_mask != 0) { |
260 intptr_t fd = pollfds[i].fd; | 258 intptr_t fd = pollfds[i].fd; |
261 Dart_Port port = PortFor(fd); | 259 Dart_Port port = GetSocketData(fd)->port(); |
262 ASSERT(port != 0); | 260 ASSERT(port != 0); |
263 UnregisterFd(fd); | 261 UnregisterFd(fd); |
264 Dart_PostIntArray(port, 1, &event_mask); | 262 Dart_PostIntArray(port, 1, &event_mask); |
265 } | 263 } |
266 } | 264 } |
267 } | 265 } |
268 HandleInterruptFd(); | 266 HandleInterruptFd(); |
269 } | 267 } |
270 | 268 |
271 | 269 |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
321 FATAL("Create start event handler thread"); | 319 FATAL("Create start event handler thread"); |
322 } | 320 } |
323 } | 321 } |
324 | 322 |
325 | 323 |
326 void EventHandlerImplementation::SendData(intptr_t id, | 324 void EventHandlerImplementation::SendData(intptr_t id, |
327 Dart_Port dart_port, | 325 Dart_Port dart_port, |
328 intptr_t data) { | 326 intptr_t data) { |
329 RegisterFdWakeup(id, dart_port, data); | 327 RegisterFdWakeup(id, dart_port, data); |
330 } | 328 } |
OLD | NEW |