Index: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc |
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc |
index 0f7cac89355bd2dec1960186257d82f2f5db1b27..d34cf7ceb2a47f44b69b4e169deef76bd962d9a3 100644 |
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc |
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc |
@@ -6,10 +6,12 @@ |
#include "nacl_io/ossocket.h" |
#ifdef PROVIDES_SOCKET_API |
+#include <assert.h> |
#include <errno.h> |
#include <string.h> |
#include <algorithm> |
+#include "nacl_io/kernel_handle.h" |
#include "nacl_io/mount_node_tcp.h" |
#include "nacl_io/mount_stream.h" |
#include "nacl_io/pepper_interface.h" |
@@ -57,7 +59,7 @@ class TCPSendWork : public TCPWork { |
// If not currently sending... |
if (!stream->TestStreamFlags(SSF_SENDING)) { |
- size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable(); |
+ size_t tx_data_avail = emitter_->BytesInOutputFIFO(); |
int capped_len = std::min(tx_data_avail, kMaxPacketSize); |
if (capped_len == 0) |
@@ -113,7 +115,7 @@ class TCPRecvWork : public TCPWork { |
// If we are not currently receiving |
if (!stream->TestStreamFlags(SSF_RECVING)) { |
- size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable(); |
+ size_t rx_space_avail = emitter_->SpaceInInputFIFO(); |
int capped_len = |
static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize)); |
@@ -152,6 +154,110 @@ class TCPRecvWork : public TCPWork { |
} |
}; |
+class TCPAcceptWork : public MountStream::Work { |
+ public: |
+ explicit TCPAcceptWork(MountStream* stream, |
+ const ScopedEventEmitterTCP& emitter) |
+ : MountStream::Work(stream), |
+ emitter_(emitter) {} |
+ |
+ TCPSocketInterface* TCPInterface() { |
+ return mount()->ppapi()->GetTCPSocketInterface(); |
+ } |
+ |
+ virtual bool Start(int32_t val) { |
+ AUTO_LOCK(emitter_->GetLock()); |
+ MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream()); |
+ |
+ // Does the stream exist, and can it accept? |
+ if (NULL == node) |
+ return false; |
+ |
+ // If we are not currently accepting |
+ if (!node->TestStreamFlags(SSF_LISTENING)) |
+ return false; |
+ |
+ int err = TCPInterface()->Accept(node->socket_resource(), |
+ &new_socket_, |
+ mount()->GetRunCompletion(this)); |
+ |
+ if (err != PP_OK_COMPLETIONPENDING) |
+ // Anything else, we should assume the socket has gone bad. |
+ node->SetError_Locked(err); |
+ |
+ return true; |
+ } |
+ |
+ virtual void Run(int32_t error) { |
+ AUTO_LOCK(emitter_->GetLock()); |
+ MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream()); |
+ |
+ if (node == NULL) |
+ return; |
+ |
+ if (error != PP_OK) { |
+ node->SetError_Locked(error); |
+ return; |
+ } |
+ |
+ emitter_->SetAcceptedSocket_Locked(new_socket_); |
+ } |
+ |
+ protected: |
+ PP_Resource new_socket_; |
+ ScopedEventEmitterTCP emitter_; |
+}; |
+ |
+class TCPConnectWork : public MountStream::Work { |
+ public: |
+ explicit TCPConnectWork(MountStream* stream, |
+ const ScopedEventEmitterTCP& emitter) |
+ : MountStream::Work(stream), |
+ emitter_(emitter) {} |
+ |
+ TCPSocketInterface* TCPInterface() { |
+ return mount()->ppapi()->GetTCPSocketInterface(); |
+ } |
+ |
+ virtual bool Start(int32_t val) { |
+ AUTO_LOCK(emitter_->GetLock()); |
+ MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream()); |
+ |
+ // Does the stream exist, and can it connect? |
+ if (NULL == node) |
+ return false; |
+ |
+ int err = TCPInterface()->Connect(node->socket_resource(), |
+ node->remote_addr(), |
+ mount()->GetRunCompletion(this)); |
+ |
+ if (err != PP_OK_COMPLETIONPENDING) |
+ // Anything else, we should assume the socket has gone bad. |
+ node->SetError_Locked(err); |
+ |
+ return true; |
+ } |
+ |
+ virtual void Run(int32_t error) { |
+ AUTO_LOCK(emitter_->GetLock()); |
+ MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream()); |
+ |
+ if (node == NULL) |
+ return; |
+ |
+ if (error != PP_OK) { |
+ node->ConnectFailed_Locked(); |
+ node->SetError_Locked(error); |
+ return; |
+ } |
+ |
+ node->ConnectDone_Locked(); |
+ } |
+ |
+ protected: |
+ ScopedEventEmitterTCP emitter_; |
+}; |
+ |
MountNodeTCP::MountNodeTCP(Mount* mount) |
: MountNodeSocket(mount), |
emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) { |
@@ -177,12 +283,14 @@ Error MountNodeTCP::Init(int open_flags) { |
if (TCPInterface() == NULL) |
return EACCES; |
+ SetStreamFlags(SSF_CAN_CONNECT); |
+ |
if (socket_resource_ != 0) { |
// TCP sockets that are contructed with an existing socket_resource_ |
// are those that generated from calls to Accept() and therefore are |
// already connected. |
remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_); |
- ConnectDone(); |
+ ConnectDone_Locked(); |
} else { |
socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance()); |
if (0 == socket_resource_) |
@@ -192,34 +300,62 @@ Error MountNodeTCP::Init(int open_flags) { |
return 0; |
} |
-EventEmitterTCP* MountNodeTCP::GetEventEmitter() { |
+EventEmitter* MountNodeTCP::GetEventEmitter() { |
return emitter_.get(); |
} |
+void MountNodeTCP::QueueAccept() { |
+ MountStream::Work* work = new TCPAcceptWork(mount_stream(), emitter_); |
+ mount_stream()->EnqueueWork(work); |
+} |
+ |
+void MountNodeTCP::QueueConnect() { |
+ MountStream::Work* work = new TCPConnectWork(mount_stream(), emitter_); |
+ mount_stream()->EnqueueWork(work); |
+} |
+ |
void MountNodeTCP::QueueInput() { |
- TCPRecvWork* work = new TCPRecvWork(emitter_); |
+ MountStream::Work* work = new TCPRecvWork(emitter_); |
mount_stream()->EnqueueWork(work); |
} |
void MountNodeTCP::QueueOutput() { |
- TCPSendWork* work = new TCPSendWork(emitter_); |
+ MountStream::Work* work = new TCPSendWork(emitter_); |
mount_stream()->EnqueueWork(work); |
} |
-Error MountNodeTCP::Accept(PP_Resource* out_sock, |
+Error MountNodeTCP::Accept(const HandleAttr& attr, |
+ PP_Resource* out_sock, |
struct sockaddr* addr, |
socklen_t* len) { |
- AUTO_LOCK(node_lock_); |
- int err = TCPInterface()->Accept(socket_resource_, |
- out_sock, |
- PP_BlockUntilComplete()); |
+ EventListenerLock wait(GetEventEmitter()); |
- if (err != PP_OK) |
- return PPErrorToErrno(err); |
+ if (!TestStreamFlags(SSF_LISTENING)) |
+ return EINVAL; |
+ |
+ // Either block forever or not at all |
+ int ms = attr.IsBlocking() ? -1 : 0; |
+ |
+ Error err = wait.WaitOnEvent(POLLIN, ms); |
+ |
+ if (ETIMEDOUT == err) |
+ return EWOULDBLOCK; |
+ |
+ int s = emitter_->GetAcceptedSocket_Locked(); |
+ // Non-blocking case. |
+ if (s == 0) |
+ return EAGAIN; |
+ // Consume the new socket and start listening for the next one |
+ *out_sock = s; |
+ emitter_->ClearEvents_Locked(POLLIN); |
+ |
+ // Set the out paramaters |
PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock); |
*len = ResourceToSockAddr(remote_addr, *len, addr); |
mount_->ppapi()->ReleaseResource(remote_addr); |
+ |
+ QueueAccept(); |
return 0; |
} |
@@ -249,48 +385,73 @@ Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) { |
return 0; |
} |
-Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { |
- AUTO_LOCK(node_lock_); |
+Error MountNodeTCP::Connect(const HandleAttr& attr, |
+ const struct sockaddr* addr, |
+ socklen_t len) { |
+ EventListenerLock wait(GetEventEmitter()); |
- if (remote_addr_ != 0) |
+ if (TestStreamFlags(SSF_CONNECTING)) |
+ return EALREADY; |
+ |
+ if (remote_addr_ != 0) { |
return EISCONN; |
+ } |
remote_addr_ = SockAddrToResource(addr, len); |
if (0 == remote_addr_) |
return EINVAL; |
- int err = TCPInterface()->Connect(socket_resource_, |
- remote_addr_, |
- PP_BlockUntilComplete()); |
+ int ms = attr.IsBlocking() ? -1 : 0; |
+ |
+ SetStreamFlags(SSF_CONNECTING); |
+ QueueConnect(); |
+ |
+ Error err = wait.WaitOnEvent(POLLOUT, ms); |
+ if (ETIMEDOUT == err) |
+ return EINPROGRESS; |
// If we fail, release the dest addr resource |
- if (err != PP_OK) { |
- mount_->ppapi()->ReleaseResource(remote_addr_); |
- remote_addr_ = 0; |
- return PPErrorToErrno(err); |
+ if (err != 0) { |
+ ConnectFailed_Locked(); |
+ return err; |
} |
- ConnectDone(); |
+ ConnectDone_Locked(); |
return 0; |
} |
-void MountNodeTCP::ConnectDone() { |
+void MountNodeTCP::ConnectDone_Locked() { |
local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_); |
// Now that we are connected, we can start sending and receiving. |
+ ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT); |
SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV); |
+ emitter_->ConnectDone_Locked(); |
+ |
// Begin the input pump |
QueueInput(); |
} |
+void MountNodeTCP::ConnectFailed_Locked() { |
+ mount_->ppapi()->ReleaseResource(remote_addr_); |
+ remote_addr_ = 0; |
+} |
+ |
Error MountNodeTCP::Listen(int backlog) { |
+ AUTO_LOCK(node_lock_); |
+ if (0 == local_addr_) |
+ return EINVAL; |
+ |
int err = TCPInterface()->Listen(socket_resource_, |
backlog, |
PP_BlockUntilComplete()); |
if (err != PP_OK) |
return PPErrorToErrno(err); |
+ ClearStreamFlags(SSF_CAN_CONNECT); |
+ SetStreamFlags(SSF_LISTENING); |
+ QueueAccept(); |
return 0; |
} |
@@ -298,6 +459,7 @@ Error MountNodeTCP::Recv_Locked(void* buf, |
size_t len, |
PP_Resource* out_addr, |
int* out_len) { |
+ assert(emitter_.get()); |
*out_len = emitter_->ReadIn_Locked((char*)buf, len); |
*out_addr = remote_addr_; |
@@ -311,6 +473,7 @@ Error MountNodeTCP::Send_Locked(const void* buf, |
size_t len, |
PP_Resource, |
int* out_len) { |
+ assert(emitter_.get()); |
*out_len = emitter_->WriteOut_Locked((char*)buf, len); |
return 0; |
} |