Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(134)

Side by Side Diff: runtime/bin/eventhandler_win.cc

Issue 2760293002: [dart:io] Adds a finalizer to _NativeSocket to avoid socket leaks (Closed)
Patch Set: Address comments Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/main.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #if !defined(DART_IO_DISABLED) 5 #if !defined(DART_IO_DISABLED)
6 6
7 #include "platform/globals.h" 7 #include "platform/globals.h"
8 #if defined(HOST_OS_WINDOWS) 8 #if defined(HOST_OS_WINDOWS)
9 9
10 #include "bin/eventhandler.h" 10 #include "bin/eventhandler.h"
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 data_length_ = num_bytes; 108 data_length_ = num_bytes;
109 return num_bytes; 109 return num_bytes;
110 } 110 }
111 111
112 112
113 int OverlappedBuffer::GetRemainingLength() { 113 int OverlappedBuffer::GetRemainingLength() {
114 ASSERT(operation_ == kRead || operation_ == kRecvFrom); 114 ASSERT(operation_ == kRead || operation_ == kRecvFrom);
115 return data_length_ - index_; 115 return data_length_ - index_;
116 } 116 }
117 117
118
118 Handle::Handle(intptr_t handle) 119 Handle::Handle(intptr_t handle)
119 : DescriptorInfoBase(handle), 120 : ReferenceCounted(),
121 DescriptorInfoBase(handle),
120 handle_(reinterpret_cast<HANDLE>(handle)), 122 handle_(reinterpret_cast<HANDLE>(handle)),
121 completion_port_(INVALID_HANDLE_VALUE), 123 completion_port_(INVALID_HANDLE_VALUE),
122 event_handler_(NULL), 124 event_handler_(NULL),
123 data_ready_(NULL), 125 data_ready_(NULL),
124 pending_read_(NULL), 126 pending_read_(NULL),
125 pending_write_(NULL), 127 pending_write_(NULL),
126 last_error_(NOERROR), 128 last_error_(NOERROR),
127 flags_(0), 129 flags_(0),
128 read_thread_id_(Thread::kInvalidThreadId), 130 read_thread_id_(Thread::kInvalidThreadId),
129 read_thread_handle_(NULL), 131 read_thread_handle_(NULL),
130 read_thread_starting_(false), 132 read_thread_starting_(false),
131 read_thread_finished_(false), 133 read_thread_finished_(false),
132 monitor_(new Monitor()) {} 134 monitor_(new Monitor()) {}
133 135
134 136
135 Handle::~Handle() { 137 Handle::~Handle() {
136 delete monitor_; 138 delete monitor_;
137 } 139 }
138 140
139 141
140 bool Handle::CreateCompletionPort(HANDLE completion_port) { 142 bool Handle::CreateCompletionPort(HANDLE completion_port) {
143 ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
144 // A reference to the Handle is Retained by the IO completion port.
145 // It is Released by DeleteIfClosed.
146 Retain();
141 completion_port_ = CreateIoCompletionPort( 147 completion_port_ = CreateIoCompletionPort(
142 handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0); 148 handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0);
143 return (completion_port_ != NULL); 149 return (completion_port_ != NULL);
144 } 150 }
145 151
146 152
147 void Handle::Close() { 153 void Handle::Close() {
148 MonitorLocker ml(monitor_); 154 MonitorLocker ml(monitor_);
149 if (!SupportsOverlappedIO()) { 155 if (!SupportsOverlappedIO()) {
150 // If the handle uses synchronous I/O (e.g. stdin), cancel any pending 156 // If the handle uses synchronous I/O (e.g. stdin), cancel any pending
(...skipping 234 matching lines...) Expand 10 before | Expand all | Expand 10 after
385 } else { 391 } else {
386 HandleError(this); 392 HandleError(this);
387 } 393 }
388 SetLastError(error); 394 SetLastError(error);
389 } 395 }
390 396
391 397
392 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { 398 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
393 MonitorLocker ml(monitor_); 399 MonitorLocker ml(monitor_);
394 event_handler_ = event_handler; 400 event_handler_ = event_handler;
395 if (SupportsOverlappedIO() && (completion_port_ == INVALID_HANDLE_VALUE)) { 401 if (completion_port_ == INVALID_HANDLE_VALUE) {
396 CreateCompletionPort(event_handler_->completion_port()); 402 if (SupportsOverlappedIO()) {
403 CreateCompletionPort(event_handler_->completion_port());
404 } else {
405 // We need to retain the Handle even if overlapped IO is not supported.
406 // It is Released by DeleteIfClosed after ReadSyncCompleteAsync
407 // manually puts an event on the IO completion port.
408 Retain();
409 completion_port_ = event_handler_->completion_port();
410 }
397 } 411 }
398 } 412 }
399 413
400 414
401 bool FileHandle::IsClosed() { 415 bool FileHandle::IsClosed() {
402 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); 416 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
403 } 417 }
404 418
405 419
406 void DirectoryWatchHandle::EnsureInitialized( 420 void DirectoryWatchHandle::EnsureInitialized(
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after
539 closesocket(buffer->client()); 553 closesocket(buffer->client());
540 } 554 }
541 555
542 pending_accept_count_--; 556 pending_accept_count_--;
543 OverlappedBuffer::DisposeBuffer(buffer); 557 OverlappedBuffer::DisposeBuffer(buffer);
544 } 558 }
545 559
546 560
547 static void DeleteIfClosed(Handle* handle) { 561 static void DeleteIfClosed(Handle* handle) {
548 if (handle->IsClosed()) { 562 if (handle->IsClosed()) {
563 handle->set_completion_port(INVALID_HANDLE_VALUE);
564 handle->set_event_handler(NULL);
549 handle->NotifyAllDartPorts(1 << kDestroyedEvent); 565 handle->NotifyAllDartPorts(1 << kDestroyedEvent);
550 handle->RemoveAllPorts(); 566 handle->RemoveAllPorts();
551 delete handle; 567 // Once the Handle is closed, no further events on the IO completion port
568 // will mention it. Thus, we can drop the reference here.
569 handle->Release();
552 } 570 }
553 } 571 }
554 572
555 573
556 void ListenSocket::DoClose() { 574 void ListenSocket::DoClose() {
557 closesocket(socket()); 575 closesocket(socket());
558 handle_ = INVALID_HANDLE_VALUE; 576 handle_ = INVALID_HANDLE_VALUE;
559 while (CanAccept()) { 577 while (CanAccept()) {
560 // Get rid of connections already accepted. 578 // Get rid of connections already accepted.
561 ClientSocket* client = Accept(); 579 ClientSocket* client = Accept();
562 if (client != NULL) { 580 if (client != NULL) {
563 client->Close(); 581 client->Close();
582 // Release the reference from the list.
583 // When an accept completes, we make a new ClientSocket (1 reference),
584 // and add it to the IO completion port (1 more reference). If an
585 // accepted connection is never requested by the Dart code, then
586 // this list owns a reference (first Release), and the IO completion
587 // port owns a reference, (second Release in DeleteIfClosed).
588 client->Release();
564 DeleteIfClosed(client); 589 DeleteIfClosed(client);
565 } else { 590 } else {
566 break; 591 break;
567 } 592 }
568 } 593 }
594 // To finish resetting the state of the ListenSocket back to what it was
595 // before EnsureInitialized was called, we have to reset the AcceptEx_
596 // function pointer.
597 AcceptEx_ = NULL;
569 } 598 }
570 599
571 600
572 bool ListenSocket::CanAccept() { 601 bool ListenSocket::CanAccept() {
573 MonitorLocker ml(monitor_); 602 MonitorLocker ml(monitor_);
574 return accepted_head_ != NULL; 603 return accepted_head_ != NULL;
575 } 604 }
576 605
577 606
578 ClientSocket* ListenSocket::Accept() { 607 ClientSocket* ListenSocket::Accept() {
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after
785 // Note that we return '0', unless a thread have already completed a write. 814 // Note that we return '0', unless a thread have already completed a write.
786 if (thread_wrote_ > 0) { 815 if (thread_wrote_ > 0) {
787 if (num_bytes > thread_wrote_) { 816 if (num_bytes > thread_wrote_) {
788 num_bytes = thread_wrote_; 817 num_bytes = thread_wrote_;
789 } 818 }
790 thread_wrote_ -= num_bytes; 819 thread_wrote_ -= num_bytes;
791 return num_bytes; 820 return num_bytes;
792 } 821 }
793 if (!write_thread_exists_) { 822 if (!write_thread_exists_) {
794 write_thread_exists_ = true; 823 write_thread_exists_ = true;
824 // The write thread gets a reference to the Handle, which it places in
825 // the events it puts on the IO completion port. The reference is
826 // Released by DeleteIfClosed.
827 Retain();
795 int result = Thread::Start(WriteFileThread, reinterpret_cast<uword>(this)); 828 int result = Thread::Start(WriteFileThread, reinterpret_cast<uword>(this));
796 if (result != 0) { 829 if (result != 0) {
797 FATAL1("Failed to start write file thread %d", result); 830 FATAL1("Failed to start write file thread %d", result);
798 } 831 }
799 while (!write_thread_running_) { 832 while (!write_thread_running_) {
800 // Wait until we the thread is running. 833 // Wait until we the thread is running.
801 ml.Wait(Monitor::kNoTimeout); 834 ml.Wait(Monitor::kNoTimeout);
802 } 835 }
803 } 836 }
804 // Only queue up to INT_MAX bytes. 837 // Only queue up to INT_MAX bytes.
(...skipping 16 matching lines...) Expand all
821 } 854 }
822 // Join the thread. 855 // Join the thread.
823 DWORD res = WaitForSingleObject(thread_handle_, INFINITE); 856 DWORD res = WaitForSingleObject(thread_handle_, INFINITE);
824 CloseHandle(thread_handle_); 857 CloseHandle(thread_handle_);
825 ASSERT(res == WAIT_OBJECT_0); 858 ASSERT(res == WAIT_OBJECT_0);
826 } 859 }
827 Handle::DoClose(); 860 Handle::DoClose();
828 } 861 }
829 862
830 863
864 #if defined(DEBUG)
865 intptr_t ClientSocket::disconnecting_ = 0;
866 #endif
867
868
831 bool ClientSocket::LoadDisconnectEx() { 869 bool ClientSocket::LoadDisconnectEx() {
832 // Load the DisconnectEx function into memory using WSAIoctl. 870 // Load the DisconnectEx function into memory using WSAIoctl.
833 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; 871 GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
834 DWORD bytes; 872 DWORD bytes;
835 int status = 873 int status =
836 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER, 874 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
837 &guid_disconnect_ex, sizeof(guid_disconnect_ex), &DisconnectEx_, 875 &guid_disconnect_ex, sizeof(guid_disconnect_ex), &DisconnectEx_,
838 sizeof(DisconnectEx_), &bytes, NULL, NULL); 876 sizeof(DisconnectEx_), &bytes, NULL, NULL);
839 return (status != SOCKET_ERROR); 877 return (status != SOCKET_ERROR);
840 } 878 }
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
907 945
908 void ClientSocket::IssueDisconnect() { 946 void ClientSocket::IssueDisconnect() {
909 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer(); 947 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer();
910 BOOL ok = 948 BOOL ok =
911 DisconnectEx_(socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0); 949 DisconnectEx_(socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0);
912 // DisconnectEx works like other OverlappedIO APIs, where we can get either an 950 // DisconnectEx works like other OverlappedIO APIs, where we can get either an
913 // immediate success or delayed operation by WSA_IO_PENDING being set. 951 // immediate success or delayed operation by WSA_IO_PENDING being set.
914 if (ok || (WSAGetLastError() != WSA_IO_PENDING)) { 952 if (ok || (WSAGetLastError() != WSA_IO_PENDING)) {
915 DisconnectComplete(buffer); 953 DisconnectComplete(buffer);
916 } 954 }
955 // When the Dart side receives this event, it may decide to close its Dart
956 // ports. When all ports are closed, the VM will shut down. The EventHandler
957 // will then shut down. If the EventHandler shuts down before this
958 // asynchronous disconnect finishes, this ClientSocket will be leaked.
959 // TODO(dart:io): Retain a list of client sockets that are in the process of
960 // disconnecting. Disconnect them forcefully, and clean up their resources
961 // when the EventHandler shuts down.
917 NotifyAllDartPorts(1 << kDestroyedEvent); 962 NotifyAllDartPorts(1 << kDestroyedEvent);
918 RemoveAllPorts(); 963 RemoveAllPorts();
964 #if defined(DEBUG)
965 disconnecting_++;
966 #endif
919 } 967 }
920 968
921 969
922 void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) { 970 void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) {
923 OverlappedBuffer::DisposeBuffer(buffer); 971 OverlappedBuffer::DisposeBuffer(buffer);
924 closesocket(socket()); 972 closesocket(socket());
925 if (data_ready_ != NULL) { 973 if (data_ready_ != NULL) {
926 OverlappedBuffer::DisposeBuffer(data_ready_); 974 OverlappedBuffer::DisposeBuffer(data_ready_);
927 } 975 }
928 mark_closed(); 976 mark_closed();
977 #if defined(DEBUG)
978 disconnecting_--;
979 #endif
929 } 980 }
930 981
931 982
932 void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) { 983 void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) {
933 OverlappedBuffer::DisposeBuffer(buffer); 984 OverlappedBuffer::DisposeBuffer(buffer);
934 // Update socket to support full socket API, after ConnectEx completed. 985 // Update socket to support full socket API, after ConnectEx completed.
935 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 986 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
936 // If the port is set, we already listen for this socket in Dart. 987 // If the port is set, we already listen for this socket in Dart.
937 // Handle the cases here. 988 // Handle the cases here.
938 if (!IsClosedRead() && ((Mask() & (1 << kInEvent)) != 0)) { 989 if (!IsClosedRead() && ((Mask() & (1 << kInEvent)) != 0)) {
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
1030 1081
1031 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { 1082 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
1032 ASSERT(this != NULL); 1083 ASSERT(this != NULL);
1033 if (msg->id == kTimerId) { 1084 if (msg->id == kTimerId) {
1034 // Change of timeout request. Just set the new timeout and port as the 1085 // Change of timeout request. Just set the new timeout and port as the
1035 // completion thread will use the new timeout value for its next wait. 1086 // completion thread will use the new timeout value for its next wait.
1036 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); 1087 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
1037 } else if (msg->id == kShutdownId) { 1088 } else if (msg->id == kShutdownId) {
1038 shutdown_ = true; 1089 shutdown_ = true;
1039 } else { 1090 } else {
1040 Handle* handle = reinterpret_cast<Handle*>(msg->id); 1091 Socket* socket = reinterpret_cast<Socket*>(msg->id);
1092 RefCntReleaseScope<Socket> rs(socket);
1093 if (socket->fd() == -1) {
1094 return;
1095 }
1096 Handle* handle = reinterpret_cast<Handle*>(socket->fd());
1041 ASSERT(handle != NULL); 1097 ASSERT(handle != NULL);
1042 1098
1043 if (handle->is_listen_socket()) { 1099 if (handle->is_listen_socket()) {
1044 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(handle); 1100 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(handle);
1045 listen_socket->EnsureInitialized(this); 1101 listen_socket->EnsureInitialized(this);
1046 1102
1047 MonitorLocker ml(listen_socket->monitor_); 1103 MonitorLocker ml(listen_socket->monitor_);
1048 1104
1049 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { 1105 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1050 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); 1106 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
1051 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { 1107 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1052 // `events` can only have kInEvent/kOutEvent flags set. 1108 // `events` can only have kInEvent/kOutEvent flags set.
1053 intptr_t events = msg->data & EVENT_MASK; 1109 intptr_t events = msg->data & EVENT_MASK;
1054 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); 1110 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1055 listen_socket->SetPortAndMask(msg->dart_port, events); 1111 listen_socket->SetPortAndMask(msg->dart_port, events);
1056 TryDispatchingPendingAccepts(listen_socket); 1112 TryDispatchingPendingAccepts(listen_socket);
1057 } else if (IS_COMMAND(msg->data, kCloseCommand)) { 1113 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1058 listen_socket->RemovePort(msg->dart_port); 1114 listen_socket->RemovePort(msg->dart_port);
1059 1115
1060 // We only close the socket file descriptor from the operating 1116 // We only close the socket file descriptor from the operating
1061 // system if there are no other dart socket objects which 1117 // system if there are no other dart socket objects which
1062 // are listening on the same (address, port) combination. 1118 // are listening on the same (address, port) combination.
1063 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance(); 1119 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
1064 MutexLocker locker(registry->mutex()); 1120 MutexLocker locker(registry->mutex());
1065 if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) { 1121 if (registry->CloseSafe(socket)) {
1066 ASSERT(listen_socket->Mask() == 0); 1122 ASSERT(listen_socket->Mask() == 0);
1067 listen_socket->Close(); 1123 listen_socket->Close();
1124 socket->SetClosedFd();
1068 } 1125 }
1069 1126
1070 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); 1127 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent);
1071 } else { 1128 } else {
1072 UNREACHABLE(); 1129 UNREACHABLE();
1073 } 1130 }
1074 } else { 1131 } else {
1075 handle->EnsureInitialized(this); 1132 handle->EnsureInitialized(this);
1076 MonitorLocker ml(handle->monitor_); 1133 MonitorLocker ml(handle->monitor_);
1077 1134
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
1125 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); 1182 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle);
1126 client_socket->Shutdown(SD_RECEIVE); 1183 client_socket->Shutdown(SD_RECEIVE);
1127 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) { 1184 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) {
1128 ASSERT(handle->is_client_socket()); 1185 ASSERT(handle->is_client_socket());
1129 1186
1130 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); 1187 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle);
1131 client_socket->Shutdown(SD_SEND); 1188 client_socket->Shutdown(SD_SEND);
1132 } else if (IS_COMMAND(msg->data, kCloseCommand)) { 1189 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1133 handle->SetPortAndMask(msg->dart_port, 0); 1190 handle->SetPortAndMask(msg->dart_port, 0);
1134 handle->Close(); 1191 handle->Close();
1192 socket->SetClosedFd();
1135 } else { 1193 } else {
1136 UNREACHABLE(); 1194 UNREACHABLE();
1137 } 1195 }
1138 } 1196 }
1139 1197
1140 DeleteIfClosed(handle); 1198 DeleteIfClosed(handle);
1141 } 1199 }
1142 } 1200 }
1143 1201
1144 1202
(...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after
1306 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key); 1364 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key);
1307 HandleConnect(client_socket, bytes, buffer); 1365 HandleConnect(client_socket, bytes, buffer);
1308 break; 1366 break;
1309 } 1367 }
1310 default: 1368 default:
1311 UNREACHABLE(); 1369 UNREACHABLE();
1312 } 1370 }
1313 } 1371 }
1314 1372
1315 1373
1374 void EventHandlerImplementation::HandleCompletionOrInterrupt(
1375 BOOL ok,
1376 DWORD bytes,
1377 ULONG_PTR key,
1378 OVERLAPPED* overlapped) {
1379 if (!ok) {
1380 // Treat ERROR_CONNECTION_ABORTED as connection closed.
1381 // The error ERROR_OPERATION_ABORTED is set for pending
1382 // accept requests for a listen socket which is closed.
1383 // ERROR_NETNAME_DELETED occurs when the client closes
1384 // the socket it is reading from.
1385 DWORD last_error = GetLastError();
1386 if ((last_error == ERROR_CONNECTION_ABORTED) ||
1387 (last_error == ERROR_OPERATION_ABORTED) ||
1388 (last_error == ERROR_NETNAME_DELETED) ||
1389 (last_error == ERROR_BROKEN_PIPE)) {
1390 ASSERT(bytes == 0);
1391 HandleIOCompletion(bytes, key, overlapped);
1392 } else if (last_error == ERROR_MORE_DATA) {
1393 // Don't ASSERT no bytes in this case. This can happen if the receive
1394 // buffer for datagram sockets is too small to contain a full datagram,
1395 // and in this case bytes hold the bytes that was read.
1396 HandleIOCompletion(-1, key, overlapped);
1397 } else {
1398 ASSERT(bytes == 0);
1399 HandleIOCompletion(-1, key, overlapped);
1400 }
1401 } else if (key == NULL) {
1402 // A key of NULL signals an interrupt message.
1403 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped);
1404 HandleInterrupt(msg);
1405 delete msg;
1406 } else {
1407 HandleIOCompletion(bytes, key, overlapped);
1408 }
1409 }
1410
1411
1316 EventHandlerImplementation::EventHandlerImplementation() { 1412 EventHandlerImplementation::EventHandlerImplementation() {
1317 startup_monitor_ = new Monitor(); 1413 startup_monitor_ = new Monitor();
1318 handler_thread_id_ = Thread::kInvalidThreadId; 1414 handler_thread_id_ = Thread::kInvalidThreadId;
1319 handler_thread_handle_ = NULL; 1415 handler_thread_handle_ = NULL;
1320 completion_port_ = 1416 completion_port_ =
1321 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); 1417 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1);
1322 if (completion_port_ == NULL) { 1418 if (completion_port_ == NULL) {
1323 FATAL("Completion port creation failed"); 1419 FATAL("Completion port creation failed");
1324 } 1420 }
1325 shutdown_ = false; 1421 shutdown_ = false;
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
1367 ASSERT(handler_impl != NULL); 1463 ASSERT(handler_impl != NULL);
1368 1464
1369 { 1465 {
1370 MonitorLocker ml(handler_impl->startup_monitor_); 1466 MonitorLocker ml(handler_impl->startup_monitor_);
1371 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId(); 1467 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId();
1372 handler_impl->handler_thread_handle_ = 1468 handler_impl->handler_thread_handle_ =
1373 OpenThread(SYNCHRONIZE, false, handler_impl->handler_thread_id_); 1469 OpenThread(SYNCHRONIZE, false, handler_impl->handler_thread_id_);
1374 ml.Notify(); 1470 ml.Notify();
1375 } 1471 }
1376 1472
1473 DWORD bytes;
1474 ULONG_PTR key;
1475 OVERLAPPED* overlapped;
1476 BOOL ok;
1377 while (!handler_impl->shutdown_) { 1477 while (!handler_impl->shutdown_) {
1378 DWORD bytes;
1379 ULONG_PTR key;
1380 OVERLAPPED* overlapped;
1381 int64_t millis = handler_impl->GetTimeout(); 1478 int64_t millis = handler_impl->GetTimeout();
1382 ASSERT(millis == kInfinityTimeout || millis >= 0); 1479 ASSERT(millis == kInfinityTimeout || millis >= 0);
1383 if (millis > kMaxInt32) { 1480 if (millis > kMaxInt32) {
1384 millis = kMaxInt32; 1481 millis = kMaxInt32;
1385 } 1482 }
1386 ASSERT(sizeof(int32_t) == sizeof(DWORD)); 1483 ASSERT(sizeof(int32_t) == sizeof(DWORD));
1387 BOOL ok = 1484 DWORD timeout = static_cast<DWORD>(millis);
1388 GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, &key, 1485 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1389 &overlapped, static_cast<DWORD>(millis)); 1486 &key, &overlapped, timeout);
1390 1487
1391 if (!ok && (overlapped == NULL)) { 1488 if (!ok && (overlapped == NULL)) {
1392 if (GetLastError() == ERROR_ABANDONED_WAIT_0) { 1489 if (GetLastError() == ERROR_ABANDONED_WAIT_0) {
1393 // The completion port should never be closed. 1490 // The completion port should never be closed.
1394 Log::Print("Completion port closed\n"); 1491 Log::Print("Completion port closed\n");
1395 UNREACHABLE(); 1492 UNREACHABLE();
1396 } else { 1493 } else {
1397 // Timeout is signalled by false result and NULL in overlapped. 1494 // Timeout is signalled by false result and NULL in overlapped.
1398 handler_impl->HandleTimeout(); 1495 handler_impl->HandleTimeout();
1399 } 1496 }
1400 } else if (!ok) {
1401 // Treat ERROR_CONNECTION_ABORTED as connection closed.
1402 // The error ERROR_OPERATION_ABORTED is set for pending
1403 // accept requests for a listen socket which is closed.
1404 // ERROR_NETNAME_DELETED occurs when the client closes
1405 // the socket it is reading from.
1406 DWORD last_error = GetLastError();
1407 if ((last_error == ERROR_CONNECTION_ABORTED) ||
1408 (last_error == ERROR_OPERATION_ABORTED) ||
1409 (last_error == ERROR_NETNAME_DELETED) ||
1410 (last_error == ERROR_BROKEN_PIPE)) {
1411 ASSERT(bytes == 0);
1412 handler_impl->HandleIOCompletion(bytes, key, overlapped);
1413 } else if (last_error == ERROR_MORE_DATA) {
1414 // Don't ASSERT no bytes in this case. This can happen if the receive
1415 // buffer for datagram sockets is to small to contain a full datagram,
1416 // and in this case bytes hold the bytes that was read.
1417 handler_impl->HandleIOCompletion(-1, key, overlapped);
1418 } else {
1419 ASSERT(bytes == 0);
1420 handler_impl->HandleIOCompletion(-1, key, overlapped);
1421 }
1422 } else if (key == NULL) {
1423 // A key of NULL signals an interrupt message.
1424 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped);
1425 handler_impl->HandleInterrupt(msg);
1426 delete msg;
1427 } else { 1497 } else {
1428 handler_impl->HandleIOCompletion(bytes, key, overlapped); 1498 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
1429 } 1499 }
1430 } 1500 }
1501
1502 // In a Debug build, drain the IO completion port to make sure we aren't
1503 // leaking any (non-disconnecting) Handles. In a Release build, we don't care
1504 // because the VM is going down, and the asserts below are Debug-only.
1505 #if defined(DEBUG)
1506 while (true) {
1507 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1508 &key, &overlapped, 0);
1509 if (!ok && (overlapped == NULL)) {
1510 // There was an error or nothing is ready. Assume the port is drained.
1511 break;
1512 }
1513 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
1514 }
1515 #endif
1516
1517 // The eventhandler thread is going down so there should be no more live
1518 // Handles or Sockets.
1519 // TODO(dart:io): It would be nice to be able to assert here that:
1520 // ReferenceCounted<Handle>::instances() == 0;
1521 // However, we cannot at the moment. See the TODO on:
1522 // ClientSocket::IssueDisconnect()
1523 DEBUG_ASSERT(ReferenceCounted<Handle>::instances() ==
1524 ClientSocket::disconnecting());
1525 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
1431 handler->NotifyShutdownDone(); 1526 handler->NotifyShutdownDone();
1432 } 1527 }
1433 1528
1434 1529
1435 void EventHandlerImplementation::Start(EventHandler* handler) { 1530 void EventHandlerImplementation::Start(EventHandler* handler) {
1436 int result = 1531 int result =
1437 Thread::Start(EventHandlerEntry, reinterpret_cast<uword>(handler)); 1532 Thread::Start(EventHandlerEntry, reinterpret_cast<uword>(handler));
1438 if (result != 0) { 1533 if (result != 0) {
1439 FATAL1("Failed to start event handler thread %d", result); 1534 FATAL1("Failed to start event handler thread %d", result);
1440 } 1535 }
(...skipping 15 matching lines...) Expand all
1456 void EventHandlerImplementation::Shutdown() { 1551 void EventHandlerImplementation::Shutdown() {
1457 SendData(kShutdownId, 0, 0); 1552 SendData(kShutdownId, 0, 0);
1458 } 1553 }
1459 1554
1460 } // namespace bin 1555 } // namespace bin
1461 } // namespace dart 1556 } // namespace dart
1462 1557
1463 #endif // defined(HOST_OS_WINDOWS) 1558 #endif // defined(HOST_OS_WINDOWS)
1464 1559
1465 #endif // !defined(DART_IO_DISABLED) 1560 #endif // !defined(DART_IO_DISABLED)
OLDNEW
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/main.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698