| Index: native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc | 
| diff --git a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc | 
| index a929a6a169a53ec30a4fb115c0f274d610ba101b..4445d397cce192fe66a86b2c11a21bea91e90fc9 100644 | 
| --- a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc | 
| +++ b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc | 
| @@ -29,9 +29,11 @@ | 
| #include "nacl_io/mount_http.h" | 
| #include "nacl_io/mount_mem.h" | 
| #include "nacl_io/mount_node.h" | 
| +#include "nacl_io/mount_node_pipe.h" | 
| #include "nacl_io/mount_node_tcp.h" | 
| #include "nacl_io/mount_node_udp.h" | 
| #include "nacl_io/mount_passthrough.h" | 
| +#include "nacl_io/mount_stream.h" | 
| #include "nacl_io/osmman.h" | 
| #include "nacl_io/ossocket.h" | 
| #include "nacl_io/osstat.h" | 
| @@ -48,30 +50,10 @@ | 
|  | 
| namespace nacl_io { | 
|  | 
| -class SignalEmitter : public EventEmitter { | 
| - public: | 
| -  // From EventEmitter.  The SignalEmitter exists in order | 
| -  // to inturrupt anything waiting in select()/poll() when kill() | 
| -  // is called.  It is an edge trigger only and therefore has no | 
| -  // persistent readable/wriable/error state. | 
| -  uint32_t GetEventStatus() { | 
| -     return 0; | 
| -  } | 
| - | 
| -  int GetType() { | 
| -    // For lack of a better type, report socket to signify it can be in an | 
| -    // used to signal. | 
| -    return S_IFSOCK; | 
| -  } | 
| - | 
| -  void SignalOccurred() { | 
| -    RaiseEvent(POLLERR); | 
| -  } | 
| -}; | 
|  | 
| KernelProxy::KernelProxy() : dev_(0), ppapi_(NULL), | 
| sigwinch_handler_(SIG_IGN), | 
| -                             signal_emitter_(new SignalEmitter) { | 
| +                             signal_emitter_(new EventEmitter) { | 
|  | 
| } | 
|  | 
| @@ -132,8 +114,8 @@ Error KernelProxy::Init(PepperInterface* ppapi) { | 
| #endif | 
|  | 
| StringMap_t args; | 
| -  socket_mount_.reset(new MountSocket()); | 
| -  result = socket_mount_->Init(0, args, ppapi); | 
| +  stream_mount_.reset(new MountStream()); | 
| +  result = stream_mount_->Init(0, args, ppapi); | 
| if (result != 0) { | 
| assert(false); | 
| rtn = result; | 
| @@ -193,6 +175,29 @@ int KernelProxy::open(const char* path, int oflags) { | 
| return AllocateFD(handle); | 
| } | 
|  | 
| +int KernelProxy::pipe(int pipefds[2]) { | 
| +  MountNodePipe* pipe = new MountNodePipe(stream_mount_.get()); | 
| +  ScopedMountNode node(pipe); | 
| + | 
| +  if (pipe->Init(S_IREAD | S_IWRITE) == 0) { | 
| +    ScopedKernelHandle handle0(new KernelHandle(stream_mount_, node)); | 
| +    ScopedKernelHandle handle1(new KernelHandle(stream_mount_, node)); | 
| + | 
| +    // Should never fail, but... | 
| +    if (handle0->Init(S_IREAD) || handle1->Init(S_IWRITE)) { | 
| +      errno = EACCES; | 
| +      return -1; | 
| +    } | 
| + | 
| +    pipefds[0] = AllocateFD(handle0); | 
| +    pipefds[1] = AllocateFD(handle1); | 
| +    return 0; | 
| +  } | 
| + | 
| +  errno = ENOSYS; | 
| +  return -1; | 
| +} | 
| + | 
| int KernelProxy::close(int fd) { | 
| ScopedKernelHandle handle; | 
| Error error = AcquireHandle(fd, &handle); | 
| @@ -773,7 +778,8 @@ int KernelProxy::kill(pid_t pid, int sig) { | 
| } | 
|  | 
| // Raise an event so that select/poll get interrupted. | 
| -  signal_emitter_->SignalOccurred(); | 
| +  AUTO_LOCK(signal_emitter_->GetLock()) | 
| +  signal_emitter_->RaiseEvents_Locked(POLLERR); | 
| switch (sig) { | 
| case SIGWINCH: | 
| if (sigwinch_handler_ != SIG_IGN) | 
| @@ -788,7 +794,6 @@ int KernelProxy::kill(pid_t pid, int sig) { | 
| errno = EINVAL; | 
| return -1; | 
| } | 
| - | 
| return 0; | 
| } | 
|  | 
| @@ -831,204 +836,166 @@ sighandler_t KernelProxy::sigset(int signum, sighandler_t handler) { | 
|  | 
| int KernelProxy::select(int nfds, fd_set* readfds, fd_set* writefds, | 
| fd_set* exceptfds, struct timeval* timeout) { | 
| -  ScopedEventListener listener(new EventListener); | 
| - | 
| -  std::vector<struct pollfd> fds; | 
| - | 
| -  fd_set readout, writeout, exceptout; | 
| - | 
| -  FD_ZERO(&readout); | 
| -  FD_ZERO(&writeout); | 
| -  FD_ZERO(&exceptout); | 
| - | 
| -  int fd; | 
| -  size_t event_cnt = 0; | 
| -  int event_track = 0; | 
| -  for (fd = 0; fd < nfds; fd++) { | 
| +  fd_set ignore; | 
| +  std::vector<pollfd> pollfds; | 
| + | 
| +  // Simplify logic, by using an IGNORE set for any undefined set | 
| +  FD_ZERO(&ignore); | 
| +  if (NULL == readfds) | 
| +    readfds = &ignore; | 
| +  if (NULL == writefds) | 
| +    writefds = &ignore; | 
| +  if (NULL == exceptfds) | 
| +    exceptfds = &ignore; | 
| + | 
| +  for (int fd = 0; fd < nfds; fd++) { | 
| int events = 0; | 
| - | 
| -    if (readfds != NULL && FD_ISSET(fd, readfds)) | 
| +    if (FD_ISSET(fd, readfds)) | 
| events |= POLLIN; | 
|  | 
| -    if (writefds != NULL && FD_ISSET(fd, writefds)) | 
| +    if (FD_ISSET(fd, writefds)) | 
| events |= POLLOUT; | 
|  | 
| -    if (exceptfds != NULL && FD_ISSET(fd, exceptfds)) | 
| +    if (FD_ISSET(fd, exceptfds)) | 
| events |= POLLERR | POLLHUP; | 
|  | 
| -    // If we are not interested in this FD, skip it | 
| -    if (0 == events) continue; | 
| - | 
| -    ScopedKernelHandle handle; | 
| -    Error err = AcquireHandle(fd, &handle); | 
| - | 
| -    // Select will return immediately if there are bad FDs. | 
| -    if (err != 0) { | 
| -      errno = EBADF; | 
| -      return -1; | 
| +    if (events) { | 
| +      pollfd info; | 
| +      info.fd = fd; | 
| +      info.events = events; | 
| +      pollfds.push_back(info); | 
| } | 
| +  } | 
|  | 
| -    int status = handle->node()->GetEventStatus() & events; | 
| -    if (status & POLLIN) { | 
| -      FD_SET(fd, &readout); | 
| -      event_cnt++; | 
| -    } | 
| +  FD_ZERO(readfds); | 
| +  FD_ZERO(writefds); | 
| +  FD_ZERO(exceptfds); | 
|  | 
| -    if (status & POLLOUT) { | 
| -      FD_SET(fd, &writeout); | 
| -      event_cnt++; | 
| -    } | 
| +  // NULL timeout signals wait forever. | 
| +  int ms_timeout = -1; | 
| +  if (timeout != NULL) { | 
| +    int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000); | 
|  | 
| -    if (status & (POLLERR | POLLHUP)) { | 
| -      FD_SET(fd, &exceptout); | 
| -      event_cnt++; | 
| +    // If the timeout is invalid or too long (larger than signed 32 bit). | 
| +    if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) || | 
| +        (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) || | 
| +        (ms < 0) || (ms >= INT_MAX)) { | 
| +      errno = EINVAL; | 
| +      return -1; | 
| } | 
|  | 
| -    // Otherwise track it. | 
| -    if (0 == status) { | 
| -      err = listener->Track(fd, handle->node(), events, fd); | 
| -      if (err != 0) { | 
| -        errno = EBADF; | 
| -        return -1; | 
| -      } | 
| -      event_track++; | 
| -    } | 
| +    ms_timeout = static_cast<int>(ms); | 
| } | 
|  | 
| -  // If nothing is signaled, then we must wait. | 
| -  if (event_cnt == 0) { | 
| -    std::vector<EventData> events; | 
| -    int ready_cnt; | 
| -    int ms_timeout; | 
| - | 
| -    // NULL timeout signals wait forever. | 
| -    if (timeout == NULL) { | 
| -      ms_timeout = -1; | 
| -    } else { | 
| -      int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000); | 
| - | 
| -      // If the timeout is invalid or too long (larger than signed 32 bit). | 
| -      if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) || | 
| -          (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) || | 
| -          (ms < 0) || (ms >= INT_MAX)) { | 
| -        errno = EINVAL; | 
| -        return -1; | 
| -      } | 
| +  int result = poll(&pollfds[0], pollfds.size(), ms_timeout); | 
| +  if (result == -1) | 
| +    return -1; | 
|  | 
| -      ms_timeout = static_cast<int>(ms); | 
| +  int event_cnt = 0; | 
| +  for (size_t index = 0; index < pollfds.size(); index++) { | 
| +    pollfd* info = &pollfds[index]; | 
| +    if (info->revents & POLLIN) { | 
| +      FD_SET(info->fd, readfds); | 
| +      event_cnt++; | 
| } | 
| - | 
| -    // Add a special node to listen for events | 
| -    // coming from the KernelProxy itself (kill will | 
| -    // generated a SIGERR event). | 
| -    listener->Track(-1, signal_emitter_, POLLERR, -1); | 
| -    event_track += 1; | 
| - | 
| -    events.resize(event_track); | 
| - | 
| -    bool interrupted = false; | 
| -    listener->Wait(events.data(), event_track, ms_timeout, &ready_cnt); | 
| -    for (fd = 0; static_cast<int>(fd) < ready_cnt; fd++) { | 
| -      if (events[fd].user_data == static_cast<uint64_t>(-1)) { | 
| -        if (events[fd].events & POLLERR) { | 
| -          interrupted = true; | 
| -        } | 
| -        continue; | 
| -      } | 
| - | 
| -      if (events[fd].events & POLLIN) { | 
| -        FD_SET(events[fd].user_data, &readout); | 
| -        event_cnt++; | 
| -      } | 
| - | 
| -      if (events[fd].events & POLLOUT) { | 
| -        FD_SET(events[fd].user_data, &writeout); | 
| -        event_cnt++; | 
| -      } | 
| - | 
| -      if (events[fd].events & (POLLERR | POLLHUP)) { | 
| -        FD_SET(events[fd].user_data, &exceptout); | 
| -        event_cnt++; | 
| -      } | 
| +    if (info->revents & POLLOUT) { | 
| +      FD_SET(info->fd, writefds); | 
| +      event_cnt++; | 
| } | 
| - | 
| -    if (0 == event_cnt && interrupted) { | 
| -      errno = EINTR; | 
| -      return -1; | 
| +    if (info->revents & (POLLHUP | POLLERR)) { | 
| +      FD_SET(info->fd, exceptfds); | 
| +      event_cnt++; | 
| } | 
| } | 
|  | 
| -  // Copy out the results | 
| -  if (readfds != NULL) | 
| -    *readfds = readout; | 
| +  return event_cnt; | 
| +} | 
|  | 
| -  if (writefds != NULL) | 
| -    *writefds = writeout; | 
| +struct PollInfo { | 
| +  PollInfo() : index(-1) {}; | 
|  | 
| -  if (exceptfds != NULL) | 
| -    *exceptfds = exceptout; | 
| +  std::vector<struct pollfd*> fds; | 
| +  int index; | 
| +}; | 
|  | 
| -  return event_cnt; | 
| -} | 
| +typedef std::map<EventEmitter*, PollInfo> EventPollMap_t; | 
|  | 
| int KernelProxy::poll(struct pollfd *fds, nfds_t nfds, int timeout) { | 
| -  ScopedEventListener listener(new EventListener); | 
| -  listener->Track(-1, signal_emitter_, POLLERR, 0); | 
| +  EventPollMap_t event_map; | 
|  | 
| -  int index; | 
| +  std::vector<EventRequest> requests; | 
| size_t event_cnt = 0; | 
| -  size_t event_track = 1; | 
| -  for (index = 0; static_cast<nfds_t>(index) < nfds; index++) { | 
| + | 
| +  for (int index = 0; static_cast<nfds_t>(index) < nfds; index++) { | 
| ScopedKernelHandle handle; | 
| -    struct pollfd* info = &fds[index]; | 
| -    Error err = AcquireHandle(info->fd, &handle); | 
| +    struct pollfd* fd_info = &fds[index]; | 
| +    Error err = AcquireHandle(fd_info->fd, &handle); | 
| + | 
| +    fd_info->revents = 0; | 
|  | 
| // If the node isn't open, or somehow invalid, mark it so. | 
| if (err != 0) { | 
| -      info->revents = POLLNVAL; | 
| +      fd_info->revents = POLLNVAL; | 
| event_cnt++; | 
| continue; | 
| } | 
|  | 
| // If it's already signaled, then just capture the event | 
| -    if (handle->node()->GetEventStatus() & info->events) { | 
| -      info->revents = info->events & handle->node()->GetEventStatus(); | 
| +    ScopedEventEmitter emitter(handle->node()->GetEventEmitter()); | 
| +    int events = POLLIN | POLLOUT; | 
| +    if (emitter) | 
| +      events = emitter->GetEventStatus(); | 
| + | 
| +    if (events & fd_info->events) { | 
| +      fd_info->revents = events & fd_info->events; | 
| event_cnt++; | 
| continue; | 
| } | 
|  | 
| -    // Otherwise try to track it. | 
| -    err = listener->Track(info->fd, handle->node(), info->events, index); | 
| -    if (err != 0) { | 
| -      info->revents = POLLNVAL; | 
| +    if (NULL == emitter) { | 
| +      fd_info->revents = POLLNVAL; | 
| event_cnt++; | 
| continue; | 
| } | 
| -    event_track++; | 
| + | 
| +    // Otherwise try to track it. | 
| +    PollInfo* info = &event_map[emitter.get()]; | 
| +    if (info->index == -1) { | 
| +      EventRequest request; | 
| +      request.emitter = emitter; | 
| +      request.filter = fd_info->events; | 
| +      request.events = 0; | 
| + | 
| +      info->index = requests.size(); | 
| +      requests.push_back(request); | 
| +    } | 
| +    info->fds.push_back(fd_info); | 
| +    requests[info->index].filter |= fd_info->events; | 
| } | 
|  | 
| -  // If nothing is signaled, then we must wait. | 
| +  // If nothing is signaled, then we must wait on the event map | 
| if (0 == event_cnt) { | 
| -    std::vector<EventData> events; | 
| -    int ready_cnt; | 
| - | 
| -    bool interrupted = false; | 
| -    events.resize(event_track); | 
| -    listener->Wait(events.data(), event_track, timeout, &ready_cnt); | 
| -    for (index = 0; index < ready_cnt; index++) { | 
| -      struct pollfd* info = &fds[events[index].user_data]; | 
| -      if (!info) { | 
| -        interrupted = true; | 
| -        continue; | 
| -      } | 
| - | 
| -      info->revents = events[index].events; | 
| -      event_cnt++; | 
| -    } | 
| -    if (0 == event_cnt && interrupted) { | 
| -      errno = EINTR; | 
| +    EventListenerPoll wait; | 
| +    Error err = wait.WaitOnAny(&requests[0], requests.size(), timeout); | 
| +    if ((err != 0) && (err != ETIMEDOUT)) { | 
| +      errno = err; | 
| return -1; | 
| } | 
| + | 
| +    for (size_t rindex = 0; rindex < requests.size(); rindex++) { | 
| +      EventRequest* request = &requests[rindex]; | 
| +      if (request->events) { | 
| +        PollInfo* poll_info = &event_map[request->emitter.get()]; | 
| +        for (size_t findex = 0; findex < poll_info->fds.size(); findex++) { | 
| +          struct pollfd* fd_info = poll_info->fds[findex]; | 
| +          uint32_t events = fd_info->events & request->events; | 
| +          if (events) { | 
| +            fd_info->revents = events; | 
| +            event_cnt++; | 
| +          } | 
| +        } | 
| +      } | 
| +    } | 
| } | 
|  | 
| return event_cnt; | 
| @@ -1337,11 +1304,11 @@ int KernelProxy::socket(int domain, int type, int protocol) { | 
| MountNodeSocket* sock = NULL; | 
| switch (type) { | 
| case SOCK_DGRAM: | 
| -      sock = new MountNodeUDP(socket_mount_.get()); | 
| +      sock = new MountNodeUDP(stream_mount_.get()); | 
| break; | 
|  | 
| case SOCK_STREAM: | 
| -      sock = new MountNodeTCP(socket_mount_.get()); | 
| +      sock = new MountNodeTCP(stream_mount_.get()); | 
| break; | 
|  | 
| default: | 
| @@ -1351,7 +1318,7 @@ int KernelProxy::socket(int domain, int type, int protocol) { | 
|  | 
| ScopedMountNode node(sock); | 
| if (sock->Init(S_IREAD | S_IWRITE) == 0) { | 
| -    ScopedKernelHandle handle(new KernelHandle(socket_mount_, node)); | 
| +    ScopedKernelHandle handle(new KernelHandle(stream_mount_, node)); | 
| return AllocateFD(handle); | 
| } | 
|  | 
|  |