| Index: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc | 
| diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc | 
| index 5a5f9f12e150ff4125c0470962381325633de58b..bdc4b1020456b3268304b6f88f354df0da9b7ab1 100644 | 
| --- a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc | 
| +++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc | 
| @@ -10,34 +10,191 @@ | 
| #include <string.h> | 
| #include <algorithm> | 
|  | 
| -#include "nacl_io/mount.h" | 
| -#include "nacl_io/mount_node_socket.h" | 
| #include "nacl_io/mount_node_tcp.h" | 
| +#include "nacl_io/mount_stream.h" | 
| #include "nacl_io/pepper_interface.h" | 
|  | 
| +namespace { | 
| +  const size_t kMaxPacketSize = 65536; | 
| +  const size_t kDefaultFifoSize = kMaxPacketSize * 8; | 
| +} | 
| + | 
| namespace nacl_io { | 
|  | 
| -MountNodeTCP::MountNodeTCP(Mount* mount) : MountNodeSocket(mount) {} | 
| +class TCPWork : public MountStream::Work { | 
| + public: | 
| +  explicit TCPWork(const ScopedEventEmitterTCP& emitter) | 
| +      : MountStream::Work(emitter->stream()->mount_stream()), | 
| +        emitter_(emitter), | 
| +        data_(NULL) { | 
| +  } | 
| + | 
| +  ~TCPWork() { | 
| +    delete[] data_; | 
| +  } | 
| + | 
| +  TCPSocketInterface* TCPInterface() { | 
| +    return mount()->ppapi()->GetTCPSocketInterface(); | 
| +  } | 
| + | 
| + protected: | 
| +  ScopedEventEmitterTCP emitter_; | 
| +  char* data_; | 
| +}; | 
| + | 
| + | 
| +class TCPSendWork : public TCPWork { | 
| + public: | 
| +  explicit TCPSendWork(const ScopedEventEmitterTCP& emitter) | 
| +      : TCPWork(emitter) {} | 
| + | 
| +  virtual bool Start(int32_t val) { | 
| +    AUTO_LOCK(emitter_->GetLock()); | 
| +    MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); | 
| + | 
| +    // Does the stream exist, and can it send? | 
| +    if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND)) | 
| +      return false; | 
| + | 
| +    // If not currently sending... | 
| +    if (!stream->TestStreamFlags(SSF_SENDING)) { | 
| +      size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable(); | 
| +      int capped_len = | 
| +          static_cast<int32_t>(std::min(tx_data_avail, kMaxPacketSize)); | 
| + | 
| +      if (capped_len == 0) | 
| +        return false; | 
| + | 
| +      data_ = new char[capped_len]; | 
| +      emitter_->ReadOut_Locked(data_, capped_len); | 
| + | 
| +      stream->SetStreamFlags(SSF_SENDING); | 
| +      int err = TCPInterface()->Write(stream->socket_resource(), | 
| +                                      data_, | 
| +                                      capped_len, | 
| +                                      mount()->GetRunCompletion(this)); | 
| +      if (err == PP_OK_COMPLETIONPENDING) | 
| +        return true; | 
| + | 
| +      // Anything else, we should assume the socket has gone bad. | 
| +      stream->SetError_Locked(err); | 
| +    } | 
| +    return false; | 
| +  } | 
| + | 
| +  virtual void Run(int32_t length_error) { | 
| +    AUTO_LOCK(emitter_->GetLock()); | 
| +    MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); | 
| + | 
| +    // If the stream is still there... | 
| +    if (stream) { | 
| +      // And we did send, then Q more work. | 
| +      if (length_error >= 0) { | 
| +        stream->ClearStreamFlags(SSF_SENDING); | 
| +        stream->QueueOutput(); | 
| +      } else { | 
| +        // Otherwise this socket has gone bad. | 
| +        stream->SetError_Locked(length_error); | 
| +      } | 
| +    } | 
| +  } | 
| +}; | 
| + | 
| +class TCPRecvWork : public TCPWork { | 
| + public: | 
| +  explicit TCPRecvWork(const ScopedEventEmitterTCP& emitter) | 
| +      : TCPWork(emitter) {} | 
| + | 
| +  virtual bool Start(int32_t val) { | 
| +    AUTO_LOCK(emitter_->GetLock()); | 
| +    MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); | 
| + | 
| +    // Does the stream exist, and can it recv? | 
| +    if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV)) | 
| +      return false; | 
| + | 
| +    // If we are not currently receiving | 
| +    if (!stream->TestStreamFlags(SSF_RECVING)) { | 
| +      size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable(); | 
| +      int capped_len = | 
| +          static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize)); | 
| + | 
| +      if (capped_len == 0) | 
| +        return false; | 
| + | 
| +      stream->SetStreamFlags(SSF_RECVING); | 
| +      data_ = new char[capped_len]; | 
| +      int err = TCPInterface()->Read(stream->socket_resource(), | 
| +                                      data_, | 
| +                                      capped_len, | 
| +                                      mount()->GetRunCompletion(this)); | 
| +      if (err == PP_OK_COMPLETIONPENDING) | 
| +        return true; | 
| + | 
| +      // Anything else, we should assume the socket has gone bad. | 
| +      stream->SetError_Locked(err); | 
| +    } | 
| +    return false; | 
| +  } | 
|  | 
| +  virtual void Run(int32_t length_error) { | 
| +    AUTO_LOCK(emitter_->GetLock()); | 
| +    MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); | 
| + | 
| +    // If the stream is still there, see if we can queue more input | 
| +    if (stream) { | 
| +      if (length_error > 0) { | 
| +        emitter_->WriteIn_Locked(data_, length_error); | 
| +        stream->QueueInput(); | 
| +      } else { | 
| +        stream->SetError_Locked(length_error); | 
| +      } | 
| +    } | 
| +  } | 
| +}; | 
|  | 
| -TCPSocketInterface* MountNodeTCP::TCPSocket() { | 
| -  if (mount_->ppapi() == NULL) | 
| -    return NULL; | 
|  | 
| -  return mount_->ppapi()->GetTCPSocketInterface(); | 
| +MountNodeTCP::MountNodeTCP(Mount* mount) | 
| +    : MountNodeSocket(mount), | 
| +      emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) { | 
| +  emitter_->AttachStream(this); | 
| +} | 
| + | 
| +void MountNodeTCP::Destroy() { | 
| +  emitter_->DetachStream(); | 
| +  MountNodeSocket::Destroy(); | 
| } | 
|  | 
| Error MountNodeTCP::Init(int flags) { | 
| -  if (TCPSocket() == NULL) | 
| +  if (TCPInterface() == NULL) | 
| return EACCES; | 
|  | 
| -  socket_resource_ = TCPSocket()->Create(mount_->ppapi()->GetInstance()); | 
| +  socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance()); | 
| if (0 == socket_resource_) | 
| return EACCES; | 
|  | 
| return 0; | 
| } | 
|  | 
| +EventEmitterTCP* MountNodeTCP::GetEventEmitter() { | 
| +  return emitter_.get(); | 
| +} | 
| + | 
| +void MountNodeTCP::QueueInput() { | 
| +  TCPRecvWork* work = new TCPRecvWork(emitter_); | 
| +  mount_stream()->EnqueueWork(work); | 
| +} | 
| + | 
| +void MountNodeTCP::QueueOutput() { | 
| +  TCPSendWork* work = new TCPSendWork(emitter_); | 
| +  mount_stream()->EnqueueWork(work); | 
| +} | 
| + | 
| + | 
| +// We can not bind a client socket with PPAPI.  For now we ignore the | 
| +// bind but report the correct address later, just in case someone is | 
| +// binding without really caring what the address is (for example to | 
| +// select a more optimized interface/route.) | 
| Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) { | 
| AUTO_LOCK(node_lock_); | 
|  | 
| @@ -65,9 +222,9 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { | 
| if (0 == remote_addr_) | 
| return EINVAL; | 
|  | 
| -  int err = TCPSocket()->Connect(socket_resource_, | 
| -                                 remote_addr_, | 
| -                                 PP_BlockUntilComplete()); | 
| +  int err = TCPInterface()->Connect(socket_resource_, | 
| +                                    remote_addr_, | 
| +                                    PP_BlockUntilComplete()); | 
|  | 
| // If we fail, release the dest addr resource | 
| if (err != PP_OK) { | 
| @@ -76,71 +233,40 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { | 
| return PPErrorToErrno(err); | 
| } | 
|  | 
| -  local_addr_ = TCPSocket()->GetLocalAddress(socket_resource_); | 
| +  local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_); | 
| mount_->ppapi()->AddRefResource(local_addr_); | 
| -  return 0; | 
| -} | 
| - | 
| -Error MountNodeTCP::Recv(void* buf, size_t len, int flags, int* out_len) { | 
| -  AUTO_LOCK(node_lock_); | 
| -  if (0 == socket_resource_) | 
| -    return EBADF; | 
|  | 
| -  int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); | 
| -  int err = TCPSocket()->Read(socket_resource_, | 
| -                              static_cast<char*>(buf), | 
| -                              capped_len, | 
| -                              PP_BlockUntilComplete()); | 
| -  if (err < 0) | 
| -    return PPErrorToErrno(err); | 
| +  // Now that we are connected, we can start sending and receiving. | 
| +  SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV); | 
|  | 
| -  *out_len = err; | 
| +  // Begin the input pump | 
| +  QueueInput(); | 
| return 0; | 
| } | 
|  | 
| -Error MountNodeTCP::RecvFrom(void* buf, | 
| -                             size_t len, | 
| -                             int flags, | 
| -                             struct sockaddr* src_addr, | 
| -                             socklen_t* addrlen, | 
| -                             int* out_len) { | 
| -  Error err = Recv(buf, len, flags, out_len); | 
| -  if (err == 0) | 
| -    GetPeerName(src_addr, addrlen); | 
| -  return err; | 
| -} | 
| - | 
| - | 
| -Error MountNodeTCP::Send(const void* buf, size_t len, int flags, int* out_len) { | 
| -  AUTO_LOCK(node_lock_); | 
| - | 
| -  if (0 == socket_resource_) | 
| -    return EBADF; | 
|  | 
| -  if (0 == remote_addr_) | 
| -    return ENOTCONN; | 
| - | 
| -  int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); | 
| -  int err = TCPSocket()->Write(socket_resource_, | 
| -                               static_cast<const char*>(buf), | 
| -                               capped_len, | 
| -                               PP_BlockUntilComplete()); | 
| -  if (err < 0) | 
| -    return PPErrorToErrno(err); | 
| +Error MountNodeTCP::Recv_Locked(void* buf, | 
| +                                size_t len, | 
| +                                PP_Resource* out_addr, | 
| +                                int* out_len) { | 
| +  *out_len = emitter_->in_fifo()->Read(buf, len); | 
| +  *out_addr = remote_addr_; | 
|  | 
| -  *out_len = err; | 
| +  // Ref the address copy we pass back. | 
| +  mount_->ppapi()->AddRefResource(remote_addr_); | 
| return 0; | 
| } | 
|  | 
| -Error MountNodeTCP::SendTo(const void* buf, | 
| -                           size_t len, | 
| -                           int flags, | 
| -                           const struct sockaddr* dest_addr, | 
| -                           socklen_t addrlen, | 
| -                           int* out_len) { | 
| -  return Send(buf, len, flags, out_len); | 
| +// TCP ignores dst addr passed to send_to, and always uses bound address | 
| +Error MountNodeTCP::Send_Locked(const void* buf, | 
| +                                size_t len, | 
| +                                PP_Resource, | 
| +                                int* out_len) { | 
| +  *out_len = emitter_->out_fifo()->Write(buf, len); | 
| +  return 0; | 
| } | 
|  | 
| + | 
| }  // namespace nacl_io | 
|  | 
| #endif  // PROVIDES_SOCKET_API | 
|  |