Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(28)

Unified Diff: native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Merge Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
index 310a2bdf06581cbfb87039872211109e1af1246d..cb8c23e78cbf782006ba1fd8286b945d45ea52b2 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
@@ -3,41 +3,194 @@
// found in the LICENSE file.
-#include "nacl_io/ossocket.h"
-#ifdef PROVIDES_SOCKET_API
+#include "nacl_io/mount_node_udp.h"
#include <errno.h>
#include <string.h>
+
#include <algorithm>
-#include "nacl_io/mount.h"
-#include "nacl_io/mount_node_socket.h"
-#include "nacl_io/mount_node_udp.h"
+#include "nacl_io/event_emitter_udp.h"
+#include "nacl_io/mount_stream.h"
+#include "nacl_io/packet.h"
#include "nacl_io/pepper_interface.h"
+namespace {
+ const size_t kMaxPacketSize = 65536;
+ const size_t kDefaultFifoSize = kMaxPacketSize * 8;
+}
+
namespace nacl_io {
-MountNodeUDP::MountNodeUDP(Mount* mount) : MountNodeSocket(mount) {}
+class UDPWork : public MountStream::Work {
+ public:
+ explicit UDPWork(const ScopedEventEmitterUDP& emitter)
+ : MountStream::Work(emitter->stream()->mount_stream()),
+ emitter_(emitter),
+ packet_(NULL) {
+ }
+
+ ~UDPWork() {
+ delete packet_;
+ }
+
+ UDPSocketInterface* UDPInterface() {
+ return mount()->ppapi()->GetUDPSocketInterface();
+ }
+
+ protected:
+ ScopedEventEmitterUDP emitter_;
+ Packet* packet_;
+};
+
+
+class UDPSendWork : public UDPWork {
+ public:
+ explicit UDPSendWork(const ScopedEventEmitterUDP& emitter)
+ : UDPWork(emitter) {}
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // Does the stream exist, and can it send?
+ if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND))
+ return false;
+
+ // If not currently sending...
+ if (!stream->TestStreamFlags(SSF_SENDING)) {
+ packet_ = emitter_->ReadTXPacket_Locked();
+ if (packet_) {
+ stream->SetStreamFlags(SSF_SENDING);
+ int err = UDPInterface()->SendTo(stream->socket_resource(),
+ packet_->buffer(),
+ packet_->len(),
+ packet_->addr(),
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+
+ // Anything else, we should assume the socket has gone bad.
+ stream->SetError_Locked(err);
+ }
+ }
+ return false;
+ }
+
+ virtual void Run(int32_t length_error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is still there...
+ if (stream) {
+ // And we did send, then Q more work.
+ if (length_error >= 0) {
+ stream->ClearStreamFlags(SSF_SENDING);
+ stream->QueueOutput();
+ } else {
+ // Otherwise this socket has gone bad.
+ stream->SetError_Locked(length_error);
+ }
+ }
+ }
+};
+
+
+class UDPRecvWork : public UDPWork {
+ public:
+ explicit UDPRecvWork(const ScopedEventEmitterUDP& emitter)
+ : UDPWork(emitter) {
+ data_ = new char[kMaxPacketSize];
+ }
+
+ ~UDPRecvWork() {
+ delete[] data_;
+ }
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // Does the stream exist, and can it recv?
+ if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
+ return false;
+
+ // If the stream is valid and we are not currently receiving
+ if (!stream->TestStreamFlags(SSF_RECVING)) {
+ stream->SetStreamFlags(SSF_RECVING);
+ int err = UDPInterface()->RecvFrom(stream->socket_resource(),
+ data_,
+ kMaxPacketSize,
+ &addr_,
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+
+ stream->SetError_Locked(err);
+ }
+ return false;
+ }
+
+ virtual void Run(int32_t length_error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is still there, see if we can queue more input
+ if (stream) {
+ if (length_error > 0) {
+ Packet* packet = new Packet(mount()->ppapi());
+ packet->Copy(data_, length_error, addr_);
+ emitter_->WriteRXPacket_Locked(packet);
+ stream->ClearStreamFlags(SSF_RECVING);
+ stream->QueueInput();
+ } else {
+ stream->SetError_Locked(length_error);
+ }
+ }
+ }
+
+ private:
+ char* data_;
+ PP_Resource addr_;
+};
-UDPSocketInterface* MountNodeUDP::UDPSocket() {
- if (mount_->ppapi() == NULL)
- return NULL;
+MountNodeUDP::MountNodeUDP(Mount* mount)
+ : MountNodeSocket(mount),
+ emitter_(new EventEmitterUDP(kDefaultFifoSize, kDefaultFifoSize)) {
+ emitter_->AttachStream(this);
+}
+
+void MountNodeUDP::Destroy() {
+ emitter_->DetachStream();
+ MountNodeSocket::Destroy();
+}
- return mount_->ppapi()->GetUDPSocketInterface();
+EventEmitterUDP* MountNodeUDP::GetEventEmitter() {
+ return emitter_.get();
}
Error MountNodeUDP::Init(int flags) {
- if (UDPSocket() == NULL)
+ if (UDPInterface() == NULL)
return EACCES;
- socket_resource_ = UDPSocket()->Create(mount_->ppapi()->GetInstance());
+ socket_resource_ = UDPInterface()->Create(mount_->ppapi()->GetInstance());
if (0 == socket_resource_)
return EACCES;
return 0;
}
+void MountNodeUDP::QueueInput() {
+ UDPRecvWork* work = new UDPRecvWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
+void MountNodeUDP::QueueOutput() {
+ UDPSendWork* work = new UDPSendWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
if (0 == socket_resource_)
return EBADF;
@@ -50,14 +203,18 @@ Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
if (0 == out_addr)
return EINVAL;
- int err = UDPSocket()->Bind(socket_resource_,
- out_addr,
- PP_BlockUntilComplete());
+ int err = UDPInterface()->Bind(socket_resource_,
+ out_addr,
+ PP_BlockUntilComplete());
if (err != 0) {
mount_->ppapi()->ReleaseResource(out_addr);
return PPErrorToErrno(err);
}
+ // Now that we are bound, we can start sending and receiving.
+ SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
+ QueueInput();
+
local_addr_ = out_addr;
return 0;
}
@@ -79,110 +236,48 @@ Error MountNodeUDP::Connect(const struct sockaddr* addr, socklen_t len) {
return 0;
}
-Error MountNodeUDP::RecvFromHelper(void* buf,
- size_t len,
- int flags,
- PP_Resource* out_addr,
- int* out_len) {
- if (0 == socket_resource_)
- return EBADF;
-
- int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = UDPSocket()->RecvFrom(socket_resource_,
- static_cast<char*>(buf),
- capped_len,
- out_addr,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
-
- *out_len = err;
- return 0;
-}
-
-Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) {
- while (1) {
- int local_len = 0;
- PP_Resource addr = 0;
-
- int err = RecvFromHelper(buf, len, flags, &addr, &local_len);
- if (err < 0)
- return PPErrorToErrno(err);
-
- /* If "connected" then only receive packets from the given remote. */
- bool same = IsEquivalentAddress(addr, remote_addr_);
- mount_->ppapi()->ReleaseResource(addr);
-
- if (remote_addr_ != 0 && same)
- continue;
-
- *out_len = local_len;
+Error MountNodeUDP::Recv_Locked(void* buf,
+ size_t len,
+ PP_Resource* out_addr,
+ int* out_len) {
+ Packet* packet = emitter_->ReadRXPacket_Locked();
+ *out_len = 0;
+ *out_addr = 0;
+
+ if (packet) {
+ int capped_len =
+ static_cast<int32_t>(std::min<int>(len, packet->len()));
+ memcpy(buf, packet->buffer(), capped_len);
+
+ if (packet->addr() != 0) {
+ mount_->ppapi()->AddRefResource(packet->addr());
+ *out_addr = packet->addr();
+ }
+
+ *out_len = capped_len;
+ delete packet;
return 0;
}
-}
-
-Error MountNodeUDP::RecvFrom(void* buf,
- size_t len,
- int flags,
- struct sockaddr* src_addr,
- socklen_t* addrlen,
- int* out_len) {
- PP_Resource addr = 0;
- int err = RecvFromHelper(buf, len, flags, &addr, out_len);
- if (err < 0)
- return PPErrorToErrno(err);
-
- if (src_addr)
- *addrlen = ResourceToSockAddr(addr, *addrlen, src_addr);
- mount_->ppapi()->ReleaseResource(addr);
- return 0;
+ // Should never happen, Recv_Locked should not be called
+ // unless already in a POLLIN state.
+ return EBADF;
}
-
-Error MountNodeUDP::SendToHelper(const void* buf,
- size_t len,
- int flags,
- PP_Resource addr,
- int* out_len) {
- if (0 == socket_resource_)
- return EBADF;
-
- if (0 == addr)
- return ENOTCONN;
-
- int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = UDPSocket()->SendTo(socket_resource_,
- static_cast<const char*>(buf),
- capped_len,
- addr,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
-
- *out_len = err;
+Error MountNodeUDP::Send_Locked(const void* buf,
+ size_t len,
+ PP_Resource addr,
+ int* out_len) {
+ *out_len = 0;
+ int capped_len =
+ static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
+ Packet* packet = new Packet(mount_->ppapi());
+ packet->Copy(buf, capped_len, addr);
+
+ emitter_->WriteTXPacket_Locked(packet);
+ *out_len = capped_len;
return 0;
}
-Error MountNodeUDP::Send(const void* buf, size_t len, int flags, int* out_len) {
- return SendToHelper(buf, len, flags, remote_addr_, out_len);
-}
-
-Error MountNodeUDP::SendTo(const void* buf,
- size_t len,
- int flags,
- const struct sockaddr* dest_addr,
- socklen_t addrlen,
- int* out_len) {
- PP_Resource out_addr = SockAddrToResource(dest_addr, addrlen);
- if (0 == out_addr)
- return EINVAL;
-
- Error err = SendToHelper(buf, len, flags, out_addr, out_len);
- mount_->ppapi()->ReleaseResource(out_addr);
- return err;
-}
-
} // namespace nacl_io
-#endif // PROVIDES_SOCKET_API
« no previous file with comments | « native_client_sdk/src/libraries/nacl_io/mount_node_udp.h ('k') | native_client_sdk/src/libraries/nacl_io/mount_socket.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698