Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(576)

Unified Diff: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Merge Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
index 5a5f9f12e150ff4125c0470962381325633de58b..bdc4b1020456b3268304b6f88f354df0da9b7ab1 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
@@ -10,34 +10,191 @@
#include <string.h>
#include <algorithm>
-#include "nacl_io/mount.h"
-#include "nacl_io/mount_node_socket.h"
#include "nacl_io/mount_node_tcp.h"
+#include "nacl_io/mount_stream.h"
#include "nacl_io/pepper_interface.h"
+namespace {
+ const size_t kMaxPacketSize = 65536;
+ const size_t kDefaultFifoSize = kMaxPacketSize * 8;
+}
+
namespace nacl_io {
-MountNodeTCP::MountNodeTCP(Mount* mount) : MountNodeSocket(mount) {}
+class TCPWork : public MountStream::Work {
+ public:
+ explicit TCPWork(const ScopedEventEmitterTCP& emitter)
+ : MountStream::Work(emitter->stream()->mount_stream()),
+ emitter_(emitter),
+ data_(NULL) {
+ }
+
+ ~TCPWork() {
+ delete[] data_;
+ }
+
+ TCPSocketInterface* TCPInterface() {
+ return mount()->ppapi()->GetTCPSocketInterface();
+ }
+
+ protected:
+ ScopedEventEmitterTCP emitter_;
+ char* data_;
+};
+
+
+class TCPSendWork : public TCPWork {
+ public:
+ explicit TCPSendWork(const ScopedEventEmitterTCP& emitter)
+ : TCPWork(emitter) {}
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // Does the stream exist, and can it send?
+ if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND))
+ return false;
+
+ // If not currently sending...
+ if (!stream->TestStreamFlags(SSF_SENDING)) {
+ size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable();
+ int capped_len =
+ static_cast<int32_t>(std::min(tx_data_avail, kMaxPacketSize));
+
+ if (capped_len == 0)
+ return false;
+
+ data_ = new char[capped_len];
+ emitter_->ReadOut_Locked(data_, capped_len);
+
+ stream->SetStreamFlags(SSF_SENDING);
+ int err = TCPInterface()->Write(stream->socket_resource(),
+ data_,
+ capped_len,
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+
+ // Anything else, we should assume the socket has gone bad.
+ stream->SetError_Locked(err);
+ }
+ return false;
+ }
+
+ virtual void Run(int32_t length_error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // If the stream is still there...
+ if (stream) {
+ // And we did send, then Q more work.
+ if (length_error >= 0) {
+ stream->ClearStreamFlags(SSF_SENDING);
+ stream->QueueOutput();
+ } else {
+ // Otherwise this socket has gone bad.
+ stream->SetError_Locked(length_error);
+ }
+ }
+ }
+};
+
+class TCPRecvWork : public TCPWork {
+ public:
+ explicit TCPRecvWork(const ScopedEventEmitterTCP& emitter)
+ : TCPWork(emitter) {}
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // Does the stream exist, and can it recv?
+ if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
+ return false;
+
+ // If we are not currently receiving
+ if (!stream->TestStreamFlags(SSF_RECVING)) {
+ size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable();
+ int capped_len =
+ static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
+
+ if (capped_len == 0)
+ return false;
+
+ stream->SetStreamFlags(SSF_RECVING);
+ data_ = new char[capped_len];
+ int err = TCPInterface()->Read(stream->socket_resource(),
+ data_,
+ capped_len,
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+
+ // Anything else, we should assume the socket has gone bad.
+ stream->SetError_Locked(err);
+ }
+ return false;
+ }
+ virtual void Run(int32_t length_error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // If the stream is still there, see if we can queue more input
+ if (stream) {
+ if (length_error > 0) {
+ emitter_->WriteIn_Locked(data_, length_error);
+ stream->QueueInput();
+ } else {
+ stream->SetError_Locked(length_error);
+ }
+ }
+ }
+};
-TCPSocketInterface* MountNodeTCP::TCPSocket() {
- if (mount_->ppapi() == NULL)
- return NULL;
- return mount_->ppapi()->GetTCPSocketInterface();
+MountNodeTCP::MountNodeTCP(Mount* mount)
+ : MountNodeSocket(mount),
+ emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) {
+ emitter_->AttachStream(this);
+}
+
+void MountNodeTCP::Destroy() {
+ emitter_->DetachStream();
+ MountNodeSocket::Destroy();
}
Error MountNodeTCP::Init(int flags) {
- if (TCPSocket() == NULL)
+ if (TCPInterface() == NULL)
return EACCES;
- socket_resource_ = TCPSocket()->Create(mount_->ppapi()->GetInstance());
+ socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
if (0 == socket_resource_)
return EACCES;
return 0;
}
+EventEmitterTCP* MountNodeTCP::GetEventEmitter() {
+ return emitter_.get();
+}
+
+void MountNodeTCP::QueueInput() {
+ TCPRecvWork* work = new TCPRecvWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
+void MountNodeTCP::QueueOutput() {
+ TCPSendWork* work = new TCPSendWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
+
+// We can not bind a client socket with PPAPI. For now we ignore the
+// bind but report the correct address later, just in case someone is
+// binding without really caring what the address is (for example to
+// select a more optimized interface/route.)
Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
AUTO_LOCK(node_lock_);
@@ -65,9 +222,9 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
if (0 == remote_addr_)
return EINVAL;
- int err = TCPSocket()->Connect(socket_resource_,
- remote_addr_,
- PP_BlockUntilComplete());
+ int err = TCPInterface()->Connect(socket_resource_,
+ remote_addr_,
+ PP_BlockUntilComplete());
// If we fail, release the dest addr resource
if (err != PP_OK) {
@@ -76,71 +233,40 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
return PPErrorToErrno(err);
}
- local_addr_ = TCPSocket()->GetLocalAddress(socket_resource_);
+ local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
mount_->ppapi()->AddRefResource(local_addr_);
- return 0;
-}
-
-Error MountNodeTCP::Recv(void* buf, size_t len, int flags, int* out_len) {
- AUTO_LOCK(node_lock_);
- if (0 == socket_resource_)
- return EBADF;
- int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = TCPSocket()->Read(socket_resource_,
- static_cast<char*>(buf),
- capped_len,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
+ // Now that we are connected, we can start sending and receiving.
+ SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
- *out_len = err;
+ // Begin the input pump
+ QueueInput();
return 0;
}
-Error MountNodeTCP::RecvFrom(void* buf,
- size_t len,
- int flags,
- struct sockaddr* src_addr,
- socklen_t* addrlen,
- int* out_len) {
- Error err = Recv(buf, len, flags, out_len);
- if (err == 0)
- GetPeerName(src_addr, addrlen);
- return err;
-}
-
-
-Error MountNodeTCP::Send(const void* buf, size_t len, int flags, int* out_len) {
- AUTO_LOCK(node_lock_);
-
- if (0 == socket_resource_)
- return EBADF;
- if (0 == remote_addr_)
- return ENOTCONN;
-
- int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = TCPSocket()->Write(socket_resource_,
- static_cast<const char*>(buf),
- capped_len,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
+Error MountNodeTCP::Recv_Locked(void* buf,
+ size_t len,
+ PP_Resource* out_addr,
+ int* out_len) {
+ *out_len = emitter_->in_fifo()->Read(buf, len);
+ *out_addr = remote_addr_;
- *out_len = err;
+ // Ref the address copy we pass back.
+ mount_->ppapi()->AddRefResource(remote_addr_);
return 0;
}
-Error MountNodeTCP::SendTo(const void* buf,
- size_t len,
- int flags,
- const struct sockaddr* dest_addr,
- socklen_t addrlen,
- int* out_len) {
- return Send(buf, len, flags, out_len);
+// TCP ignores dst addr passed to send_to, and always uses bound address
+Error MountNodeTCP::Send_Locked(const void* buf,
+ size_t len,
+ PP_Resource,
+ int* out_len) {
+ *out_len = emitter_->out_fifo()->Write(buf, len);
+ return 0;
}
+
} // namespace nacl_io
#endif // PROVIDES_SOCKET_API
« no previous file with comments | « native_client_sdk/src/libraries/nacl_io/mount_node_tcp.h ('k') | native_client_sdk/src/libraries/nacl_io/mount_node_tty.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698