 Chromium Code Reviews
 Chromium Code Reviews Issue 23498015:
  [NaCl SDK] Support non blocking TCP/UDP  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src
    
  
    Issue 23498015:
  [NaCl SDK] Support non blocking TCP/UDP  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src| Index: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc | 
| diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc | 
| index 5a5f9f12e150ff4125c0470962381325633de58b..91ffa94a15a38f6a30db4a50cd00615af1ae05fc 100644 | 
| --- a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc | 
| +++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc | 
| @@ -10,34 +10,151 @@ | 
| #include <string.h> | 
| #include <algorithm> | 
| -#include "nacl_io/mount.h" | 
| -#include "nacl_io/mount_node_socket.h" | 
| #include "nacl_io/mount_node_tcp.h" | 
| +#include "nacl_io/mount_stream.h" | 
| #include "nacl_io/pepper_interface.h" | 
| +namespace { | 
| + size_t kMaxPacketSize = 65536; | 
| 
binji
2013/09/15 22:18:58
const
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + size_t kDefaultFifoSize = kMaxPacketSize * 8; | 
| +} | 
| + | 
| namespace nacl_io { | 
| -MountNodeTCP::MountNodeTCP(Mount* mount) : MountNodeSocket(mount) {} | 
| +class TCPWork : public MountStreamWork { | 
| + public: | 
| + TCPWork(EventEmitterTCP* emitter) | 
| 
binji
2013/09/15 22:18:58
explicit
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + : MountStreamWork(emitter->stream()->mount_stream()), | 
| + emitter_(emitter) {} | 
| + | 
| + TCPSocketInterface* TCPInterface() { | 
| + return mount()->ppapi()->GetTCPSocketInterface(); | 
| + } | 
| + | 
| + protected: | 
| + ScopedEventEmitterTCP emitter_; | 
| +}; | 
| + | 
| + | 
| +class TCPSendWork : public TCPWork { | 
| + public: | 
| + TCPSendWork(EventEmitterTCP* emitter) : TCPWork(emitter) {} | 
| 
binji
2013/09/15 22:18:58
explicit
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + | 
| + virtual bool Start(int32_t val) { | 
| + AUTO_LOCK(emitter_->GetLock()); | 
| + MountNodeTCP* stream = reinterpret_cast<MountNodeTCP*>(emitter_->stream()); | 
| 
binji
2013/09/15 22:18:58
static_cast (though unnecessary if you use covaria
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + // If the stream is valid, and we are not currently sending | 
| + if (stream && ((stream->GetStreamFlags() & SSF_SENDING) == 0)) { | 
| + size_t tx_data_avail = emitter_->tx_fifo()->ReadAvailable(); | 
| + int capped_len = | 
| + static_cast<int32_t>(std::min(tx_data_avail, kMaxPacketSize)); | 
| + if (capped_len == 0) | 
| + return false; | 
| + | 
| + char* data = new char[capped_len]; | 
| + emitter_->ReadTXBytes_Locked(data, capped_len); | 
| + | 
| + stream->SetStreamFlags(SSF_SENDING); | 
| + int err = TCPInterface()->Write(stream->socket_resource(), | 
| + data, | 
| + capped_len, | 
| + mount()->GetRunCompletion(this)); | 
| + delete data; | 
| 
binji
2013/09/15 22:18:58
Deleting the data sent to an async call?
 
noelallen1
2013/09/17 21:21:54
Consolidated into base class.
 | 
| + if (err == PP_OK_COMPLETIONPENDING) | 
| 
binji
2013/09/15 22:18:58
any way to return this error in case it is not PP_
 
noelallen1
2013/09/17 21:21:54
Will log once we officially have logging.
 | 
| + return true; | 
| + | 
| + stream->ClearStreamFlags(SSF_SENDING); | 
| + } | 
| + return false; | 
| + } | 
| + | 
| + virtual void Run(int32_t val) { | 
| 
binji
2013/09/15 22:18:58
Run is a strange name for this; it isn't meant to
 
noelallen1
2013/09/17 21:21:54
MountStream::GetOnCompletedCompletion?
 | 
| + AUTO_LOCK(emitter_->GetLock()); | 
| + MountNodeTCP* stream = reinterpret_cast<MountNodeTCP*>(emitter_->stream()); | 
| 
binji
2013/09/15 22:18:58
static_cast
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + | 
| + // If the stream is still there, see if we can queue more output | 
| + if (stream) { | 
| + stream->ClearStreamFlags(SSF_SENDING); | 
| + stream->QueueOutput(); | 
| + } | 
| + } | 
| +}; | 
| + | 
| +class TCPRecvWork : public TCPWork { | 
| + public: | 
| + TCPRecvWork(EventEmitterTCP* emitter) : TCPWork(emitter), data_(NULL) {} | 
| 
binji
2013/09/15 22:18:58
explicit
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + | 
| + ~TCPRecvWork() { | 
| + delete[] data_; | 
| + } | 
| + | 
| + virtual bool Start(int32_t val) { | 
| + AUTO_LOCK(emitter_->GetLock()); | 
| + MountNodeTCP* stream = reinterpret_cast<MountNodeTCP*>(emitter_->stream()); | 
| 
binji
2013/09/15 22:18:58
static_cast
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + | 
| + size_t rx_space_avail = emitter_->rx_fifo()->WriteAvailable(); | 
| + int capped_len = | 
| + static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize)); | 
| + if (NULL == stream || (stream->GetStreamFlags() & SSF_RECVING) == 0) | 
| + return false; | 
| -TCPSocketInterface* MountNodeTCP::TCPSocket() { | 
| - if (mount_->ppapi() == NULL) | 
| - return NULL; | 
| + if (capped_len <= 0) | 
| + return false; | 
| - return mount_->ppapi()->GetTCPSocketInterface(); | 
| + stream->SetStreamFlags(SSF_RECVING); | 
| + data_ = new char[capped_len]; | 
| + int err = TCPInterface()->Read(stream->socket_resource(), | 
| + data_, | 
| 
binji
2013/09/15 22:18:58
nit: align params
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + capped_len, | 
| + mount()->GetRunCompletion(this)); | 
| + if (err == PP_OK_COMPLETIONPENDING) | 
| + return true; | 
| + | 
| + stream->ClearStreamFlags(SSF_RECVING); | 
| + return false; | 
| + } | 
| + | 
| + virtual void Run(int32_t val) { | 
| + AUTO_LOCK(emitter_->GetLock()); | 
| + MountNodeTCP* stream = | 
| + reinterpret_cast<MountNodeTCP*>(emitter_->stream()); | 
| 
binji
2013/09/15 22:18:58
static_cast
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + | 
| + if (NULL == stream) | 
| + return; | 
| + | 
| + // If the stream is still there, see if we can queue more input | 
| + if (val > 0) | 
| 
binji
2013/09/15 22:18:58
better name for val? (bytes_read or something?)
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + emitter_->WriteRXBytes_Locked(data_, val); | 
| + | 
| + stream->ClearStreamFlags(SSF_RECVING); | 
| + stream->QueueInput(); | 
| + } | 
| + | 
| + private: | 
| + char* data_; | 
| 
binji
2013/09/15 22:18:58
std::vector
 | 
| +}; | 
| + | 
| + | 
| +MountNodeTCP::MountNodeTCP(Mount* mount) | 
| + : MountNodeSocket(mount) { | 
| } | 
| Error MountNodeTCP::Init(int flags) { | 
| - if (TCPSocket() == NULL) | 
| + if (TCPInterface() == NULL) | 
| return EACCES; | 
| - socket_resource_ = TCPSocket()->Create(mount_->ppapi()->GetInstance()); | 
| + socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance()); | 
| if (0 == socket_resource_) | 
| return EACCES; | 
| return 0; | 
| } | 
| +EventEmitter* MountNodeTCP::GetEventEmitter() { | 
| + return NULL; | 
| 
binji
2013/09/15 22:18:58
Why no emitter?
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| +} | 
| + | 
| 
binji
2013/09/15 22:18:58
Looks like TCP is not yet implemented?
 
