| Index: dart/runtime/bin/eventhandler_win.cc
|
| diff --git a/dart/runtime/bin/eventhandler_win.cc b/dart/runtime/bin/eventhandler_win.cc
|
| index fff9d87d9ae8f1db639c3eb777639452c42517e0..c5a332b89bdee622a3118ecb10ca5a7bbe77fde2 100644
|
| --- a/dart/runtime/bin/eventhandler_win.cc
|
| +++ b/dart/runtime/bin/eventhandler_win.cc
|
| @@ -6,6 +6,7 @@
|
| #if defined(TARGET_OS_WINDOWS)
|
|
|
| #include "bin/eventhandler.h"
|
| +#include "bin/eventhandler_win.h"
|
|
|
| #include <winsock2.h> // NOLINT
|
| #include <ws2tcpip.h> // NOLINT
|
| @@ -29,10 +30,6 @@ namespace bin {
|
| static const int kBufferSize = 64 * 1024;
|
| static const int kStdOverlappedBufferSize = 16 * 1024;
|
|
|
| -static const int kInfinityTimeout = -1;
|
| -static const int kTimeoutId = -1;
|
| -static const int kShutdownId = -2;
|
| -
|
| OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size,
|
| Operation operation) {
|
| OverlappedBuffer* buffer =
|
| @@ -115,26 +112,9 @@ int OverlappedBuffer::GetRemainingLength() {
|
| return data_length_ - index_;
|
| }
|
|
|
| -
|
| -Handle::Handle(HANDLE handle)
|
| - : handle_(reinterpret_cast<HANDLE>(handle)),
|
| - port_(0),
|
| - mask_(0),
|
| - completion_port_(INVALID_HANDLE_VALUE),
|
| - event_handler_(NULL),
|
| - data_ready_(NULL),
|
| - pending_read_(NULL),
|
| - pending_write_(NULL),
|
| - last_error_(NOERROR),
|
| - flags_(0) {
|
| - InitializeCriticalSection(&cs_);
|
| -}
|
| -
|
| -
|
| -Handle::Handle(HANDLE handle, Dart_Port port)
|
| - : handle_(reinterpret_cast<HANDLE>(handle)),
|
| - port_(port),
|
| - mask_(0),
|
| +Handle::Handle(intptr_t handle)
|
| + : DescriptorInfoBase(handle),
|
| + handle_(reinterpret_cast<HANDLE>(handle)),
|
| completion_port_(INVALID_HANDLE_VALUE),
|
| event_handler_(NULL),
|
| data_ready_(NULL),
|
| @@ -293,7 +273,7 @@ bool Handle::IssueRead() {
|
| // Completing asynchronously through thread.
|
| pending_read_ = buffer;
|
| int result = Thread::Start(ReadFileThread,
|
| - reinterpret_cast<uword>(this));
|
| + reinterpret_cast<uword>(this));
|
| if (result != 0) {
|
| FATAL1("Failed to start read file thread %d", result);
|
| }
|
| @@ -339,7 +319,7 @@ bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
|
| static void HandleClosed(Handle* handle) {
|
| if (!handle->IsClosing()) {
|
| int event_mask = 1 << kCloseEvent;
|
| - DartUtils::PostInt32(handle->port(), event_mask);
|
| + DartUtils::PostInt32(handle->NextPort(), event_mask);
|
| }
|
| }
|
|
|
| @@ -347,11 +327,8 @@ static void HandleClosed(Handle* handle) {
|
| static void HandleError(Handle* handle) {
|
| handle->set_last_error(WSAGetLastError());
|
| handle->MarkError();
|
| - if (!handle->IsClosing()) {
|
| - Dart_Port port = handle->port();
|
| - if (port != ILLEGAL_PORT) {
|
| - DartUtils::PostInt32(port, 1 << kErrorEvent);
|
| - }
|
| + if (!handle->IsClosing() && handle->HasNextPort()) {
|
| + DartUtils::PostInt32(handle->NextPort(), 1 << kErrorEvent);
|
| }
|
| }
|
|
|
| @@ -467,6 +444,7 @@ bool ListenSocket::LoadAcceptEx() {
|
|
|
| bool ListenSocket::IssueAccept() {
|
| ScopedLock lock(this);
|
| +
|
| // For AcceptEx there needs to be buffer storage for address
|
| // information for two addresses (local and remote address). The
|
| // AcceptEx documentation says: "This value must be at least 16
|
| @@ -515,7 +493,7 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
|
| reinterpret_cast<char*>(&s), sizeof(s));
|
| if (rc == NO_ERROR) {
|
| // Insert the accepted socket into the list.
|
| - ClientSocket* client_socket = new ClientSocket(buffer->client(), 0);
|
| + ClientSocket* client_socket = new ClientSocket(buffer->client());
|
| client_socket->mark_connected();
|
| client_socket->CreateCompletionPort(completion_port);
|
| if (accepted_head_ == NULL) {
|
| @@ -526,6 +504,7 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
|
| accepted_tail_->set_next(client_socket);
|
| accepted_tail_ = client_socket;
|
| }
|
| + accepted_count_++;
|
| } else {
|
| closesocket(buffer->client());
|
| }
|
| @@ -541,11 +520,8 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
|
|
|
| static void DeleteIfClosed(Handle* handle) {
|
| if (handle->IsClosed()) {
|
| - Dart_Port port = handle->port();
|
| + handle->SendToAll(1 << kDestroyedEvent);
|
| delete handle;
|
| - if (port != ILLEGAL_PORT) {
|
| - DartUtils::PostInt32(port, 1 << kDestroyedEvent);
|
| - }
|
| }
|
| }
|
|
|
| @@ -574,16 +550,23 @@ bool ListenSocket::CanAccept() {
|
|
|
| ClientSocket* ListenSocket::Accept() {
|
| ScopedLock lock(this);
|
| - if (accepted_head_ == NULL) return NULL;
|
| - ClientSocket* result = accepted_head_;
|
| - accepted_head_ = accepted_head_->next();
|
| - if (accepted_head_ == NULL) accepted_tail_ = NULL;
|
| - result->set_next(NULL);
|
| +
|
| + ClientSocket *result = NULL;
|
| +
|
| + if (accepted_head_ != NULL) {
|
| + result = accepted_head_;
|
| + accepted_head_ = accepted_head_->next();
|
| + if (accepted_head_ == NULL) accepted_tail_ = NULL;
|
| + result->set_next(NULL);
|
| + accepted_count_--;
|
| + }
|
| +
|
| if (!IsClosing()) {
|
| if (!IssueAccept()) {
|
| HandleError(this);
|
| }
|
| }
|
| +
|
| return result;
|
| }
|
|
|
| @@ -880,9 +863,11 @@ void ClientSocket::IssueDisconnect() {
|
| if (ok || WSAGetLastError() != WSA_IO_PENDING) {
|
| DisconnectComplete(buffer);
|
| }
|
| - Dart_Port p = port();
|
| - if (p != ILLEGAL_PORT) DartUtils::PostInt32(p, 1 << kDestroyedEvent);
|
| - port_ = ILLEGAL_PORT;
|
| + if (HasNextPort()) {
|
| + Dart_Port p = NextPort();
|
| + DartUtils::PostInt32(p, 1 << kDestroyedEvent);
|
| + RemovePort(p);
|
| + }
|
| }
|
|
|
|
|
| @@ -900,15 +885,14 @@ void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) {
|
| OverlappedBuffer::DisposeBuffer(buffer);
|
| // Update socket to support full socket API, after ConnectEx completed.
|
| setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
|
| - Dart_Port p = port();
|
| - if (p != ILLEGAL_PORT) {
|
| + if (HasNextPort()) {
|
| // If the port is set, we already listen for this socket in Dart.
|
| // Handle the cases here.
|
| if (!IsClosedRead()) {
|
| IssueRead();
|
| }
|
| if (!IsClosedWrite()) {
|
| - DartUtils::PostInt32(p, 1 << kOutEvent);
|
| + DartUtils::PostInt32(NextPort(), 1 << kOutEvent);
|
| }
|
| }
|
| }
|
| @@ -1011,42 +995,62 @@ void DatagramSocket::DoClose() {
|
|
|
| void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
|
| ASSERT(this != NULL);
|
| - if (msg->id == kTimeoutId) {
|
| + if (msg->id == kTimerId) {
|
| // Change of timeout request. Just set the new timeout and port as the
|
| // completion thread will use the new timeout value for its next wait.
|
| timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
|
| } else if (msg->id == kShutdownId) {
|
| shutdown_ = true;
|
| } else {
|
| - // No tokens to return on Windows.
|
| - if ((msg->data & (1 << kReturnTokenCommand)) != 0) return;
|
| Handle* handle = reinterpret_cast<Handle*>(msg->id);
|
| ASSERT(handle != NULL);
|
| +
|
| if (handle->is_listen_socket()) {
|
| ListenSocket* listen_socket =
|
| reinterpret_cast<ListenSocket*>(handle);
|
| listen_socket->EnsureInitialized(this);
|
| - listen_socket->SetPortAndMask(msg->dart_port, msg->data);
|
|
|
| Handle::ScopedLock lock(listen_socket);
|
|
|
| // If incoming connections are requested make sure to post already
|
| // accepted connections.
|
| if ((msg->data & (1 << kInEvent)) != 0) {
|
| - if (listen_socket->CanAccept()) {
|
| - int event_mask = (1 << kInEvent);
|
| - handle->set_mask(handle->mask() & ~event_mask);
|
| - DartUtils::PostInt32(handle->port(), event_mask);
|
| + listen_socket->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
|
| + TryDispatchingPendingAccepts(listen_socket);
|
| + }
|
| +
|
| + if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
|
| + int count = TOKEN_COUNT(msg->data);
|
| + listen_socket->ReturnTokens(msg->dart_port, count);
|
| + TryDispatchingPendingAccepts(listen_socket);
|
| + return;
|
| + } else if (IS_COMMAND(msg->data, kCloseCommand)) {
|
| + Dart_Port port = msg->dart_port;
|
| + listen_socket->RemovePort(port);
|
| +
|
| + MutexLocker locker(globalTcpListeningSocketRegistry.mutex());
|
| + if (globalTcpListeningSocketRegistry.CloseSafe(
|
| + reinterpret_cast<intptr_t>(listen_socket))) {
|
| + handle->Close();
|
| }
|
| + DartUtils::PostInt32(port, 1 << kDestroyedEvent);
|
| }
|
| } else {
|
| handle->EnsureInitialized(this);
|
|
|
| Handle::ScopedLock lock(handle);
|
|
|
| + if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
|
| + int count = TOKEN_COUNT(msg->data);
|
| + handle->ReturnTokens(msg->dart_port, count);
|
| + // TODO(kustermann): How can we continue with sending events
|
| + // to dart from here?
|
| + return;
|
| + }
|
| +
|
| // Only set mask if we turned on kInEvent or kOutEvent.
|
| if ((msg->data & ((1 << kInEvent) | (1 << kOutEvent))) != 0) {
|
| - handle->SetPortAndMask(msg->dart_port, msg->data);
|
| + handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
|
| }
|
|
|
| // Issue a read.
|
| @@ -1069,10 +1073,10 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
|
| if (!handle->HasPendingWrite()) {
|
| if (handle->is_client_socket()) {
|
| if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
|
| - DartUtils::PostInt32(handle->port(), 1 << kOutEvent);
|
| + DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent);
|
| }
|
| } else {
|
| - DartUtils::PostInt32(handle->port(), 1 << kOutEvent);
|
| + DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent);
|
| }
|
| }
|
| }
|
| @@ -1087,13 +1091,12 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
|
| client_socket->Shutdown(SD_SEND);
|
| }
|
| }
|
| - }
|
|
|
| - if ((msg->data & (1 << kCloseCommand)) != 0) {
|
| - handle->SetPortAndMask(msg->dart_port, msg->data);
|
| - handle->Close();
|
| + if ((msg->data & (1 << kCloseCommand)) != 0) {
|
| + handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
|
| + handle->Close();
|
| + }
|
| }
|
| -
|
| DeleteIfClosed(handle);
|
| }
|
| }
|
| @@ -1103,17 +1106,30 @@ void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
|
| OverlappedBuffer* buffer) {
|
| listen_socket->AcceptComplete(buffer, completion_port_);
|
|
|
| - if (!listen_socket->IsClosing()) {
|
| - int event_mask = 1 << kInEvent;
|
| - if ((listen_socket->mask() & event_mask) != 0) {
|
| - DartUtils::PostInt32(listen_socket->port(), event_mask);
|
| - }
|
| - }
|
| + TryDispatchingPendingAccepts(listen_socket);
|
|
|
| DeleteIfClosed(listen_socket);
|
| }
|
|
|
|
|
| +void EventHandlerImplementation::TryDispatchingPendingAccepts(
|
| + ListenSocket *listen_socket) {
|
| + Handle::ScopedLock lock(listen_socket);
|
| +
|
| + if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
|
| + for (int i = 0;
|
| + i < listen_socket->accepted_count() && listen_socket->HasNextPort();
|
| + i++) {
|
| + Dart_Port port = listen_socket->NextPort();
|
| + DartUtils::PostInt32(port, 1 << kInEvent);
|
| + if (listen_socket->TakeToken()) {
|
| + break;
|
| + }
|
| + }
|
| + }
|
| +}
|
| +
|
| +
|
| void EventHandlerImplementation::HandleRead(Handle* handle,
|
| int bytes,
|
| OverlappedBuffer* buffer) {
|
| @@ -1122,8 +1138,8 @@ void EventHandlerImplementation::HandleRead(Handle* handle,
|
| if (bytes > 0) {
|
| if (!handle->IsClosing()) {
|
| int event_mask = 1 << kInEvent;
|
| - if ((handle->mask() & event_mask) != 0) {
|
| - DartUtils::PostInt32(handle->port(), event_mask);
|
| + if ((handle->Mask() & event_mask) != 0) {
|
| + DartUtils::PostInt32(handle->NextPort(), event_mask);
|
| }
|
| }
|
| } else {
|
| @@ -1147,8 +1163,8 @@ void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
|
| handle->ReadComplete(buffer);
|
| if (!handle->IsClosing()) {
|
| int event_mask = 1 << kInEvent;
|
| - if ((handle->mask() & event_mask) != 0) {
|
| - DartUtils::PostInt32(handle->port(), event_mask);
|
| + if ((handle->Mask() & event_mask) != 0) {
|
| + DartUtils::PostInt32(handle->NextPort(), event_mask);
|
| }
|
| }
|
|
|
| @@ -1166,8 +1182,8 @@ void EventHandlerImplementation::HandleWrite(Handle* handle,
|
| int event_mask = 1 << kOutEvent;
|
| ASSERT(!handle->is_client_socket() ||
|
| reinterpret_cast<ClientSocket*>(handle)->is_connected());
|
| - if ((handle->mask() & event_mask) != 0) {
|
| - DartUtils::PostInt32(handle->port(), event_mask);
|
| + if ((handle->Mask() & event_mask) != 0) {
|
| + DartUtils::PostInt32(handle->NextPort(), event_mask);
|
| }
|
| }
|
| } else {
|
| @@ -1350,7 +1366,7 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
|
|
|
| void EventHandlerImplementation::Start(EventHandler* handler) {
|
| int result = Thread::Start(EventHandlerEntry,
|
| - reinterpret_cast<uword>(handler));
|
| + reinterpret_cast<uword>(handler));
|
| if (result != 0) {
|
| FATAL1("Failed to start event handler thread %d", result);
|
| }
|
|
|