| OLD | NEW |
| 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 #include <errno.h> | 5 #include <errno.h> |
| 6 #include <poll.h> | 6 #include <poll.h> |
| 7 #include <pthread.h> | 7 #include <pthread.h> |
| 8 #include <stdio.h> | 8 #include <stdio.h> |
| 9 #include <string.h> | 9 #include <string.h> |
| 10 #include <sys/time.h> | 10 #include <sys/time.h> |
| (...skipping 13 matching lines...) Expand all Loading... |
| 24 } | 24 } |
| 25 | 25 |
| 26 | 26 |
| 27 static const int kInitialPortMapSize = 128; | 27 static const int kInitialPortMapSize = 128; |
| 28 static const int kPortMapGrowingFactor = 2; | 28 static const int kPortMapGrowingFactor = 2; |
| 29 static const int kInterruptMessageSize = sizeof(InterruptMessage); | 29 static const int kInterruptMessageSize = sizeof(InterruptMessage); |
| 30 static const int kInfinityTimeout = -1; | 30 static const int kInfinityTimeout = -1; |
| 31 static const int kTimerId = -1; | 31 static const int kTimerId = -1; |
| 32 | 32 |
| 33 | 33 |
| 34 |
| 35 void SocketData::FillPollEvents(struct pollfd* pollfds) { |
| 36 // Do not ask for POLLERR and POLLHUP explicitly as they are |
| 37 // triggered anyway. |
| 38 if ((_mask & (1 << kInEvent)) != 0) { |
| 39 pollfds->events |= POLLIN; |
| 40 } |
| 41 if ((_mask & (1 << kOutEvent)) != 0) { |
| 42 pollfds->events |= POLLOUT; |
| 43 } |
| 44 pollfds->events |= POLLRDHUP; |
| 45 } |
| 46 |
| 47 |
| 34 EventHandlerImplementation::EventHandlerImplementation() { | 48 EventHandlerImplementation::EventHandlerImplementation() { |
| 35 intptr_t result; | 49 intptr_t result; |
| 36 port_map_entries_ = 0; | 50 socket_map_entries_ = 0; |
| 37 port_map_size_ = kInitialPortMapSize; | 51 socket_map_size_ = kInitialPortMapSize; |
| 38 port_map_ = reinterpret_cast<PortData*>(calloc(port_map_size_, | 52 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_, |
| 39 sizeof(PortData))); | 53 sizeof(SocketData))); |
| 40 ASSERT(port_map_ != NULL); | 54 ASSERT(socket_map_ != NULL); |
| 41 result = pipe(interrupt_fds_); | 55 result = pipe(interrupt_fds_); |
| 42 if (result != 0) { | 56 if (result != 0) { |
| 43 FATAL("Pipe creation failed"); | 57 FATAL("Pipe creation failed"); |
| 44 } | 58 } |
| 45 FDUtils::SetNonBlocking(interrupt_fds_[0]); | 59 FDUtils::SetNonBlocking(interrupt_fds_[0]); |
| 46 FDUtils::SetNonBlocking(interrupt_fds_[1]); | 60 FDUtils::SetNonBlocking(interrupt_fds_[1]); |
| 47 timeout_ = kInfinityTimeout; | 61 timeout_ = kInfinityTimeout; |
| 48 timeout_port_ = 0; | 62 timeout_port_ = 0; |
| 49 } | 63 } |
| 50 | 64 |
| 51 | 65 |
| 52 EventHandlerImplementation::~EventHandlerImplementation() { | 66 EventHandlerImplementation::~EventHandlerImplementation() { |
| 53 free(port_map_); | 67 free(socket_map_); |
| 54 close(interrupt_fds_[0]); | 68 close(interrupt_fds_[0]); |
| 55 close(interrupt_fds_[1]); | 69 close(interrupt_fds_[1]); |
| 56 } | 70 } |
| 57 | 71 |
| 58 | 72 |
| 59 // TODO(hpayer): Use hash table instead of array. | 73 // TODO(hpayer): Use hash table instead of array. |
| 74 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
| 75 ASSERT(fd >= 0); |
| 76 if (fd >= socket_map_size_) { |
| 77 intptr_t new_socket_map_size = socket_map_size_; |
| 78 do { |
| 79 new_socket_map_size = new_socket_map_size * kPortMapGrowingFactor; |
| 80 } while (fd >= new_socket_map_size); |
| 81 size_t new_socket_map_bytes = new_socket_map_size * sizeof(SocketData); |
| 82 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_, |
| 83 new_socket_map_bytes)); |
| 84 ASSERT(socket_map_ != NULL); |
| 85 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData); |
| 86 memset(socket_map_ + socket_map_size_, |
| 87 0, |
| 88 new_socket_map_bytes - socket_map_bytes); |
| 89 socket_map_size_ = new_socket_map_size; |
| 90 } |
| 91 |
| 92 return socket_map_ + fd; |
| 93 } |
| 94 |
| 95 |
| 60 void EventHandlerImplementation::SetPort(intptr_t fd, | 96 void EventHandlerImplementation::SetPort(intptr_t fd, |
| 61 Dart_Port dart_port, | 97 Dart_Port dart_port, |
| 62 intptr_t mask) { | 98 intptr_t mask) { |
| 63 ASSERT(fd >= 0); | 99 SocketData* sd = GetSocketData(fd); |
| 64 if (fd >= port_map_size_) { | 100 |
| 65 intptr_t new_port_map_size = port_map_size_; | 101 // Only change the port map entries count if SetPort changes the |
| 66 do { | 102 // port map state. |
| 67 new_port_map_size = new_port_map_size * kPortMapGrowingFactor; | 103 if (dart_port == 0 && sd->port() != 0) { |
| 68 } while (fd >= new_port_map_size); | 104 socket_map_entries_--; |
| 69 size_t new_port_map_bytes = new_port_map_size * sizeof(PortData); | 105 } else if (dart_port != 0 && sd->port() == 0) { |
| 70 port_map_ = reinterpret_cast<PortData*>(realloc(port_map_, | 106 socket_map_entries_++; |
| 71 new_port_map_bytes)); | |
| 72 ASSERT(port_map_ != NULL); | |
| 73 size_t port_map_bytes = port_map_size_ * sizeof(PortData); | |
| 74 memset(port_map_ + port_map_size_, | |
| 75 0, | |
| 76 new_port_map_bytes - port_map_bytes); | |
| 77 port_map_size_ = new_port_map_size; | |
| 78 } | 107 } |
| 79 | 108 |
| 80 /* | 109 sd->set_port(dart_port); |
| 81 * Only change the port map entries count if SetPort changes | 110 sd->set_mask(mask); |
| 82 * the port map state. | |
| 83 */ | |
| 84 if (dart_port == 0 && PortFor(fd) != 0) { | |
| 85 port_map_entries_--; | |
| 86 } else if (dart_port != 0 && PortFor(fd) == 0) { | |
| 87 port_map_entries_++; | |
| 88 } | |
| 89 port_map_[fd].dart_port = dart_port; | |
| 90 port_map_[fd].mask = mask; | |
| 91 } | 111 } |
| 92 | 112 |
| 93 | 113 |
| 94 Dart_Port EventHandlerImplementation::PortFor(intptr_t fd) { | |
| 95 return port_map_[fd].dart_port; | |
| 96 } | |
| 97 | |
| 98 | |
| 99 bool EventHandlerImplementation::IsListeningSocket(intptr_t fd) { | |
| 100 return (port_map_[fd].mask & (1 << kListeningSocket)) != 0; | |
| 101 } | |
| 102 | |
| 103 | |
| 104 void EventHandlerImplementation::RegisterFdWakeup(intptr_t id, | 114 void EventHandlerImplementation::RegisterFdWakeup(intptr_t id, |
| 105 Dart_Port dart_port, | 115 Dart_Port dart_port, |
| 106 intptr_t data) { | 116 intptr_t data) { |
| 107 WakeupHandler(id, dart_port, data); | 117 WakeupHandler(id, dart_port, data); |
| 108 } | 118 } |
| 109 | 119 |
| 110 | 120 |
| 111 void EventHandlerImplementation::CloseFd(intptr_t id) { | 121 void EventHandlerImplementation::CloseFd(intptr_t id) { |
| 112 SetPort(id, 0, 0); | 122 SetPort(id, 0, 0); |
| 113 close(id); | 123 close(id); |
| (...skipping 18 matching lines...) Expand all Loading... |
| 132 msg.dart_port = dart_port; | 142 msg.dart_port = dart_port; |
| 133 msg.data = data; | 143 msg.data = data; |
| 134 intptr_t result = | 144 intptr_t result = |
| 135 write(interrupt_fds_[1], &msg, kInterruptMessageSize); | 145 write(interrupt_fds_[1], &msg, kInterruptMessageSize); |
| 136 if (result != kInterruptMessageSize) { | 146 if (result != kInterruptMessageSize) { |
| 137 perror("Interrupt message failure"); | 147 perror("Interrupt message failure"); |
| 138 } | 148 } |
| 139 } | 149 } |
| 140 | 150 |
| 141 | 151 |
| 142 void EventHandlerImplementation::SetPollEvents(struct pollfd* pollfds, | |
| 143 intptr_t mask) { | |
| 144 // Do not ask for POLLERR and POLLHUP explicitly as they are | |
| 145 // triggered anyway. | |
| 146 if ((mask & (1 << kInEvent)) != 0) { | |
| 147 pollfds->events |= POLLIN; | |
| 148 } | |
| 149 if ((mask & (1 << kOutEvent)) != 0) { | |
| 150 pollfds->events |= POLLOUT; | |
| 151 } | |
| 152 pollfds->events |= POLLRDHUP; | |
| 153 } | |
| 154 | |
| 155 | |
| 156 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { | 152 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { |
| 157 struct pollfd* pollfds; | 153 struct pollfd* pollfds; |
| 158 | 154 |
| 159 intptr_t numPollfds = 1 + port_map_entries_; | 155 intptr_t numPollfds = 1 + socket_map_entries_; |
| 160 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), | 156 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), |
| 161 numPollfds)); | 157 numPollfds)); |
| 162 pollfds[0].fd = interrupt_fds_[0]; | 158 pollfds[0].fd = interrupt_fds_[0]; |
| 163 pollfds[0].events |= POLLIN; | 159 pollfds[0].events |= POLLIN; |
| 164 | 160 |
| 165 // TODO(hpayer): optimize the following iteration over the hash map | 161 // TODO(hpayer): optimize the following iteration over the hash map |
| 166 int j = 1; | 162 int j = 1; |
| 167 for (int i = 0; i < port_map_size_; i++) { | 163 for (int i = 0; i < socket_map_size_; i++) { |
| 168 if (port_map_[i].dart_port != 0) { | 164 SocketData* sd = &socket_map_[i]; |
| 165 if (sd->port() != 0) { |
| 169 // Fd is added to the poll set. | 166 // Fd is added to the poll set. |
| 170 pollfds[j].fd = i; | 167 pollfds[j].fd = i; |
| 171 SetPollEvents(&pollfds[j], port_map_[i].mask); | 168 sd->FillPollEvents(&pollfds[j]); |
| 172 j++; | 169 j++; |
| 173 } | 170 } |
| 174 } | 171 } |
| 175 *pollfds_size = numPollfds; | 172 *pollfds_size = numPollfds; |
| 176 return pollfds; | 173 return pollfds; |
| 177 } | 174 } |
| 178 | 175 |
| 179 | 176 |
| 180 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { | 177 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { |
| 181 int total_read = 0; | 178 int total_read = 0; |
| (...skipping 27 matching lines...) Expand all Loading... |
| 209 CloseFd(msg.id); | 206 CloseFd(msg.id); |
| 210 } else { | 207 } else { |
| 211 SetPort(msg.id, msg.dart_port, msg.data); | 208 SetPort(msg.id, msg.dart_port, msg.data); |
| 212 } | 209 } |
| 213 } | 210 } |
| 214 } | 211 } |
| 215 | 212 |
| 216 | 213 |
| 217 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { | 214 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { |
| 218 intptr_t event_mask = 0; | 215 intptr_t event_mask = 0; |
| 219 if (IsListeningSocket(pollfd->fd)) { | 216 SocketData* sd = GetSocketData(pollfd->fd); |
| 217 if (sd->IsListeningSocket()) { |
| 220 // For listening sockets the POLLIN event indicate that there are | 218 // For listening sockets the POLLIN event indicate that there are |
| 221 // connections ready for accept unless accompanied with one of the | 219 // connections ready for accept unless accompanied with one of the |
| 222 // other flags. | 220 // other flags. |
| 223 if ((pollfd->revents & POLLIN) != 0) { | 221 if ((pollfd->revents & POLLIN) != 0) { |
| 224 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); | 222 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); |
| 225 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); | 223 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); |
| 226 if (event_mask == 0) event_mask |= (1 << kInEvent); | 224 if (event_mask == 0) event_mask |= (1 << kInEvent); |
| 227 } | 225 } |
| 228 } else { | 226 } else { |
| 229 // Prioritize data events over close and error events. | 227 // Prioritize data events over close and error events. |
| (...skipping 23 matching lines...) Expand all Loading... |
| 253 } | 251 } |
| 254 if (result_size > 0) { | 252 if (result_size > 0) { |
| 255 for (int i = 1; i < pollfds_size; i++) { | 253 for (int i = 1; i < pollfds_size; i++) { |
| 256 /* | 254 /* |
| 257 * The fd is unregistered. It gets re-registered when the request | 255 * The fd is unregistered. It gets re-registered when the request |
| 258 * was handled by dart. | 256 * was handled by dart. |
| 259 */ | 257 */ |
| 260 intptr_t event_mask = GetPollEvents(&pollfds[i]); | 258 intptr_t event_mask = GetPollEvents(&pollfds[i]); |
| 261 if (event_mask != 0) { | 259 if (event_mask != 0) { |
| 262 intptr_t fd = pollfds[i].fd; | 260 intptr_t fd = pollfds[i].fd; |
| 263 Dart_Port port = PortFor(fd); | 261 Dart_Port port = GetSocketData(fd)->port(); |
| 264 ASSERT(port != 0); | 262 ASSERT(port != 0); |
| 265 UnregisterFd(fd); | 263 UnregisterFd(fd); |
| 266 Dart_PostIntArray(port, 1, &event_mask); | 264 Dart_PostIntArray(port, 1, &event_mask); |
| 267 } | 265 } |
| 268 } | 266 } |
| 269 } | 267 } |
| 270 HandleInterruptFd(); | 268 HandleInterruptFd(); |
| 271 } | 269 } |
| 272 | 270 |
| 273 | 271 |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 323 FATAL("Create start event handler thread"); | 321 FATAL("Create start event handler thread"); |
| 324 } | 322 } |
| 325 } | 323 } |
| 326 | 324 |
| 327 | 325 |
| 328 void EventHandlerImplementation::SendData(intptr_t id, | 326 void EventHandlerImplementation::SendData(intptr_t id, |
| 329 Dart_Port dart_port, | 327 Dart_Port dart_port, |
| 330 intptr_t data) { | 328 intptr_t data) { |
| 331 RegisterFdWakeup(id, dart_port, data); | 329 RegisterFdWakeup(id, dart_port, data); |
| 332 } | 330 } |
| OLD | NEW |