 Chromium Code Reviews
 Chromium Code Reviews Issue 23498015:
  [NaCl SDK] Support non blocking TCP/UDP  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src
    
  
    Issue 23498015:
  [NaCl SDK] Support non blocking TCP/UDP  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src| Index: native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc | 
| diff --git a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc | 
| index a929a6a169a53ec30a4fb115c0f274d610ba101b..9818f83dd0b46437682fad3d3c088cb1614acbf4 100644 | 
| --- a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc | 
| +++ b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc | 
| @@ -29,9 +29,11 @@ | 
| #include "nacl_io/mount_http.h" | 
| #include "nacl_io/mount_mem.h" | 
| #include "nacl_io/mount_node.h" | 
| +#include "nacl_io/mount_node_pipe.h" | 
| #include "nacl_io/mount_node_tcp.h" | 
| #include "nacl_io/mount_node_udp.h" | 
| #include "nacl_io/mount_passthrough.h" | 
| +#include "nacl_io/mount_stream.h" | 
| #include "nacl_io/osmman.h" | 
| #include "nacl_io/ossocket.h" | 
| #include "nacl_io/osstat.h" | 
| @@ -48,30 +50,10 @@ | 
| namespace nacl_io { | 
| -class SignalEmitter : public EventEmitter { | 
| - public: | 
| - // From EventEmitter. The SignalEmitter exists in order | 
| - // to inturrupt anything waiting in select()/poll() when kill() | 
| - // is called. It is an edge trigger only and therefore has no | 
| - // persistent readable/wriable/error state. | 
| - uint32_t GetEventStatus() { | 
| - return 0; | 
| - } | 
| - | 
| - int GetType() { | 
| - // For lack of a better type, report socket to signify it can be in an | 
| - // used to signal. | 
| - return S_IFSOCK; | 
| - } | 
| - | 
| - void SignalOccurred() { | 
| - RaiseEvent(POLLERR); | 
| - } | 
| -}; | 
| KernelProxy::KernelProxy() : dev_(0), ppapi_(NULL), | 
| sigwinch_handler_(SIG_IGN), | 
| - signal_emitter_(new SignalEmitter) { | 
| + signal_emitter_(new EventEmitterSignal) { | 
| } | 
| @@ -132,8 +114,8 @@ Error KernelProxy::Init(PepperInterface* ppapi) { | 
| #endif | 
| StringMap_t args; | 
| - socket_mount_.reset(new MountSocket()); | 
| - result = socket_mount_->Init(0, args, ppapi); | 
| + stream_mount_.reset(new MountStream()); | 
| + result = stream_mount_->Init(0, args, ppapi); | 
| if (result != 0) { | 
| assert(false); | 
| rtn = result; | 
| @@ -193,6 +175,33 @@ int KernelProxy::open(const char* path, int oflags) { | 
| return AllocateFD(handle); | 
| } | 
| +int KernelProxy::pipe(int pipefds[2]) { | 
| + MountNodePipe* pipe = new MountNodePipe(stream_mount_.get()); | 
| + ScopedMountNode node(pipe); | 
| + | 
| + if (pipe->Init(S_IREAD | S_IWRITE) == 0) { | 
| + ScopedKernelHandle handle0(new KernelHandle(stream_mount_, node)); | 
| + ScopedKernelHandle handle1(new KernelHandle(stream_mount_, node)); | 
| + | 
| + handle0->Init(S_IREAD); | 
| 
binji
2013/09/12 01:47:57
check return from Init?
 
noelallen1
2013/09/12 23:19:03
Done.
 | 
| + handle1->Init(S_IWRITE); | 
| + | 
| + pipefds[0] = AllocateFD(handle0); | 
| + if (pipefds[0] == 0) | 
| 
binji
2013/09/12 01:47:57
AllocateFD can never return a zero FD.
 
noelallen1
2013/09/12 23:19:03
Done.
 | 
| + return -1; | 
| + | 
| + pipefds[1] = AllocateFD(handle1); | 
| + if (pipefds[1] == 0) { | 
| + close(pipefds[0]); | 
| + return -1; | 
| + } | 
| + | 
| + return 0; | 
| + } | 
| + | 
| + return -1; | 
| 
binji
2013/09/12 01:47:57
set errno?
 
noelallen1
2013/09/12 23:19:03
Done.
 | 
| +} | 
| + | 
| int KernelProxy::close(int fd) { | 
| ScopedKernelHandle handle; | 
| Error error = AcquireHandle(fd, &handle); | 
| @@ -773,7 +782,7 @@ int KernelProxy::kill(pid_t pid, int sig) { | 
| } | 
| // Raise an event so that select/poll get interrupted. | 
| - signal_emitter_->SignalOccurred(); | 
| + signal_emitter_->Raise(POLLERR); | 
| switch (sig) { | 
| case SIGWINCH: | 
| if (sigwinch_handler_ != SIG_IGN) | 
| @@ -788,7 +797,6 @@ int KernelProxy::kill(pid_t pid, int sig) { | 
| errno = EINVAL; | 
| return -1; | 
| } | 
| - | 
| return 0; | 
| } | 
| @@ -831,22 +839,10 @@ sighandler_t KernelProxy::sigset(int signum, sighandler_t handler) { | 
| int KernelProxy::select(int nfds, fd_set* readfds, fd_set* writefds, | 
| fd_set* exceptfds, struct timeval* timeout) { | 
| - ScopedEventListener listener(new EventListener); | 
| - | 
| - std::vector<struct pollfd> fds; | 
| + std::vector<pollfd> pollfds; | 
| - fd_set readout, writeout, exceptout; | 
| - | 
| - FD_ZERO(&readout); | 
| - FD_ZERO(&writeout); | 
| - FD_ZERO(&exceptout); | 
| - | 
| - int fd; | 
| - size_t event_cnt = 0; | 
| - int event_track = 0; | 
| - for (fd = 0; fd < nfds; fd++) { | 
| + for (int fd = 0; fd < nfds; fd++) { | 
| int events = 0; | 
| - | 
| if (readfds != NULL && FD_ISSET(fd, readfds)) | 
| events |= POLLIN; | 
| @@ -856,179 +852,141 @@ int KernelProxy::select(int nfds, fd_set* readfds, fd_set* writefds, | 
| if (exceptfds != NULL && FD_ISSET(fd, exceptfds)) | 
| events |= POLLERR | POLLHUP; | 
| - // If we are not interested in this FD, skip it | 
| - if (0 == events) continue; | 
| - | 
| - ScopedKernelHandle handle; | 
| - Error err = AcquireHandle(fd, &handle); | 
| - | 
| - // Select will return immediately if there are bad FDs. | 
| - if (err != 0) { | 
| - errno = EBADF; | 
| - return -1; | 
| - } | 
| - | 
| - int status = handle->node()->GetEventStatus() & events; | 
| - if (status & POLLIN) { | 
| - FD_SET(fd, &readout); | 
| - event_cnt++; | 
| - } | 
| - | 
| - if (status & POLLOUT) { | 
| - FD_SET(fd, &writeout); | 
| - event_cnt++; | 
| - } | 
| - | 
| - if (status & (POLLERR | POLLHUP)) { | 
| - FD_SET(fd, &exceptout); | 
| - event_cnt++; | 
| - } | 
| - | 
| - // Otherwise track it. | 
| - if (0 == status) { | 
| - err = listener->Track(fd, handle->node(), events, fd); | 
| - if (err != 0) { | 
| - errno = EBADF; | 
| - return -1; | 
| - } | 
| - event_track++; | 
| + if (events) { | 
| + pollfd info; | 
| + info.fd = fd; | 
| + info.events = events; | 
| + pollfds.push_back(info); | 
| } | 
| } | 
| - // If nothing is signaled, then we must wait. | 
| - if (event_cnt == 0) { | 
| - std::vector<EventData> events; | 
| - int ready_cnt; | 
| - int ms_timeout; | 
| - | 
| - // NULL timeout signals wait forever. | 
| - if (timeout == NULL) { | 
| - ms_timeout = -1; | 
| - } else { | 
| - int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000); | 
| - | 
| - // If the timeout is invalid or too long (larger than signed 32 bit). | 
| - if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) || | 
| - (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) || | 
| - (ms < 0) || (ms >= INT_MAX)) { | 
| - errno = EINVAL; | 
| - return -1; | 
| - } | 
| + // NULL timeout signals wait forever. | 
| + int ms_timeout = -1; | 
| + if (timeout != NULL) { | 
| + int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000); | 
| - ms_timeout = static_cast<int>(ms); | 
| + // If the timeout is invalid or too long (larger than signed 32 bit). | 
| + if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) || | 
| + (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) || | 
| + (ms < 0) || (ms >= INT_MAX)) { | 
| + errno = EINVAL; | 
| + return -1; | 
| } | 
| - // Add a special node to listen for events | 
| - // coming from the KernelProxy itself (kill will | 
| - // generated a SIGERR event). | 
| - listener->Track(-1, signal_emitter_, POLLERR, -1); | 
| - event_track += 1; | 
| - | 
| - events.resize(event_track); | 
| + ms_timeout = static_cast<int>(ms); | 
| + } | 
| - bool interrupted = false; | 
| - listener->Wait(events.data(), event_track, ms_timeout, &ready_cnt); | 
| - for (fd = 0; static_cast<int>(fd) < ready_cnt; fd++) { | 
| - if (events[fd].user_data == static_cast<uint64_t>(-1)) { | 
| - if (events[fd].events & POLLERR) { | 
| - interrupted = true; | 
| - } | 
| - continue; | 
| - } | 
| + int result = poll(&pollfds[0], pollfds.size(), ms_timeout); | 
| + if (result == -1) | 
| + return -1; | 
| - if (events[fd].events & POLLIN) { | 
| - FD_SET(events[fd].user_data, &readout); | 
| - event_cnt++; | 
| - } | 
| + FD_ZERO(readfds); | 
| + FD_ZERO(writefds); | 
| + FD_ZERO(exceptfds); | 
| - if (events[fd].events & POLLOUT) { | 
| - FD_SET(events[fd].user_data, &writeout); | 
| - event_cnt++; | 
| - } | 
| - | 
| - if (events[fd].events & (POLLERR | POLLHUP)) { | 
| - FD_SET(events[fd].user_data, &exceptout); | 
| - event_cnt++; | 
| - } | 
| + int event_cnt = 0; | 
| + for (size_t index = 0; index < pollfds.size(); index++) { | 
| + pollfd* info = &pollfds[index]; | 
| + if (info->revents & POLLIN) { | 
| + FD_SET(info->fd, readfds); | 
| + event_cnt++; | 
| } | 
| - | 
| - if (0 == event_cnt && interrupted) { | 
| - errno = EINTR; | 
| - return -1; | 
| + if (info->revents & POLLOUT) { | 
| + FD_SET(info->fd, writefds); | 
| + event_cnt++; | 
| + } | 
| + if (info->revents & POLLHUP) { | 
| 
binji
2013/09/12 01:47:57
not POLLERR? You request it above.
 
noelallen1
2013/09/12 23:19:03
Done.
 | 
| + FD_SET(info->fd, exceptfds); | 
| + event_cnt++; | 
| } | 
| } | 
| - // Copy out the results | 
| - if (readfds != NULL) | 
| - *readfds = readout; | 
| + return event_cnt; | 
| +} | 
| - if (writefds != NULL) | 
| - *writefds = writeout; | 
| +struct PollInfo { | 
| + PollInfo() : index(-1) {}; | 
| - if (exceptfds != NULL) | 
| - *exceptfds = exceptout; | 
| + std::vector<struct pollfd*> fds; | 
| + int index; | 
| +}; | 
| - return event_cnt; | 
| -} | 
| +typedef std::map<EventEmitter*, PollInfo> EventPollMap_t; | 
| int KernelProxy::poll(struct pollfd *fds, nfds_t nfds, int timeout) { | 
| - ScopedEventListener listener(new EventListener); | 
| - listener->Track(-1, signal_emitter_, POLLERR, 0); | 
| + EventPollMap_t event_map; | 
| - int index; | 
| + std::vector<EventRequest> requests; | 
| size_t event_cnt = 0; | 
| - size_t event_track = 1; | 
| - for (index = 0; static_cast<nfds_t>(index) < nfds; index++) { | 
| + | 
| + for (int index = 0; static_cast<nfds_t>(index) < nfds; index++) { | 
| ScopedKernelHandle handle; | 
| - struct pollfd* info = &fds[index]; | 
| - Error err = AcquireHandle(info->fd, &handle); | 
| + struct pollfd* fd_info = &fds[index]; | 
| + Error err = AcquireHandle(fd_info->fd, &handle); | 
| // If the node isn't open, or somehow invalid, mark it so. | 
| if (err != 0) { | 
| - info->revents = POLLNVAL; | 
| + fd_info->revents = POLLNVAL; | 
| event_cnt++; | 
| continue; | 
| } | 
| // If it's already signaled, then just capture the event | 
| - if (handle->node()->GetEventStatus() & info->events) { | 
| - info->revents = info->events & handle->node()->GetEventStatus(); | 
| + ScopedEventEmitter emitter(handle->node()->GetEventEmitter()); | 
| + int events = POLLIN | POLLOUT; | 
| + if (emitter) | 
| + events = emitter->GetEventStatus(); | 
| + | 
| + if (events & fd_info->events) { | 
| + fd_info->revents = fd_info->events & events; | 
| 
binji
2013/09/12 01:47:57
change to "events & fd_info->events" like above?
 
noelallen1
2013/09/12 23:19:03
Done.
 | 
| event_cnt++; | 
| continue; | 
| } | 
| - // Otherwise try to track it. | 
| - err = listener->Track(info->fd, handle->node(), info->events, index); | 
| - if (err != 0) { | 
| - info->revents = POLLNVAL; | 
| + if (NULL == emitter) { | 
| + fd_info->revents = POLLNVAL; | 
| event_cnt++; | 
| continue; | 
| } | 
| - event_track++; | 
| + | 
| + // Otherwise try to track it. | 
| + PollInfo* info = &event_map[emitter.get()]; | 
| 
binji
2013/09/12 01:47:57
this is a lot of extra work to prevent sending the
 
noelallen1
2013/09/12 23:19:03
Having a one-to-one mapping between Listener and E
 | 
| + if (info->index == -1) { | 
| + EventRequest request; | 
| + request.emitter = emitter; | 
| + request.filter = fd_info->events; | 
| + request.events = 0; | 
| + | 
| + info->index = requests.size(); | 
| + requests.push_back(request); | 
| + } | 
| + info->fds.push_back(fd_info); | 
| + requests[info->index].filter |= fd_info->events; | 
| } | 
| - // If nothing is signaled, then we must wait. | 
| + // If nothing is signaled, then we must wait on the event map | 
| if (0 == event_cnt) { | 
| - std::vector<EventData> events; | 
| - int ready_cnt; | 
| - | 
| - bool interrupted = false; | 
| - events.resize(event_track); | 
| - listener->Wait(events.data(), event_track, timeout, &ready_cnt); | 
| - for (index = 0; index < ready_cnt; index++) { | 
| - struct pollfd* info = &fds[events[index].user_data]; | 
| - if (!info) { | 
| - interrupted = true; | 
| - continue; | 
| - } | 
| - | 
| - info->revents = events[index].events; | 
| - event_cnt++; | 
| - } | 
| - if (0 == event_cnt && interrupted) { | 
| - errno = EINTR; | 
| + EventListenerPoll wait; | 
| + Error err = wait.WaitOnAny(&requests[0], requests.size(), timeout); | 
| + if (err != 0) { | 
| + errno = err; | 
| return -1; | 
| } | 
| + | 
| + for (size_t rindex = 0; rindex < requests.size(); rindex++) { | 
| + EventRequest* request = &requests[rindex]; | 
| + if (request->events) { | 
| + PollInfo* poll_info = &event_map[request->emitter.get()]; | 
| + for (size_t findex = 0; findex < poll_info->fds.size(); findex++) { | 
| + struct pollfd* fd_info = poll_info->fds[findex]; | 
| + uint32_t events = fd_info->events & request->events; | 
| + if (events) { | 
| + fd_info->revents = events; | 
| + event_cnt++; | 
| + } | 
| + } | 
| + } | 
| + } | 
| } | 
| return event_cnt; | 
| @@ -1337,11 +1295,11 @@ int KernelProxy::socket(int domain, int type, int protocol) { | 
| MountNodeSocket* sock = NULL; | 
| switch (type) { | 
| case SOCK_DGRAM: | 
| - sock = new MountNodeUDP(socket_mount_.get()); | 
| + sock = new MountNodeUDP(stream_mount_.get()); | 
| break; | 
| case SOCK_STREAM: | 
| - sock = new MountNodeTCP(socket_mount_.get()); | 
| + sock = new MountNodeTCP(stream_mount_.get()); | 
| break; | 
| default: | 
| @@ -1351,7 +1309,7 @@ int KernelProxy::socket(int domain, int type, int protocol) { | 
| ScopedMountNode node(sock); | 
| if (sock->Init(S_IREAD | S_IWRITE) == 0) { | 
| - ScopedKernelHandle handle(new KernelHandle(socket_mount_, node)); | 
| + ScopedKernelHandle handle(new KernelHandle(stream_mount_, node)); | 
| return AllocateFD(handle); | 
| } |