OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #if !defined(DART_IO_DISABLED) | 5 #if !defined(DART_IO_DISABLED) |
6 | 6 |
7 #include "platform/globals.h" | 7 #include "platform/globals.h" |
8 #if defined(HOST_OS_WINDOWS) | 8 #if defined(HOST_OS_WINDOWS) |
9 | 9 |
10 #include "bin/eventhandler.h" | 10 #include "bin/eventhandler.h" |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
108 data_length_ = num_bytes; | 108 data_length_ = num_bytes; |
109 return num_bytes; | 109 return num_bytes; |
110 } | 110 } |
111 | 111 |
112 | 112 |
113 int OverlappedBuffer::GetRemainingLength() { | 113 int OverlappedBuffer::GetRemainingLength() { |
114 ASSERT(operation_ == kRead || operation_ == kRecvFrom); | 114 ASSERT(operation_ == kRead || operation_ == kRecvFrom); |
115 return data_length_ - index_; | 115 return data_length_ - index_; |
116 } | 116 } |
117 | 117 |
118 | |
118 Handle::Handle(intptr_t handle) | 119 Handle::Handle(intptr_t handle) |
119 : DescriptorInfoBase(handle), | 120 : ReferenceCounted(), |
121 DescriptorInfoBase(handle), | |
120 handle_(reinterpret_cast<HANDLE>(handle)), | 122 handle_(reinterpret_cast<HANDLE>(handle)), |
121 completion_port_(INVALID_HANDLE_VALUE), | 123 completion_port_(INVALID_HANDLE_VALUE), |
122 event_handler_(NULL), | 124 event_handler_(NULL), |
123 data_ready_(NULL), | 125 data_ready_(NULL), |
124 pending_read_(NULL), | 126 pending_read_(NULL), |
125 pending_write_(NULL), | 127 pending_write_(NULL), |
126 last_error_(NOERROR), | 128 last_error_(NOERROR), |
127 flags_(0), | 129 flags_(0), |
128 read_thread_id_(Thread::kInvalidThreadId), | 130 read_thread_id_(Thread::kInvalidThreadId), |
129 read_thread_handle_(NULL), | 131 read_thread_handle_(NULL), |
130 read_thread_starting_(false), | 132 read_thread_starting_(false), |
131 read_thread_finished_(false), | 133 read_thread_finished_(false), |
132 monitor_(new Monitor()) {} | 134 monitor_(new Monitor()) {} |
133 | 135 |
134 | 136 |
135 Handle::~Handle() { | 137 Handle::~Handle() { |
136 delete monitor_; | 138 delete monitor_; |
137 } | 139 } |
138 | 140 |
139 | 141 |
140 bool Handle::CreateCompletionPort(HANDLE completion_port) { | 142 bool Handle::CreateCompletionPort(HANDLE completion_port) { |
143 ASSERT(completion_port_ == INVALID_HANDLE_VALUE); | |
144 // A reference to the Handle is Retained by the IO completion port. | |
145 // It is Released by DeleteIfClosed. | |
146 Retain(); | |
141 completion_port_ = CreateIoCompletionPort( | 147 completion_port_ = CreateIoCompletionPort( |
142 handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0); | 148 handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0); |
143 return (completion_port_ != NULL); | 149 return (completion_port_ != NULL); |
144 } | 150 } |
145 | 151 |
146 | 152 |
147 void Handle::Close() { | 153 void Handle::Close() { |
148 MonitorLocker ml(monitor_); | 154 MonitorLocker ml(monitor_); |
149 if (!SupportsOverlappedIO()) { | 155 if (!SupportsOverlappedIO()) { |
150 // If the handle uses synchronous I/O (e.g. stdin), cancel any pending | 156 // If the handle uses synchronous I/O (e.g. stdin), cancel any pending |
(...skipping 234 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
385 } else { | 391 } else { |
386 HandleError(this); | 392 HandleError(this); |
387 } | 393 } |
388 SetLastError(error); | 394 SetLastError(error); |
389 } | 395 } |
390 | 396 |
391 | 397 |
392 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { | 398 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { |
393 MonitorLocker ml(monitor_); | 399 MonitorLocker ml(monitor_); |
394 event_handler_ = event_handler; | 400 event_handler_ = event_handler; |
395 if (SupportsOverlappedIO() && (completion_port_ == INVALID_HANDLE_VALUE)) { | 401 if (completion_port_ == INVALID_HANDLE_VALUE) { |
396 CreateCompletionPort(event_handler_->completion_port()); | 402 if (SupportsOverlappedIO()) { |
403 CreateCompletionPort(event_handler_->completion_port()); | |
404 } else { | |
405 // We need to retain the Handle even if overlapped IO is not supported. | |
406 // It is Released by DeleteIfClosed after ReadSyncCompleteAsync | |
407 // manually puts an event on the IO completion port. | |
408 Retain(); | |
409 completion_port_ = event_handler_->completion_port(); | |
410 } | |
397 } | 411 } |
398 } | 412 } |
399 | 413 |
400 | 414 |
401 bool FileHandle::IsClosed() { | 415 bool FileHandle::IsClosed() { |
402 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); | 416 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); |
403 } | 417 } |
404 | 418 |
405 | 419 |
406 void DirectoryWatchHandle::EnsureInitialized( | 420 void DirectoryWatchHandle::EnsureInitialized( |
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
539 closesocket(buffer->client()); | 553 closesocket(buffer->client()); |
540 } | 554 } |
541 | 555 |
542 pending_accept_count_--; | 556 pending_accept_count_--; |
543 OverlappedBuffer::DisposeBuffer(buffer); | 557 OverlappedBuffer::DisposeBuffer(buffer); |
544 } | 558 } |
545 | 559 |
546 | 560 |
547 static void DeleteIfClosed(Handle* handle) { | 561 static void DeleteIfClosed(Handle* handle) { |
548 if (handle->IsClosed()) { | 562 if (handle->IsClosed()) { |
563 handle->set_completion_port(INVALID_HANDLE_VALUE); | |
564 handle->set_event_handler(NULL); | |
549 handle->NotifyAllDartPorts(1 << kDestroyedEvent); | 565 handle->NotifyAllDartPorts(1 << kDestroyedEvent); |
550 handle->RemoveAllPorts(); | 566 handle->RemoveAllPorts(); |
551 delete handle; | 567 // Once the Handle is closed, no further events on the IO completion port |
568 // will mention it. Thus, we can drop the reference here. | |
569 handle->Release(); | |
552 } | 570 } |
553 } | 571 } |
554 | 572 |
555 | 573 |
556 void ListenSocket::DoClose() { | 574 void ListenSocket::DoClose() { |
557 closesocket(socket()); | 575 closesocket(socket()); |
558 handle_ = INVALID_HANDLE_VALUE; | 576 handle_ = INVALID_HANDLE_VALUE; |
559 while (CanAccept()) { | 577 while (CanAccept()) { |
560 // Get rid of connections already accepted. | 578 // Get rid of connections already accepted. |
561 ClientSocket* client = Accept(); | 579 ClientSocket* client = Accept(); |
562 if (client != NULL) { | 580 if (client != NULL) { |
563 client->Close(); | 581 client->Close(); |
582 // Release the reference from the list. | |
583 // When an accept completes, we make a new ClientSocket (1 reference), | |
584 // and add it to the IO completion port (1 more reference). If an | |
585 // accepted connection is never requested by the Dart code, then | |
586 // this list owns a reference (first Release), and the IO completion | |
587 // port owns a reference, (second Release in DeleteIfClosed). | |
588 client->Release(); | |
564 DeleteIfClosed(client); | 589 DeleteIfClosed(client); |
565 } else { | 590 } else { |
566 break; | 591 break; |
567 } | 592 } |
568 } | 593 } |
594 AcceptEx_ = NULL; | |
Cutch
2017/03/27 17:17:50
Maybe a note explaining why you're NULLing the fie
zra
2017/03/28 14:40:19
Done.
| |
569 } | 595 } |
570 | 596 |
571 | 597 |
572 bool ListenSocket::CanAccept() { | 598 bool ListenSocket::CanAccept() { |
573 MonitorLocker ml(monitor_); | 599 MonitorLocker ml(monitor_); |
574 return accepted_head_ != NULL; | 600 return accepted_head_ != NULL; |
575 } | 601 } |
576 | 602 |
577 | 603 |
578 ClientSocket* ListenSocket::Accept() { | 604 ClientSocket* ListenSocket::Accept() { |
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
785 // Note that we return '0', unless a thread have already completed a write. | 811 // Note that we return '0', unless a thread have already completed a write. |
786 if (thread_wrote_ > 0) { | 812 if (thread_wrote_ > 0) { |
787 if (num_bytes > thread_wrote_) { | 813 if (num_bytes > thread_wrote_) { |
788 num_bytes = thread_wrote_; | 814 num_bytes = thread_wrote_; |
789 } | 815 } |
790 thread_wrote_ -= num_bytes; | 816 thread_wrote_ -= num_bytes; |
791 return num_bytes; | 817 return num_bytes; |
792 } | 818 } |
793 if (!write_thread_exists_) { | 819 if (!write_thread_exists_) { |
794 write_thread_exists_ = true; | 820 write_thread_exists_ = true; |
821 // The write thread gets a reference to the Handle, which it places in | |
822 // the events it puts on the IO completion port. The reference is | |
823 // Released by DeleteIfClosed. | |
824 Retain(); | |
795 int result = Thread::Start(WriteFileThread, reinterpret_cast<uword>(this)); | 825 int result = Thread::Start(WriteFileThread, reinterpret_cast<uword>(this)); |
796 if (result != 0) { | 826 if (result != 0) { |
797 FATAL1("Failed to start write file thread %d", result); | 827 FATAL1("Failed to start write file thread %d", result); |
798 } | 828 } |
799 while (!write_thread_running_) { | 829 while (!write_thread_running_) { |
800 // Wait until we the thread is running. | 830 // Wait until we the thread is running. |
801 ml.Wait(Monitor::kNoTimeout); | 831 ml.Wait(Monitor::kNoTimeout); |
802 } | 832 } |
803 } | 833 } |
804 // Only queue up to INT_MAX bytes. | 834 // Only queue up to INT_MAX bytes. |
(...skipping 16 matching lines...) Expand all Loading... | |
821 } | 851 } |
822 // Join the thread. | 852 // Join the thread. |
823 DWORD res = WaitForSingleObject(thread_handle_, INFINITE); | 853 DWORD res = WaitForSingleObject(thread_handle_, INFINITE); |
824 CloseHandle(thread_handle_); | 854 CloseHandle(thread_handle_); |
825 ASSERT(res == WAIT_OBJECT_0); | 855 ASSERT(res == WAIT_OBJECT_0); |
826 } | 856 } |
827 Handle::DoClose(); | 857 Handle::DoClose(); |
828 } | 858 } |
829 | 859 |
830 | 860 |
861 #if defined(DEBUG) | |
862 intptr_t ClientSocket::disconnecting_ = 0; | |
863 #endif | |
864 | |
865 | |
831 bool ClientSocket::LoadDisconnectEx() { | 866 bool ClientSocket::LoadDisconnectEx() { |
832 // Load the DisconnectEx function into memory using WSAIoctl. | 867 // Load the DisconnectEx function into memory using WSAIoctl. |
833 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; | 868 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; |
834 DWORD bytes; | 869 DWORD bytes; |
835 int status = | 870 int status = |
836 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER, | 871 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER, |
837 &guid_disconnect_ex, sizeof(guid_disconnect_ex), &DisconnectEx_, | 872 &guid_disconnect_ex, sizeof(guid_disconnect_ex), &DisconnectEx_, |
838 sizeof(DisconnectEx_), &bytes, NULL, NULL); | 873 sizeof(DisconnectEx_), &bytes, NULL, NULL); |
839 return (status != SOCKET_ERROR); | 874 return (status != SOCKET_ERROR); |
840 } | 875 } |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
907 | 942 |
908 void ClientSocket::IssueDisconnect() { | 943 void ClientSocket::IssueDisconnect() { |
909 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer(); | 944 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer(); |
910 BOOL ok = | 945 BOOL ok = |
911 DisconnectEx_(socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0); | 946 DisconnectEx_(socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0); |
912 // DisconnectEx works like other OverlappedIO APIs, where we can get either an | 947 // DisconnectEx works like other OverlappedIO APIs, where we can get either an |
913 // immediate success or delayed operation by WSA_IO_PENDING being set. | 948 // immediate success or delayed operation by WSA_IO_PENDING being set. |
914 if (ok || (WSAGetLastError() != WSA_IO_PENDING)) { | 949 if (ok || (WSAGetLastError() != WSA_IO_PENDING)) { |
915 DisconnectComplete(buffer); | 950 DisconnectComplete(buffer); |
916 } | 951 } |
952 // When the Dart side receives this event, it may decide to close its Dart | |
953 // ports. When all ports are closed, the VM will shut down. The EventHandler | |
954 // will then shut down. If the EventHandler shuts down before this | |
955 // asynchronous disconnect finishes, this ClientSocket will be leaked. | |
956 // TODO(dart:io): Retain a list of client sockets that are in the process of | |
957 // disconnecting. Disconnect them forcefully, and clean up their resources | |
958 // when the EventHandler shuts down. | |
917 NotifyAllDartPorts(1 << kDestroyedEvent); | 959 NotifyAllDartPorts(1 << kDestroyedEvent); |
918 RemoveAllPorts(); | 960 RemoveAllPorts(); |
961 #if defined(DEBUG) | |
962 disconnecting_++; | |
963 #endif | |
919 } | 964 } |
920 | 965 |
921 | 966 |
922 void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) { | 967 void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) { |
923 OverlappedBuffer::DisposeBuffer(buffer); | 968 OverlappedBuffer::DisposeBuffer(buffer); |
924 closesocket(socket()); | 969 closesocket(socket()); |
925 if (data_ready_ != NULL) { | 970 if (data_ready_ != NULL) { |
926 OverlappedBuffer::DisposeBuffer(data_ready_); | 971 OverlappedBuffer::DisposeBuffer(data_ready_); |
927 } | 972 } |
928 mark_closed(); | 973 mark_closed(); |
974 #if defined(DEBUG) | |
975 disconnecting_--; | |
976 #endif | |
929 } | 977 } |
930 | 978 |
931 | 979 |
932 void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) { | 980 void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) { |
933 OverlappedBuffer::DisposeBuffer(buffer); | 981 OverlappedBuffer::DisposeBuffer(buffer); |
934 // Update socket to support full socket API, after ConnectEx completed. | 982 // Update socket to support full socket API, after ConnectEx completed. |
935 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); | 983 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); |
936 // If the port is set, we already listen for this socket in Dart. | 984 // If the port is set, we already listen for this socket in Dart. |
937 // Handle the cases here. | 985 // Handle the cases here. |
938 if (!IsClosedRead() && ((Mask() & (1 << kInEvent)) != 0)) { | 986 if (!IsClosedRead() && ((Mask() & (1 << kInEvent)) != 0)) { |
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1030 | 1078 |
1031 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { | 1079 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
1032 ASSERT(this != NULL); | 1080 ASSERT(this != NULL); |
1033 if (msg->id == kTimerId) { | 1081 if (msg->id == kTimerId) { |
1034 // Change of timeout request. Just set the new timeout and port as the | 1082 // Change of timeout request. Just set the new timeout and port as the |
1035 // completion thread will use the new timeout value for its next wait. | 1083 // completion thread will use the new timeout value for its next wait. |
1036 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); | 1084 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); |
1037 } else if (msg->id == kShutdownId) { | 1085 } else if (msg->id == kShutdownId) { |
1038 shutdown_ = true; | 1086 shutdown_ = true; |
1039 } else { | 1087 } else { |
1040 Handle* handle = reinterpret_cast<Handle*>(msg->id); | 1088 Socket* socket = reinterpret_cast<Socket*>(msg->id); |
1089 RefCntReleaseScope<Socket> rs(socket); | |
1090 if (socket->fd() == -1) { | |
1091 return; | |
1092 } | |
1093 Handle* handle = reinterpret_cast<Handle*>(socket->fd()); | |
1041 ASSERT(handle != NULL); | 1094 ASSERT(handle != NULL); |
1042 | 1095 |
1043 if (handle->is_listen_socket()) { | 1096 if (handle->is_listen_socket()) { |
1044 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(handle); | 1097 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(handle); |
1045 listen_socket->EnsureInitialized(this); | 1098 listen_socket->EnsureInitialized(this); |
1046 | 1099 |
1047 MonitorLocker ml(listen_socket->monitor_); | 1100 MonitorLocker ml(listen_socket->monitor_); |
1048 | 1101 |
1049 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { | 1102 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
1050 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); | 1103 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
1051 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { | 1104 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { |
1052 // `events` can only have kInEvent/kOutEvent flags set. | 1105 // `events` can only have kInEvent/kOutEvent flags set. |
1053 intptr_t events = msg->data & EVENT_MASK; | 1106 intptr_t events = msg->data & EVENT_MASK; |
1054 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); | 1107 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
1055 listen_socket->SetPortAndMask(msg->dart_port, events); | 1108 listen_socket->SetPortAndMask(msg->dart_port, events); |
1056 TryDispatchingPendingAccepts(listen_socket); | 1109 TryDispatchingPendingAccepts(listen_socket); |
1057 } else if (IS_COMMAND(msg->data, kCloseCommand)) { | 1110 } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
1058 listen_socket->RemovePort(msg->dart_port); | 1111 listen_socket->RemovePort(msg->dart_port); |
1059 | 1112 |
1060 // We only close the socket file descriptor from the operating | 1113 // We only close the socket file descriptor from the operating |
1061 // system if there are no other dart socket objects which | 1114 // system if there are no other dart socket objects which |
1062 // are listening on the same (address, port) combination. | 1115 // are listening on the same (address, port) combination. |
1063 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance(); | 1116 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance(); |
1064 MutexLocker locker(registry->mutex()); | 1117 MutexLocker locker(registry->mutex()); |
1065 if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) { | 1118 if (registry->CloseSafe(socket)) { |
1066 ASSERT(listen_socket->Mask() == 0); | 1119 ASSERT(listen_socket->Mask() == 0); |
1067 listen_socket->Close(); | 1120 listen_socket->Close(); |
1121 socket->SetClosedFd(); | |
1068 } | 1122 } |
1069 | 1123 |
1070 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); | 1124 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); |
1071 } else { | 1125 } else { |
1072 UNREACHABLE(); | 1126 UNREACHABLE(); |
1073 } | 1127 } |
1074 } else { | 1128 } else { |
1075 handle->EnsureInitialized(this); | 1129 handle->EnsureInitialized(this); |
1076 MonitorLocker ml(handle->monitor_); | 1130 MonitorLocker ml(handle->monitor_); |
1077 | 1131 |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1125 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); | 1179 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); |
1126 client_socket->Shutdown(SD_RECEIVE); | 1180 client_socket->Shutdown(SD_RECEIVE); |
1127 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) { | 1181 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) { |
1128 ASSERT(handle->is_client_socket()); | 1182 ASSERT(handle->is_client_socket()); |
1129 | 1183 |
1130 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); | 1184 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); |
1131 client_socket->Shutdown(SD_SEND); | 1185 client_socket->Shutdown(SD_SEND); |
1132 } else if (IS_COMMAND(msg->data, kCloseCommand)) { | 1186 } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
1133 handle->SetPortAndMask(msg->dart_port, 0); | 1187 handle->SetPortAndMask(msg->dart_port, 0); |
1134 handle->Close(); | 1188 handle->Close(); |
1189 socket->SetClosedFd(); | |
1135 } else { | 1190 } else { |
1136 UNREACHABLE(); | 1191 UNREACHABLE(); |
1137 } | 1192 } |
1138 } | 1193 } |
1139 | 1194 |
1140 DeleteIfClosed(handle); | 1195 DeleteIfClosed(handle); |
1141 } | 1196 } |
1142 } | 1197 } |
1143 | 1198 |
1144 | 1199 |
(...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1306 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key); | 1361 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key); |
1307 HandleConnect(client_socket, bytes, buffer); | 1362 HandleConnect(client_socket, bytes, buffer); |
1308 break; | 1363 break; |
1309 } | 1364 } |
1310 default: | 1365 default: |
1311 UNREACHABLE(); | 1366 UNREACHABLE(); |
1312 } | 1367 } |
1313 } | 1368 } |
1314 | 1369 |
1315 | 1370 |
1371 void EventHandlerImplementation::HandleCompletionOrInterrupt( | |
1372 BOOL ok, | |
1373 DWORD bytes, | |
1374 ULONG_PTR key, | |
1375 OVERLAPPED* overlapped) { | |
1376 if (!ok) { | |
1377 // Treat ERROR_CONNECTION_ABORTED as connection closed. | |
1378 // The error ERROR_OPERATION_ABORTED is set for pending | |
1379 // accept requests for a listen socket which is closed. | |
1380 // ERROR_NETNAME_DELETED occurs when the client closes | |
1381 // the socket it is reading from. | |
1382 DWORD last_error = GetLastError(); | |
1383 if ((last_error == ERROR_CONNECTION_ABORTED) || | |
1384 (last_error == ERROR_OPERATION_ABORTED) || | |
1385 (last_error == ERROR_NETNAME_DELETED) || | |
1386 (last_error == ERROR_BROKEN_PIPE)) { | |
1387 ASSERT(bytes == 0); | |
1388 HandleIOCompletion(bytes, key, overlapped); | |
1389 } else if (last_error == ERROR_MORE_DATA) { | |
1390 // Don't ASSERT no bytes in this case. This can happen if the receive | |
1391 // buffer for datagram sockets is too small to contain a full datagram, | |
1392 // and in this case bytes hold the bytes that was read. | |
1393 HandleIOCompletion(-1, key, overlapped); | |
1394 } else { | |
1395 ASSERT(bytes == 0); | |
1396 HandleIOCompletion(-1, key, overlapped); | |
1397 } | |
1398 } else if (key == NULL) { | |
1399 // A key of NULL signals an interrupt message. | |
1400 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped); | |
1401 HandleInterrupt(msg); | |
1402 delete msg; | |
1403 } else { | |
1404 HandleIOCompletion(bytes, key, overlapped); | |
1405 } | |
1406 } | |
1407 | |
1408 | |
1316 EventHandlerImplementation::EventHandlerImplementation() { | 1409 EventHandlerImplementation::EventHandlerImplementation() { |
1317 startup_monitor_ = new Monitor(); | 1410 startup_monitor_ = new Monitor(); |
1318 handler_thread_id_ = Thread::kInvalidThreadId; | 1411 handler_thread_id_ = Thread::kInvalidThreadId; |
1319 handler_thread_handle_ = NULL; | 1412 handler_thread_handle_ = NULL; |
1320 completion_port_ = | 1413 completion_port_ = |
1321 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); | 1414 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); |
1322 if (completion_port_ == NULL) { | 1415 if (completion_port_ == NULL) { |
1323 FATAL("Completion port creation failed"); | 1416 FATAL("Completion port creation failed"); |
1324 } | 1417 } |
1325 shutdown_ = false; | 1418 shutdown_ = false; |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1367 ASSERT(handler_impl != NULL); | 1460 ASSERT(handler_impl != NULL); |
1368 | 1461 |
1369 { | 1462 { |
1370 MonitorLocker ml(handler_impl->startup_monitor_); | 1463 MonitorLocker ml(handler_impl->startup_monitor_); |
1371 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId(); | 1464 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId(); |
1372 handler_impl->handler_thread_handle_ = | 1465 handler_impl->handler_thread_handle_ = |
1373 OpenThread(SYNCHRONIZE, false, handler_impl->handler_thread_id_); | 1466 OpenThread(SYNCHRONIZE, false, handler_impl->handler_thread_id_); |
1374 ml.Notify(); | 1467 ml.Notify(); |
1375 } | 1468 } |
1376 | 1469 |
1470 DWORD bytes; | |
1471 ULONG_PTR key; | |
1472 OVERLAPPED* overlapped; | |
1473 BOOL ok; | |
1377 while (!handler_impl->shutdown_) { | 1474 while (!handler_impl->shutdown_) { |
1378 DWORD bytes; | |
1379 ULONG_PTR key; | |
1380 OVERLAPPED* overlapped; | |
1381 int64_t millis = handler_impl->GetTimeout(); | 1475 int64_t millis = handler_impl->GetTimeout(); |
1382 ASSERT(millis == kInfinityTimeout || millis >= 0); | 1476 ASSERT(millis == kInfinityTimeout || millis >= 0); |
1383 if (millis > kMaxInt32) { | 1477 if (millis > kMaxInt32) { |
1384 millis = kMaxInt32; | 1478 millis = kMaxInt32; |
1385 } | 1479 } |
1386 ASSERT(sizeof(int32_t) == sizeof(DWORD)); | 1480 ASSERT(sizeof(int32_t) == sizeof(DWORD)); |
1387 BOOL ok = | 1481 DWORD timeout = static_cast<DWORD>(millis); |
1388 GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, &key, | 1482 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, |
1389 &overlapped, static_cast<DWORD>(millis)); | 1483 &key, &overlapped, timeout); |
1390 | 1484 |
1391 if (!ok && (overlapped == NULL)) { | 1485 if (!ok && (overlapped == NULL)) { |
1392 if (GetLastError() == ERROR_ABANDONED_WAIT_0) { | 1486 if (GetLastError() == ERROR_ABANDONED_WAIT_0) { |
1393 // The completion port should never be closed. | 1487 // The completion port should never be closed. |
1394 Log::Print("Completion port closed\n"); | 1488 Log::Print("Completion port closed\n"); |
1395 UNREACHABLE(); | 1489 UNREACHABLE(); |
1396 } else { | 1490 } else { |
1397 // Timeout is signalled by false result and NULL in overlapped. | 1491 // Timeout is signalled by false result and NULL in overlapped. |
1398 handler_impl->HandleTimeout(); | 1492 handler_impl->HandleTimeout(); |
1399 } | 1493 } |
1400 } else if (!ok) { | |
1401 // Treat ERROR_CONNECTION_ABORTED as connection closed. | |
1402 // The error ERROR_OPERATION_ABORTED is set for pending | |
1403 // accept requests for a listen socket which is closed. | |
1404 // ERROR_NETNAME_DELETED occurs when the client closes | |
1405 // the socket it is reading from. | |
1406 DWORD last_error = GetLastError(); | |
1407 if ((last_error == ERROR_CONNECTION_ABORTED) || | |
1408 (last_error == ERROR_OPERATION_ABORTED) || | |
1409 (last_error == ERROR_NETNAME_DELETED) || | |
1410 (last_error == ERROR_BROKEN_PIPE)) { | |
1411 ASSERT(bytes == 0); | |
1412 handler_impl->HandleIOCompletion(bytes, key, overlapped); | |
1413 } else if (last_error == ERROR_MORE_DATA) { | |
1414 // Don't ASSERT no bytes in this case. This can happen if the receive | |
1415 // buffer for datagram sockets is to small to contain a full datagram, | |
1416 // and in this case bytes hold the bytes that was read. | |
1417 handler_impl->HandleIOCompletion(-1, key, overlapped); | |
1418 } else { | |
1419 ASSERT(bytes == 0); | |
1420 handler_impl->HandleIOCompletion(-1, key, overlapped); | |
1421 } | |
1422 } else if (key == NULL) { | |
1423 // A key of NULL signals an interrupt message. | |
1424 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped); | |
1425 handler_impl->HandleInterrupt(msg); | |
1426 delete msg; | |
1427 } else { | 1494 } else { |
1428 handler_impl->HandleIOCompletion(bytes, key, overlapped); | 1495 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped); |
1429 } | 1496 } |
1430 } | 1497 } |
1498 | |
1499 // In a Debug build, drain the IO completion port to make sure we aren't | |
1500 // leaking any (non-disconnecting) Handles. In a Release build, we don't care | |
1501 // because the VM is going down, and the asserts below are Debug-only. | |
1502 #if defined(DEBUG) | |
1503 while (true) { | |
1504 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, | |
1505 &key, &overlapped, 0); | |
1506 if (!ok && (overlapped == NULL)) { | |
1507 // There was an error or nothing is ready. Assume the port is drained. | |
1508 break; | |
1509 } | |
1510 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped); | |
1511 } | |
1512 #endif | |
1513 | |
1514 // The eventhandler thread is going down so there should be no more live | |
1515 // Handles or Sockets. | |
1516 // TODO(dart:io): It would be nice to be able to assert here that: | |
1517 // ReferenceCounted<Handle>::instances() == 0; | |
1518 // However, we cannot at the moment. See the TODO on: | |
1519 // ClientSocket::IssueDisconnect() | |
1520 DEBUG_ASSERT(ReferenceCounted<Handle>::instances() == | |
1521 ClientSocket::disconnecting()); | |
1522 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0); | |
1431 handler->NotifyShutdownDone(); | 1523 handler->NotifyShutdownDone(); |
1432 } | 1524 } |
1433 | 1525 |
1434 | 1526 |
1435 void EventHandlerImplementation::Start(EventHandler* handler) { | 1527 void EventHandlerImplementation::Start(EventHandler* handler) { |
1436 int result = | 1528 int result = |
1437 Thread::Start(EventHandlerEntry, reinterpret_cast<uword>(handler)); | 1529 Thread::Start(EventHandlerEntry, reinterpret_cast<uword>(handler)); |
1438 if (result != 0) { | 1530 if (result != 0) { |
1439 FATAL1("Failed to start event handler thread %d", result); | 1531 FATAL1("Failed to start event handler thread %d", result); |
1440 } | 1532 } |
(...skipping 15 matching lines...) Expand all Loading... | |
1456 void EventHandlerImplementation::Shutdown() { | 1548 void EventHandlerImplementation::Shutdown() { |
1457 SendData(kShutdownId, 0, 0); | 1549 SendData(kShutdownId, 0, 0); |
1458 } | 1550 } |
1459 | 1551 |
1460 } // namespace bin | 1552 } // namespace bin |
1461 } // namespace dart | 1553 } // namespace dart |
1462 | 1554 |
1463 #endif // defined(HOST_OS_WINDOWS) | 1555 #endif // defined(HOST_OS_WINDOWS) |
1464 | 1556 |
1465 #endif // !defined(DART_IO_DISABLED) | 1557 #endif // !defined(DART_IO_DISABLED) |
OLD | NEW |