Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(808)

Unified Diff: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Added Fifo Tests Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
index 5a5f9f12e150ff4125c0470962381325633de58b..91ffa94a15a38f6a30db4a50cd00615af1ae05fc 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
@@ -10,34 +10,151 @@
#include <string.h>
#include <algorithm>
-#include "nacl_io/mount.h"
-#include "nacl_io/mount_node_socket.h"
#include "nacl_io/mount_node_tcp.h"
+#include "nacl_io/mount_stream.h"
#include "nacl_io/pepper_interface.h"
+namespace {
+ size_t kMaxPacketSize = 65536;
binji 2013/09/15 22:18:58 const
noelallen1 2013/09/17 21:21:54 Done.
+ size_t kDefaultFifoSize = kMaxPacketSize * 8;
+}
+
namespace nacl_io {
-MountNodeTCP::MountNodeTCP(Mount* mount) : MountNodeSocket(mount) {}
+class TCPWork : public MountStreamWork {
+ public:
+ TCPWork(EventEmitterTCP* emitter)
binji 2013/09/15 22:18:58 explicit
noelallen1 2013/09/17 21:21:54 Done.
+ : MountStreamWork(emitter->stream()->mount_stream()),
+ emitter_(emitter) {}
+
+ TCPSocketInterface* TCPInterface() {
+ return mount()->ppapi()->GetTCPSocketInterface();
+ }
+
+ protected:
+ ScopedEventEmitterTCP emitter_;
+};
+
+
+class TCPSendWork : public TCPWork {
+ public:
+ TCPSendWork(EventEmitterTCP* emitter) : TCPWork(emitter) {}
binji 2013/09/15 22:18:58 explicit
noelallen1 2013/09/17 21:21:54 Done.
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = reinterpret_cast<MountNodeTCP*>(emitter_->stream());
binji 2013/09/15 22:18:58 static_cast (though unnecessary if you use covaria
noelallen1 2013/09/17 21:21:54 Done.
+ // If the stream is valid, and we are not currently sending
+ if (stream && ((stream->GetStreamFlags() & SSF_SENDING) == 0)) {
+ size_t tx_data_avail = emitter_->tx_fifo()->ReadAvailable();
+ int capped_len =
+ static_cast<int32_t>(std::min(tx_data_avail, kMaxPacketSize));
+ if (capped_len == 0)
+ return false;
+
+ char* data = new char[capped_len];
+ emitter_->ReadTXBytes_Locked(data, capped_len);
+
+ stream->SetStreamFlags(SSF_SENDING);
+ int err = TCPInterface()->Write(stream->socket_resource(),
+ data,
+ capped_len,
+ mount()->GetRunCompletion(this));
+ delete data;
binji 2013/09/15 22:18:58 Deleting the data sent to an async call?
noelallen1 2013/09/17 21:21:54 Consolidated into base class.
+ if (err == PP_OK_COMPLETIONPENDING)
binji 2013/09/15 22:18:58 any way to return this error in case it is not PP_
noelallen1 2013/09/17 21:21:54 Will log once we officially have logging.
+ return true;
+
+ stream->ClearStreamFlags(SSF_SENDING);
+ }
+ return false;
+ }
+
+ virtual void Run(int32_t val) {
binji 2013/09/15 22:18:58 Run is a strange name for this; it isn't meant to
noelallen1 2013/09/17 21:21:54 MountStream::GetOnCompletedCompletion?
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = reinterpret_cast<MountNodeTCP*>(emitter_->stream());
binji 2013/09/15 22:18:58 static_cast
noelallen1 2013/09/17 21:21:54 Done.
+
+ // If the stream is still there, see if we can queue more output
+ if (stream) {
+ stream->ClearStreamFlags(SSF_SENDING);
+ stream->QueueOutput();
+ }
+ }
+};
+
+class TCPRecvWork : public TCPWork {
+ public:
+ TCPRecvWork(EventEmitterTCP* emitter) : TCPWork(emitter), data_(NULL) {}
binji 2013/09/15 22:18:58 explicit
noelallen1 2013/09/17 21:21:54 Done.
+
+ ~TCPRecvWork() {
+ delete[] data_;
+ }
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = reinterpret_cast<MountNodeTCP*>(emitter_->stream());
binji 2013/09/15 22:18:58 static_cast
noelallen1 2013/09/17 21:21:54 Done.
+
+ size_t rx_space_avail = emitter_->rx_fifo()->WriteAvailable();
+ int capped_len =
+ static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
+ if (NULL == stream || (stream->GetStreamFlags() & SSF_RECVING) == 0)
+ return false;
-TCPSocketInterface* MountNodeTCP::TCPSocket() {
- if (mount_->ppapi() == NULL)
- return NULL;
+ if (capped_len <= 0)
+ return false;
- return mount_->ppapi()->GetTCPSocketInterface();
+ stream->SetStreamFlags(SSF_RECVING);
+ data_ = new char[capped_len];
+ int err = TCPInterface()->Read(stream->socket_resource(),
+ data_,
binji 2013/09/15 22:18:58 nit: align params
noelallen1 2013/09/17 21:21:54 Done.
+ capped_len,
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+
+ stream->ClearStreamFlags(SSF_RECVING);
+ return false;
+ }
+
+ virtual void Run(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream =
+ reinterpret_cast<MountNodeTCP*>(emitter_->stream());
binji 2013/09/15 22:18:58 static_cast
noelallen1 2013/09/17 21:21:54 Done.
+
+ if (NULL == stream)
+ return;
+
+ // If the stream is still there, see if we can queue more input
+ if (val > 0)
binji 2013/09/15 22:18:58 better name for val? (bytes_read or something?)
noelallen1 2013/09/17 21:21:54 Done.
+ emitter_->WriteRXBytes_Locked(data_, val);
+
+ stream->ClearStreamFlags(SSF_RECVING);
+ stream->QueueInput();
+ }
+
+ private:
+ char* data_;
binji 2013/09/15 22:18:58 std::vector
+};
+
+
+MountNodeTCP::MountNodeTCP(Mount* mount)
+ : MountNodeSocket(mount) {
}
Error MountNodeTCP::Init(int flags) {
- if (TCPSocket() == NULL)
+ if (TCPInterface() == NULL)
return EACCES;
- socket_resource_ = TCPSocket()->Create(mount_->ppapi()->GetInstance());
+ socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
if (0 == socket_resource_)
return EACCES;
return 0;
}
+EventEmitter* MountNodeTCP::GetEventEmitter() {
+ return NULL;
binji 2013/09/15 22:18:58 Why no emitter?
noelallen1 2013/09/17 21:21:54 Done.
+}
+
binji 2013/09/15 22:18:58 Looks like TCP is not yet implemented?
noelallen1 2013/09/17 21:21:54 Bind is not allowed on client sockets. There's no
binji 2013/09/19 00:48:54 Sorry, I wasn't talking about Bind, I meant non-bl
noelallen1 2013/09/19 21:29:27 Ah I see, fixed.
Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
AUTO_LOCK(node_lock_);
@@ -65,9 +182,9 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
if (0 == remote_addr_)
return EINVAL;
- int err = TCPSocket()->Connect(socket_resource_,
- remote_addr_,
- PP_BlockUntilComplete());
+ int err = TCPInterface()->Connect(socket_resource_,
+ remote_addr_,
+ PP_BlockUntilComplete());
// If we fail, release the dest addr resource
if (err != PP_OK) {
@@ -76,7 +193,7 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
return PPErrorToErrno(err);
}
- local_addr_ = TCPSocket()->GetLocalAddress(socket_resource_);
+ local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
mount_->ppapi()->AddRefResource(local_addr_);
return 0;
}
@@ -87,10 +204,10 @@ Error MountNodeTCP::Recv(void* buf, size_t len, int flags, int* out_len) {
return EBADF;
int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = TCPSocket()->Read(socket_resource_,
- static_cast<char*>(buf),
- capped_len,
- PP_BlockUntilComplete());
+ int err = TCPInterface()->Read(socket_resource_,
+ static_cast<char*>(buf),
+ capped_len,
+ PP_BlockUntilComplete());
if (err < 0)
return PPErrorToErrno(err);
@@ -121,10 +238,10 @@ Error MountNodeTCP::Send(const void* buf, size_t len, int flags, int* out_len) {
return ENOTCONN;
int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = TCPSocket()->Write(socket_resource_,
- static_cast<const char*>(buf),
- capped_len,
- PP_BlockUntilComplete());
+ int err = TCPInterface()->Write(socket_resource_,
+ static_cast<const char*>(buf),
+ capped_len,
+ PP_BlockUntilComplete());
if (err < 0)
return PPErrorToErrno(err);

Powered by Google App Engine
This is Rietveld 408576698