Chromium Code Reviews| Index: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc |
| diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc |
| index 5a5f9f12e150ff4125c0470962381325633de58b..6fbc730aa5a6534f26c94c75e988fd56269c45cf 100644 |
| --- a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc |
| +++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc |
| @@ -10,34 +10,192 @@ |
| #include <string.h> |
| #include <algorithm> |
| -#include "nacl_io/mount.h" |
| -#include "nacl_io/mount_node_socket.h" |
| #include "nacl_io/mount_node_tcp.h" |
| +#include "nacl_io/mount_stream.h" |
| #include "nacl_io/pepper_interface.h" |
| +namespace { |
| + const size_t kMaxPacketSize = 65536; |
| + const size_t kDefaultFifoSize = kMaxPacketSize * 8; |
| +} |
| + |
| namespace nacl_io { |
| -MountNodeTCP::MountNodeTCP(Mount* mount) : MountNodeSocket(mount) {} |
| +class TCPWork : public MountStream::Work { |
| + public: |
| + explicit TCPWork(const ScopedEventEmitterTCP& emitter) |
| + : MountStream::Work(emitter->stream()->mount_stream()), |
| + emitter_(emitter), |
| + data_(NULL) { |
| + } |
| + |
| + ~TCPWork() { |
| + delete[] data_; |
| + } |
| + |
| + TCPSocketInterface* TCPInterface() { |
| + return mount()->ppapi()->GetTCPSocketInterface(); |
| + } |
| + |
| + protected: |
| + ScopedEventEmitterTCP emitter_; |
| + char* data_; |
| +}; |
| + |
| + |
| +class TCPSendWork : public TCPWork { |
| + public: |
| + explicit TCPSendWork(const ScopedEventEmitterTCP& emitter) |
| + : TCPWork(emitter) {} |
| + |
| + virtual bool Start(int32_t val) { |
| + AUTO_LOCK(emitter_->GetLock()); |
| + MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); |
| + |
| + // Does the stream exist, and can it send? |
| + if (NULL == stream) |
| + return false; |
| + |
| + bool wth = stream->TestStreamFlags(SSF_CAN_SEND); |
|
binji
2013/09/19 22:40:25
wth?
noelallen1
2013/09/20 00:51:27
Still debugging TCP. I take it you didn't notice t
|
| + if (!wth) |
| + return false; |
| + |
| + // If not currently sending... |
| + if (!stream->TestStreamFlags(SSF_SENDING)) { |
| + size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable(); |
| + int capped_len = |
| + static_cast<int32_t>(std::min(tx_data_avail, kMaxPacketSize)); |
| + |
| + if (capped_len == 0) |
| + return false; |
| + |
| + data_ = new char[capped_len]; |
| + emitter_->ReadOut_Locked(data_, capped_len); |
| + |
| + stream->SetStreamFlags(SSF_SENDING); |
| + int err = TCPInterface()->Write(stream->socket_resource(), |
| + data_, |
| + capped_len, |
| + mount()->GetRunCompletion(this)); |
| + if (err == PP_OK_COMPLETIONPENDING) |
| + return true; |
| + |
| + // Anything else, we should assume the socket has gone bad. |
| + stream->SetError(err); |
| + } |
| + return false; |
| + } |
| + |
| + virtual void Run(int32_t length_error) { |
| + AUTO_LOCK(emitter_->GetLock()); |
| + MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); |
| + |
| + // If the stream is still there... |
| + if (stream) { |
| + // And we did send, then Q more work. |
| + if (length_error >= 0) { |
| + stream->ClearStreamFlags(SSF_SENDING); |
| + stream->QueueOutput(); |
|
binji
2013/09/19 22:40:25
why does TCPSendWork QueueOutput in the Run callba
noelallen1
2013/09/20 00:51:27
MountNodeSocket handles the foreground thread side
|
| + } else { |
| + // Otherwise this socket has gone bad. |
| + stream->SetError(length_error); |
| + } |
| + } |
| + } |
| +}; |
| + |
| +class TCPRecvWork : public TCPWork { |
| + public: |
| + explicit TCPRecvWork(const ScopedEventEmitterTCP& emitter) |
| + : TCPWork(emitter) {} |
| + |
| + virtual bool Start(int32_t val) { |
| + AUTO_LOCK(emitter_->GetLock()); |
| + MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); |
| + |
| + // Does the stream exist, and can it recv? |
| + if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV)) |
| + return false; |
| + |
| + // If the stream is valid and we are not currently receiving |
|
binji
2013/09/19 22:40:25
bad comment, just "if not currently receiving"
noelallen1
2013/09/20 00:51:27
Done.
|
| + if (!stream->TestStreamFlags(SSF_RECVING)) { |
| + size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable(); |
| + int capped_len = |
| + static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize)); |
| + |
| + if (capped_len > 0) { |
|
binji
2013/09/19 22:40:25
why did you flip this if? (TCPSendWork uses an if-
noelallen1
2013/09/20 00:51:27
Done.
|
| + stream->SetStreamFlags(SSF_RECVING); |
| + data_ = new char[capped_len]; |
| + int err = TCPInterface()->Read(stream->socket_resource(), |
| + data_, |
| + capped_len, |
| + mount()->GetRunCompletion(this)); |
| + if (err == PP_OK_COMPLETIONPENDING) |
| + return true; |
| + |
| + stream->SetError(err); |
| + } |
| + } |
| + return false; |
| + } |
| + virtual void Run(int32_t length_error) { |
| + AUTO_LOCK(emitter_->GetLock()); |
| + MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); |
| + |
| + // If the stream is still there, see if we can queue more input |
| + if (stream) { |
| + if (length_error > 0) { |
| + emitter_->WriteIn_Locked(data_, length_error); |
| + } else { |
| + stream->SetError(length_error); |
| + } |
| + } |
| + } |
| +}; |
| -TCPSocketInterface* MountNodeTCP::TCPSocket() { |
| - if (mount_->ppapi() == NULL) |
| - return NULL; |
| - return mount_->ppapi()->GetTCPSocketInterface(); |
| +MountNodeTCP::MountNodeTCP(Mount* mount) |
| + : MountNodeSocket(mount), |
| + emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) { |
| + emitter_->AttachStream(this); |
| +} |
| + |
| +void MountNodeTCP::Destroy() { |
| + emitter_->DetachStream(); |
| + MountNodeSocket::Destroy(); |
| } |
| Error MountNodeTCP::Init(int flags) { |
| - if (TCPSocket() == NULL) |
| + if (TCPInterface() == NULL) |
| return EACCES; |
| - socket_resource_ = TCPSocket()->Create(mount_->ppapi()->GetInstance()); |
| + socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance()); |
| if (0 == socket_resource_) |
| return EACCES; |
| return 0; |
| } |
| +EventEmitterTCP* MountNodeTCP::GetEventEmitter() { |
| + return emitter_.get(); |
| +} |
| + |
| +void MountNodeTCP::QueueInput() { |
| + TCPRecvWork* work = new TCPRecvWork(emitter_); |
| + mount_stream()->EnqueueWork(work); |
| +} |
| + |
| +void MountNodeTCP::QueueOutput() { |
| + TCPSendWork* work = new TCPSendWork(emitter_); |
| + mount_stream()->EnqueueWork(work); |
| +} |
| + |
| + |
| +// We can not bind a client socket with PPAPI. For now we ignore the |
| +// bind but report the correct address later, just in case someone is |
| +// binding without really caring what the address is (for example to |
| +// select a more optimized interface/route.) |
| Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) { |
| AUTO_LOCK(node_lock_); |
| @@ -65,9 +223,9 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { |
| if (0 == remote_addr_) |
| return EINVAL; |
| - int err = TCPSocket()->Connect(socket_resource_, |
| - remote_addr_, |
| - PP_BlockUntilComplete()); |
| + int err = TCPInterface()->Connect(socket_resource_, |
| + remote_addr_, |
| + PP_BlockUntilComplete()); |
| // If we fail, release the dest addr resource |
| if (err != PP_OK) { |
| @@ -76,71 +234,39 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { |
| return PPErrorToErrno(err); |
| } |
| - local_addr_ = TCPSocket()->GetLocalAddress(socket_resource_); |
| + local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_); |
| mount_->ppapi()->AddRefResource(local_addr_); |
| - return 0; |
| -} |
| - |
| -Error MountNodeTCP::Recv(void* buf, size_t len, int flags, int* out_len) { |
| - AUTO_LOCK(node_lock_); |
| - if (0 == socket_resource_) |
| - return EBADF; |
| - int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); |
| - int err = TCPSocket()->Read(socket_resource_, |
| - static_cast<char*>(buf), |
| - capped_len, |
| - PP_BlockUntilComplete()); |
| - if (err < 0) |
| - return PPErrorToErrno(err); |
| + // Now that we are connected, we can start sending and receiving. |
| + SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV); |
| - *out_len = err; |
| + // Begin the input pump |
| + QueueInput(); |
| return 0; |
| } |
| -Error MountNodeTCP::RecvFrom(void* buf, |
| - size_t len, |
| - int flags, |
| - struct sockaddr* src_addr, |
| - socklen_t* addrlen, |
| - int* out_len) { |
| - Error err = Recv(buf, len, flags, out_len); |
| - if (err == 0) |
| - GetPeerName(src_addr, addrlen); |
| - return err; |
| -} |
| - |
| - |
| -Error MountNodeTCP::Send(const void* buf, size_t len, int flags, int* out_len) { |
| - AUTO_LOCK(node_lock_); |
| - |
| - if (0 == socket_resource_) |
| - return EBADF; |
| - if (0 == remote_addr_) |
| - return ENOTCONN; |
| - |
| - int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); |
| - int err = TCPSocket()->Write(socket_resource_, |
| - static_cast<const char*>(buf), |
| - capped_len, |
| - PP_BlockUntilComplete()); |
| - if (err < 0) |
| - return PPErrorToErrno(err); |
| +Error MountNodeTCP::Recv_Locked(void* buf, |
| + size_t len, |
| + PP_Resource* addr, |
|
binji
2013/09/19 22:40:25
out_addr?
noelallen1
2013/09/20 00:51:27
Done.
|
| + int* out_len) { |
| + *out_len = emitter_->in_fifo()->Read(buf, len); |
| + *addr = remote_addr_; |
| - *out_len = err; |
| + mount_->ppapi()->AddRefResource(remote_addr_); |
| return 0; |
| } |
| -Error MountNodeTCP::SendTo(const void* buf, |
| - size_t len, |
| - int flags, |
| - const struct sockaddr* dest_addr, |
| - socklen_t addrlen, |
| - int* out_len) { |
| - return Send(buf, len, flags, out_len); |
| +// TCP ignores dst addr passed to send_to, and always uses bound address |
| +Error MountNodeTCP::Send_Locked(const void* buf, |
| + size_t len, |
| + PP_Resource, |
| + int* out_len) { |
| + *out_len = emitter_->in_fifo()->Write(buf, len); |
| + return 0; |
| } |
| + |
| } // namespace nacl_io |
| #endif // PROVIDES_SOCKET_API |