| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 #if !defined(DART_IO_DISABLED) | 5 #if !defined(DART_IO_DISABLED) |
| 6 | 6 |
| 7 #include "platform/globals.h" | 7 #include "platform/globals.h" |
| 8 #if defined(HOST_OS_WINDOWS) | 8 #if defined(HOST_OS_WINDOWS) |
| 9 | 9 |
| 10 #include "bin/eventhandler.h" | 10 #include "bin/eventhandler.h" |
| (...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 108 data_length_ = num_bytes; | 108 data_length_ = num_bytes; |
| 109 return num_bytes; | 109 return num_bytes; |
| 110 } | 110 } |
| 111 | 111 |
| 112 | 112 |
| 113 int OverlappedBuffer::GetRemainingLength() { | 113 int OverlappedBuffer::GetRemainingLength() { |
| 114 ASSERT(operation_ == kRead || operation_ == kRecvFrom); | 114 ASSERT(operation_ == kRead || operation_ == kRecvFrom); |
| 115 return data_length_ - index_; | 115 return data_length_ - index_; |
| 116 } | 116 } |
| 117 | 117 |
| 118 |
| 118 Handle::Handle(intptr_t handle) | 119 Handle::Handle(intptr_t handle) |
| 119 : DescriptorInfoBase(handle), | 120 : ReferenceCounted(), |
| 121 DescriptorInfoBase(handle), |
| 120 handle_(reinterpret_cast<HANDLE>(handle)), | 122 handle_(reinterpret_cast<HANDLE>(handle)), |
| 121 completion_port_(INVALID_HANDLE_VALUE), | 123 completion_port_(INVALID_HANDLE_VALUE), |
| 122 event_handler_(NULL), | 124 event_handler_(NULL), |
| 123 data_ready_(NULL), | 125 data_ready_(NULL), |
| 124 pending_read_(NULL), | 126 pending_read_(NULL), |
| 125 pending_write_(NULL), | 127 pending_write_(NULL), |
| 126 last_error_(NOERROR), | 128 last_error_(NOERROR), |
| 127 flags_(0), | 129 flags_(0), |
| 128 read_thread_id_(Thread::kInvalidThreadId), | 130 read_thread_id_(Thread::kInvalidThreadId), |
| 129 read_thread_handle_(NULL), | 131 read_thread_handle_(NULL), |
| 130 read_thread_starting_(false), | 132 read_thread_starting_(false), |
| 131 read_thread_finished_(false), | 133 read_thread_finished_(false), |
| 132 monitor_(new Monitor()) {} | 134 monitor_(new Monitor()) {} |
| 133 | 135 |
| 134 | 136 |
| 135 Handle::~Handle() { | 137 Handle::~Handle() { |
| 136 delete monitor_; | 138 delete monitor_; |
| 137 } | 139 } |
| 138 | 140 |
| 139 | 141 |
| 140 bool Handle::CreateCompletionPort(HANDLE completion_port) { | 142 bool Handle::CreateCompletionPort(HANDLE completion_port) { |
| 143 ASSERT(completion_port_ == INVALID_HANDLE_VALUE); |
| 144 // A reference to the Handle is Retained by the IO completion port. |
| 145 // It is Released by DeleteIfClosed. |
| 146 Retain(); |
| 141 completion_port_ = CreateIoCompletionPort( | 147 completion_port_ = CreateIoCompletionPort( |
| 142 handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0); | 148 handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0); |
| 143 return (completion_port_ != NULL); | 149 return (completion_port_ != NULL); |
| 144 } | 150 } |
| 145 | 151 |
| 146 | 152 |
| 147 void Handle::Close() { | 153 void Handle::Close() { |
| 148 MonitorLocker ml(monitor_); | 154 MonitorLocker ml(monitor_); |
| 149 if (!SupportsOverlappedIO()) { | 155 if (!SupportsOverlappedIO()) { |
| 150 // If the handle uses synchronous I/O (e.g. stdin), cancel any pending | 156 // If the handle uses synchronous I/O (e.g. stdin), cancel any pending |
| (...skipping 234 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 385 } else { | 391 } else { |
| 386 HandleError(this); | 392 HandleError(this); |
| 387 } | 393 } |
| 388 SetLastError(error); | 394 SetLastError(error); |
| 389 } | 395 } |
| 390 | 396 |
| 391 | 397 |
| 392 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { | 398 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { |
| 393 MonitorLocker ml(monitor_); | 399 MonitorLocker ml(monitor_); |
| 394 event_handler_ = event_handler; | 400 event_handler_ = event_handler; |
| 395 if (SupportsOverlappedIO() && (completion_port_ == INVALID_HANDLE_VALUE)) { | 401 if (completion_port_ == INVALID_HANDLE_VALUE) { |
| 396 CreateCompletionPort(event_handler_->completion_port()); | 402 if (SupportsOverlappedIO()) { |
| 403 CreateCompletionPort(event_handler_->completion_port()); |
| 404 } else { |
| 405 // We need to retain the Handle even if overlapped IO is not supported. |
| 406 // It is Released by DeleteIfClosed after ReadSyncCompleteAsync |
| 407 // manually puts an event on the IO completion port. |
| 408 Retain(); |
| 409 completion_port_ = event_handler_->completion_port(); |
| 410 } |
| 397 } | 411 } |
| 398 } | 412 } |
| 399 | 413 |
| 400 | 414 |
| 401 bool FileHandle::IsClosed() { | 415 bool FileHandle::IsClosed() { |
| 402 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); | 416 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); |
| 403 } | 417 } |
| 404 | 418 |
| 405 | 419 |
| 406 void DirectoryWatchHandle::EnsureInitialized( | 420 void DirectoryWatchHandle::EnsureInitialized( |
| (...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 539 closesocket(buffer->client()); | 553 closesocket(buffer->client()); |
| 540 } | 554 } |
| 541 | 555 |
| 542 pending_accept_count_--; | 556 pending_accept_count_--; |
| 543 OverlappedBuffer::DisposeBuffer(buffer); | 557 OverlappedBuffer::DisposeBuffer(buffer); |
| 544 } | 558 } |
| 545 | 559 |
| 546 | 560 |
| 547 static void DeleteIfClosed(Handle* handle) { | 561 static void DeleteIfClosed(Handle* handle) { |
| 548 if (handle->IsClosed()) { | 562 if (handle->IsClosed()) { |
| 563 handle->set_completion_port(INVALID_HANDLE_VALUE); |
| 564 handle->set_event_handler(NULL); |
| 549 handle->NotifyAllDartPorts(1 << kDestroyedEvent); | 565 handle->NotifyAllDartPorts(1 << kDestroyedEvent); |
| 550 handle->RemoveAllPorts(); | 566 handle->RemoveAllPorts(); |
| 551 delete handle; | 567 // Once the Handle is closed, no further events on the IO completion port |
| 568 // will mention it. Thus, we can drop the reference here. |
| 569 handle->Release(); |
| 552 } | 570 } |
| 553 } | 571 } |
| 554 | 572 |
| 555 | 573 |
| 556 void ListenSocket::DoClose() { | 574 void ListenSocket::DoClose() { |
| 557 closesocket(socket()); | 575 closesocket(socket()); |
| 558 handle_ = INVALID_HANDLE_VALUE; | 576 handle_ = INVALID_HANDLE_VALUE; |
| 559 while (CanAccept()) { | 577 while (CanAccept()) { |
| 560 // Get rid of connections already accepted. | 578 // Get rid of connections already accepted. |
| 561 ClientSocket* client = Accept(); | 579 ClientSocket* client = Accept(); |
| 562 if (client != NULL) { | 580 if (client != NULL) { |
| 563 client->Close(); | 581 client->Close(); |
| 582 // Release the reference from the list. |
| 583 // When an accept completes, we make a new ClientSocket (1 reference), |
| 584 // and add it to the IO completion port (1 more reference). If an |
| 585 // accepted connection is never requested by the Dart code, then |
| 586 // this list owns a reference (first Release), and the IO completion |
| 587 // port owns a reference, (second Release in DeleteIfClosed). |
| 588 client->Release(); |
| 564 DeleteIfClosed(client); | 589 DeleteIfClosed(client); |
| 565 } else { | 590 } else { |
| 566 break; | 591 break; |
| 567 } | 592 } |
| 568 } | 593 } |
| 594 // To finish resetting the state of the ListenSocket back to what it was |
| 595 // before EnsureInitialized was called, we have to reset the AcceptEx_ |
| 596 // function pointer. |
| 597 AcceptEx_ = NULL; |
| 569 } | 598 } |
| 570 | 599 |
| 571 | 600 |
| 572 bool ListenSocket::CanAccept() { | 601 bool ListenSocket::CanAccept() { |
| 573 MonitorLocker ml(monitor_); | 602 MonitorLocker ml(monitor_); |
| 574 return accepted_head_ != NULL; | 603 return accepted_head_ != NULL; |
| 575 } | 604 } |
| 576 | 605 |
| 577 | 606 |
| 578 ClientSocket* ListenSocket::Accept() { | 607 ClientSocket* ListenSocket::Accept() { |
| (...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 785 // Note that we return '0', unless a thread have already completed a write. | 814 // Note that we return '0', unless a thread have already completed a write. |
| 786 if (thread_wrote_ > 0) { | 815 if (thread_wrote_ > 0) { |
| 787 if (num_bytes > thread_wrote_) { | 816 if (num_bytes > thread_wrote_) { |
| 788 num_bytes = thread_wrote_; | 817 num_bytes = thread_wrote_; |
| 789 } | 818 } |
| 790 thread_wrote_ -= num_bytes; | 819 thread_wrote_ -= num_bytes; |
| 791 return num_bytes; | 820 return num_bytes; |
| 792 } | 821 } |
| 793 if (!write_thread_exists_) { | 822 if (!write_thread_exists_) { |
| 794 write_thread_exists_ = true; | 823 write_thread_exists_ = true; |
| 824 // The write thread gets a reference to the Handle, which it places in |
| 825 // the events it puts on the IO completion port. The reference is |
| 826 // Released by DeleteIfClosed. |
| 827 Retain(); |
| 795 int result = Thread::Start(WriteFileThread, reinterpret_cast<uword>(this)); | 828 int result = Thread::Start(WriteFileThread, reinterpret_cast<uword>(this)); |
| 796 if (result != 0) { | 829 if (result != 0) { |
| 797 FATAL1("Failed to start write file thread %d", result); | 830 FATAL1("Failed to start write file thread %d", result); |
| 798 } | 831 } |
| 799 while (!write_thread_running_) { | 832 while (!write_thread_running_) { |
| 800 // Wait until we the thread is running. | 833 // Wait until we the thread is running. |
| 801 ml.Wait(Monitor::kNoTimeout); | 834 ml.Wait(Monitor::kNoTimeout); |
| 802 } | 835 } |
| 803 } | 836 } |
| 804 // Only queue up to INT_MAX bytes. | 837 // Only queue up to INT_MAX bytes. |
| (...skipping 16 matching lines...) Expand all Loading... |
| 821 } | 854 } |
| 822 // Join the thread. | 855 // Join the thread. |
| 823 DWORD res = WaitForSingleObject(thread_handle_, INFINITE); | 856 DWORD res = WaitForSingleObject(thread_handle_, INFINITE); |
| 824 CloseHandle(thread_handle_); | 857 CloseHandle(thread_handle_); |
| 825 ASSERT(res == WAIT_OBJECT_0); | 858 ASSERT(res == WAIT_OBJECT_0); |
| 826 } | 859 } |
| 827 Handle::DoClose(); | 860 Handle::DoClose(); |
| 828 } | 861 } |
| 829 | 862 |
| 830 | 863 |
| 864 #if defined(DEBUG) |
| 865 intptr_t ClientSocket::disconnecting_ = 0; |
| 866 #endif |
| 867 |
| 868 |
| 831 bool ClientSocket::LoadDisconnectEx() { | 869 bool ClientSocket::LoadDisconnectEx() { |
| 832 // Load the DisconnectEx function into memory using WSAIoctl. | 870 // Load the DisconnectEx function into memory using WSAIoctl. |
| 833 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; | 871 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; |
| 834 DWORD bytes; | 872 DWORD bytes; |
| 835 int status = | 873 int status = |
| 836 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER, | 874 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER, |
| 837 &guid_disconnect_ex, sizeof(guid_disconnect_ex), &DisconnectEx_, | 875 &guid_disconnect_ex, sizeof(guid_disconnect_ex), &DisconnectEx_, |
| 838 sizeof(DisconnectEx_), &bytes, NULL, NULL); | 876 sizeof(DisconnectEx_), &bytes, NULL, NULL); |
| 839 return (status != SOCKET_ERROR); | 877 return (status != SOCKET_ERROR); |
| 840 } | 878 } |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 907 | 945 |
| 908 void ClientSocket::IssueDisconnect() { | 946 void ClientSocket::IssueDisconnect() { |
| 909 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer(); | 947 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer(); |
| 910 BOOL ok = | 948 BOOL ok = |
| 911 DisconnectEx_(socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0); | 949 DisconnectEx_(socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0); |
| 912 // DisconnectEx works like other OverlappedIO APIs, where we can get either an | 950 // DisconnectEx works like other OverlappedIO APIs, where we can get either an |
| 913 // immediate success or delayed operation by WSA_IO_PENDING being set. | 951 // immediate success or delayed operation by WSA_IO_PENDING being set. |
| 914 if (ok || (WSAGetLastError() != WSA_IO_PENDING)) { | 952 if (ok || (WSAGetLastError() != WSA_IO_PENDING)) { |
| 915 DisconnectComplete(buffer); | 953 DisconnectComplete(buffer); |
| 916 } | 954 } |
| 955 // When the Dart side receives this event, it may decide to close its Dart |
| 956 // ports. When all ports are closed, the VM will shut down. The EventHandler |
| 957 // will then shut down. If the EventHandler shuts down before this |
| 958 // asynchronous disconnect finishes, this ClientSocket will be leaked. |
| 959 // TODO(dart:io): Retain a list of client sockets that are in the process of |
| 960 // disconnecting. Disconnect them forcefully, and clean up their resources |
| 961 // when the EventHandler shuts down. |
| 917 NotifyAllDartPorts(1 << kDestroyedEvent); | 962 NotifyAllDartPorts(1 << kDestroyedEvent); |
| 918 RemoveAllPorts(); | 963 RemoveAllPorts(); |
| 964 #if defined(DEBUG) |
| 965 disconnecting_++; |
| 966 #endif |
| 919 } | 967 } |
| 920 | 968 |
| 921 | 969 |
| 922 void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) { | 970 void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) { |
| 923 OverlappedBuffer::DisposeBuffer(buffer); | 971 OverlappedBuffer::DisposeBuffer(buffer); |
| 924 closesocket(socket()); | 972 closesocket(socket()); |
| 925 if (data_ready_ != NULL) { | 973 if (data_ready_ != NULL) { |
| 926 OverlappedBuffer::DisposeBuffer(data_ready_); | 974 OverlappedBuffer::DisposeBuffer(data_ready_); |
| 927 } | 975 } |
| 928 mark_closed(); | 976 mark_closed(); |
| 977 #if defined(DEBUG) |
| 978 disconnecting_--; |
| 979 #endif |
| 929 } | 980 } |
| 930 | 981 |
| 931 | 982 |
| 932 void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) { | 983 void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) { |
| 933 OverlappedBuffer::DisposeBuffer(buffer); | 984 OverlappedBuffer::DisposeBuffer(buffer); |
| 934 // Update socket to support full socket API, after ConnectEx completed. | 985 // Update socket to support full socket API, after ConnectEx completed. |
| 935 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); | 986 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); |
| 936 // If the port is set, we already listen for this socket in Dart. | 987 // If the port is set, we already listen for this socket in Dart. |
| 937 // Handle the cases here. | 988 // Handle the cases here. |
| 938 if (!IsClosedRead() && ((Mask() & (1 << kInEvent)) != 0)) { | 989 if (!IsClosedRead() && ((Mask() & (1 << kInEvent)) != 0)) { |
| (...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1030 | 1081 |
| 1031 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { | 1082 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
| 1032 ASSERT(this != NULL); | 1083 ASSERT(this != NULL); |
| 1033 if (msg->id == kTimerId) { | 1084 if (msg->id == kTimerId) { |
| 1034 // Change of timeout request. Just set the new timeout and port as the | 1085 // Change of timeout request. Just set the new timeout and port as the |
| 1035 // completion thread will use the new timeout value for its next wait. | 1086 // completion thread will use the new timeout value for its next wait. |
| 1036 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); | 1087 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); |
| 1037 } else if (msg->id == kShutdownId) { | 1088 } else if (msg->id == kShutdownId) { |
| 1038 shutdown_ = true; | 1089 shutdown_ = true; |
| 1039 } else { | 1090 } else { |
| 1040 Handle* handle = reinterpret_cast<Handle*>(msg->id); | 1091 Socket* socket = reinterpret_cast<Socket*>(msg->id); |
| 1092 RefCntReleaseScope<Socket> rs(socket); |
| 1093 if (socket->fd() == -1) { |
| 1094 return; |
| 1095 } |
| 1096 Handle* handle = reinterpret_cast<Handle*>(socket->fd()); |
| 1041 ASSERT(handle != NULL); | 1097 ASSERT(handle != NULL); |
| 1042 | 1098 |
| 1043 if (handle->is_listen_socket()) { | 1099 if (handle->is_listen_socket()) { |
| 1044 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(handle); | 1100 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(handle); |
| 1045 listen_socket->EnsureInitialized(this); | 1101 listen_socket->EnsureInitialized(this); |
| 1046 | 1102 |
| 1047 MonitorLocker ml(listen_socket->monitor_); | 1103 MonitorLocker ml(listen_socket->monitor_); |
| 1048 | 1104 |
| 1049 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { | 1105 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
| 1050 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); | 1106 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
| 1051 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { | 1107 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { |
| 1052 // `events` can only have kInEvent/kOutEvent flags set. | 1108 // `events` can only have kInEvent/kOutEvent flags set. |
| 1053 intptr_t events = msg->data & EVENT_MASK; | 1109 intptr_t events = msg->data & EVENT_MASK; |
| 1054 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); | 1110 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
| 1055 listen_socket->SetPortAndMask(msg->dart_port, events); | 1111 listen_socket->SetPortAndMask(msg->dart_port, events); |
| 1056 TryDispatchingPendingAccepts(listen_socket); | 1112 TryDispatchingPendingAccepts(listen_socket); |
| 1057 } else if (IS_COMMAND(msg->data, kCloseCommand)) { | 1113 } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
| 1058 listen_socket->RemovePort(msg->dart_port); | 1114 listen_socket->RemovePort(msg->dart_port); |
| 1059 | 1115 |
| 1060 // We only close the socket file descriptor from the operating | 1116 // We only close the socket file descriptor from the operating |
| 1061 // system if there are no other dart socket objects which | 1117 // system if there are no other dart socket objects which |
| 1062 // are listening on the same (address, port) combination. | 1118 // are listening on the same (address, port) combination. |
| 1063 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance(); | 1119 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance(); |
| 1064 MutexLocker locker(registry->mutex()); | 1120 MutexLocker locker(registry->mutex()); |
| 1065 if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) { | 1121 if (registry->CloseSafe(socket)) { |
| 1066 ASSERT(listen_socket->Mask() == 0); | 1122 ASSERT(listen_socket->Mask() == 0); |
| 1067 listen_socket->Close(); | 1123 listen_socket->Close(); |
| 1124 socket->SetClosedFd(); |
| 1068 } | 1125 } |
| 1069 | 1126 |
| 1070 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); | 1127 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); |
| 1071 } else { | 1128 } else { |
| 1072 UNREACHABLE(); | 1129 UNREACHABLE(); |
| 1073 } | 1130 } |
| 1074 } else { | 1131 } else { |
| 1075 handle->EnsureInitialized(this); | 1132 handle->EnsureInitialized(this); |
| 1076 MonitorLocker ml(handle->monitor_); | 1133 MonitorLocker ml(handle->monitor_); |
| 1077 | 1134 |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1125 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); | 1182 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); |
| 1126 client_socket->Shutdown(SD_RECEIVE); | 1183 client_socket->Shutdown(SD_RECEIVE); |
| 1127 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) { | 1184 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) { |
| 1128 ASSERT(handle->is_client_socket()); | 1185 ASSERT(handle->is_client_socket()); |
| 1129 | 1186 |
| 1130 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); | 1187 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); |
| 1131 client_socket->Shutdown(SD_SEND); | 1188 client_socket->Shutdown(SD_SEND); |
| 1132 } else if (IS_COMMAND(msg->data, kCloseCommand)) { | 1189 } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
| 1133 handle->SetPortAndMask(msg->dart_port, 0); | 1190 handle->SetPortAndMask(msg->dart_port, 0); |
| 1134 handle->Close(); | 1191 handle->Close(); |
| 1192 socket->SetClosedFd(); |
| 1135 } else { | 1193 } else { |
| 1136 UNREACHABLE(); | 1194 UNREACHABLE(); |
| 1137 } | 1195 } |
| 1138 } | 1196 } |
| 1139 | 1197 |
| 1140 DeleteIfClosed(handle); | 1198 DeleteIfClosed(handle); |
| 1141 } | 1199 } |
| 1142 } | 1200 } |
| 1143 | 1201 |
| 1144 | 1202 |
| (...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1306 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key); | 1364 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key); |
| 1307 HandleConnect(client_socket, bytes, buffer); | 1365 HandleConnect(client_socket, bytes, buffer); |
| 1308 break; | 1366 break; |
| 1309 } | 1367 } |
| 1310 default: | 1368 default: |
| 1311 UNREACHABLE(); | 1369 UNREACHABLE(); |
| 1312 } | 1370 } |
| 1313 } | 1371 } |
| 1314 | 1372 |
| 1315 | 1373 |
| 1374 void EventHandlerImplementation::HandleCompletionOrInterrupt( |
| 1375 BOOL ok, |
| 1376 DWORD bytes, |
| 1377 ULONG_PTR key, |
| 1378 OVERLAPPED* overlapped) { |
| 1379 if (!ok) { |
| 1380 // Treat ERROR_CONNECTION_ABORTED as connection closed. |
| 1381 // The error ERROR_OPERATION_ABORTED is set for pending |
| 1382 // accept requests for a listen socket which is closed. |
| 1383 // ERROR_NETNAME_DELETED occurs when the client closes |
| 1384 // the socket it is reading from. |
| 1385 DWORD last_error = GetLastError(); |
| 1386 if ((last_error == ERROR_CONNECTION_ABORTED) || |
| 1387 (last_error == ERROR_OPERATION_ABORTED) || |
| 1388 (last_error == ERROR_NETNAME_DELETED) || |
| 1389 (last_error == ERROR_BROKEN_PIPE)) { |
| 1390 ASSERT(bytes == 0); |
| 1391 HandleIOCompletion(bytes, key, overlapped); |
| 1392 } else if (last_error == ERROR_MORE_DATA) { |
| 1393 // Don't ASSERT no bytes in this case. This can happen if the receive |
| 1394 // buffer for datagram sockets is too small to contain a full datagram, |
| 1395 // and in this case bytes hold the bytes that was read. |
| 1396 HandleIOCompletion(-1, key, overlapped); |
| 1397 } else { |
| 1398 ASSERT(bytes == 0); |
| 1399 HandleIOCompletion(-1, key, overlapped); |
| 1400 } |
| 1401 } else if (key == NULL) { |
| 1402 // A key of NULL signals an interrupt message. |
| 1403 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped); |
| 1404 HandleInterrupt(msg); |
| 1405 delete msg; |
| 1406 } else { |
| 1407 HandleIOCompletion(bytes, key, overlapped); |
| 1408 } |
| 1409 } |
| 1410 |
| 1411 |
| 1316 EventHandlerImplementation::EventHandlerImplementation() { | 1412 EventHandlerImplementation::EventHandlerImplementation() { |
| 1317 startup_monitor_ = new Monitor(); | 1413 startup_monitor_ = new Monitor(); |
| 1318 handler_thread_id_ = Thread::kInvalidThreadId; | 1414 handler_thread_id_ = Thread::kInvalidThreadId; |
| 1319 handler_thread_handle_ = NULL; | 1415 handler_thread_handle_ = NULL; |
| 1320 completion_port_ = | 1416 completion_port_ = |
| 1321 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); | 1417 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); |
| 1322 if (completion_port_ == NULL) { | 1418 if (completion_port_ == NULL) { |
| 1323 FATAL("Completion port creation failed"); | 1419 FATAL("Completion port creation failed"); |
| 1324 } | 1420 } |
| 1325 shutdown_ = false; | 1421 shutdown_ = false; |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1367 ASSERT(handler_impl != NULL); | 1463 ASSERT(handler_impl != NULL); |
| 1368 | 1464 |
| 1369 { | 1465 { |
| 1370 MonitorLocker ml(handler_impl->startup_monitor_); | 1466 MonitorLocker ml(handler_impl->startup_monitor_); |
| 1371 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId(); | 1467 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId(); |
| 1372 handler_impl->handler_thread_handle_ = | 1468 handler_impl->handler_thread_handle_ = |
| 1373 OpenThread(SYNCHRONIZE, false, handler_impl->handler_thread_id_); | 1469 OpenThread(SYNCHRONIZE, false, handler_impl->handler_thread_id_); |
| 1374 ml.Notify(); | 1470 ml.Notify(); |
| 1375 } | 1471 } |
| 1376 | 1472 |
| 1473 DWORD bytes; |
| 1474 ULONG_PTR key; |
| 1475 OVERLAPPED* overlapped; |
| 1476 BOOL ok; |
| 1377 while (!handler_impl->shutdown_) { | 1477 while (!handler_impl->shutdown_) { |
| 1378 DWORD bytes; | |
| 1379 ULONG_PTR key; | |
| 1380 OVERLAPPED* overlapped; | |
| 1381 int64_t millis = handler_impl->GetTimeout(); | 1478 int64_t millis = handler_impl->GetTimeout(); |
| 1382 ASSERT(millis == kInfinityTimeout || millis >= 0); | 1479 ASSERT(millis == kInfinityTimeout || millis >= 0); |
| 1383 if (millis > kMaxInt32) { | 1480 if (millis > kMaxInt32) { |
| 1384 millis = kMaxInt32; | 1481 millis = kMaxInt32; |
| 1385 } | 1482 } |
| 1386 ASSERT(sizeof(int32_t) == sizeof(DWORD)); | 1483 ASSERT(sizeof(int32_t) == sizeof(DWORD)); |
| 1387 BOOL ok = | 1484 DWORD timeout = static_cast<DWORD>(millis); |
| 1388 GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, &key, | 1485 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, |
| 1389 &overlapped, static_cast<DWORD>(millis)); | 1486 &key, &overlapped, timeout); |
| 1390 | 1487 |
| 1391 if (!ok && (overlapped == NULL)) { | 1488 if (!ok && (overlapped == NULL)) { |
| 1392 if (GetLastError() == ERROR_ABANDONED_WAIT_0) { | 1489 if (GetLastError() == ERROR_ABANDONED_WAIT_0) { |
| 1393 // The completion port should never be closed. | 1490 // The completion port should never be closed. |
| 1394 Log::Print("Completion port closed\n"); | 1491 Log::Print("Completion port closed\n"); |
| 1395 UNREACHABLE(); | 1492 UNREACHABLE(); |
| 1396 } else { | 1493 } else { |
| 1397 // Timeout is signalled by false result and NULL in overlapped. | 1494 // Timeout is signalled by false result and NULL in overlapped. |
| 1398 handler_impl->HandleTimeout(); | 1495 handler_impl->HandleTimeout(); |
| 1399 } | 1496 } |
| 1400 } else if (!ok) { | |
| 1401 // Treat ERROR_CONNECTION_ABORTED as connection closed. | |
| 1402 // The error ERROR_OPERATION_ABORTED is set for pending | |
| 1403 // accept requests for a listen socket which is closed. | |
| 1404 // ERROR_NETNAME_DELETED occurs when the client closes | |
| 1405 // the socket it is reading from. | |
| 1406 DWORD last_error = GetLastError(); | |
| 1407 if ((last_error == ERROR_CONNECTION_ABORTED) || | |
| 1408 (last_error == ERROR_OPERATION_ABORTED) || | |
| 1409 (last_error == ERROR_NETNAME_DELETED) || | |
| 1410 (last_error == ERROR_BROKEN_PIPE)) { | |
| 1411 ASSERT(bytes == 0); | |
| 1412 handler_impl->HandleIOCompletion(bytes, key, overlapped); | |
| 1413 } else if (last_error == ERROR_MORE_DATA) { | |
| 1414 // Don't ASSERT no bytes in this case. This can happen if the receive | |
| 1415 // buffer for datagram sockets is to small to contain a full datagram, | |
| 1416 // and in this case bytes hold the bytes that was read. | |
| 1417 handler_impl->HandleIOCompletion(-1, key, overlapped); | |
| 1418 } else { | |
| 1419 ASSERT(bytes == 0); | |
| 1420 handler_impl->HandleIOCompletion(-1, key, overlapped); | |
| 1421 } | |
| 1422 } else if (key == NULL) { | |
| 1423 // A key of NULL signals an interrupt message. | |
| 1424 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped); | |
| 1425 handler_impl->HandleInterrupt(msg); | |
| 1426 delete msg; | |
| 1427 } else { | 1497 } else { |
| 1428 handler_impl->HandleIOCompletion(bytes, key, overlapped); | 1498 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped); |
| 1429 } | 1499 } |
| 1430 } | 1500 } |
| 1501 |
| 1502 // In a Debug build, drain the IO completion port to make sure we aren't |
| 1503 // leaking any (non-disconnecting) Handles. In a Release build, we don't care |
| 1504 // because the VM is going down, and the asserts below are Debug-only. |
| 1505 #if defined(DEBUG) |
| 1506 while (true) { |
| 1507 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, |
| 1508 &key, &overlapped, 0); |
| 1509 if (!ok && (overlapped == NULL)) { |
| 1510 // There was an error or nothing is ready. Assume the port is drained. |
| 1511 break; |
| 1512 } |
| 1513 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped); |
| 1514 } |
| 1515 #endif |
| 1516 |
| 1517 // The eventhandler thread is going down so there should be no more live |
| 1518 // Handles or Sockets. |
| 1519 // TODO(dart:io): It would be nice to be able to assert here that: |
| 1520 // ReferenceCounted<Handle>::instances() == 0; |
| 1521 // However, we cannot at the moment. See the TODO on: |
| 1522 // ClientSocket::IssueDisconnect() |
| 1523 DEBUG_ASSERT(ReferenceCounted<Handle>::instances() == |
| 1524 ClientSocket::disconnecting()); |
| 1525 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0); |
| 1431 handler->NotifyShutdownDone(); | 1526 handler->NotifyShutdownDone(); |
| 1432 } | 1527 } |
| 1433 | 1528 |
| 1434 | 1529 |
| 1435 void EventHandlerImplementation::Start(EventHandler* handler) { | 1530 void EventHandlerImplementation::Start(EventHandler* handler) { |
| 1436 int result = | 1531 int result = |
| 1437 Thread::Start(EventHandlerEntry, reinterpret_cast<uword>(handler)); | 1532 Thread::Start(EventHandlerEntry, reinterpret_cast<uword>(handler)); |
| 1438 if (result != 0) { | 1533 if (result != 0) { |
| 1439 FATAL1("Failed to start event handler thread %d", result); | 1534 FATAL1("Failed to start event handler thread %d", result); |
| 1440 } | 1535 } |
| (...skipping 15 matching lines...) Expand all Loading... |
| 1456 void EventHandlerImplementation::Shutdown() { | 1551 void EventHandlerImplementation::Shutdown() { |
| 1457 SendData(kShutdownId, 0, 0); | 1552 SendData(kShutdownId, 0, 0); |
| 1458 } | 1553 } |
| 1459 | 1554 |
| 1460 } // namespace bin | 1555 } // namespace bin |
| 1461 } // namespace dart | 1556 } // namespace dart |
| 1462 | 1557 |
| 1463 #endif // defined(HOST_OS_WINDOWS) | 1558 #endif // defined(HOST_OS_WINDOWS) |
| 1464 | 1559 |
| 1465 #endif // !defined(DART_IO_DISABLED) | 1560 #endif // !defined(DART_IO_DISABLED) |
| OLD | NEW |