Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(20)

Unified Diff: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc

Issue 26703008: [NaCl SDK] nacl_io: Add support for non-blocking connect/accept (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
index 0f7cac89355bd2dec1960186257d82f2f5db1b27..d34cf7ceb2a47f44b69b4e169deef76bd962d9a3 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
@@ -6,10 +6,12 @@
#include "nacl_io/ossocket.h"
#ifdef PROVIDES_SOCKET_API
+#include <assert.h>
#include <errno.h>
#include <string.h>
#include <algorithm>
+#include "nacl_io/kernel_handle.h"
#include "nacl_io/mount_node_tcp.h"
#include "nacl_io/mount_stream.h"
#include "nacl_io/pepper_interface.h"
@@ -57,7 +59,7 @@ class TCPSendWork : public TCPWork {
// If not currently sending...
if (!stream->TestStreamFlags(SSF_SENDING)) {
- size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable();
+ size_t tx_data_avail = emitter_->BytesInOutputFIFO();
int capped_len = std::min(tx_data_avail, kMaxPacketSize);
if (capped_len == 0)
@@ -113,7 +115,7 @@ class TCPRecvWork : public TCPWork {
// If we are not currently receiving
if (!stream->TestStreamFlags(SSF_RECVING)) {
- size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable();
+ size_t rx_space_avail = emitter_->SpaceInInputFIFO();
int capped_len =
static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
@@ -152,6 +154,110 @@ class TCPRecvWork : public TCPWork {
}
};
+class TCPAcceptWork : public MountStream::Work {
+ public:
+ explicit TCPAcceptWork(MountStream* stream,
+ const ScopedEventEmitterTCP& emitter)
+ : MountStream::Work(stream),
+ emitter_(emitter) {}
+
+ TCPSocketInterface* TCPInterface() {
+ return mount()->ppapi()->GetTCPSocketInterface();
+ }
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // Does the stream exist, and can it accept?
+ if (NULL == node)
+ return false;
+
+ // If we are not currently accepting
+ if (!node->TestStreamFlags(SSF_LISTENING))
+ return false;
+
+ int err = TCPInterface()->Accept(node->socket_resource(),
+ &new_socket_,
+ mount()->GetRunCompletion(this));
+
+ if (err != PP_OK_COMPLETIONPENDING)
+ // Anything else, we should assume the socket has gone bad.
+ node->SetError_Locked(err);
+
+ return true;
+ }
+
+ virtual void Run(int32_t error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ if (node == NULL)
+ return;
+
+ if (error != PP_OK) {
+ node->SetError_Locked(error);
+ return;
+ }
+
+ emitter_->SetAcceptedSocket_Locked(new_socket_);
+ }
+
+ protected:
+ PP_Resource new_socket_;
+ ScopedEventEmitterTCP emitter_;
+};
+
+class TCPConnectWork : public MountStream::Work {
+ public:
+ explicit TCPConnectWork(MountStream* stream,
+ const ScopedEventEmitterTCP& emitter)
+ : MountStream::Work(stream),
+ emitter_(emitter) {}
+
+ TCPSocketInterface* TCPInterface() {
+ return mount()->ppapi()->GetTCPSocketInterface();
+ }
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // Does the stream exist, and can it connect?
+ if (NULL == node)
+ return false;
+
+ int err = TCPInterface()->Connect(node->socket_resource(),
+ node->remote_addr(),
+ mount()->GetRunCompletion(this));
+
+ if (err != PP_OK_COMPLETIONPENDING)
+ // Anything else, we should assume the socket has gone bad.
+ node->SetError_Locked(err);
+
+ return true;
+ }
+
+ virtual void Run(int32_t error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ if (node == NULL)
+ return;
+
+ if (error != PP_OK) {
+ node->ConnectFailed_Locked();
+ node->SetError_Locked(error);
+ return;
+ }
+
+ node->ConnectDone_Locked();
+ }
+
+ protected:
+ ScopedEventEmitterTCP emitter_;
+};
+
MountNodeTCP::MountNodeTCP(Mount* mount)
: MountNodeSocket(mount),
emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) {
@@ -177,12 +283,14 @@ Error MountNodeTCP::Init(int open_flags) {
if (TCPInterface() == NULL)
return EACCES;
+ SetStreamFlags(SSF_CAN_CONNECT);
+
if (socket_resource_ != 0) {
// TCP sockets that are contructed with an existing socket_resource_
// are those that generated from calls to Accept() and therefore are
// already connected.
remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
- ConnectDone();
+ ConnectDone_Locked();
} else {
socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
if (0 == socket_resource_)
@@ -192,34 +300,62 @@ Error MountNodeTCP::Init(int open_flags) {
return 0;
}
-EventEmitterTCP* MountNodeTCP::GetEventEmitter() {
+EventEmitter* MountNodeTCP::GetEventEmitter() {
return emitter_.get();
}
+void MountNodeTCP::QueueAccept() {
+ MountStream::Work* work = new TCPAcceptWork(mount_stream(), emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
+void MountNodeTCP::QueueConnect() {
+ MountStream::Work* work = new TCPConnectWork(mount_stream(), emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
void MountNodeTCP::QueueInput() {
- TCPRecvWork* work = new TCPRecvWork(emitter_);
+ MountStream::Work* work = new TCPRecvWork(emitter_);
mount_stream()->EnqueueWork(work);
}
void MountNodeTCP::QueueOutput() {
- TCPSendWork* work = new TCPSendWork(emitter_);
+ MountStream::Work* work = new TCPSendWork(emitter_);
mount_stream()->EnqueueWork(work);
}
-Error MountNodeTCP::Accept(PP_Resource* out_sock,
+Error MountNodeTCP::Accept(const HandleAttr& attr,
+ PP_Resource* out_sock,
struct sockaddr* addr,
socklen_t* len) {
- AUTO_LOCK(node_lock_);
- int err = TCPInterface()->Accept(socket_resource_,
- out_sock,
- PP_BlockUntilComplete());
+ EventListenerLock wait(GetEventEmitter());
- if (err != PP_OK)
- return PPErrorToErrno(err);
+ if (!TestStreamFlags(SSF_LISTENING))
+ return EINVAL;
+
+ // Either block forever or not at all
+ int ms = attr.IsBlocking() ? -1 : 0;
+
+ Error err = wait.WaitOnEvent(POLLIN, ms);
+
+ if (ETIMEDOUT == err)
+ return EWOULDBLOCK;
+
+ int s = emitter_->GetAcceptedSocket_Locked();
+ // Non-blocking case.
+ if (s == 0)
+ return EAGAIN;
+ // Consume the new socket and start listening for the next one
+ *out_sock = s;
+ emitter_->ClearEvents_Locked(POLLIN);
+
+ // Set the out paramaters
PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
*len = ResourceToSockAddr(remote_addr, *len, addr);
mount_->ppapi()->ReleaseResource(remote_addr);
+
+ QueueAccept();
return 0;
}
@@ -249,48 +385,73 @@ Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
return 0;
}
-Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
- AUTO_LOCK(node_lock_);
+Error MountNodeTCP::Connect(const HandleAttr& attr,
+ const struct sockaddr* addr,
+ socklen_t len) {
+ EventListenerLock wait(GetEventEmitter());
- if (remote_addr_ != 0)
+ if (TestStreamFlags(SSF_CONNECTING))
+ return EALREADY;
+
+ if (remote_addr_ != 0) {
return EISCONN;
+ }
remote_addr_ = SockAddrToResource(addr, len);
if (0 == remote_addr_)
return EINVAL;
- int err = TCPInterface()->Connect(socket_resource_,
- remote_addr_,
- PP_BlockUntilComplete());
+ int ms = attr.IsBlocking() ? -1 : 0;
+
+ SetStreamFlags(SSF_CONNECTING);
+ QueueConnect();
+
+ Error err = wait.WaitOnEvent(POLLOUT, ms);
+ if (ETIMEDOUT == err)
+ return EINPROGRESS;
// If we fail, release the dest addr resource
- if (err != PP_OK) {
- mount_->ppapi()->ReleaseResource(remote_addr_);
- remote_addr_ = 0;
- return PPErrorToErrno(err);
+ if (err != 0) {
+ ConnectFailed_Locked();
+ return err;
}
- ConnectDone();
+ ConnectDone_Locked();
return 0;
}
-void MountNodeTCP::ConnectDone() {
+void MountNodeTCP::ConnectDone_Locked() {
local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
// Now that we are connected, we can start sending and receiving.
+ ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
+ emitter_->ConnectDone_Locked();
+
// Begin the input pump
QueueInput();
}
+void MountNodeTCP::ConnectFailed_Locked() {
+ mount_->ppapi()->ReleaseResource(remote_addr_);
+ remote_addr_ = 0;
+}
+
Error MountNodeTCP::Listen(int backlog) {
+ AUTO_LOCK(node_lock_);
+ if (0 == local_addr_)
+ return EINVAL;
+
int err = TCPInterface()->Listen(socket_resource_,
backlog,
PP_BlockUntilComplete());
if (err != PP_OK)
return PPErrorToErrno(err);
+ ClearStreamFlags(SSF_CAN_CONNECT);
+ SetStreamFlags(SSF_LISTENING);
+ QueueAccept();
return 0;
}
@@ -298,6 +459,7 @@ Error MountNodeTCP::Recv_Locked(void* buf,
size_t len,
PP_Resource* out_addr,
int* out_len) {
+ assert(emitter_.get());
*out_len = emitter_->ReadIn_Locked((char*)buf, len);
*out_addr = remote_addr_;
@@ -311,6 +473,7 @@ Error MountNodeTCP::Send_Locked(const void* buf,
size_t len,
PP_Resource,
int* out_len) {
+ assert(emitter_.get());
*out_len = emitter_->WriteOut_Locked((char*)buf, len);
return 0;
}

Powered by Google App Engine
This is Rietveld 408576698