Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(44)

Side by Side Diff: runtime/bin/eventhandler_linux.cc

Issue 8437090: Change the handling of closing sockets (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Addressed review comments by ager@ Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/bin/eventhandler_linux.h ('k') | runtime/bin/eventhandler_macos.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include <errno.h> 5 #include <errno.h>
6 #include <poll.h> 6 #include <poll.h>
7 #include <pthread.h> 7 #include <pthread.h>
8 #include <stdio.h> 8 #include <stdio.h>
9 #include <string.h> 9 #include <string.h>
10 #include <sys/time.h> 10 #include <sys/time.h>
11 #include <unistd.h> 11 #include <unistd.h>
12 12
13 #include "bin/eventhandler.h" 13 #include "bin/eventhandler.h"
14 #include "bin/fdutils.h" 14 #include "bin/fdutils.h"
15 15
16 16
17 int64_t GetCurrentTimeMilliseconds() { 17 int64_t GetCurrentTimeMilliseconds() {
18 struct timeval tv; 18 struct timeval tv;
19 if (gettimeofday(&tv, NULL) < 0) { 19 if (gettimeofday(&tv, NULL) < 0) {
20 UNREACHABLE(); 20 UNREACHABLE();
21 return 0; 21 return 0;
22 } 22 }
23 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; 23 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000;
24 } 24 }
25 25
26 26
27 static const int kInitialPortMapSize = 128; 27 static const int kInitialPortMapSize = 16;
28 static const int kPortMapGrowingFactor = 2; 28 static const int kPortMapGrowingFactor = 2;
29 static const int kInterruptMessageSize = sizeof(InterruptMessage); 29 static const int kInterruptMessageSize = sizeof(InterruptMessage);
30 static const int kInfinityTimeout = -1; 30 static const int kInfinityTimeout = -1;
31 static const int kTimerId = -1; 31 static const int kTimerId = -1;
32 32
33 33
34 34 intptr_t SocketData::GetPollEvents() {
35 void SocketData::FillPollEvents(struct pollfd* pollfds) {
36 // Do not ask for POLLERR and POLLHUP explicitly as they are 35 // Do not ask for POLLERR and POLLHUP explicitly as they are
37 // triggered anyway. 36 // triggered anyway.
38 if ((_mask & (1 << kInEvent)) != 0) { 37 intptr_t events = 0;
39 pollfds->events |= POLLIN; 38 if (!IsClosedRead()) {
39 if ((mask_ & (1 << kInEvent)) != 0) {
40 events |= POLLIN;
41 }
40 } 42 }
41 if ((_mask & (1 << kOutEvent)) != 0) { 43 if (!IsClosedWrite()) {
42 pollfds->events |= POLLOUT; 44 if ((mask_ & (1 << kOutEvent)) != 0) {
45 events |= POLLOUT;
46 }
43 } 47 }
44 pollfds->events |= POLLRDHUP; 48 return events;
45 } 49 }
46 50
47 51
48 EventHandlerImplementation::EventHandlerImplementation() { 52 EventHandlerImplementation::EventHandlerImplementation() {
49 intptr_t result; 53 intptr_t result;
50 socket_map_entries_ = 0;
51 socket_map_size_ = kInitialPortMapSize; 54 socket_map_size_ = kInitialPortMapSize;
52 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_, 55 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_,
53 sizeof(SocketData))); 56 sizeof(SocketData)));
54 ASSERT(socket_map_ != NULL); 57 ASSERT(socket_map_ != NULL);
55 result = pipe(interrupt_fds_); 58 result = pipe(interrupt_fds_);
56 if (result != 0) { 59 if (result != 0) {
57 FATAL("Pipe creation failed"); 60 FATAL("Pipe creation failed");
58 } 61 }
59 FDUtils::SetNonBlocking(interrupt_fds_[0]); 62 FDUtils::SetNonBlocking(interrupt_fds_[0]);
60 FDUtils::SetNonBlocking(interrupt_fds_[1]); 63 FDUtils::SetNonBlocking(interrupt_fds_[1]);
(...skipping 21 matching lines...) Expand all
82 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_, 85 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_,
83 new_socket_map_bytes)); 86 new_socket_map_bytes));
84 ASSERT(socket_map_ != NULL); 87 ASSERT(socket_map_ != NULL);
85 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData); 88 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData);
86 memset(socket_map_ + socket_map_size_, 89 memset(socket_map_ + socket_map_size_,
87 0, 90 0,
88 new_socket_map_bytes - socket_map_bytes); 91 new_socket_map_bytes - socket_map_bytes);
89 socket_map_size_ = new_socket_map_size; 92 socket_map_size_ = new_socket_map_size;
90 } 93 }
91 94
92 return socket_map_ + fd; 95 SocketData* sd = socket_map_ + fd;
96 sd->set_fd(fd); // For now just make sure the fd is set.
97 return sd;
93 } 98 }
94 99
95 100
96 void EventHandlerImplementation::SetPort(intptr_t fd,
97 Dart_Port dart_port,
98 intptr_t mask) {
99 SocketData* sd = GetSocketData(fd);
100
101 // Only change the port map entries count if SetPort changes the
102 // port map state.
103 if (dart_port == 0 && sd->port() != 0) {
104 socket_map_entries_--;
105 } else if (dart_port != 0 && sd->port() == 0) {
106 socket_map_entries_++;
107 }
108
109 sd->set_port(dart_port);
110 sd->set_mask(mask);
111 }
112
113
114 void EventHandlerImplementation::RegisterFdWakeup(intptr_t id,
115 Dart_Port dart_port,
116 intptr_t data) {
117 WakeupHandler(id, dart_port, data);
118 }
119
120
121 void EventHandlerImplementation::CloseFd(intptr_t id) {
122 SetPort(id, 0, 0);
123 close(id);
124 }
125
126
127 void EventHandlerImplementation::UnregisterFdWakeup(intptr_t id) {
128 WakeupHandler(id, 0, 0);
129 }
130
131
132 void EventHandlerImplementation::UnregisterFd(intptr_t id) {
133 SetPort(id, 0, 0);
134 }
135
136
137 void EventHandlerImplementation::WakeupHandler(intptr_t id, 101 void EventHandlerImplementation::WakeupHandler(intptr_t id,
138 Dart_Port dart_port, 102 Dart_Port dart_port,
139 int64_t data) { 103 int64_t data) {
140 InterruptMessage msg; 104 InterruptMessage msg;
141 msg.id = id; 105 msg.id = id;
142 msg.dart_port = dart_port; 106 msg.dart_port = dart_port;
143 msg.data = data; 107 msg.data = data;
144 intptr_t result = 108 intptr_t result =
145 write(interrupt_fds_[1], &msg, kInterruptMessageSize); 109 write(interrupt_fds_[1], &msg, kInterruptMessageSize);
146 if (result != kInterruptMessageSize) { 110 if (result != kInterruptMessageSize) {
147 perror("Interrupt message failure"); 111 perror("Interrupt message failure");
148 } 112 }
149 } 113 }
150 114
151 115
152 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { 116 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) {
153 struct pollfd* pollfds; 117 struct pollfd* pollfds;
154 118
155 intptr_t numPollfds = 1 + socket_map_entries_; 119 // Calculate the number of file descriptors to poll on.
120 intptr_t numPollfds = 1;
121 for (int i = 0; i < socket_map_size_; i++) {
122 SocketData* sd = &socket_map_[i];
123 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++;
124 }
125
156 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), 126 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd),
157 numPollfds)); 127 numPollfds));
158 pollfds[0].fd = interrupt_fds_[0]; 128 pollfds[0].fd = interrupt_fds_[0];
159 pollfds[0].events |= POLLIN; 129 pollfds[0].events |= POLLIN;
160 130
161 // TODO(hpayer): optimize the following iteration over the hash map 131 // TODO(hpayer): optimize the following iteration over the hash map
162 int j = 1; 132 int j = 1;
163 for (int i = 0; i < socket_map_size_; i++) { 133 for (int i = 0; i < socket_map_size_; i++) {
164 SocketData* sd = &socket_map_[i]; 134 SocketData* sd = &socket_map_[i];
165 if (sd->port() != 0) { 135 intptr_t events = sd->GetPollEvents();
136 if (sd->port() > 0 && events != 0) {
166 // Fd is added to the poll set. 137 // Fd is added to the poll set.
167 pollfds[j].fd = i; 138 pollfds[j].fd = sd->fd();
168 sd->FillPollEvents(&pollfds[j]); 139 pollfds[j].events = events;
169 j++; 140 j++;
170 } 141 }
171 } 142 }
172 *pollfds_size = numPollfds; 143 ASSERT(numPollfds == j);
144 *pollfds_size = j;
173 return pollfds; 145 return pollfds;
174 } 146 }
175 147
176 148
177 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { 149 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) {
178 int total_read = 0; 150 int total_read = 0;
179 int bytes_read = read(interrupt_fds_[0], msg, kInterruptMessageSize); 151 int bytes_read = read(interrupt_fds_[0], msg, kInterruptMessageSize);
180 if (bytes_read < 0) { 152 if (bytes_read < 0) {
181 return false; 153 return false;
182 } 154 }
183 total_read = bytes_read; 155 total_read = bytes_read;
184 while (total_read < kInterruptMessageSize) { 156 while (total_read < kInterruptMessageSize) {
185 bytes_read = read(interrupt_fds_[0], 157 bytes_read = read(interrupt_fds_[0],
186 msg + total_read, 158 msg + total_read,
187 kInterruptMessageSize - total_read); 159 kInterruptMessageSize - total_read);
188 if (bytes_read > 0) { 160 if (bytes_read > 0) {
189 total_read = total_read + bytes_read; 161 total_read = total_read + bytes_read;
190 } 162 }
191 } 163 }
192 return (total_read == kInterruptMessageSize) ? true : false; 164 return (total_read == kInterruptMessageSize) ? true : false;
193 } 165 }
194 166
195 void EventHandlerImplementation::HandleInterruptFd() { 167 void EventHandlerImplementation::HandleInterruptFd() {
196 InterruptMessage msg; 168 InterruptMessage msg;
197 while (GetInterruptMessage(&msg)) { 169 while (GetInterruptMessage(&msg)) {
198 if (msg.id == kTimerId) { 170 if (msg.id == kTimerId) {
199 timeout_ = msg.data; 171 timeout_ = msg.data;
200 timeout_port_ = msg.dart_port; 172 timeout_port_ = msg.dart_port;
201 } else if ((msg.data & (1 << kCloseCommand)) != 0) {
202 /*
203 * A close event happened in dart, we have to explicitly unregister
204 * the fd and close the fd.
205 */
206 CloseFd(msg.id);
207 } else { 173 } else {
208 SetPort(msg.id, msg.dart_port, msg.data); 174 SocketData* sd = GetSocketData(msg.id);
175 if ((msg.data & (1 << kShutdownReadCommand)) != 0) {
176 ASSERT(msg.data == (1 << kShutdownReadCommand));
177 // Close the socket for reading.
178 sd->ShutdownRead();
179 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) {
180 ASSERT(msg.data == (1 << kShutdownWriteCommand));
181 // Close the socket for writing.
182 sd->ShutdownWrite();
183 } else if ((msg.data & (1 << kCloseCommand)) != 0) {
184 ASSERT(msg.data == (1 << kCloseCommand));
185 // Close the socket and free system resources.
186 sd->Close();
187 } else {
188 // Setup events to wait for.
189 sd->SetPortAndMask(msg.dart_port, msg.data);
190 }
209 } 191 }
210 } 192 }
211 } 193 }
212 194
195 #ifdef DEBUG_POLL
196 static void PrintEventMask(struct pollfd* pollfd) {
197 printf("%d ", pollfd->fd);
198 if ((pollfd->revents & POLLIN) != 0) printf("POLLIN ");
199 if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI ");
200 if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT ");
201 if ((pollfd->revents & POLLERR) != 0) printf("POLLERR ");
202 if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP ");
203 if ((pollfd->revents & POLLRDHUP) != 0) printf("POLLRDHUP ");
204 if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL ");
205 int all_events = POLLIN | POLLPRI | POLLOUT |
206 POLLERR | POLLHUP | POLLRDHUP | POLLNVAL;
207 if ((pollfd->revents & ~all_events) != 0) {
208 printf("(and %08x) ", pollfd->revents & ~all_events);
209 }
210 printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd));
211
212 printf("\n");
213 }
214 #endif
213 215
214 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { 216 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) {
217 #ifdef DEBUG_POLL
218 if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd);
219 #endif
215 intptr_t event_mask = 0; 220 intptr_t event_mask = 0;
216 SocketData* sd = GetSocketData(pollfd->fd); 221 SocketData* sd = GetSocketData(pollfd->fd);
217 if (sd->IsListeningSocket()) { 222 if (sd->IsListeningSocket()) {
218 // For listening sockets the POLLIN event indicate that there are 223 // For listening sockets the POLLIN event indicate that there are
219 // connections ready for accept unless accompanied with one of the 224 // connections ready for accept unless accompanied with one of the
220 // other flags. 225 // other flags.
221 if ((pollfd->revents & POLLIN) != 0) { 226 if ((pollfd->revents & POLLIN) != 0) {
222 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); 227 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent);
223 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); 228 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent);
224 if (event_mask == 0) event_mask |= (1 << kInEvent); 229 if (event_mask == 0) event_mask |= (1 << kInEvent);
225 } 230 }
226 } else { 231 } else {
232 if ((pollfd->revents & POLLNVAL) != 0) {
233 return 0;
234 }
235
227 // Prioritize data events over close and error events. 236 // Prioritize data events over close and error events.
228 if ((pollfd->revents & POLLIN) != 0) { 237 if ((pollfd->revents & POLLIN) != 0) {
229 if (FDUtils::AvailableBytes(pollfd->fd) != 0) { 238 if (FDUtils::AvailableBytes(pollfd->fd) != 0) {
230 event_mask = (1 << kInEvent); 239 event_mask = (1 << kInEvent);
231 } else if (((pollfd->revents & POLLHUP) != 0) || 240 } else if (((pollfd->revents & POLLHUP) != 0)) {
232 ((pollfd->revents & POLLRDHUP) != 0)) {
233 event_mask = (1 << kCloseEvent); 241 event_mask = (1 << kCloseEvent);
242 sd->MarkClosedRead();
234 } else if ((pollfd->revents & POLLERR) != 0) { 243 } else if ((pollfd->revents & POLLERR) != 0) {
235 event_mask = (1 << kErrorEvent); 244 event_mask = (1 << kErrorEvent);
245 } else {
246 // If POLLIN is set with no available data and no POLLHUP use
247 // recv to peek for whether the other end of the socket
248 // actually closed.
249 char buffer;
250 ssize_t bytesPeeked = recv(sd->fd(), &buffer, 1, MSG_PEEK);
251 if (bytesPeeked == 0) {
252 event_mask = (1 << kCloseEvent);
253 sd->MarkClosedRead();
254 } else if (errno != EAGAIN) {
255 fprintf(stderr, "Error recv: %s\n", strerror(errno));
256 }
236 } 257 }
237 } 258 }
238 259
260 // On pipes POLLHUP is reported without POLLIN.
261 if (((pollfd->revents & POLLIN) == 0) &&
262 ((pollfd->revents & POLLHUP) != 0)) {
263 event_mask = (1 << kCloseEvent);
264 sd->MarkClosedRead();
265 }
266
239 if ((pollfd->revents & POLLOUT) != 0) event_mask |= (1 << kOutEvent); 267 if ((pollfd->revents & POLLOUT) != 0) event_mask |= (1 << kOutEvent);
240 } 268 }
241 269
242 return event_mask; 270 return event_mask;
243 } 271 }
244 272
245 273
246 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, 274 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds,
247 int pollfds_size, 275 int pollfds_size,
248 int result_size) { 276 int result_size) {
249 if ((pollfds[0].revents & POLLIN) != 0) { 277 if ((pollfds[0].revents & POLLIN) != 0) {
250 result_size -= 1; 278 result_size -= 1;
251 } 279 }
252 if (result_size > 0) { 280 if (result_size > 0) {
253 for (int i = 1; i < pollfds_size; i++) { 281 for (int i = 1; i < pollfds_size; i++) {
254 /* 282 /*
255 * The fd is unregistered. It gets re-registered when the request 283 * The fd is unregistered. It gets re-registered when the request
256 * was handled by dart. 284 * was handled by dart.
257 */ 285 */
258 intptr_t event_mask = GetPollEvents(&pollfds[i]); 286 intptr_t event_mask = GetPollEvents(&pollfds[i]);
259 if (event_mask != 0) { 287 if (event_mask != 0) {
260 intptr_t fd = pollfds[i].fd; 288 intptr_t fd = pollfds[i].fd;
261 Dart_Port port = GetSocketData(fd)->port(); 289 SocketData* sd = GetSocketData(fd);
290 Dart_Port port = sd->port();
262 ASSERT(port != 0); 291 ASSERT(port != 0);
263 UnregisterFd(fd); 292 sd->Unregister();
264 Dart_PostIntArray(port, 1, &event_mask); 293 Dart_PostIntArray(port, 1, &event_mask);
265 } 294 }
266 } 295 }
267 } 296 }
268 HandleInterruptFd(); 297 HandleInterruptFd();
269 } 298 }
270 299
271 300
272 intptr_t EventHandlerImplementation::GetTimeout() { 301 intptr_t EventHandlerImplementation::GetTimeout() {
273 if (timeout_ == kInfinityTimeout) { 302 if (timeout_ == kInfinityTimeout) {
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
319 this); 348 this);
320 if (result != 0) { 349 if (result != 0) {
321 FATAL("Create start event handler thread"); 350 FATAL("Create start event handler thread");
322 } 351 }
323 } 352 }
324 353
325 354
326 void EventHandlerImplementation::SendData(intptr_t id, 355 void EventHandlerImplementation::SendData(intptr_t id,
327 Dart_Port dart_port, 356 Dart_Port dart_port,
328 intptr_t data) { 357 intptr_t data) {
329 RegisterFdWakeup(id, dart_port, data); 358 WakeupHandler(id, dart_port, data);
330 } 359 }
OLDNEW
« no previous file with comments | « runtime/bin/eventhandler_linux.h ('k') | runtime/bin/eventhandler_macos.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698