Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(17)

Unified Diff: dart/runtime/bin/eventhandler_win.cc

Issue 879353003: Introduce optional 'bool shared' parameter to ServerSocket.bind() ... (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge
Patch Set: Addressed comments Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « dart/runtime/bin/eventhandler_win.h ('k') | dart/runtime/bin/io_natives.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: dart/runtime/bin/eventhandler_win.cc
diff --git a/dart/runtime/bin/eventhandler_win.cc b/dart/runtime/bin/eventhandler_win.cc
index fff9d87d9ae8f1db639c3eb777639452c42517e0..c5a332b89bdee622a3118ecb10ca5a7bbe77fde2 100644
--- a/dart/runtime/bin/eventhandler_win.cc
+++ b/dart/runtime/bin/eventhandler_win.cc
@@ -6,6 +6,7 @@
#if defined(TARGET_OS_WINDOWS)
#include "bin/eventhandler.h"
+#include "bin/eventhandler_win.h"
#include <winsock2.h> // NOLINT
#include <ws2tcpip.h> // NOLINT
@@ -29,10 +30,6 @@ namespace bin {
static const int kBufferSize = 64 * 1024;
static const int kStdOverlappedBufferSize = 16 * 1024;
-static const int kInfinityTimeout = -1;
-static const int kTimeoutId = -1;
-static const int kShutdownId = -2;
-
OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size,
Operation operation) {
OverlappedBuffer* buffer =
@@ -115,26 +112,9 @@ int OverlappedBuffer::GetRemainingLength() {
return data_length_ - index_;
}
-
-Handle::Handle(HANDLE handle)
- : handle_(reinterpret_cast<HANDLE>(handle)),
- port_(0),
- mask_(0),
- completion_port_(INVALID_HANDLE_VALUE),
- event_handler_(NULL),
- data_ready_(NULL),
- pending_read_(NULL),
- pending_write_(NULL),
- last_error_(NOERROR),
- flags_(0) {
- InitializeCriticalSection(&cs_);
-}
-
-
-Handle::Handle(HANDLE handle, Dart_Port port)
- : handle_(reinterpret_cast<HANDLE>(handle)),
- port_(port),
- mask_(0),
+Handle::Handle(intptr_t handle)
+ : DescriptorInfoBase(handle),
+ handle_(reinterpret_cast<HANDLE>(handle)),
completion_port_(INVALID_HANDLE_VALUE),
event_handler_(NULL),
data_ready_(NULL),
@@ -293,7 +273,7 @@ bool Handle::IssueRead() {
// Completing asynchronously through thread.
pending_read_ = buffer;
int result = Thread::Start(ReadFileThread,
- reinterpret_cast<uword>(this));
+ reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Failed to start read file thread %d", result);
}
@@ -339,7 +319,7 @@ bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
static void HandleClosed(Handle* handle) {
if (!handle->IsClosing()) {
int event_mask = 1 << kCloseEvent;
- DartUtils::PostInt32(handle->port(), event_mask);
+ DartUtils::PostInt32(handle->NextPort(), event_mask);
}
}
@@ -347,11 +327,8 @@ static void HandleClosed(Handle* handle) {
static void HandleError(Handle* handle) {
handle->set_last_error(WSAGetLastError());
handle->MarkError();
- if (!handle->IsClosing()) {
- Dart_Port port = handle->port();
- if (port != ILLEGAL_PORT) {
- DartUtils::PostInt32(port, 1 << kErrorEvent);
- }
+ if (!handle->IsClosing() && handle->HasNextPort()) {
+ DartUtils::PostInt32(handle->NextPort(), 1 << kErrorEvent);
}
}
@@ -467,6 +444,7 @@ bool ListenSocket::LoadAcceptEx() {
bool ListenSocket::IssueAccept() {
ScopedLock lock(this);
+
// For AcceptEx there needs to be buffer storage for address
// information for two addresses (local and remote address). The
// AcceptEx documentation says: "This value must be at least 16
@@ -515,7 +493,7 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
reinterpret_cast<char*>(&s), sizeof(s));
if (rc == NO_ERROR) {
// Insert the accepted socket into the list.
- ClientSocket* client_socket = new ClientSocket(buffer->client(), 0);
+ ClientSocket* client_socket = new ClientSocket(buffer->client());
client_socket->mark_connected();
client_socket->CreateCompletionPort(completion_port);
if (accepted_head_ == NULL) {
@@ -526,6 +504,7 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
accepted_tail_->set_next(client_socket);
accepted_tail_ = client_socket;
}
+ accepted_count_++;
} else {
closesocket(buffer->client());
}
@@ -541,11 +520,8 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
static void DeleteIfClosed(Handle* handle) {
if (handle->IsClosed()) {
- Dart_Port port = handle->port();
+ handle->SendToAll(1 << kDestroyedEvent);
delete handle;
- if (port != ILLEGAL_PORT) {
- DartUtils::PostInt32(port, 1 << kDestroyedEvent);
- }
}
}
@@ -574,16 +550,23 @@ bool ListenSocket::CanAccept() {
ClientSocket* ListenSocket::Accept() {
ScopedLock lock(this);
- if (accepted_head_ == NULL) return NULL;
- ClientSocket* result = accepted_head_;
- accepted_head_ = accepted_head_->next();
- if (accepted_head_ == NULL) accepted_tail_ = NULL;
- result->set_next(NULL);
+
+ ClientSocket *result = NULL;
+
+ if (accepted_head_ != NULL) {
+ result = accepted_head_;
+ accepted_head_ = accepted_head_->next();
+ if (accepted_head_ == NULL) accepted_tail_ = NULL;
+ result->set_next(NULL);
+ accepted_count_--;
+ }
+
if (!IsClosing()) {
if (!IssueAccept()) {
HandleError(this);
}
}
+
return result;
}
@@ -880,9 +863,11 @@ void ClientSocket::IssueDisconnect() {
if (ok || WSAGetLastError() != WSA_IO_PENDING) {
DisconnectComplete(buffer);
}
- Dart_Port p = port();
- if (p != ILLEGAL_PORT) DartUtils::PostInt32(p, 1 << kDestroyedEvent);
- port_ = ILLEGAL_PORT;
+ if (HasNextPort()) {
+ Dart_Port p = NextPort();
+ DartUtils::PostInt32(p, 1 << kDestroyedEvent);
+ RemovePort(p);
+ }
}
@@ -900,15 +885,14 @@ void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) {
OverlappedBuffer::DisposeBuffer(buffer);
// Update socket to support full socket API, after ConnectEx completed.
setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
- Dart_Port p = port();
- if (p != ILLEGAL_PORT) {
+ if (HasNextPort()) {
// If the port is set, we already listen for this socket in Dart.
// Handle the cases here.
if (!IsClosedRead()) {
IssueRead();
}
if (!IsClosedWrite()) {
- DartUtils::PostInt32(p, 1 << kOutEvent);
+ DartUtils::PostInt32(NextPort(), 1 << kOutEvent);
}
}
}
@@ -1011,42 +995,62 @@ void DatagramSocket::DoClose() {
void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
ASSERT(this != NULL);
- if (msg->id == kTimeoutId) {
+ if (msg->id == kTimerId) {
// Change of timeout request. Just set the new timeout and port as the
// completion thread will use the new timeout value for its next wait.
timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
} else if (msg->id == kShutdownId) {
shutdown_ = true;
} else {
- // No tokens to return on Windows.
- if ((msg->data & (1 << kReturnTokenCommand)) != 0) return;
Handle* handle = reinterpret_cast<Handle*>(msg->id);
ASSERT(handle != NULL);
+
if (handle->is_listen_socket()) {
ListenSocket* listen_socket =
reinterpret_cast<ListenSocket*>(handle);
listen_socket->EnsureInitialized(this);
- listen_socket->SetPortAndMask(msg->dart_port, msg->data);
Handle::ScopedLock lock(listen_socket);
// If incoming connections are requested make sure to post already
// accepted connections.
if ((msg->data & (1 << kInEvent)) != 0) {
- if (listen_socket->CanAccept()) {
- int event_mask = (1 << kInEvent);
- handle->set_mask(handle->mask() & ~event_mask);
- DartUtils::PostInt32(handle->port(), event_mask);
+ listen_socket->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
+ TryDispatchingPendingAccepts(listen_socket);
+ }
+
+ if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
+ int count = TOKEN_COUNT(msg->data);
+ listen_socket->ReturnTokens(msg->dart_port, count);
+ TryDispatchingPendingAccepts(listen_socket);
+ return;
+ } else if (IS_COMMAND(msg->data, kCloseCommand)) {
+ Dart_Port port = msg->dart_port;
+ listen_socket->RemovePort(port);
+
+ MutexLocker locker(globalTcpListeningSocketRegistry.mutex());
+ if (globalTcpListeningSocketRegistry.CloseSafe(
+ reinterpret_cast<intptr_t>(listen_socket))) {
+ handle->Close();
}
+ DartUtils::PostInt32(port, 1 << kDestroyedEvent);
}
} else {
handle->EnsureInitialized(this);
Handle::ScopedLock lock(handle);
+ if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
+ int count = TOKEN_COUNT(msg->data);
+ handle->ReturnTokens(msg->dart_port, count);
+ // TODO(kustermann): How can we continue with sending events
+ // to dart from here?
+ return;
+ }
+
// Only set mask if we turned on kInEvent or kOutEvent.
if ((msg->data & ((1 << kInEvent) | (1 << kOutEvent))) != 0) {
- handle->SetPortAndMask(msg->dart_port, msg->data);
+ handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
}
// Issue a read.
@@ -1069,10 +1073,10 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
if (!handle->HasPendingWrite()) {
if (handle->is_client_socket()) {
if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
- DartUtils::PostInt32(handle->port(), 1 << kOutEvent);
+ DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent);
}
} else {
- DartUtils::PostInt32(handle->port(), 1 << kOutEvent);
+ DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent);
}
}
}
@@ -1087,13 +1091,12 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
client_socket->Shutdown(SD_SEND);
}
}
- }
- if ((msg->data & (1 << kCloseCommand)) != 0) {
- handle->SetPortAndMask(msg->dart_port, msg->data);
- handle->Close();
+ if ((msg->data & (1 << kCloseCommand)) != 0) {
+ handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
+ handle->Close();
+ }
}
-
DeleteIfClosed(handle);
}
}
@@ -1103,17 +1106,30 @@ void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
OverlappedBuffer* buffer) {
listen_socket->AcceptComplete(buffer, completion_port_);
- if (!listen_socket->IsClosing()) {
- int event_mask = 1 << kInEvent;
- if ((listen_socket->mask() & event_mask) != 0) {
- DartUtils::PostInt32(listen_socket->port(), event_mask);
- }
- }
+ TryDispatchingPendingAccepts(listen_socket);
DeleteIfClosed(listen_socket);
}
+void EventHandlerImplementation::TryDispatchingPendingAccepts(
+ ListenSocket *listen_socket) {
+ Handle::ScopedLock lock(listen_socket);
+
+ if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
+ for (int i = 0;
+ i < listen_socket->accepted_count() && listen_socket->HasNextPort();
+ i++) {
+ Dart_Port port = listen_socket->NextPort();
+ DartUtils::PostInt32(port, 1 << kInEvent);
+ if (listen_socket->TakeToken()) {
+ break;
+ }
+ }
+ }
+}
+
+
void EventHandlerImplementation::HandleRead(Handle* handle,
int bytes,
OverlappedBuffer* buffer) {
@@ -1122,8 +1138,8 @@ void EventHandlerImplementation::HandleRead(Handle* handle,
if (bytes > 0) {
if (!handle->IsClosing()) {
int event_mask = 1 << kInEvent;
- if ((handle->mask() & event_mask) != 0) {
- DartUtils::PostInt32(handle->port(), event_mask);
+ if ((handle->Mask() & event_mask) != 0) {
+ DartUtils::PostInt32(handle->NextPort(), event_mask);
}
}
} else {
@@ -1147,8 +1163,8 @@ void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
handle->ReadComplete(buffer);
if (!handle->IsClosing()) {
int event_mask = 1 << kInEvent;
- if ((handle->mask() & event_mask) != 0) {
- DartUtils::PostInt32(handle->port(), event_mask);
+ if ((handle->Mask() & event_mask) != 0) {
+ DartUtils::PostInt32(handle->NextPort(), event_mask);
}
}
@@ -1166,8 +1182,8 @@ void EventHandlerImplementation::HandleWrite(Handle* handle,
int event_mask = 1 << kOutEvent;
ASSERT(!handle->is_client_socket() ||
reinterpret_cast<ClientSocket*>(handle)->is_connected());
- if ((handle->mask() & event_mask) != 0) {
- DartUtils::PostInt32(handle->port(), event_mask);
+ if ((handle->Mask() & event_mask) != 0) {
+ DartUtils::PostInt32(handle->NextPort(), event_mask);
}
}
} else {
@@ -1350,7 +1366,7 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
void EventHandlerImplementation::Start(EventHandler* handler) {
int result = Thread::Start(EventHandlerEntry,
- reinterpret_cast<uword>(handler));
+ reinterpret_cast<uword>(handler));
if (result != 0) {
FATAL1("Failed to start event handler thread %d", result);
}
« no previous file with comments | « dart/runtime/bin/eventhandler_win.h ('k') | dart/runtime/bin/io_natives.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698