| Index: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
|
| diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
|
| index 0f7cac89355bd2dec1960186257d82f2f5db1b27..f96954da38f50a7d3a4dabb19e8105d3acf605da 100644
|
| --- a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
|
| +++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
|
| @@ -6,10 +6,13 @@
|
| #include "nacl_io/ossocket.h"
|
| #ifdef PROVIDES_SOCKET_API
|
|
|
| +#include <assert.h>
|
| #include <errno.h>
|
| #include <string.h>
|
| #include <algorithm>
|
|
|
| +#include "nacl_io/dbgprint.h"
|
| +#include "nacl_io/kernel_handle.h"
|
| #include "nacl_io/mount_node_tcp.h"
|
| #include "nacl_io/mount_stream.h"
|
| #include "nacl_io/pepper_interface.h"
|
| @@ -57,7 +60,7 @@ class TCPSendWork : public TCPWork {
|
|
|
| // If not currently sending...
|
| if (!stream->TestStreamFlags(SSF_SENDING)) {
|
| - size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable();
|
| + size_t tx_data_avail = emitter_->BytesInOutputFIFO();
|
| int capped_len = std::min(tx_data_avail, kMaxPacketSize);
|
|
|
| if (capped_len == 0)
|
| @@ -113,7 +116,7 @@ class TCPRecvWork : public TCPWork {
|
|
|
| // If we are not currently receiving
|
| if (!stream->TestStreamFlags(SSF_RECVING)) {
|
| - size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable();
|
| + size_t rx_space_avail = emitter_->SpaceInInputFIFO();
|
| int capped_len =
|
| static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
|
|
|
| @@ -152,6 +155,117 @@ class TCPRecvWork : public TCPWork {
|
| }
|
| };
|
|
|
| +class TCPAcceptWork : public MountStream::Work {
|
| + public:
|
| + explicit TCPAcceptWork(MountStream* stream,
|
| + const ScopedEventEmitterTCP& emitter)
|
| + : MountStream::Work(stream),
|
| + emitter_(emitter) {}
|
| +
|
| + TCPSocketInterface* TCPInterface() {
|
| + return mount()->ppapi()->GetTCPSocketInterface();
|
| + }
|
| +
|
| + virtual bool Start(int32_t val) {
|
| + AUTO_LOCK(emitter_->GetLock());
|
| + MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
|
| +
|
| + // Does the stream exist, and can it accept?
|
| + if (NULL == node)
|
| + return false;
|
| +
|
| + // If we are not currently accepting
|
| + if (!node->TestStreamFlags(SSF_LISTENING))
|
| + return false;
|
| +
|
| + int err = TCPInterface()->Accept(node->socket_resource(),
|
| + &new_socket_,
|
| + mount()->GetRunCompletion(this));
|
| +
|
| + if (err != PP_OK_COMPLETIONPENDING)
|
| + // Anything else, we should assume the socket has gone bad.
|
| + node->SetError_Locked(err);
|
| +
|
| + return true;
|
| + }
|
| +
|
| + virtual void Run(int32_t error) {
|
| + AUTO_LOCK(emitter_->GetLock());
|
| + MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
|
| +
|
| + //dbgprintf("accept completed\n");
|
| +
|
| + if (node == NULL)
|
| + return;
|
| +
|
| + if (error != PP_OK) {
|
| + //dbgprintf("accept error\n");
|
| + node->SetError_Locked(error);
|
| + return;
|
| + }
|
| +
|
| + emitter_->SetAcceptedSocket_Locked(new_socket_);
|
| + }
|
| +
|
| + protected:
|
| + PP_Resource new_socket_;
|
| + ScopedEventEmitterTCP emitter_;
|
| +};
|
| +
|
| +class TCPConnectWork : public MountStream::Work {
|
| + public:
|
| + explicit TCPConnectWork(MountStream* stream,
|
| + const ScopedEventEmitterTCP& emitter)
|
| + : MountStream::Work(stream),
|
| + emitter_(emitter) {}
|
| +
|
| + TCPSocketInterface* TCPInterface() {
|
| + return mount()->ppapi()->GetTCPSocketInterface();
|
| + }
|
| +
|
| + virtual bool Start(int32_t val) {
|
| + //dbgprintf("connect start 1\n");
|
| + AUTO_LOCK(emitter_->GetLock());
|
| + MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
|
| +
|
| + // Does the stream exist, and can it connect?
|
| + if (NULL == node)
|
| + return false;
|
| +
|
| + //dbgprintf("connect start 2\n");
|
| + int err = TCPInterface()->Connect(node->socket_resource(),
|
| + node->remote_addr(),
|
| + mount()->GetRunCompletion(this));
|
| +
|
| + if (err != PP_OK_COMPLETIONPENDING)
|
| + // Anything else, we should assume the socket has gone bad.
|
| + node->SetError_Locked(err);
|
| +
|
| + return true;
|
| + }
|
| +
|
| + virtual void Run(int32_t error) {
|
| + AUTO_LOCK(emitter_->GetLock());
|
| + MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
|
| +
|
| + //dbgprintf("connect completed\n");
|
| + if (node == NULL)
|
| + return;
|
| +
|
| + if (error != PP_OK) {
|
| + //dbgprintf("connect error\n");
|
| + node->ConnectFailed_Locked();
|
| + node->SetError_Locked(error);
|
| + return;
|
| + }
|
| +
|
| + node->ConnectDone_Locked();
|
| + }
|
| +
|
| + protected:
|
| + ScopedEventEmitterTCP emitter_;
|
| +};
|
| +
|
| MountNodeTCP::MountNodeTCP(Mount* mount)
|
| : MountNodeSocket(mount),
|
| emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) {
|
| @@ -177,12 +291,14 @@ Error MountNodeTCP::Init(int open_flags) {
|
| if (TCPInterface() == NULL)
|
| return EACCES;
|
|
|
| + SetStreamFlags(SSF_CAN_CONNECT);
|
| +
|
| if (socket_resource_ != 0) {
|
| // TCP sockets that are contructed with an existing socket_resource_
|
| // are those that generated from calls to Accept() and therefore are
|
| // already connected.
|
| remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
|
| - ConnectDone();
|
| + ConnectDone_Locked();
|
| } else {
|
| socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
|
| if (0 == socket_resource_)
|
| @@ -192,34 +308,64 @@ Error MountNodeTCP::Init(int open_flags) {
|
| return 0;
|
| }
|
|
|
| -EventEmitterTCP* MountNodeTCP::GetEventEmitter() {
|
| +EventEmitter* MountNodeTCP::GetEventEmitter() {
|
| return emitter_.get();
|
| }
|
|
|
| +void MountNodeTCP::QueueAccept() {
|
| + MountStream::Work* work = new TCPAcceptWork(mount_stream(), emitter_);
|
| + mount_stream()->EnqueueWork(work);
|
| +}
|
| +
|
| +void MountNodeTCP::QueueConnect() {
|
| + MountStream::Work* work = new TCPConnectWork(mount_stream(), emitter_);
|
| + mount_stream()->EnqueueWork(work);
|
| +}
|
| +
|
| void MountNodeTCP::QueueInput() {
|
| - TCPRecvWork* work = new TCPRecvWork(emitter_);
|
| + MountStream::Work* work = new TCPRecvWork(emitter_);
|
| mount_stream()->EnqueueWork(work);
|
| }
|
|
|
| void MountNodeTCP::QueueOutput() {
|
| - TCPSendWork* work = new TCPSendWork(emitter_);
|
| + MountStream::Work* work = new TCPSendWork(emitter_);
|
| mount_stream()->EnqueueWork(work);
|
| }
|
|
|
| -Error MountNodeTCP::Accept(PP_Resource* out_sock,
|
| +Error MountNodeTCP::Accept(const HandleAttr& attr,
|
| + PP_Resource* out_sock,
|
| struct sockaddr* addr,
|
| socklen_t* len) {
|
| - AUTO_LOCK(node_lock_);
|
| - int err = TCPInterface()->Accept(socket_resource_,
|
| - out_sock,
|
| - PP_BlockUntilComplete());
|
| + EventListenerLock wait(GetEventEmitter());
|
|
|
| - if (err != PP_OK)
|
| - return PPErrorToErrno(err);
|
| + if (!TestStreamFlags(SSF_LISTENING))
|
| + return EINVAL;
|
| +
|
| + // Either block forever or not at all
|
| + int ms = attr.IsBlocking() ? -1 : 0;
|
| +
|
| + //dbgprintf("Accept: Waiting: %d\n", ms);
|
| + Error err = wait.WaitOnEvent(POLLIN, ms);
|
| + //dbgprintf("Accept: wait done: %d\n", (int)err);
|
| +
|
| + if (ETIMEDOUT == err)
|
| + return EWOULDBLOCK;
|
| +
|
| + int s = emitter_->GetAcceptedSocket_Locked();
|
| + // Non-blocking case.
|
| + if (s == 0)
|
| + return EAGAIN;
|
|
|
| + // Consume the new socket and start listening for the next one
|
| + *out_sock = s;
|
| + emitter_->ClearEvents_Locked(POLLIN);
|
| +
|
| + // Set the out paramaters
|
| PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
|
| *len = ResourceToSockAddr(remote_addr, *len, addr);
|
| mount_->ppapi()->ReleaseResource(remote_addr);
|
| +
|
| + QueueAccept();
|
| return 0;
|
| }
|
|
|
| @@ -243,54 +389,87 @@ Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
|
| if (err != PP_OK) {
|
| mount_->ppapi()->ReleaseResource(local_addr_);
|
| local_addr_ = 0;
|
| + //dbgprintf("Bind error: %d\n", err);
|
| return PPErrorToErrno(err);
|
| }
|
|
|
| return 0;
|
| }
|
|
|
| -Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
|
| - AUTO_LOCK(node_lock_);
|
| +Error MountNodeTCP::Connect(const HandleAttr& attr,
|
| + const struct sockaddr* addr,
|
| + socklen_t len) {
|
| + EventListenerLock wait(GetEventEmitter());
|
| + //dbgprintf("Connect\n");
|
|
|
| - if (remote_addr_ != 0)
|
| + if (TestStreamFlags(SSF_CONNECTING))
|
| + return EALREADY;
|
| +
|
| + if (remote_addr_ != 0) {
|
| + //dbgprintf("Already connected: %d\n", EISCONN);
|
| return EISCONN;
|
| + }
|
|
|
| remote_addr_ = SockAddrToResource(addr, len);
|
| if (0 == remote_addr_)
|
| return EINVAL;
|
|
|
| - int err = TCPInterface()->Connect(socket_resource_,
|
| - remote_addr_,
|
| - PP_BlockUntilComplete());
|
| + //dbgprintf("Connect blocking %d flags=%d\n", attr.IsBlocking(), attr.flags);
|
| + int ms = attr.IsBlocking() ? -1 : 0;
|
| +
|
| + SetStreamFlags(SSF_CONNECTING);
|
| + QueueConnect();
|
| +
|
| + //dbgprintf("Waiting on connect: %d\n", ms);
|
| + Error err = wait.WaitOnEvent(POLLOUT, ms);
|
| + //dbgprintf("connect: done waiting: %d\n", (int)err);
|
| +
|
| + if (ETIMEDOUT == err)
|
| + return EINPROGRESS;
|
|
|
| // If we fail, release the dest addr resource
|
| - if (err != PP_OK) {
|
| - mount_->ppapi()->ReleaseResource(remote_addr_);
|
| - remote_addr_ = 0;
|
| - return PPErrorToErrno(err);
|
| + if (err != 0) {
|
| + ConnectFailed_Locked();
|
| + return err;
|
| }
|
|
|
| - ConnectDone();
|
| + ConnectDone_Locked();
|
| return 0;
|
| }
|
|
|
| -void MountNodeTCP::ConnectDone() {
|
| +void MountNodeTCP::ConnectDone_Locked() {
|
| local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
|
|
|
| // Now that we are connected, we can start sending and receiving.
|
| + ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
|
| SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
|
|
|
| + emitter_->ConnectDone();
|
| +
|
| // Begin the input pump
|
| QueueInput();
|
| }
|
|
|
| +void MountNodeTCP::ConnectFailed_Locked() {
|
| + mount_->ppapi()->ReleaseResource(remote_addr_);
|
| + remote_addr_ = 0;
|
| +}
|
| +
|
| Error MountNodeTCP::Listen(int backlog) {
|
| + AUTO_LOCK(node_lock_);
|
| + if (0 == local_addr_)
|
| + return EINVAL;
|
| +
|
| int err = TCPInterface()->Listen(socket_resource_,
|
| backlog,
|
| PP_BlockUntilComplete());
|
| if (err != PP_OK)
|
| return PPErrorToErrno(err);
|
|
|
| + //dbgprintf("listening\n");
|
| + ClearStreamFlags(SSF_CAN_CONNECT);
|
| + SetStreamFlags(SSF_LISTENING);
|
| + QueueAccept();
|
| return 0;
|
| }
|
|
|
| @@ -298,6 +477,7 @@ Error MountNodeTCP::Recv_Locked(void* buf,
|
| size_t len,
|
| PP_Resource* out_addr,
|
| int* out_len) {
|
| + assert(emitter_.get());
|
| *out_len = emitter_->ReadIn_Locked((char*)buf, len);
|
| *out_addr = remote_addr_;
|
|
|
| @@ -311,6 +491,7 @@ Error MountNodeTCP::Send_Locked(const void* buf,
|
| size_t len,
|
| PP_Resource,
|
| int* out_len) {
|
| + assert(emitter_.get());
|
| *out_len = emitter_->WriteOut_Locked((char*)buf, len);
|
| return 0;
|
| }
|
|
|