Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(226)

Side by Side Diff: runtime/bin/eventhandler_macos.cc

Issue 198743002: Make the event-handler handle backpreasure. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Doc fix Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/bin/eventhandler_macos.h ('k') | runtime/bin/eventhandler_win.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "platform/globals.h" 5 #include "platform/globals.h"
6 #if defined(TARGET_OS_MACOS) 6 #if defined(TARGET_OS_MACOS)
7 7
8 #include "bin/eventhandler.h" 8 #include "bin/eventhandler.h"
9 9
10 #include <errno.h> // NOLINT 10 #include <errno.h> // NOLINT
(...skipping 29 matching lines...) Expand all
40 40
41 bool SocketData::HasWriteEvent() { 41 bool SocketData::HasWriteEvent() {
42 return (mask_ & (1 << kOutEvent)) != 0; 42 return (mask_ & (1 << kOutEvent)) != 0;
43 } 43 }
44 44
45 45
46 // Unregister the file descriptor for a SocketData structure with kqueue. 46 // Unregister the file descriptor for a SocketData structure with kqueue.
47 static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) { 47 static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) {
48 if (!sd->tracked_by_kqueue()) return; 48 if (!sd->tracked_by_kqueue()) return;
49 static const intptr_t kMaxChanges = 2; 49 static const intptr_t kMaxChanges = 2;
50 intptr_t changes = 0;
51 struct kevent events[kMaxChanges]; 50 struct kevent events[kMaxChanges];
52 if (sd->HasReadEvent()) { 51 EV_SET(events, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
53 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL); 52 VOID_TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, 1, NULL, 0, NULL));
54 ++changes; 53 EV_SET(events, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
55 } 54 VOID_TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, 1, NULL, 0, NULL));
56 if (sd->HasWriteEvent()) {
57 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
58 ++changes;
59 }
60 ASSERT(changes > 0);
61 ASSERT(changes <= kMaxChanges);
62 int status =
63 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
64 if (status == -1) {
65 const int kBufferSize = 1024;
66 char error_message[kBufferSize];
67 strerror_r(errno, error_message, kBufferSize);
68 FATAL1("Failed deleting events from kqueue: %s\n", error_message);
69 }
70 sd->set_tracked_by_kqueue(false); 55 sd->set_tracked_by_kqueue(false);
71 } 56 }
72 57
73 58
74 // Update the kqueue registration for SocketData structure to reflect 59 // Update the kqueue registration for SocketData structure to reflect
75 // the events currently of interest. 60 // the events currently of interest.
76 static void AddToKqueue(intptr_t kqueue_fd_, SocketData* sd) { 61 static void AddToKqueue(intptr_t kqueue_fd_, SocketData* sd) {
62 ASSERT(!sd->tracked_by_kqueue());
77 static const intptr_t kMaxChanges = 2; 63 static const intptr_t kMaxChanges = 2;
78 intptr_t changes = 0; 64 intptr_t changes = 0;
79 struct kevent events[kMaxChanges]; 65 struct kevent events[kMaxChanges];
80 // Register or unregister READ filter if needed. 66 // Register or unregister READ filter if needed.
81 if (sd->HasReadEvent()) { 67 if (sd->HasReadEvent()) {
82 EV_SET(events + changes, 68 EV_SET(events + changes,
83 sd->fd(), 69 sd->fd(),
84 EVFILT_READ, 70 EVFILT_READ,
85 EV_ADD | EV_CLEAR, 71 EV_ADD | EV_CLEAR,
86 0, 72 0,
87 0, 73 0,
88 sd); 74 sd);
89 ++changes; 75 ++changes;
90 } 76 }
91 // Register or unregister WRITE filter if needed. 77 // Register or unregister WRITE filter if needed.
92 if (sd->HasWriteEvent()) { 78 if (sd->HasWriteEvent()) {
93 EV_SET(events + changes, 79 EV_SET(events + changes,
94 sd->fd(), 80 sd->fd(),
95 EVFILT_WRITE, 81 EVFILT_WRITE,
96 EV_ADD | EV_CLEAR, 82 EV_ADD | EV_CLEAR,
97 0, 83 0,
98 0, 84 0,
99 sd); 85 sd);
100 ++changes; 86 ++changes;
101 } 87 }
102 ASSERT(changes > 0); 88 ASSERT(changes > 0);
103 ASSERT(changes <= kMaxChanges); 89 ASSERT(changes <= kMaxChanges);
104 int status = 90 int status =
105 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); 91 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
106 if (status == -1) { 92 if (status == -1) {
107 // kQueue does not accept the file descriptor. It could be due to 93 // kQueue does not accept the file descriptor. It could be due to
108 // already closed file descriptor, or unuspported devices, such 94 // already closed file descriptor, or unuspported devices, such
109 // as /dev/null. In such case, mark the file descriptor as closed, 95 // as /dev/null. In such case, mark the file descriptor as closed,
110 // so dart will handle it accordingly. 96 // so dart will handle it accordingly.
111 DartUtils::PostInt32(sd->port(), 1 << kCloseEvent); 97 DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
112 } else { 98 } else {
113 sd->set_tracked_by_kqueue(true); 99 sd->set_tracked_by_kqueue(true);
114 } 100 }
115 } 101 }
(...skipping 29 matching lines...) Expand all
145 } 131 }
146 132
147 133
148 EventHandlerImplementation::~EventHandlerImplementation() { 134 EventHandlerImplementation::~EventHandlerImplementation() {
149 VOID_TEMP_FAILURE_RETRY(close(kqueue_fd_)); 135 VOID_TEMP_FAILURE_RETRY(close(kqueue_fd_));
150 VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); 136 VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[0]));
151 VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); 137 VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[1]));
152 } 138 }
153 139
154 140
155 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd, 141 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
156 bool* is_new) {
157 ASSERT(fd >= 0); 142 ASSERT(fd >= 0);
158 HashMap::Entry* entry = socket_map_.Lookup( 143 HashMap::Entry* entry = socket_map_.Lookup(
159 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); 144 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
160 ASSERT(entry != NULL); 145 ASSERT(entry != NULL);
161 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); 146 SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
162 if (sd == NULL) { 147 if (sd == NULL) {
163 // If there is no data in the hash map for this file descriptor a 148 // If there is no data in the hash map for this file descriptor a
164 // new SocketData for the file descriptor is inserted. 149 // new SocketData for the file descriptor is inserted.
165 sd = new SocketData(fd); 150 sd = new SocketData(fd);
166 entry->value = sd; 151 entry->value = sd;
167 *is_new = true;
168 } 152 }
169 ASSERT(fd == sd->fd()); 153 ASSERT(fd == sd->fd());
170 return sd; 154 return sd;
171 } 155 }
172 156
173 157
174 void EventHandlerImplementation::WakeupHandler(intptr_t id, 158 void EventHandlerImplementation::WakeupHandler(intptr_t id,
175 Dart_Port dart_port, 159 Dart_Port dart_port,
176 int64_t data) { 160 int64_t data) {
177 InterruptMessage msg; 161 InterruptMessage msg;
(...skipping 18 matching lines...) Expand all
196 const intptr_t MAX_MESSAGES = kInterruptMessageSize; 180 const intptr_t MAX_MESSAGES = kInterruptMessageSize;
197 InterruptMessage msg[MAX_MESSAGES]; 181 InterruptMessage msg[MAX_MESSAGES];
198 ssize_t bytes = TEMP_FAILURE_RETRY( 182 ssize_t bytes = TEMP_FAILURE_RETRY(
199 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize)); 183 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
200 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) { 184 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
201 if (msg[i].id == kTimerId) { 185 if (msg[i].id == kTimerId) {
202 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data); 186 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
203 } else if (msg[i].id == kShutdownId) { 187 } else if (msg[i].id == kShutdownId) {
204 shutdown_ = true; 188 shutdown_ = true;
205 } else { 189 } else {
206 bool is_new = false; 190 SocketData* sd = GetSocketData(msg[i].id);
207 SocketData* sd = GetSocketData(msg[i].id, &is_new);
208 if (is_new) {
209 sd->SetPortAndMask(msg[i].dart_port, msg[i].data);
210 }
211 if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) { 191 if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) {
212 ASSERT(msg[i].data == (1 << kShutdownReadCommand)); 192 ASSERT(msg[i].data == (1 << kShutdownReadCommand));
213 // Close the socket for reading. 193 // Close the socket for reading.
214 sd->ShutdownRead(); 194 shutdown(sd->fd(), SHUT_RD);
215 } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) { 195 } else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) {
216 ASSERT(msg[i].data == (1 << kShutdownWriteCommand)); 196 ASSERT(msg[i].data == (1 << kShutdownWriteCommand));
217 // Close the socket for writing. 197 // Close the socket for writing.
218 sd->ShutdownWrite(); 198 shutdown(sd->fd(), SHUT_WR);
219 } else if ((msg[i].data & (1 << kCloseCommand)) != 0) { 199 } else if ((msg[i].data & (1 << kCloseCommand)) != 0) {
220 ASSERT(msg[i].data == (1 << kCloseCommand)); 200 ASSERT(msg[i].data == (1 << kCloseCommand));
221 // Close the socket and free system resources. 201 // Close the socket and free system resources.
222 RemoveFromKqueue(kqueue_fd_, sd); 202 RemoveFromKqueue(kqueue_fd_, sd);
223 intptr_t fd = sd->fd(); 203 intptr_t fd = sd->fd();
224 sd->Close(); 204 sd->Close();
225 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); 205 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
226 delete sd; 206 delete sd;
227 DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent); 207 DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
228 } else { 208 } else if ((msg[i].data & (1 << kReturnTokenCommand)) != 0) {
229 if (is_new) { 209 if (sd->ReturnToken()) {
230 AddToKqueue(kqueue_fd_, sd); 210 AddToKqueue(kqueue_fd_, sd);
231 } 211 }
212 } else {
213 // Setup events to wait for.
214 ASSERT((msg[i].data > 0) && (msg[i].data < kIntptrMax));
215 ASSERT(sd->port() == 0);
216 sd->SetPortAndMask(msg[i].dart_port,
217 static_cast<intptr_t>(msg[i].data));
218 AddToKqueue(kqueue_fd_, sd);
232 } 219 }
233 } 220 }
234 } 221 }
235 } 222 }
236 223
237 #ifdef DEBUG_KQUEUE 224 #ifdef DEBUG_KQUEUE
238 static void PrintEventMask(intptr_t fd, struct kevent* event) { 225 static void PrintEventMask(intptr_t fd, struct kevent* event) {
239 Log::Print("%d ", static_cast<int>(fd)); 226 Log::Print("%d ", static_cast<int>(fd));
240 Log::Print("filter=0x%x:", event->filter); 227 Log::Print("filter=0x%x:", event->filter);
241 if (event->filter == EVFILT_READ) Log::Print("EVFILT_READ "); 228 if (event->filter == EVFILT_READ) Log::Print("EVFILT_READ ");
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
313 char error_message[kBufferSize]; 300 char error_message[kBufferSize];
314 strerror_r(events[i].data, error_message, kBufferSize); 301 strerror_r(events[i].data, error_message, kBufferSize);
315 FATAL1("kevent failed %s\n", error_message); 302 FATAL1("kevent failed %s\n", error_message);
316 } 303 }
317 if (events[i].udata == NULL) { 304 if (events[i].udata == NULL) {
318 interrupt_seen = true; 305 interrupt_seen = true;
319 } else { 306 } else {
320 SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata); 307 SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata);
321 intptr_t event_mask = GetEvents(events + i, sd); 308 intptr_t event_mask = GetEvents(events + i, sd);
322 if (event_mask != 0) { 309 if (event_mask != 0) {
310 if (sd->TakeToken()) {
311 // Took last token, remove from epoll.
312 RemoveFromKqueue(kqueue_fd_, sd);
313 }
323 Dart_Port port = sd->port(); 314 Dart_Port port = sd->port();
324 ASSERT(port != 0); 315 ASSERT(port != 0);
325 DartUtils::PostInt32(port, event_mask); 316 DartUtils::PostInt32(port, event_mask);
326 } 317 }
327 } 318 }
328 } 319 }
329 if (interrupt_seen) { 320 if (interrupt_seen) {
330 // Handle after socket events, so we avoid closing a socket before we handle 321 // Handle after socket events, so we avoid closing a socket before we handle
331 // the current events. 322 // the current events.
332 HandleInterruptFd(); 323 HandleInterruptFd();
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
401 int result = 392 int result =
402 dart::Thread::Start(&EventHandlerImplementation::EventHandlerEntry, 393 dart::Thread::Start(&EventHandlerImplementation::EventHandlerEntry,
403 reinterpret_cast<uword>(handler)); 394 reinterpret_cast<uword>(handler));
404 if (result != 0) { 395 if (result != 0) {
405 FATAL1("Failed to start event handler thread %d", result); 396 FATAL1("Failed to start event handler thread %d", result);
406 } 397 }
407 } 398 }
408 399
409 400
410 void EventHandlerImplementation::Shutdown() { 401 void EventHandlerImplementation::Shutdown() {
411 Notify(kShutdownId, 0, 0); 402 SendData(kShutdownId, 0, 0);
412 } 403 }
413 404
414 405
415 void EventHandlerImplementation::Notify(intptr_t id, 406 void EventHandlerImplementation::SendData(intptr_t id,
416 Dart_Port dart_port, 407 Dart_Port dart_port,
417 int64_t data) { 408 int64_t data) {
418 WakeupHandler(id, dart_port, data); 409 WakeupHandler(id, dart_port, data);
419 } 410 }
420 411
421 412
422 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { 413 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
423 // The hashmap does not support keys with value 0. 414 // The hashmap does not support keys with value 0.
424 return reinterpret_cast<void*>(fd + 1); 415 return reinterpret_cast<void*>(fd + 1);
425 } 416 }
426 417
427 418
428 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { 419 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
429 // The hashmap does not support keys with value 0. 420 // The hashmap does not support keys with value 0.
430 return dart::Utils::WordHash(fd + 1); 421 return dart::Utils::WordHash(fd + 1);
431 } 422 }
432 423
433 } // namespace bin 424 } // namespace bin
434 } // namespace dart 425 } // namespace dart
435 426
436 #endif // defined(TARGET_OS_MACOS) 427 #endif // defined(TARGET_OS_MACOS)
OLDNEW
« no previous file with comments | « runtime/bin/eventhandler_macos.h ('k') | runtime/bin/eventhandler_win.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698