| Index: native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc | 
| diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc | 
| index 310a2bdf06581cbfb87039872211109e1af1246d..cb8c23e78cbf782006ba1fd8286b945d45ea52b2 100644 | 
| --- a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc | 
| +++ b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc | 
| @@ -3,41 +3,194 @@ | 
| // found in the LICENSE file. | 
|  | 
|  | 
| -#include "nacl_io/ossocket.h" | 
| -#ifdef PROVIDES_SOCKET_API | 
| +#include "nacl_io/mount_node_udp.h" | 
|  | 
| #include <errno.h> | 
| #include <string.h> | 
| + | 
| #include <algorithm> | 
|  | 
| -#include "nacl_io/mount.h" | 
| -#include "nacl_io/mount_node_socket.h" | 
| -#include "nacl_io/mount_node_udp.h" | 
| +#include "nacl_io/event_emitter_udp.h" | 
| +#include "nacl_io/mount_stream.h" | 
| +#include "nacl_io/packet.h" | 
| #include "nacl_io/pepper_interface.h" | 
|  | 
| +namespace { | 
| +  const size_t kMaxPacketSize = 65536; | 
| +  const size_t kDefaultFifoSize = kMaxPacketSize * 8; | 
| +} | 
| + | 
| namespace nacl_io { | 
|  | 
| -MountNodeUDP::MountNodeUDP(Mount* mount) : MountNodeSocket(mount) {} | 
| +class UDPWork : public MountStream::Work { | 
| + public: | 
| +  explicit UDPWork(const ScopedEventEmitterUDP& emitter) | 
| +      : MountStream::Work(emitter->stream()->mount_stream()), | 
| +        emitter_(emitter), | 
| +        packet_(NULL) { | 
| +  } | 
| + | 
| +  ~UDPWork() { | 
| +    delete packet_; | 
| +  } | 
| + | 
| +  UDPSocketInterface* UDPInterface() { | 
| +    return mount()->ppapi()->GetUDPSocketInterface(); | 
| +  } | 
| + | 
| + protected: | 
| +  ScopedEventEmitterUDP emitter_; | 
| +  Packet* packet_; | 
| +}; | 
| + | 
| + | 
| +class UDPSendWork : public UDPWork { | 
| + public: | 
| +  explicit UDPSendWork(const ScopedEventEmitterUDP& emitter) | 
| +      : UDPWork(emitter) {} | 
| + | 
| +  virtual bool Start(int32_t val) { | 
| +    AUTO_LOCK(emitter_->GetLock()); | 
| +    MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream()); | 
| + | 
| +    // Does the stream exist, and can it send? | 
| +    if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND)) | 
| +      return false; | 
| + | 
| +    // If not currently sending... | 
| +    if (!stream->TestStreamFlags(SSF_SENDING)) { | 
| +      packet_ = emitter_->ReadTXPacket_Locked(); | 
| +      if (packet_) { | 
| +        stream->SetStreamFlags(SSF_SENDING); | 
| +        int err = UDPInterface()->SendTo(stream->socket_resource(), | 
| +                                         packet_->buffer(), | 
| +                                         packet_->len(), | 
| +                                         packet_->addr(), | 
| +                                         mount()->GetRunCompletion(this)); | 
| +        if (err == PP_OK_COMPLETIONPENDING) | 
| +          return true; | 
| + | 
| +        // Anything else, we should assume the socket has gone bad. | 
| +        stream->SetError_Locked(err); | 
| +      } | 
| +    } | 
| +    return false; | 
| +  } | 
| + | 
| +  virtual void Run(int32_t length_error) { | 
| +    AUTO_LOCK(emitter_->GetLock()); | 
| +    MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream()); | 
| + | 
| +    // If the stream is still there... | 
| +    if (stream) { | 
| +      // And we did send, then Q more work. | 
| +      if (length_error >= 0) { | 
| +        stream->ClearStreamFlags(SSF_SENDING); | 
| +        stream->QueueOutput(); | 
| +      } else { | 
| +        // Otherwise this socket has gone bad. | 
| +        stream->SetError_Locked(length_error); | 
| +      } | 
| +    } | 
| +  } | 
| +}; | 
| + | 
| + | 
| +class UDPRecvWork : public UDPWork { | 
| + public: | 
| +  explicit UDPRecvWork(const ScopedEventEmitterUDP& emitter) | 
| +      : UDPWork(emitter) { | 
| +    data_ = new char[kMaxPacketSize]; | 
| +  } | 
| + | 
| +  ~UDPRecvWork() { | 
| +    delete[] data_; | 
| +  } | 
| + | 
| +  virtual bool Start(int32_t val) { | 
| +    AUTO_LOCK(emitter_->GetLock()); | 
| +    MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream()); | 
| + | 
| +    // Does the stream exist, and can it recv? | 
| +    if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV)) | 
| +      return false; | 
| + | 
| +    // If the stream is valid and we are not currently receiving | 
| +    if (!stream->TestStreamFlags(SSF_RECVING)) { | 
| +      stream->SetStreamFlags(SSF_RECVING); | 
| +      int err = UDPInterface()->RecvFrom(stream->socket_resource(), | 
| +                                         data_, | 
| +                                         kMaxPacketSize, | 
| +                                         &addr_, | 
| +                                         mount()->GetRunCompletion(this)); | 
| +      if (err == PP_OK_COMPLETIONPENDING) | 
| +        return true; | 
| + | 
| +      stream->SetError_Locked(err); | 
| +    } | 
| +    return false; | 
| +  } | 
| + | 
| +  virtual void Run(int32_t length_error) { | 
| +    AUTO_LOCK(emitter_->GetLock()); | 
| +    MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream()); | 
| + | 
| +    // If the stream is still there, see if we can queue more input | 
| +    if (stream) { | 
| +      if (length_error > 0) { | 
| +        Packet* packet = new Packet(mount()->ppapi()); | 
| +        packet->Copy(data_, length_error, addr_); | 
| +        emitter_->WriteRXPacket_Locked(packet); | 
| +        stream->ClearStreamFlags(SSF_RECVING); | 
| +        stream->QueueInput(); | 
| +      } else { | 
| +        stream->SetError_Locked(length_error); | 
| +      } | 
| +    } | 
| +  } | 
| + | 
| + private: | 
| +  char* data_; | 
| +  PP_Resource addr_; | 
| +}; | 
|  | 
|  | 
| -UDPSocketInterface* MountNodeUDP::UDPSocket() { | 
| -  if (mount_->ppapi() == NULL) | 
| -    return NULL; | 
| +MountNodeUDP::MountNodeUDP(Mount* mount) | 
| +    : MountNodeSocket(mount), | 
| +      emitter_(new EventEmitterUDP(kDefaultFifoSize, kDefaultFifoSize)) { | 
| +  emitter_->AttachStream(this); | 
| +} | 
| + | 
| +void MountNodeUDP::Destroy() { | 
| +  emitter_->DetachStream(); | 
| +  MountNodeSocket::Destroy(); | 
| +} | 
|  | 
| -  return mount_->ppapi()->GetUDPSocketInterface(); | 
| +EventEmitterUDP* MountNodeUDP::GetEventEmitter() { | 
| +  return emitter_.get(); | 
| } | 
|  | 
| Error MountNodeUDP::Init(int flags) { | 
| -  if (UDPSocket() == NULL) | 
| +  if (UDPInterface() == NULL) | 
| return EACCES; | 
|  | 
| -  socket_resource_ = UDPSocket()->Create(mount_->ppapi()->GetInstance()); | 
| +  socket_resource_ = UDPInterface()->Create(mount_->ppapi()->GetInstance()); | 
| if (0 == socket_resource_) | 
| return EACCES; | 
|  | 
| return 0; | 
| } | 
|  | 
| +void MountNodeUDP::QueueInput() { | 
| +  UDPRecvWork* work = new UDPRecvWork(emitter_); | 
| +  mount_stream()->EnqueueWork(work); | 
| +} | 
| + | 
| +void MountNodeUDP::QueueOutput() { | 
| +  UDPSendWork* work = new UDPSendWork(emitter_); | 
| +  mount_stream()->EnqueueWork(work); | 
| +} | 
| + | 
| Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) { | 
| if (0 == socket_resource_) | 
| return EBADF; | 
| @@ -50,14 +203,18 @@ Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) { | 
| if (0 == out_addr) | 
| return EINVAL; | 
|  | 
| -  int err = UDPSocket()->Bind(socket_resource_, | 
| -                              out_addr, | 
| -                              PP_BlockUntilComplete()); | 
| +  int err = UDPInterface()->Bind(socket_resource_, | 
| +                                 out_addr, | 
| +                                 PP_BlockUntilComplete()); | 
| if (err != 0) { | 
| mount_->ppapi()->ReleaseResource(out_addr); | 
| return PPErrorToErrno(err); | 
| } | 
|  | 
| +  // Now that we are bound, we can start sending and receiving. | 
| +  SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV); | 
| +  QueueInput(); | 
| + | 
| local_addr_ = out_addr; | 
| return 0; | 
| } | 
| @@ -79,110 +236,48 @@ Error MountNodeUDP::Connect(const struct sockaddr* addr, socklen_t len) { | 
| return 0; | 
| } | 
|  | 
| -Error MountNodeUDP::RecvFromHelper(void* buf, | 
| -                                   size_t len, | 
| -                                   int flags, | 
| -                                   PP_Resource* out_addr, | 
| -                                   int* out_len) { | 
| -  if (0 == socket_resource_) | 
| -    return EBADF; | 
| - | 
| -  int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); | 
| -  int err =  UDPSocket()->RecvFrom(socket_resource_, | 
| -                                   static_cast<char*>(buf), | 
| -                                   capped_len, | 
| -                                   out_addr, | 
| -                                   PP_BlockUntilComplete()); | 
| -  if (err < 0) | 
| -    return PPErrorToErrno(err); | 
| - | 
| -  *out_len = err; | 
| -  return 0; | 
| -} | 
| - | 
| -Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) { | 
| -  while (1) { | 
| -    int local_len = 0; | 
| -    PP_Resource addr = 0; | 
| - | 
| -    int err = RecvFromHelper(buf, len, flags, &addr, &local_len); | 
| -    if (err < 0) | 
| -      return PPErrorToErrno(err); | 
| - | 
| -    /* If "connected" then only receive packets from the given remote. */ | 
| -    bool same = IsEquivalentAddress(addr, remote_addr_); | 
| -    mount_->ppapi()->ReleaseResource(addr); | 
| - | 
| -    if (remote_addr_ != 0 && same) | 
| -      continue; | 
| - | 
| -    *out_len = local_len; | 
| +Error MountNodeUDP::Recv_Locked(void* buf, | 
| +                                size_t len, | 
| +                                PP_Resource* out_addr, | 
| +                                int* out_len) { | 
| +  Packet* packet = emitter_->ReadRXPacket_Locked(); | 
| +  *out_len = 0; | 
| +  *out_addr = 0; | 
| + | 
| +  if (packet) { | 
| +    int capped_len = | 
| +        static_cast<int32_t>(std::min<int>(len, packet->len())); | 
| +    memcpy(buf, packet->buffer(), capped_len); | 
| + | 
| +    if (packet->addr() != 0) { | 
| +      mount_->ppapi()->AddRefResource(packet->addr()); | 
| +      *out_addr = packet->addr(); | 
| +    } | 
| + | 
| +    *out_len = capped_len; | 
| +    delete packet; | 
| return 0; | 
| } | 
| -} | 
| - | 
| -Error MountNodeUDP::RecvFrom(void* buf, | 
| -                             size_t len, | 
| -                             int flags, | 
| -                             struct sockaddr* src_addr, | 
| -                             socklen_t* addrlen, | 
| -                             int* out_len) { | 
| -  PP_Resource addr = 0; | 
| -  int err = RecvFromHelper(buf, len, flags, &addr, out_len); | 
| -  if (err < 0) | 
| -    return PPErrorToErrno(err); | 
| - | 
| -  if (src_addr) | 
| -    *addrlen = ResourceToSockAddr(addr, *addrlen, src_addr); | 
|  | 
| -  mount_->ppapi()->ReleaseResource(addr); | 
| -  return 0; | 
| +  // Should never happen, Recv_Locked should not be called | 
| +  // unless already in a POLLIN state. | 
| +  return EBADF; | 
| } | 
|  | 
| - | 
| -Error MountNodeUDP::SendToHelper(const void* buf, | 
| -                                 size_t len, | 
| -                                 int flags, | 
| -                                 PP_Resource addr, | 
| -                                 int* out_len) { | 
| -  if (0 == socket_resource_) | 
| -    return EBADF; | 
| - | 
| -  if (0 == addr) | 
| -    return ENOTCONN; | 
| - | 
| -  int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); | 
| -  int err = UDPSocket()->SendTo(socket_resource_, | 
| -                                static_cast<const char*>(buf), | 
| -                                capped_len, | 
| -                                addr, | 
| -                                PP_BlockUntilComplete()); | 
| -  if (err < 0) | 
| -    return PPErrorToErrno(err); | 
| - | 
| -  *out_len = err; | 
| +Error MountNodeUDP::Send_Locked(const void* buf, | 
| +                                size_t len, | 
| +                                PP_Resource addr, | 
| +                                int* out_len) { | 
| +  *out_len = 0; | 
| +  int capped_len = | 
| +      static_cast<int32_t>(std::min<int>(len, kMaxPacketSize)); | 
| +  Packet* packet = new Packet(mount_->ppapi()); | 
| +  packet->Copy(buf, capped_len, addr); | 
| + | 
| +  emitter_->WriteTXPacket_Locked(packet); | 
| +  *out_len = capped_len; | 
| return 0; | 
| } | 
|  | 
| -Error MountNodeUDP::Send(const void* buf, size_t len, int flags, int* out_len) { | 
| -  return SendToHelper(buf, len, flags, remote_addr_, out_len); | 
| -} | 
| - | 
| -Error MountNodeUDP::SendTo(const void* buf, | 
| -                           size_t len, | 
| -                           int flags, | 
| -                           const struct sockaddr* dest_addr, | 
| -                           socklen_t addrlen, | 
| -                           int* out_len) { | 
| -  PP_Resource out_addr = SockAddrToResource(dest_addr, addrlen); | 
| -  if (0 == out_addr) | 
| -    return EINVAL; | 
| - | 
| -  Error err = SendToHelper(buf, len, flags, out_addr, out_len); | 
| -  mount_->ppapi()->ReleaseResource(out_addr); | 
| -  return err; | 
| -} | 
| - | 
| }  // namespace nacl_io | 
|  | 
| -#endif  // PROVIDES_SOCKET_API | 
|  |