 Chromium Code Reviews
 Chromium Code Reviews Issue 23498015:
  [NaCl SDK] Support non blocking TCP/UDP  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src
    
  
    Issue 23498015:
  [NaCl SDK] Support non blocking TCP/UDP  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src| Index: native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc | 
| diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc | 
| index 310a2bdf06581cbfb87039872211109e1af1246d..8e5fca0bf737a4212ec09393abc7a6d1a5658f62 100644 | 
| --- a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc | 
| +++ b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc | 
| @@ -3,41 +3,174 @@ | 
| // found in the LICENSE file. | 
| -#include "nacl_io/ossocket.h" | 
| -#ifdef PROVIDES_SOCKET_API | 
| +#include "nacl_io/mount_node_udp.h" | 
| #include <errno.h> | 
| #include <string.h> | 
| +#include <sys/fcntl.h> | 
| + | 
| #include <algorithm> | 
| -#include "nacl_io/mount.h" | 
| -#include "nacl_io/mount_node_socket.h" | 
| -#include "nacl_io/mount_node_udp.h" | 
| +#include "nacl_io/event_emitter_udp.h" | 
| +#include "nacl_io/mount_stream.h" | 
| #include "nacl_io/pepper_interface.h" | 
| +namespace { | 
| + const size_t kMaxPacketSize = 65536; | 
| + const size_t kDefaultFifoSize = kMaxPacketSize * 8; | 
| +} | 
| + | 
| namespace nacl_io { | 
| -MountNodeUDP::MountNodeUDP(Mount* mount) : MountNodeSocket(mount) {} | 
| +class UDPWork : public MountStreamWork { | 
| + public: | 
| + UDPWork(EventEmitterUDP* emitter) | 
| 
binji
2013/09/15 22:18:58
many of the comments on TCP apply here too (explic
 
binji
2013/09/15 22:18:58
UDPWork(const ScopedEventEmitterUDP& emitter)
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + : MountStreamWork(emitter->stream()->mount_stream()), | 
| + emitter_(emitter) { | 
| + } | 
| + UDPSocketInterface* UDPInterface() { | 
| + return mount()->ppapi()->GetUDPSocketInterface(); | 
| + } | 
| -UDPSocketInterface* MountNodeUDP::UDPSocket() { | 
| - if (mount_->ppapi() == NULL) | 
| - return NULL; | 
| + protected: | 
| + ScopedEventEmitterUDP emitter_; | 
| +}; | 
| + | 
| + | 
| +class UDPSendWork : public UDPWork { | 
| + public: | 
| + UDPSendWork(EventEmitterUDP* emitter) : UDPWork(emitter) {} | 
| + | 
| + virtual bool Start(int32_t val) { | 
| + AUTO_LOCK(emitter_->GetLock()); | 
| + MountNodeUDP* stream = | 
| + reinterpret_cast<MountNodeUDP*>(emitter_->stream()); | 
| + | 
| + // If the stream is valid, and we are not currently sending | 
| + if (stream && ((stream->GetStreamFlags() & SSF_SENDING) == 0)) { | 
| + Packet* packet = emitter_->ReadTXPacket_Locked(); | 
| + if (packet) { | 
| + stream->SetStreamFlags(SSF_SENDING); | 
| + int err = UDPInterface()->SendTo(stream->socket_resource(), | 
| + packet->buffer_.data(), | 
| + packet->buffer_.size(), | 
| + packet->addr_, | 
| + mount()->GetRunCompletion(this)); | 
| + | 
| + mount()->ppapi()->ReleaseResource(packet->addr_); | 
| 
binji
2013/09/15 22:18:58
postpone cleanup until after SendTo completes?
 
noelallen1
2013/09/17 21:21:54
Moved referencing to Packet class.
 | 
| + delete packet; | 
| + if (err == PP_OK_COMPLETIONPENDING) | 
| + return true; | 
| + stream->ClearStreamFlags(SSF_SENDING); | 
| + } | 
| + } | 
| + return false; | 
| + } | 
| - return mount_->ppapi()->GetUDPSocketInterface(); | 
| + virtual void Run(int32_t val) { | 
| + AUTO_LOCK(emitter_->GetLock()); | 
| + MountNodeUDP* stream = | 
| + reinterpret_cast<MountNodeUDP*>(emitter_->stream()); | 
| + | 
| + // If the stream is still there, see if we can queue more output | 
| + if (stream) { | 
| + stream->ClearStreamFlags(SSF_SENDING); | 
| + stream->QueueOutput(); | 
| + } | 
| + } | 
| +}; | 
| + | 
| + | 
| +class UDPRecvWork : public UDPWork { | 
| + public: | 
| + UDPRecvWork(EventEmitterUDP* emitter) : UDPWork(emitter) { | 
| + data_ = new char[kMaxPacketSize]; | 
| + } | 
| + | 
| + ~UDPRecvWork() { | 
| + delete[] data_; | 
| + } | 
| + | 
| + virtual bool Start(int32_t val) { | 
| + AUTO_LOCK(emitter_->GetLock()); | 
| + MountNodeUDP* stream = | 
| + reinterpret_cast<MountNodeUDP*>(emitter_->stream()); | 
| + | 
| + // If the stream is valid and we are not currently receiving | 
| + if (stream && ((stream->GetStreamFlags() & SSF_RECVING) == 0)) { | 
| + stream->SetStreamFlags(SSF_RECVING); | 
| + int err = UDPInterface()->RecvFrom(stream->socket_resource(), | 
| + data_, | 
| + kMaxPacketSize, | 
| + &addr_, | 
| + mount()->GetRunCompletion(this)); | 
| + if (err == PP_OK_COMPLETIONPENDING) | 
| + return true; | 
| + } | 
| + stream->ClearStreamFlags(SSF_RECVING); | 
| + return false; | 
| + } | 
| + | 
| + virtual void Run(int32_t val) { | 
| + AUTO_LOCK(emitter_->GetLock()); | 
| + MountNodeUDP* stream = | 
| + reinterpret_cast<MountNodeUDP*>(emitter_->stream()); | 
| + | 
| + // If the stream is still there, see if we can queue more input | 
| + if (stream) { | 
| + if (val > 0) { | 
| + mount()->ppapi()->AddRefResource(addr_); | 
| + Packet* packet = new Packet(data_, val, addr_); | 
| + emitter_->WriteRXPacket_Locked(packet); | 
| + } | 
| + stream->ClearStreamFlags(SSF_RECVING); | 
| + stream->QueueInput(); | 
| + } | 
| + } | 
| + | 
| + private: | 
| + char* data_; | 
| 
binji
2013/09/15 22:18:58
std::vector
 
noelallen1
2013/09/17 21:21:54
Using char* allows me to "take" the buffer.  Other
 | 
| + PP_Resource addr_; | 
| +}; | 
| + | 
| + | 
| +MountNodeUDP::MountNodeUDP(Mount* mount) | 
| + : MountNodeSocket(mount), | 
| + emitter_(new EventEmitterUDP(kDefaultFifoSize, kDefaultFifoSize)) { | 
| + emitter_->AttachStream(this); | 
| +} | 
| + | 
| +void MountNodeUDP::Destroy() { | 
| + emitter_->DetachStream(); | 
| + MountNodeSocket::Destroy(); | 
| +} | 
| + | 
| +EventEmitter* MountNodeUDP::GetEventEmitter() { | 
| + return emitter_.get(); | 
| } | 
| Error MountNodeUDP::Init(int flags) { | 
| - if (UDPSocket() == NULL) | 
| + if (UDPInterface() == NULL) | 
| return EACCES; | 
| - socket_resource_ = UDPSocket()->Create(mount_->ppapi()->GetInstance()); | 
| + socket_resource_ = UDPInterface()->Create(mount_->ppapi()->GetInstance()); | 
| if (0 == socket_resource_) | 
| return EACCES; | 
| return 0; | 
| } | 
| +void MountNodeUDP::QueueInput() { | 
| + UDPRecvWork* work = new UDPRecvWork(emitter_.get()); | 
| 
binji
2013/09/15 22:18:58
pass scoped emitter directly instead of raw pointe
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + mount_stream()->EnqueueWork(work); | 
| +} | 
| + | 
| +void MountNodeUDP::QueueOutput() { | 
| + UDPSendWork* work = new UDPSendWork(emitter_.get()); | 
| + mount_stream()->EnqueueWork(work); | 
| +} | 
| + | 
| Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) { | 
| if (0 == socket_resource_) | 
| return EBADF; | 
| @@ -50,9 +183,10 @@ Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) { | 
| if (0 == out_addr) | 
| return EINVAL; | 
| - int err = UDPSocket()->Bind(socket_resource_, | 
| - out_addr, | 
| - PP_BlockUntilComplete()); | 
| + int err = UDPInterface()->Bind(socket_resource_, | 
| + out_addr, | 
| + PP_BlockUntilComplete()); | 
| + QueueInput(); | 
| 
binji
2013/09/15 22:18:58
why not queue input and output here?
 
noelallen1
2013/09/17 21:21:54
Output doesn't make sense.  There's no data yet.
 | 
| if (err != 0) { | 
| mount_->ppapi()->ReleaseResource(out_addr); | 
| return PPErrorToErrno(err); | 
| @@ -87,19 +221,62 @@ Error MountNodeUDP::RecvFromHelper(void* buf, | 
| if (0 == socket_resource_) | 
| return EBADF; | 
| + int ms = (GetMode() & O_NONBLOCK) ? 0 : read_timeout_; | 
| + | 
| + EventListenerLock wait(GetEventEmitter()); | 
| + Error err = wait.WaitOnEvent(POLLIN, ms); | 
| + if (err) | 
| + return err; | 
| + | 
| + Packet* packet = emitter_->ReadRXPacket_Locked(); | 
| + | 
| + *out_len = 0; | 
| + if (packet) { | 
| + int capped_len = | 
| + static_cast<int32_t>(std::min<int>(len, packet->buffer_.size())); | 
| + memcpy(buf, packet->buffer_.data(), capped_len); | 
| + *out_addr = packet->addr_; | 
| + *out_len = capped_len; | 
| + delete packet; | 
| + | 
| + QueueInput(); | 
| 
binji
2013/09/15 22:18:58
The worker is already queuing up more work. Why he
 
noelallen1
2013/09/17 21:21:54
This is safer.  Instead of making the requesting l
 | 
| + } | 
| + return 0; | 
| +} | 
| + | 
| +Error MountNodeUDP::SendToHelper(const void* buf, | 
| + size_t len, | 
| + int flags, | 
| + PP_Resource addr, | 
| + int* out_len) { | 
| + *out_len = 0; | 
| + if (0 == socket_resource_) | 
| + return EBADF; | 
| + | 
| + if (0 == addr) | 
| + return ENOTCONN; | 
| + | 
| int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); | 
| - int err = UDPSocket()->RecvFrom(socket_resource_, | 
| - static_cast<char*>(buf), | 
| - capped_len, | 
| - out_addr, | 
| - PP_BlockUntilComplete()); | 
| - if (err < 0) | 
| - return PPErrorToErrno(err); | 
| + int ms = (GetMode() & O_NONBLOCK) ? 0 : read_timeout_; | 
| 
binji
2013/09/15 22:18:58
write_timeout_?
 
noelallen1
2013/09/17 21:21:54
Done.
 | 
| + | 
| + EventListenerLock wait(GetEventEmitter()); | 
| + Error err = wait.WaitOnEvent(POLLOUT, ms); | 
| + if (err) | 
| + return err; | 
| - *out_len = err; | 
| + mount_->ppapi()->AddRefResource(addr); | 
| + Packet* packet = new Packet(static_cast<const char *>(buf), | 
| + capped_len, | 
| + addr); | 
| + | 
| + emitter_->WriteTXPacket_Locked(packet); | 
| + QueueOutput(); | 
| + | 
| + *out_len = capped_len; | 
| return 0; | 
| } | 
| + | 
| Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) { | 
| while (1) { | 
| int local_len = 0; | 
| @@ -139,31 +316,6 @@ Error MountNodeUDP::RecvFrom(void* buf, | 
| return 0; | 
| } | 
| - | 
| -Error MountNodeUDP::SendToHelper(const void* buf, | 
| - size_t len, | 
| - int flags, | 
| - PP_Resource addr, | 
| - int* out_len) { | 
| - if (0 == socket_resource_) | 
| - return EBADF; | 
| - | 
| - if (0 == addr) | 
| - return ENOTCONN; | 
| - | 
| - int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); | 
| - int err = UDPSocket()->SendTo(socket_resource_, | 
| - static_cast<const char*>(buf), | 
| - capped_len, | 
| - addr, | 
| - PP_BlockUntilComplete()); | 
| - if (err < 0) | 
| - return PPErrorToErrno(err); | 
| - | 
| - *out_len = err; | 
| - return 0; | 
| -} | 
| - | 
| Error MountNodeUDP::Send(const void* buf, size_t len, int flags, int* out_len) { | 
| return SendToHelper(buf, len, flags, remote_addr_, out_len); | 
| } | 
| @@ -185,4 +337,3 @@ Error MountNodeUDP::SendTo(const void* buf, | 
| } // namespace nacl_io | 
| -#endif // PROVIDES_SOCKET_API |