Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(82)

Side by Side Diff: runtime/bin/eventhandler_macos.cc

Issue 8437090: Change the handling of closing sockets (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Windows port and frog patch Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include <errno.h> 5 #include <errno.h>
6 #include <poll.h> 6 #include <poll.h>
7 #include <pthread.h> 7 #include <pthread.h>
8 #include <stdio.h> 8 #include <stdio.h>
9 #include <string.h> 9 #include <string.h>
10 #include <sys/time.h> 10 #include <sys/time.h>
11 #include <unistd.h> 11 #include <unistd.h>
12 12
13 #include "bin/eventhandler.h" 13 #include "bin/eventhandler.h"
14 #include "bin/fdutils.h" 14 #include "bin/fdutils.h"
15 15
16 16
17 int64_t GetCurrentTimeMilliseconds() { 17 int64_t GetCurrentTimeMilliseconds() {
18 struct timeval tv; 18 struct timeval tv;
19 if (gettimeofday(&tv, NULL) < 0) { 19 if (gettimeofday(&tv, NULL) < 0) {
20 UNREACHABLE(); 20 UNREACHABLE();
21 return 0; 21 return 0;
22 } 22 }
23 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; 23 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000;
24 } 24 }
25 25
26 26
27 static const int kInitialPortMapSize = 128; 27 static const int kInitialPortMapSize = 16;
28 static const int kPortMapGrowingFactor = 2; 28 static const int kPortMapGrowingFactor = 2;
29 static const int kInterruptMessageSize = sizeof(InterruptMessage); 29 static const int kInterruptMessageSize = sizeof(InterruptMessage);
30 static const int kInfinityTimeout = -1; 30 static const int kInfinityTimeout = -1;
31 static const int kTimerId = -1; 31 static const int kTimerId = -1;
32 32
33 33
34 34 intptr_t SocketData::GetPollEvents() {
35 void SocketData::FillPollEvents(struct pollfd* pollfds) {
36 // Do not ask for POLLERR and POLLHUP explicitly as they are 35 // Do not ask for POLLERR and POLLHUP explicitly as they are
37 // triggered anyway. 36 // triggered anyway.
38 if ((_mask & (1 << kInEvent)) != 0) { 37 intptr_t events = 0;
39 pollfds->events |= POLLIN; 38 if (!IsClosedRead()) {
39 if ((mask_ & (1 << kInEvent)) != 0) {
40 events |= POLLIN;
41 }
40 } 42 }
41 if ((_mask & (1 << kOutEvent)) != 0) { 43 if (!IsClosedWrite()) {
42 pollfds->events |= POLLOUT; 44 if ((mask_ & (1 << kOutEvent)) != 0) {
45 events |= POLLOUT;
46 }
43 } 47 }
48 return events;
44 } 49 }
45 50
46 51
47 EventHandlerImplementation::EventHandlerImplementation() { 52 EventHandlerImplementation::EventHandlerImplementation() {
48 intptr_t result; 53 intptr_t result;
49 socket_map_entries_ = 0;
50 socket_map_size_ = kInitialPortMapSize; 54 socket_map_size_ = kInitialPortMapSize;
51 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_, 55 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_,
52 sizeof(SocketData))); 56 sizeof(SocketData)));
53 ASSERT(socket_map_ != NULL); 57 ASSERT(socket_map_ != NULL);
54 result = pipe(interrupt_fds_); 58 result = pipe(interrupt_fds_);
55 if (result != 0) { 59 if (result != 0) {
56 FATAL("Pipe creation failed"); 60 FATAL("Pipe creation failed");
57 } 61 }
58 FDUtils::SetNonBlocking(interrupt_fds_[0]); 62 FDUtils::SetNonBlocking(interrupt_fds_[0]);
59 FDUtils::SetNonBlocking(interrupt_fds_[1]); 63 FDUtils::SetNonBlocking(interrupt_fds_[1]);
(...skipping 21 matching lines...) Expand all
81 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_, 85 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_,
82 new_socket_map_bytes)); 86 new_socket_map_bytes));
83 ASSERT(socket_map_ != NULL); 87 ASSERT(socket_map_ != NULL);
84 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData); 88 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData);
85 memset(socket_map_ + socket_map_size_, 89 memset(socket_map_ + socket_map_size_,
86 0, 90 0,
87 new_socket_map_bytes - socket_map_bytes); 91 new_socket_map_bytes - socket_map_bytes);
88 socket_map_size_ = new_socket_map_size; 92 socket_map_size_ = new_socket_map_size;
89 } 93 }
90 94
91 return socket_map_ + fd; 95 SocketData* sd = socket_map_ + fd;
96 sd->set_fd(fd); // For now just make sure the fd is set.
97 return sd;
92 } 98 }
93 99
94 100
95 void EventHandlerImplementation::SetPort(intptr_t fd,
96 Dart_Port dart_port,
97 intptr_t mask) {
98 SocketData* sd = GetSocketData(fd);
99
100 // Only change the port map entries count if SetPort changes the
101 // port map state.
102 if (dart_port == 0 && sd->port() != 0) {
103 socket_map_entries_--;
104 } else if (dart_port != 0 && sd->port() == 0) {
105 socket_map_entries_++;
106 }
107
108 sd->set_port(dart_port);
109 sd->set_mask(mask);
110 }
111
112
113 void EventHandlerImplementation::RegisterFdWakeup(intptr_t id,
114 Dart_Port dart_port,
115 intptr_t data) {
116 WakeupHandler(id, dart_port, data);
117 }
118
119
120 void EventHandlerImplementation::CloseFd(intptr_t id) {
121 SetPort(id, 0, 0);
122 close(id);
123 }
124
125
126 void EventHandlerImplementation::UnregisterFdWakeup(intptr_t id) {
127 WakeupHandler(id, 0, 0);
128 }
129
130
131 void EventHandlerImplementation::UnregisterFd(intptr_t id) {
132 SetPort(id, 0, 0);
133 }
134
135
136 void EventHandlerImplementation::WakeupHandler(intptr_t id, 101 void EventHandlerImplementation::WakeupHandler(intptr_t id,
137 Dart_Port dart_port, 102 Dart_Port dart_port,
138 int64_t data) { 103 int64_t data) {
139 InterruptMessage msg; 104 InterruptMessage msg;
140 msg.id = id; 105 msg.id = id;
141 msg.dart_port = dart_port; 106 msg.dart_port = dart_port;
142 msg.data = data; 107 msg.data = data;
143 intptr_t result = 108 intptr_t result =
144 write(interrupt_fds_[1], &msg, kInterruptMessageSize); 109 write(interrupt_fds_[1], &msg, kInterruptMessageSize);
145 if (result != kInterruptMessageSize) { 110 if (result != kInterruptMessageSize) {
146 perror("Interrupt message failure"); 111 perror("Interrupt message failure");
147 } 112 }
148 } 113 }
149 114
150 115
151 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { 116 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) {
152 struct pollfd* pollfds; 117 struct pollfd* pollfds;
153 118
154 intptr_t numPollfds = 1 + socket_map_entries_; 119 // Calculate the number of file descriptors to poll on.
120 intptr_t numPollfds = 1;
121 for (int i = 0; i < socket_map_size_; i++) {
122 SocketData* sd = &socket_map_[i];
123 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++;
124 }
125
155 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), 126 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd),
156 numPollfds)); 127 numPollfds));
157 pollfds[0].fd = interrupt_fds_[0]; 128 pollfds[0].fd = interrupt_fds_[0];
158 pollfds[0].events |= POLLIN; 129 pollfds[0].events |= POLLIN;
159 130
160 // TODO(hpayer): optimize the following iteration over the hash map 131 // TODO(hpayer): optimize the following iteration over the hash map
161 int j = 1; 132 int j = 1;
162 for (int i = 0; i < socket_map_size_; i++) { 133 for (int i = 0; i < socket_map_size_; i++) {
163 SocketData* sd = &socket_map_[i]; 134 SocketData* sd = &socket_map_[i];
164 if (sd->port() != 0) { 135 intptr_t events = sd->GetPollEvents();
136 if (sd->port() > 0 && events != 0) {
165 // Fd is added to the poll set. 137 // Fd is added to the poll set.
166 pollfds[j].fd = i; 138 pollfds[j].fd = sd->fd();
167 sd->FillPollEvents(&pollfds[j]); 139 pollfds[j].events = events;
168 j++; 140 j++;
169 } 141 }
170 } 142 }
171 *pollfds_size = numPollfds; 143 ASSERT(numPollfds == j);
144 *pollfds_size = j;
172 return pollfds; 145 return pollfds;
173 } 146 }
174 147
175 148
176 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { 149 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) {
177 int total_read = 0; 150 int total_read = 0;
178 int bytes_read = read(interrupt_fds_[0], msg, kInterruptMessageSize); 151 int bytes_read = read(interrupt_fds_[0], msg, kInterruptMessageSize);
179 if (bytes_read < 0) { 152 if (bytes_read < 0) {
180 return false; 153 return false;
181 } 154 }
182 total_read = bytes_read; 155 total_read = bytes_read;
183 while (total_read < kInterruptMessageSize) { 156 while (total_read < kInterruptMessageSize) {
184 bytes_read = read(interrupt_fds_[0], 157 bytes_read = read(interrupt_fds_[0],
185 msg + total_read, 158 msg + total_read,
186 kInterruptMessageSize - total_read); 159 kInterruptMessageSize - total_read);
187 if (bytes_read > 0) { 160 if (bytes_read > 0) {
188 total_read = total_read + bytes_read; 161 total_read = total_read + bytes_read;
189 } 162 }
190 } 163 }
191 return (total_read == kInterruptMessageSize) ? true : false; 164 return (total_read == kInterruptMessageSize) ? true : false;
192 } 165 }
193 166
194 void EventHandlerImplementation::HandleInterruptFd() { 167 void EventHandlerImplementation::HandleInterruptFd() {
195 InterruptMessage msg; 168 InterruptMessage msg;
196 while (GetInterruptMessage(&msg)) { 169 while (GetInterruptMessage(&msg)) {
197 if (msg.id == kTimerId) { 170 if (msg.id == kTimerId) {
198 timeout_ = msg.data; 171 timeout_ = msg.data;
199 timeout_port_ = msg.dart_port; 172 timeout_port_ = msg.dart_port;
200 } else if ((msg.data & (1 << kCloseCommand)) != 0) {
201 /*
202 * A close event happened in dart, we have to explicitly unregister
203 * the fd and close the fd.
204 */
205 CloseFd(msg.id);
206 } else { 173 } else {
207 SetPort(msg.id, msg.dart_port, msg.data); 174 SocketData* sd = GetSocketData(msg.id);
175 if ((msg.data & (1 << kShutdownReadCommand)) != 0) {
176 ASSERT(msg.data == (1 << kShutdownReadCommand));
177 // Close the socket for reading.
178 sd->ShutdownRead();
179 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) {
180 ASSERT(msg.data == (1 << kShutdownWriteCommand));
181 // Close the socket for writing.
182 sd->ShutdownWrite();
183 } else if ((msg.data & (1 << kCloseCommand)) != 0) {
184 ASSERT(msg.data == (1 << kCloseCommand));
185 // Close the socket and free system resources.
186 sd->Close();
187 } else {
188 // Setup events to wait for.
189 sd->SetPortAndMask(msg.dart_port, msg.data);
190 }
208 } 191 }
209 } 192 }
210 } 193 }
211 194
195 #ifdef DEBUG_POLL
196 static void PrintEventMask(struct pollfd* pollfd) {
197 printf("%d ", pollfd->fd);
198 if ((pollfd->revents & POLLIN) != 0) printf("POLLIN ");
199 if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI ");
200 if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT ");
201 if ((pollfd->revents & POLLERR) != 0) printf("POLLERR ");
202 if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP ");
203 if ((pollfd->revents & POLLRDHUP) != 0) printf("POLLRDHUP ");
204 if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL ");
205 int all_events = POLLIN | POLLPRI | POLLOUT |
206 POLLERR | POLLHUP | POLLRDHUP | POLLNVAL;
207 if ((pollfd->revents & ~all_events) != 0) {
208 printf("(and %08x) ", pollfd->revents & ~all_events);
209 }
210 printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd));
211
212 printf("\n");
213 }
214 #endif
212 215
213 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { 216 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) {
217 #ifdef DEBUG_POLL
218 if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd);
219 #endif
214 intptr_t event_mask = 0; 220 intptr_t event_mask = 0;
215 SocketData* sd = GetSocketData(pollfd->fd); 221 SocketData* sd = GetSocketData(pollfd->fd);
216 if (sd->IsListeningSocket()) { 222 if (sd->IsListeningSocket()) {
217 // For listening sockets the POLLIN event indicate that there are 223 // For listening sockets the POLLIN event indicate that there are
218 // connections ready for accept unless accompanied with one of the 224 // connections ready for accept unless accompanied with one of the
219 // other flags. 225 // other flags.
220 if ((pollfd->revents & POLLIN) != 0) { 226 if ((pollfd->revents & POLLIN) != 0) {
221 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); 227 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent);
222 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); 228 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent);
223 if (event_mask == 0) event_mask |= (1 << kInEvent); 229 if (event_mask == 0) event_mask |= (1 << kInEvent);
224 } 230 }
225 } else { 231 } else {
232 if ((pollfd->revents & POLLNVAL) != 0) {
233 return 0;
234 }
235
226 // Prioritize data events over close and error events. 236 // Prioritize data events over close and error events.
227 if ((pollfd->revents & POLLIN) != 0) { 237 if ((pollfd->revents & POLLIN) != 0) {
228 if (FDUtils::AvailableBytes(pollfd->fd) != 0) { 238 if (FDUtils::AvailableBytes(pollfd->fd) != 0) {
229 event_mask = (1 << kInEvent); 239 event_mask = (1 << kInEvent);
230 } else if ((pollfd->revents & POLLHUP) != 0) { 240 } else if (((pollfd->revents & POLLHUP) != 0)) {
231 event_mask = (1 << kCloseEvent); 241 event_mask = (1 << kCloseEvent);
242 sd->MarkClosedRead();
232 } else if ((pollfd->revents & POLLERR) != 0) { 243 } else if ((pollfd->revents & POLLERR) != 0) {
233 event_mask = (1 << kErrorEvent); 244 event_mask = (1 << kErrorEvent);
245 } else {
246 // If POLLIN is set with no available data and no POLLHUP use
247 // recv to peek for whether the other end of the socket
248 // actually closed.
249 char buffer;
250 ssize_t bytesPeeked = recv(sd->fd(), &buffer, 1, MSG_PEEK);
251 if (bytesPeeked == 0) {
252 event_mask = (1 << kCloseEvent);
253 sd->MarkClosedRead();
254 } else if (errno != EAGAIN) {
255 fprintf(stderr, "Error recv: %s\n", strerror(errno));
256 }
234 } 257 }
235 } 258 }
236 259
260 // On pipes POLLHUP is reported without POLLIN.
261 if (((pollfd->revents & POLLIN) == 0) &&
262 ((pollfd->revents & POLLHUP) != 0)) {
263 event_mask = (1 << kCloseEvent);
264 sd->MarkClosedRead();
265 }
266
237 if ((pollfd->revents & POLLOUT) != 0) event_mask |= (1 << kOutEvent); 267 if ((pollfd->revents & POLLOUT) != 0) event_mask |= (1 << kOutEvent);
238 } 268 }
239 269
240 return event_mask; 270 return event_mask;
241 } 271 }
242 272
243 273
244 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, 274 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds,
245 int pollfds_size, 275 int pollfds_size,
246 int result_size) { 276 int result_size) {
247 if ((pollfds[0].revents & POLLIN) != 0) { 277 if ((pollfds[0].revents & POLLIN) != 0) {
248 result_size -= 1; 278 result_size -= 1;
249 } 279 }
250 if (result_size > 0) { 280 if (result_size > 0) {
251 for (int i = 1; i < pollfds_size; i++) { 281 for (int i = 1; i < pollfds_size; i++) {
252 /* 282 /*
253 * The fd is unregistered. It gets re-registered when the request 283 * The fd is unregistered. It gets re-registered when the request
254 * was handled by dart. 284 * was handled by dart.
255 */ 285 */
256 intptr_t event_mask = GetPollEvents(&pollfds[i]); 286 intptr_t event_mask = GetPollEvents(&pollfds[i]);
257 if (event_mask != 0) { 287 if (event_mask != 0) {
258 intptr_t fd = pollfds[i].fd; 288 intptr_t fd = pollfds[i].fd;
259 Dart_Port port = GetSocketData(fd)->port(); 289 SocketData* sd = GetSocketData(fd);
290 Dart_Port port = sd->port();
260 ASSERT(port != 0); 291 ASSERT(port != 0);
261 UnregisterFd(fd); 292 sd->Unregister();
262 Dart_PostIntArray(port, 1, &event_mask); 293 Dart_PostIntArray(port, 1, &event_mask);
263 } 294 }
264 } 295 }
265 } 296 }
266 HandleInterruptFd(); 297 HandleInterruptFd();
267 } 298 }
268 299
269 300
270 intptr_t EventHandlerImplementation::GetTimeout() { 301 intptr_t EventHandlerImplementation::GetTimeout() {
271 if (timeout_ == kInfinityTimeout) { 302 if (timeout_ == kInfinityTimeout) {
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
317 this); 348 this);
318 if (result != 0) { 349 if (result != 0) {
319 FATAL("Create start event handler thread"); 350 FATAL("Create start event handler thread");
320 } 351 }
321 } 352 }
322 353
323 354
324 void EventHandlerImplementation::SendData(intptr_t id, 355 void EventHandlerImplementation::SendData(intptr_t id,
325 Dart_Port dart_port, 356 Dart_Port dart_port,
326 intptr_t data) { 357 intptr_t data) {
327 RegisterFdWakeup(id, dart_port, data); 358 WakeupHandler(id, dart_port, data);
328 } 359 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698