Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(51)

Unified Diff: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Merge Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
index 5a5f9f12e150ff4125c0470962381325633de58b..6fc7bf0b1ebda4a15381faad1dd8d2a18ca49611 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
@@ -10,34 +10,168 @@
#include <string.h>
#include <algorithm>
-#include "nacl_io/mount.h"
-#include "nacl_io/mount_node_socket.h"
#include "nacl_io/mount_node_tcp.h"
+#include "nacl_io/mount_stream.h"
#include "nacl_io/pepper_interface.h"
+namespace {
+ const size_t kMaxPacketSize = 65536;
+ const size_t kDefaultFifoSize = kMaxPacketSize * 8;
+}
+
namespace nacl_io {
-MountNodeTCP::MountNodeTCP(Mount* mount) : MountNodeSocket(mount) {}
+class TCPWork : public MountStream::Work {
+ public:
+ explicit TCPWork(const ScopedEventEmitterTCP& emitter)
+ : MountStream::Work(emitter->stream()->mount_stream()),
+ emitter_(emitter),
+ data_(NULL) {
+ }
+
+ ~TCPWork() {
+ delete[] data_;
+ }
+
+ TCPSocketInterface* TCPInterface() {
+ return mount()->ppapi()->GetTCPSocketInterface();
+ }
+
+ protected:
+ ScopedEventEmitterTCP emitter_;
+ char* data_;
+};
+
+
+class TCPSendWork : public TCPWork {
+ public:
+ explicit TCPSendWork(const ScopedEventEmitterTCP& emitter)
+ : TCPWork(emitter) {}
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // If the stream is valid, and we are not currently sending
+ if (stream && ((stream->GetStreamFlags() & SSF_SENDING) == 0)) {
+ size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable();
+ int capped_len =
+ static_cast<int32_t>(std::min(tx_data_avail, kMaxPacketSize));
+
+ if (capped_len == 0)
+ return false;
+
+ data_ = new char[capped_len];
+ emitter_->ReadOut_Locked(data_, capped_len);
+
+ stream->SetStreamFlags(SSF_SENDING);
+ int err = TCPInterface()->Write(stream->socket_resource(),
+ data_,
+ capped_len,
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+
+ stream->ClearStreamFlags(SSF_SENDING);
+ }
+ return false;
+ }
+
+ virtual void Run(int32_t length_error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // If the stream is still there, see if we can queue more output
+ if (stream) {
+ stream->ClearStreamFlags(SSF_SENDING);
+ stream->QueueOutput();
+ }
+ }
+};
+
+class TCPRecvWork : public TCPWork {
+ public:
+ explicit TCPRecvWork(const ScopedEventEmitterTCP& emitter)
+ : TCPWork(emitter) {}
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable();
+ int capped_len =
+ static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
+
+ if (NULL == stream || (stream->GetStreamFlags() & SSF_RECVING) == 0)
+ return false;
+
+ if (capped_len <= 0)
+ return false;
+
+ stream->SetStreamFlags(SSF_RECVING);
+ data_ = new char[capped_len];
+ int err = TCPInterface()->Read(stream->socket_resource(),
+ data_,
+ capped_len,
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+
+ stream->ClearStreamFlags(SSF_RECVING);
+ return false;
+ }
+
+ virtual void Run(int32_t length_error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+ if (NULL == stream)
+ return;
-TCPSocketInterface* MountNodeTCP::TCPSocket() {
- if (mount_->ppapi() == NULL)
- return NULL;
+ // If the stream is still there, see if we can queue more input
+ if (length_error > 0)
+ emitter_->WriteIn_Locked(data_, length_error);
- return mount_->ppapi()->GetTCPSocketInterface();
+ stream->ClearStreamFlags(SSF_RECVING);
+ stream->QueueInput();
+ }
+};
+
+
+MountNodeTCP::MountNodeTCP(Mount* mount)
+ : MountNodeSocket(mount) {
}
Error MountNodeTCP::Init(int flags) {
- if (TCPSocket() == NULL)
+ if (TCPInterface() == NULL)
return EACCES;
- socket_resource_ = TCPSocket()->Create(mount_->ppapi()->GetInstance());
+ socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
if (0 == socket_resource_)
return EACCES;
return 0;
}
+EventEmitterTCP* MountNodeTCP::GetEventEmitter() {
+ return emitter_.get();
+}
+
+void MountNodeTCP::QueueInput() {
+ TCPRecvWork* work = new TCPRecvWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
+void MountNodeTCP::QueueOutput() {
+ TCPSendWork* work = new TCPSendWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
+
+// We can not bind a client socket with PPAPI. For now we ignore the
+// bind but report the correct address later, just in case someone is
+// binding without really caring what the address is (for example to
+// select a more optimized interface/route.)
Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
AUTO_LOCK(node_lock_);
@@ -65,9 +199,9 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
if (0 == remote_addr_)
return EINVAL;
- int err = TCPSocket()->Connect(socket_resource_,
- remote_addr_,
- PP_BlockUntilComplete());
+ int err = TCPInterface()->Connect(socket_resource_,
+ remote_addr_,
+ PP_BlockUntilComplete());
// If we fail, release the dest addr resource
if (err != PP_OK) {
@@ -76,7 +210,7 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
return PPErrorToErrno(err);
}
- local_addr_ = TCPSocket()->GetLocalAddress(socket_resource_);
+ local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
mount_->ppapi()->AddRefResource(local_addr_);
return 0;
}
@@ -87,10 +221,10 @@ Error MountNodeTCP::Recv(void* buf, size_t len, int flags, int* out_len) {
return EBADF;
int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = TCPSocket()->Read(socket_resource_,
- static_cast<char*>(buf),
- capped_len,
- PP_BlockUntilComplete());
+ int err = TCPInterface()->Read(socket_resource_,
+ static_cast<char*>(buf),
+ capped_len,
+ PP_BlockUntilComplete());
if (err < 0)
return PPErrorToErrno(err);
@@ -121,10 +255,10 @@ Error MountNodeTCP::Send(const void* buf, size_t len, int flags, int* out_len) {
return ENOTCONN;
int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = TCPSocket()->Write(socket_resource_,
- static_cast<const char*>(buf),
- capped_len,
- PP_BlockUntilComplete());
+ int err = TCPInterface()->Write(socket_resource_,
+ static_cast<const char*>(buf),
+ capped_len,
+ PP_BlockUntilComplete());
if (err < 0)
return PPErrorToErrno(err);

Powered by Google App Engine
This is Rietveld 408576698