Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(200)

Unified Diff: native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Merge Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
index 310a2bdf06581cbfb87039872211109e1af1246d..acf1f61d52be155cd195ab7aa7f823303b06d0f6 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
@@ -3,41 +3,176 @@
// found in the LICENSE file.
-#include "nacl_io/ossocket.h"
-#ifdef PROVIDES_SOCKET_API
+#include "nacl_io/mount_node_udp.h"
#include <errno.h>
#include <string.h>
+#include <sys/fcntl.h>
+
#include <algorithm>
-#include "nacl_io/mount.h"
-#include "nacl_io/mount_node_socket.h"
-#include "nacl_io/mount_node_udp.h"
+#include "nacl_io/event_emitter_udp.h"
+#include "nacl_io/mount_stream.h"
+#include "nacl_io/packet.h"
#include "nacl_io/pepper_interface.h"
+namespace {
+ const size_t kMaxPacketSize = 65536;
+ const size_t kDefaultFifoSize = kMaxPacketSize * 8;
+}
+
namespace nacl_io {
-MountNodeUDP::MountNodeUDP(Mount* mount) : MountNodeSocket(mount) {}
+class UDPWork : public MountStream::Work {
+ public:
+ explicit UDPWork(const ScopedEventEmitterUDP& emitter)
+ : MountStream::Work(emitter->stream()->mount_stream()),
+ emitter_(emitter),
+ packet_(NULL) {
+ }
+
+ ~UDPWork() {
+ delete packet_;
+ }
+
+ UDPSocketInterface* UDPInterface() {
+ return mount()->ppapi()->GetUDPSocketInterface();
+ }
+
+ protected:
+ ScopedEventEmitterUDP emitter_;
+ Packet* packet_;
+};
+
+
+class UDPSendWork : public UDPWork {
+ public:
+ explicit UDPSendWork(const ScopedEventEmitterUDP& emitter)
+ : UDPWork(emitter) {}
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is valid, and we are not currently sending
+ if (stream && ((stream->GetStreamFlags() & SSF_SENDING) == 0)) {
+ packet_ = emitter_->ReadTXPacket_Locked();
+ if (packet_) {
+ stream->SetStreamFlags(SSF_SENDING);
+ int err = UDPInterface()->SendTo(stream->socket_resource(),
+ packet_->buffer(),
+ packet_->len(),
+ packet_->addr(),
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+ stream->ClearStreamFlags(SSF_SENDING);
+ }
+ }
+ return false;
+ }
+
+ virtual void Run(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is still there, see if we can queue more output
+ if (stream) {
+ stream->ClearStreamFlags(SSF_SENDING);
+ stream->QueueOutput();
+ }
+ }
+};
+
+class UDPRecvWork : public UDPWork {
+ public:
+ explicit UDPRecvWork(const ScopedEventEmitterUDP& emitter)
+ : UDPWork(emitter) {
+ data_ = new char[kMaxPacketSize];
+ }
-UDPSocketInterface* MountNodeUDP::UDPSocket() {
- if (mount_->ppapi() == NULL)
- return NULL;
+ ~UDPRecvWork() {
+ delete[] data_;
+ }
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is valid and we are not currently receiving
+ if (stream && ((stream->GetStreamFlags() & SSF_RECVING) == 0)) {
+ stream->SetStreamFlags(SSF_RECVING);
+ int err = UDPInterface()->RecvFrom(stream->socket_resource(),
+ data_,
+ kMaxPacketSize,
+ &addr_,
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+ }
+ stream->ClearStreamFlags(SSF_RECVING);
binji 2013/09/19 00:48:55 move inside for failure case
noelallen1 2013/09/19 21:29:27 Done.
+ return false;
+ }
- return mount_->ppapi()->GetUDPSocketInterface();
+ virtual void Run(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is still there, see if we can queue more input
+ if (stream) {
+ if (val > 0) {
+ Packet* packet = new Packet(mount()->ppapi());
+ packet->Copy(data_, val, addr_);
+ emitter_->WriteRXPacket_Locked(packet);
+ }
+ stream->ClearStreamFlags(SSF_RECVING);
+ stream->QueueInput();
+ }
+ }
+
+ private:
+ char* data_;
+ PP_Resource addr_;
+};
+
+
+MountNodeUDP::MountNodeUDP(Mount* mount)
+ : MountNodeSocket(mount),
+ emitter_(new EventEmitterUDP(kDefaultFifoSize, kDefaultFifoSize)) {
+ emitter_->AttachStream(this);
+}
+
+void MountNodeUDP::Destroy() {
+ emitter_->DetachStream();
+ MountNodeSocket::Destroy();
+}
+
+EventEmitterUDP* MountNodeUDP::GetEventEmitter() {
+ return emitter_.get();
}
Error MountNodeUDP::Init(int flags) {
- if (UDPSocket() == NULL)
+ if (UDPInterface() == NULL)
return EACCES;
- socket_resource_ = UDPSocket()->Create(mount_->ppapi()->GetInstance());
+ socket_resource_ = UDPInterface()->Create(mount_->ppapi()->GetInstance());
if (0 == socket_resource_)
return EACCES;
return 0;
}
+void MountNodeUDP::QueueInput() {
+ UDPRecvWork* work = new UDPRecvWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
+void MountNodeUDP::QueueOutput() {
+ UDPSendWork* work = new UDPSendWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
if (0 == socket_resource_)
return EBADF;
@@ -50,9 +185,10 @@ Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
if (0 == out_addr)
return EINVAL;
- int err = UDPSocket()->Bind(socket_resource_,
- out_addr,
- PP_BlockUntilComplete());
+ int err = UDPInterface()->Bind(socket_resource_,
+ out_addr,
+ PP_BlockUntilComplete());
+ QueueInput();
if (err != 0) {
mount_->ppapi()->ReleaseResource(out_addr);
return PPErrorToErrno(err);
@@ -87,19 +223,65 @@ Error MountNodeUDP::RecvFromHelper(void* buf,
if (0 == socket_resource_)
return EBADF;
+ int ms = (GetMode() & O_NONBLOCK) ? 0 : read_timeout_;
+
+ EventListenerLock wait(GetEventEmitter());
+ Error err = wait.WaitOnEvent(POLLIN, ms);
+ if (err)
+ return err;
+
+ Packet* packet = emitter_->ReadRXPacket_Locked();
+
+ *out_len = 0;
+ *out_addr = 0;
+ if (packet) {
+ int capped_len =
+ static_cast<int32_t>(std::min<int>(len, packet->len()));
+ memcpy(buf, packet->buffer(), capped_len);
+
+ if (packet->addr() != 0) {
+ mount_->ppapi()->AddRefResource(packet->addr());
+ *out_addr = packet->addr();
+ }
+ *out_len = capped_len;
+
+ delete packet;
+ QueueInput();
+ }
+ return 0;
+}
+
+Error MountNodeUDP::SendToHelper(const void* buf,
+ size_t len,
+ int flags,
+ PP_Resource addr,
+ int* out_len) {
+ *out_len = 0;
+ if (0 == socket_resource_)
+ return EBADF;
+
+ if (0 == addr)
+ return ENOTCONN;
+
int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = UDPSocket()->RecvFrom(socket_resource_,
- static_cast<char*>(buf),
- capped_len,
- out_addr,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
+ int ms = (GetMode() & O_NONBLOCK) ? 0 : write_timeout_;
- *out_len = err;
+ EventListenerLock wait(GetEventEmitter());
+ Error err = wait.WaitOnEvent(POLLOUT, ms);
+ if (err)
+ return err;
+
+ Packet* packet = new Packet(mount_->ppapi());
+ packet->Copy(buf, capped_len, addr);
+
+ emitter_->WriteTXPacket_Locked(packet);
+ QueueOutput();
+
+ *out_len = capped_len;
return 0;
}
+
Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) {
while (1) {
int local_len = 0;
@@ -109,8 +291,10 @@ Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) {
if (err < 0)
return PPErrorToErrno(err);
- /* If "connected" then only receive packets from the given remote. */
+ // If "connected" then only receive packets from the given remote.
bool same = IsEquivalentAddress(addr, remote_addr_);
binji 2013/09/19 22:40:25 This behavior is gone now? Is it necessary?
noelallen1 2013/09/20 00:51:27 Not really needed for first pass. BUG=295177
+
+ // Release references from RecvFromHelper
mount_->ppapi()->ReleaseResource(addr);
if (remote_addr_ != 0 && same)
@@ -139,31 +323,6 @@ Error MountNodeUDP::RecvFrom(void* buf,
return 0;
}
-
-Error MountNodeUDP::SendToHelper(const void* buf,
- size_t len,
- int flags,
- PP_Resource addr,
- int* out_len) {
- if (0 == socket_resource_)
- return EBADF;
-
- if (0 == addr)
- return ENOTCONN;
-
- int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = UDPSocket()->SendTo(socket_resource_,
- static_cast<const char*>(buf),
- capped_len,
- addr,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
-
- *out_len = err;
- return 0;
-}
-
Error MountNodeUDP::Send(const void* buf, size_t len, int flags, int* out_len) {
return SendToHelper(buf, len, flags, remote_addr_, out_len);
}
@@ -185,4 +344,3 @@ Error MountNodeUDP::SendTo(const void* buf,
} // namespace nacl_io
-#endif // PROVIDES_SOCKET_API

Powered by Google App Engine
This is Rietveld 408576698