Index: dart/runtime/bin/eventhandler_win.cc |
diff --git a/dart/runtime/bin/eventhandler_win.cc b/dart/runtime/bin/eventhandler_win.cc |
index c5a332b89bdee622a3118ecb10ca5a7bbe77fde2..fff9d87d9ae8f1db639c3eb777639452c42517e0 100644 |
--- a/dart/runtime/bin/eventhandler_win.cc |
+++ b/dart/runtime/bin/eventhandler_win.cc |
@@ -6,7 +6,6 @@ |
#if defined(TARGET_OS_WINDOWS) |
#include "bin/eventhandler.h" |
-#include "bin/eventhandler_win.h" |
#include <winsock2.h> // NOLINT |
#include <ws2tcpip.h> // NOLINT |
@@ -30,6 +29,10 @@ namespace bin { |
static const int kBufferSize = 64 * 1024; |
static const int kStdOverlappedBufferSize = 16 * 1024; |
+static const int kInfinityTimeout = -1; |
+static const int kTimeoutId = -1; |
+static const int kShutdownId = -2; |
+ |
OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size, |
Operation operation) { |
OverlappedBuffer* buffer = |
@@ -112,9 +115,26 @@ int OverlappedBuffer::GetRemainingLength() { |
return data_length_ - index_; |
} |
-Handle::Handle(intptr_t handle) |
- : DescriptorInfoBase(handle), |
- handle_(reinterpret_cast<HANDLE>(handle)), |
+ |
+Handle::Handle(HANDLE handle) |
+ : handle_(reinterpret_cast<HANDLE>(handle)), |
+ port_(0), |
+ mask_(0), |
+ completion_port_(INVALID_HANDLE_VALUE), |
+ event_handler_(NULL), |
+ data_ready_(NULL), |
+ pending_read_(NULL), |
+ pending_write_(NULL), |
+ last_error_(NOERROR), |
+ flags_(0) { |
+ InitializeCriticalSection(&cs_); |
+} |
+ |
+ |
+Handle::Handle(HANDLE handle, Dart_Port port) |
+ : handle_(reinterpret_cast<HANDLE>(handle)), |
+ port_(port), |
+ mask_(0), |
completion_port_(INVALID_HANDLE_VALUE), |
event_handler_(NULL), |
data_ready_(NULL), |
@@ -273,7 +293,7 @@ bool Handle::IssueRead() { |
// Completing asynchronously through thread. |
pending_read_ = buffer; |
int result = Thread::Start(ReadFileThread, |
- reinterpret_cast<uword>(this)); |
+ reinterpret_cast<uword>(this)); |
if (result != 0) { |
FATAL1("Failed to start read file thread %d", result); |
} |
@@ -319,7 +339,7 @@ bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { |
static void HandleClosed(Handle* handle) { |
if (!handle->IsClosing()) { |
int event_mask = 1 << kCloseEvent; |
- DartUtils::PostInt32(handle->NextPort(), event_mask); |
+ DartUtils::PostInt32(handle->port(), event_mask); |
} |
} |
@@ -327,8 +347,11 @@ static void HandleClosed(Handle* handle) { |
static void HandleError(Handle* handle) { |
handle->set_last_error(WSAGetLastError()); |
handle->MarkError(); |
- if (!handle->IsClosing() && handle->HasNextPort()) { |
- DartUtils::PostInt32(handle->NextPort(), 1 << kErrorEvent); |
+ if (!handle->IsClosing()) { |
+ Dart_Port port = handle->port(); |
+ if (port != ILLEGAL_PORT) { |
+ DartUtils::PostInt32(port, 1 << kErrorEvent); |
+ } |
} |
} |
@@ -444,7 +467,6 @@ bool ListenSocket::LoadAcceptEx() { |
bool ListenSocket::IssueAccept() { |
ScopedLock lock(this); |
- |
// For AcceptEx there needs to be buffer storage for address |
// information for two addresses (local and remote address). The |
// AcceptEx documentation says: "This value must be at least 16 |
@@ -493,7 +515,7 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, |
reinterpret_cast<char*>(&s), sizeof(s)); |
if (rc == NO_ERROR) { |
// Insert the accepted socket into the list. |
- ClientSocket* client_socket = new ClientSocket(buffer->client()); |
+ ClientSocket* client_socket = new ClientSocket(buffer->client(), 0); |
client_socket->mark_connected(); |
client_socket->CreateCompletionPort(completion_port); |
if (accepted_head_ == NULL) { |
@@ -504,7 +526,6 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, |
accepted_tail_->set_next(client_socket); |
accepted_tail_ = client_socket; |
} |
- accepted_count_++; |
} else { |
closesocket(buffer->client()); |
} |
@@ -520,8 +541,11 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, |
static void DeleteIfClosed(Handle* handle) { |
if (handle->IsClosed()) { |
- handle->SendToAll(1 << kDestroyedEvent); |
+ Dart_Port port = handle->port(); |
delete handle; |
+ if (port != ILLEGAL_PORT) { |
+ DartUtils::PostInt32(port, 1 << kDestroyedEvent); |
+ } |
} |
} |
@@ -550,23 +574,16 @@ bool ListenSocket::CanAccept() { |
ClientSocket* ListenSocket::Accept() { |
ScopedLock lock(this); |
- |
- ClientSocket *result = NULL; |
- |
- if (accepted_head_ != NULL) { |
- result = accepted_head_; |
- accepted_head_ = accepted_head_->next(); |
- if (accepted_head_ == NULL) accepted_tail_ = NULL; |
- result->set_next(NULL); |
- accepted_count_--; |
- } |
- |
+ if (accepted_head_ == NULL) return NULL; |
+ ClientSocket* result = accepted_head_; |
+ accepted_head_ = accepted_head_->next(); |
+ if (accepted_head_ == NULL) accepted_tail_ = NULL; |
+ result->set_next(NULL); |
if (!IsClosing()) { |
if (!IssueAccept()) { |
HandleError(this); |
} |
} |
- |
return result; |
} |
@@ -863,11 +880,9 @@ void ClientSocket::IssueDisconnect() { |
if (ok || WSAGetLastError() != WSA_IO_PENDING) { |
DisconnectComplete(buffer); |
} |
- if (HasNextPort()) { |
- Dart_Port p = NextPort(); |
- DartUtils::PostInt32(p, 1 << kDestroyedEvent); |
- RemovePort(p); |
- } |
+ Dart_Port p = port(); |
+ if (p != ILLEGAL_PORT) DartUtils::PostInt32(p, 1 << kDestroyedEvent); |
+ port_ = ILLEGAL_PORT; |
} |
@@ -885,14 +900,15 @@ void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) { |
OverlappedBuffer::DisposeBuffer(buffer); |
// Update socket to support full socket API, after ConnectEx completed. |
setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); |
- if (HasNextPort()) { |
+ Dart_Port p = port(); |
+ if (p != ILLEGAL_PORT) { |
// If the port is set, we already listen for this socket in Dart. |
// Handle the cases here. |
if (!IsClosedRead()) { |
IssueRead(); |
} |
if (!IsClosedWrite()) { |
- DartUtils::PostInt32(NextPort(), 1 << kOutEvent); |
+ DartUtils::PostInt32(p, 1 << kOutEvent); |
} |
} |
} |
@@ -995,62 +1011,42 @@ void DatagramSocket::DoClose() { |
void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
ASSERT(this != NULL); |
- if (msg->id == kTimerId) { |
+ if (msg->id == kTimeoutId) { |
// Change of timeout request. Just set the new timeout and port as the |
// completion thread will use the new timeout value for its next wait. |
timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); |
} else if (msg->id == kShutdownId) { |
shutdown_ = true; |
} else { |
+ // No tokens to return on Windows. |
+ if ((msg->data & (1 << kReturnTokenCommand)) != 0) return; |
Handle* handle = reinterpret_cast<Handle*>(msg->id); |
ASSERT(handle != NULL); |
- |
if (handle->is_listen_socket()) { |
ListenSocket* listen_socket = |
reinterpret_cast<ListenSocket*>(handle); |
listen_socket->EnsureInitialized(this); |
+ listen_socket->SetPortAndMask(msg->dart_port, msg->data); |
Handle::ScopedLock lock(listen_socket); |
// If incoming connections are requested make sure to post already |
// accepted connections. |
if ((msg->data & (1 << kInEvent)) != 0) { |
- listen_socket->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK); |
- TryDispatchingPendingAccepts(listen_socket); |
- } |
- |
- if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
- int count = TOKEN_COUNT(msg->data); |
- listen_socket->ReturnTokens(msg->dart_port, count); |
- TryDispatchingPendingAccepts(listen_socket); |
- return; |
- } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
- Dart_Port port = msg->dart_port; |
- listen_socket->RemovePort(port); |
- |
- MutexLocker locker(globalTcpListeningSocketRegistry.mutex()); |
- if (globalTcpListeningSocketRegistry.CloseSafe( |
- reinterpret_cast<intptr_t>(listen_socket))) { |
- handle->Close(); |
+ if (listen_socket->CanAccept()) { |
+ int event_mask = (1 << kInEvent); |
+ handle->set_mask(handle->mask() & ~event_mask); |
+ DartUtils::PostInt32(handle->port(), event_mask); |
} |
- DartUtils::PostInt32(port, 1 << kDestroyedEvent); |
} |
} else { |
handle->EnsureInitialized(this); |
Handle::ScopedLock lock(handle); |
- if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
- int count = TOKEN_COUNT(msg->data); |
- handle->ReturnTokens(msg->dart_port, count); |
- // TODO(kustermann): How can we continue with sending events |
- // to dart from here? |
- return; |
- } |
- |
// Only set mask if we turned on kInEvent or kOutEvent. |
if ((msg->data & ((1 << kInEvent) | (1 << kOutEvent))) != 0) { |
- handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK); |
+ handle->SetPortAndMask(msg->dart_port, msg->data); |
} |
// Issue a read. |
@@ -1073,10 +1069,10 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
if (!handle->HasPendingWrite()) { |
if (handle->is_client_socket()) { |
if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) { |
- DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent); |
+ DartUtils::PostInt32(handle->port(), 1 << kOutEvent); |
} |
} else { |
- DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent); |
+ DartUtils::PostInt32(handle->port(), 1 << kOutEvent); |
} |
} |
} |
@@ -1091,12 +1087,13 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
client_socket->Shutdown(SD_SEND); |
} |
} |
+ } |
- if ((msg->data & (1 << kCloseCommand)) != 0) { |
- handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK); |
- handle->Close(); |
- } |
+ if ((msg->data & (1 << kCloseCommand)) != 0) { |
+ handle->SetPortAndMask(msg->dart_port, msg->data); |
+ handle->Close(); |
} |
+ |
DeleteIfClosed(handle); |
} |
} |
@@ -1106,27 +1103,14 @@ void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, |
OverlappedBuffer* buffer) { |
listen_socket->AcceptComplete(buffer, completion_port_); |
- TryDispatchingPendingAccepts(listen_socket); |
- |
- DeleteIfClosed(listen_socket); |
-} |
- |
- |
-void EventHandlerImplementation::TryDispatchingPendingAccepts( |
- ListenSocket *listen_socket) { |
- Handle::ScopedLock lock(listen_socket); |
- |
- if (!listen_socket->IsClosing() && listen_socket->CanAccept()) { |
- for (int i = 0; |
- i < listen_socket->accepted_count() && listen_socket->HasNextPort(); |
- i++) { |
- Dart_Port port = listen_socket->NextPort(); |
- DartUtils::PostInt32(port, 1 << kInEvent); |
- if (listen_socket->TakeToken()) { |
- break; |
- } |
+ if (!listen_socket->IsClosing()) { |
+ int event_mask = 1 << kInEvent; |
+ if ((listen_socket->mask() & event_mask) != 0) { |
+ DartUtils::PostInt32(listen_socket->port(), event_mask); |
} |
} |
+ |
+ DeleteIfClosed(listen_socket); |
} |
@@ -1138,8 +1122,8 @@ void EventHandlerImplementation::HandleRead(Handle* handle, |
if (bytes > 0) { |
if (!handle->IsClosing()) { |
int event_mask = 1 << kInEvent; |
- if ((handle->Mask() & event_mask) != 0) { |
- DartUtils::PostInt32(handle->NextPort(), event_mask); |
+ if ((handle->mask() & event_mask) != 0) { |
+ DartUtils::PostInt32(handle->port(), event_mask); |
} |
} |
} else { |
@@ -1163,8 +1147,8 @@ void EventHandlerImplementation::HandleRecvFrom(Handle* handle, |
handle->ReadComplete(buffer); |
if (!handle->IsClosing()) { |
int event_mask = 1 << kInEvent; |
- if ((handle->Mask() & event_mask) != 0) { |
- DartUtils::PostInt32(handle->NextPort(), event_mask); |
+ if ((handle->mask() & event_mask) != 0) { |
+ DartUtils::PostInt32(handle->port(), event_mask); |
} |
} |
@@ -1182,8 +1166,8 @@ void EventHandlerImplementation::HandleWrite(Handle* handle, |
int event_mask = 1 << kOutEvent; |
ASSERT(!handle->is_client_socket() || |
reinterpret_cast<ClientSocket*>(handle)->is_connected()); |
- if ((handle->Mask() & event_mask) != 0) { |
- DartUtils::PostInt32(handle->NextPort(), event_mask); |
+ if ((handle->mask() & event_mask) != 0) { |
+ DartUtils::PostInt32(handle->port(), event_mask); |
} |
} |
} else { |
@@ -1366,7 +1350,7 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) { |
void EventHandlerImplementation::Start(EventHandler* handler) { |
int result = Thread::Start(EventHandlerEntry, |
- reinterpret_cast<uword>(handler)); |
+ reinterpret_cast<uword>(handler)); |
if (result != 0) { |
FATAL1("Failed to start event handler thread %d", result); |
} |