noelallen1
2013/09/17 21:21:54
Bind is not allowed on client sockets.  There's no
 
binji
2013/09/19 00:48:54
Sorry, I wasn't talking about Bind, I meant non-bl
 
noelallen1
2013/09/19 21:29:27
Ah I see, fixed.
 | 
| Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) { | 
| AUTO_LOCK(node_lock_); | 
| @@ -65,9 +182,9 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { | 
| if (0 == remote_addr_) | 
| return EINVAL; | 
| - int err = TCPSocket()->Connect(socket_resource_, | 
| - remote_addr_, | 
| - PP_BlockUntilComplete()); | 
| + int err = TCPInterface()->Connect(socket_resource_, | 
| + remote_addr_, | 
| + PP_BlockUntilComplete()); | 
| // If we fail, release the dest addr resource | 
| if (err != PP_OK) { | 
| @@ -76,7 +193,7 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { | 
| return PPErrorToErrno(err); | 
| } | 
| - local_addr_ = TCPSocket()->GetLocalAddress(socket_resource_); | 
| + local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_); | 
| mount_->ppapi()->AddRefResource(local_addr_); | 
| return 0; | 
| } | 
| @@ -87,10 +204,10 @@ Error MountNodeTCP::Recv(void* buf, size_t len, int flags, int* out_len) { | 
| return EBADF; | 
| int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); | 
| - int err = TCPSocket()->Read(socket_resource_, | 
| - static_cast<char*>(buf), | 
| - capped_len, | 
| - PP_BlockUntilComplete()); | 
| + int err = TCPInterface()->Read(socket_resource_, | 
| + static_cast<char*>(buf), | 
| + capped_len, | 
| + PP_BlockUntilComplete()); | 
| if (err < 0) | 
| return PPErrorToErrno(err); | 
| @@ -121,10 +238,10 @@ Error MountNodeTCP::Send(const void* buf, size_t len, int flags, int* out_len) { | 
| return ENOTCONN; | 
| int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); | 
| - int err = TCPSocket()->Write(socket_resource_, | 
| - static_cast<const char*>(buf), | 
| - capped_len, | 
| - PP_BlockUntilComplete()); | 
| + int err = TCPInterface()->Write(socket_resource_, | 
| + static_cast<const char*>(buf), | 
| + capped_len, | 
| + PP_BlockUntilComplete()); | 
| if (err < 0) | 
| return PPErrorToErrno(err); |