Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(23)

Unified Diff: native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Added Fifo Tests Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
index 310a2bdf06581cbfb87039872211109e1af1246d..8e5fca0bf737a4212ec09393abc7a6d1a5658f62 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
@@ -3,41 +3,174 @@
// found in the LICENSE file.
-#include "nacl_io/ossocket.h"
-#ifdef PROVIDES_SOCKET_API
+#include "nacl_io/mount_node_udp.h"
#include <errno.h>
#include <string.h>
+#include <sys/fcntl.h>
+
#include <algorithm>
-#include "nacl_io/mount.h"
-#include "nacl_io/mount_node_socket.h"
-#include "nacl_io/mount_node_udp.h"
+#include "nacl_io/event_emitter_udp.h"
+#include "nacl_io/mount_stream.h"
#include "nacl_io/pepper_interface.h"
+namespace {
+ const size_t kMaxPacketSize = 65536;
+ const size_t kDefaultFifoSize = kMaxPacketSize * 8;
+}
+
namespace nacl_io {
-MountNodeUDP::MountNodeUDP(Mount* mount) : MountNodeSocket(mount) {}
+class UDPWork : public MountStreamWork {
+ public:
+ UDPWork(EventEmitterUDP* emitter)
binji 2013/09/15 22:18:58 many of the comments on TCP apply here too (explic
binji 2013/09/15 22:18:58 UDPWork(const ScopedEventEmitterUDP& emitter)
noelallen1 2013/09/17 21:21:54 Done.
+ : MountStreamWork(emitter->stream()->mount_stream()),
+ emitter_(emitter) {
+ }
+ UDPSocketInterface* UDPInterface() {
+ return mount()->ppapi()->GetUDPSocketInterface();
+ }
-UDPSocketInterface* MountNodeUDP::UDPSocket() {
- if (mount_->ppapi() == NULL)
- return NULL;
+ protected:
+ ScopedEventEmitterUDP emitter_;
+};
+
+
+class UDPSendWork : public UDPWork {
+ public:
+ UDPSendWork(EventEmitterUDP* emitter) : UDPWork(emitter) {}
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream =
+ reinterpret_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is valid, and we are not currently sending
+ if (stream && ((stream->GetStreamFlags() & SSF_SENDING) == 0)) {
+ Packet* packet = emitter_->ReadTXPacket_Locked();
+ if (packet) {
+ stream->SetStreamFlags(SSF_SENDING);
+ int err = UDPInterface()->SendTo(stream->socket_resource(),
+ packet->buffer_.data(),
+ packet->buffer_.size(),
+ packet->addr_,
+ mount()->GetRunCompletion(this));
+
+ mount()->ppapi()->ReleaseResource(packet->addr_);
binji 2013/09/15 22:18:58 postpone cleanup until after SendTo completes?
noelallen1 2013/09/17 21:21:54 Moved referencing to Packet class.
+ delete packet;
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+ stream->ClearStreamFlags(SSF_SENDING);
+ }
+ }
+ return false;
+ }
- return mount_->ppapi()->GetUDPSocketInterface();
+ virtual void Run(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream =
+ reinterpret_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is still there, see if we can queue more output
+ if (stream) {
+ stream->ClearStreamFlags(SSF_SENDING);
+ stream->QueueOutput();
+ }
+ }
+};
+
+
+class UDPRecvWork : public UDPWork {
+ public:
+ UDPRecvWork(EventEmitterUDP* emitter) : UDPWork(emitter) {
+ data_ = new char[kMaxPacketSize];
+ }
+
+ ~UDPRecvWork() {
+ delete[] data_;
+ }
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream =
+ reinterpret_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is valid and we are not currently receiving
+ if (stream && ((stream->GetStreamFlags() & SSF_RECVING) == 0)) {
+ stream->SetStreamFlags(SSF_RECVING);
+ int err = UDPInterface()->RecvFrom(stream->socket_resource(),
+ data_,
+ kMaxPacketSize,
+ &addr_,
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+ }
+ stream->ClearStreamFlags(SSF_RECVING);
+ return false;
+ }
+
+ virtual void Run(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream =
+ reinterpret_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is still there, see if we can queue more input
+ if (stream) {
+ if (val > 0) {
+ mount()->ppapi()->AddRefResource(addr_);
+ Packet* packet = new Packet(data_, val, addr_);
+ emitter_->WriteRXPacket_Locked(packet);
+ }
+ stream->ClearStreamFlags(SSF_RECVING);
+ stream->QueueInput();
+ }
+ }
+
+ private:
+ char* data_;
binji 2013/09/15 22:18:58 std::vector
noelallen1 2013/09/17 21:21:54 Using char* allows me to "take" the buffer. Other
+ PP_Resource addr_;
+};
+
+
+MountNodeUDP::MountNodeUDP(Mount* mount)
+ : MountNodeSocket(mount),
+ emitter_(new EventEmitterUDP(kDefaultFifoSize, kDefaultFifoSize)) {
+ emitter_->AttachStream(this);
+}
+
+void MountNodeUDP::Destroy() {
+ emitter_->DetachStream();
+ MountNodeSocket::Destroy();
+}
+
+EventEmitter* MountNodeUDP::GetEventEmitter() {
+ return emitter_.get();
}
Error MountNodeUDP::Init(int flags) {
- if (UDPSocket() == NULL)
+ if (UDPInterface() == NULL)
return EACCES;
- socket_resource_ = UDPSocket()->Create(mount_->ppapi()->GetInstance());
+ socket_resource_ = UDPInterface()->Create(mount_->ppapi()->GetInstance());
if (0 == socket_resource_)
return EACCES;
return 0;
}
+void MountNodeUDP::QueueInput() {
+ UDPRecvWork* work = new UDPRecvWork(emitter_.get());
binji 2013/09/15 22:18:58 pass scoped emitter directly instead of raw pointe
noelallen1 2013/09/17 21:21:54 Done.
+ mount_stream()->EnqueueWork(work);
+}
+
+void MountNodeUDP::QueueOutput() {
+ UDPSendWork* work = new UDPSendWork(emitter_.get());
+ mount_stream()->EnqueueWork(work);
+}
+
Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
if (0 == socket_resource_)
return EBADF;
@@ -50,9 +183,10 @@ Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
if (0 == out_addr)
return EINVAL;
- int err = UDPSocket()->Bind(socket_resource_,
- out_addr,
- PP_BlockUntilComplete());
+ int err = UDPInterface()->Bind(socket_resource_,
+ out_addr,
+ PP_BlockUntilComplete());
+ QueueInput();
binji 2013/09/15 22:18:58 why not queue input and output here?
noelallen1 2013/09/17 21:21:54 Output doesn't make sense. There's no data yet.
if (err != 0) {
mount_->ppapi()->ReleaseResource(out_addr);
return PPErrorToErrno(err);
@@ -87,19 +221,62 @@ Error MountNodeUDP::RecvFromHelper(void* buf,
if (0 == socket_resource_)
return EBADF;
+ int ms = (GetMode() & O_NONBLOCK) ? 0 : read_timeout_;
+
+ EventListenerLock wait(GetEventEmitter());
+ Error err = wait.WaitOnEvent(POLLIN, ms);
+ if (err)
+ return err;
+
+ Packet* packet = emitter_->ReadRXPacket_Locked();
+
+ *out_len = 0;
+ if (packet) {
+ int capped_len =
+ static_cast<int32_t>(std::min<int>(len, packet->buffer_.size()));
+ memcpy(buf, packet->buffer_.data(), capped_len);
+ *out_addr = packet->addr_;
+ *out_len = capped_len;
+ delete packet;
+
+ QueueInput();
binji 2013/09/15 22:18:58 The worker is already queuing up more work. Why he
noelallen1 2013/09/17 21:21:54 This is safer. Instead of making the requesting l
+ }
+ return 0;
+}
+
+Error MountNodeUDP::SendToHelper(const void* buf,
+ size_t len,
+ int flags,
+ PP_Resource addr,
+ int* out_len) {
+ *out_len = 0;
+ if (0 == socket_resource_)
+ return EBADF;
+
+ if (0 == addr)
+ return ENOTCONN;
+
int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = UDPSocket()->RecvFrom(socket_resource_,
- static_cast<char*>(buf),
- capped_len,
- out_addr,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
+ int ms = (GetMode() & O_NONBLOCK) ? 0 : read_timeout_;
binji 2013/09/15 22:18:58 write_timeout_?
noelallen1 2013/09/17 21:21:54 Done.
+
+ EventListenerLock wait(GetEventEmitter());
+ Error err = wait.WaitOnEvent(POLLOUT, ms);
+ if (err)
+ return err;
- *out_len = err;
+ mount_->ppapi()->AddRefResource(addr);
+ Packet* packet = new Packet(static_cast<const char *>(buf),
+ capped_len,
+ addr);
+
+ emitter_->WriteTXPacket_Locked(packet);
+ QueueOutput();
+
+ *out_len = capped_len;
return 0;
}
+
Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) {
while (1) {
int local_len = 0;
@@ -139,31 +316,6 @@ Error MountNodeUDP::RecvFrom(void* buf,
return 0;
}
-
-Error MountNodeUDP::SendToHelper(const void* buf,
- size_t len,
- int flags,
- PP_Resource addr,
- int* out_len) {
- if (0 == socket_resource_)
- return EBADF;
-
- if (0 == addr)
- return ENOTCONN;
-
- int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = UDPSocket()->SendTo(socket_resource_,
- static_cast<const char*>(buf),
- capped_len,
- addr,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
-
- *out_len = err;
- return 0;
-}
-
Error MountNodeUDP::Send(const void* buf, size_t len, int flags, int* out_len) {
return SendToHelper(buf, len, flags, remote_addr_, out_len);
}
@@ -185,4 +337,3 @@ Error MountNodeUDP::SendTo(const void* buf,
} // namespace nacl_io
-#endif // PROVIDES_SOCKET_API

Powered by Google App Engine
This is Rietveld 408576698