Chromium Code Reviews| Index: native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc |
| diff --git a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc |
| index a929a6a169a53ec30a4fb115c0f274d610ba101b..9818f83dd0b46437682fad3d3c088cb1614acbf4 100644 |
| --- a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc |
| +++ b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc |
| @@ -29,9 +29,11 @@ |
| #include "nacl_io/mount_http.h" |
| #include "nacl_io/mount_mem.h" |
| #include "nacl_io/mount_node.h" |
| +#include "nacl_io/mount_node_pipe.h" |
| #include "nacl_io/mount_node_tcp.h" |
| #include "nacl_io/mount_node_udp.h" |
| #include "nacl_io/mount_passthrough.h" |
| +#include "nacl_io/mount_stream.h" |
| #include "nacl_io/osmman.h" |
| #include "nacl_io/ossocket.h" |
| #include "nacl_io/osstat.h" |
| @@ -48,30 +50,10 @@ |
| namespace nacl_io { |
| -class SignalEmitter : public EventEmitter { |
| - public: |
| - // From EventEmitter. The SignalEmitter exists in order |
| - // to inturrupt anything waiting in select()/poll() when kill() |
| - // is called. It is an edge trigger only and therefore has no |
| - // persistent readable/wriable/error state. |
| - uint32_t GetEventStatus() { |
| - return 0; |
| - } |
| - |
| - int GetType() { |
| - // For lack of a better type, report socket to signify it can be in an |
| - // used to signal. |
| - return S_IFSOCK; |
| - } |
| - |
| - void SignalOccurred() { |
| - RaiseEvent(POLLERR); |
| - } |
| -}; |
| KernelProxy::KernelProxy() : dev_(0), ppapi_(NULL), |
| sigwinch_handler_(SIG_IGN), |
| - signal_emitter_(new SignalEmitter) { |
| + signal_emitter_(new EventEmitterSignal) { |
| } |
| @@ -132,8 +114,8 @@ Error KernelProxy::Init(PepperInterface* ppapi) { |
| #endif |
| StringMap_t args; |
| - socket_mount_.reset(new MountSocket()); |
| - result = socket_mount_->Init(0, args, ppapi); |
| + stream_mount_.reset(new MountStream()); |
| + result = stream_mount_->Init(0, args, ppapi); |
| if (result != 0) { |
| assert(false); |
| rtn = result; |
| @@ -193,6 +175,33 @@ int KernelProxy::open(const char* path, int oflags) { |
| return AllocateFD(handle); |
| } |
| +int KernelProxy::pipe(int pipefds[2]) { |
| + MountNodePipe* pipe = new MountNodePipe(stream_mount_.get()); |
| + ScopedMountNode node(pipe); |
| + |
| + if (pipe->Init(S_IREAD | S_IWRITE) == 0) { |
| + ScopedKernelHandle handle0(new KernelHandle(stream_mount_, node)); |
| + ScopedKernelHandle handle1(new KernelHandle(stream_mount_, node)); |
| + |
| + handle0->Init(S_IREAD); |
|
binji
2013/09/12 01:47:57
check return from Init?
noelallen1
2013/09/12 23:19:03
Done.
|
| + handle1->Init(S_IWRITE); |
| + |
| + pipefds[0] = AllocateFD(handle0); |
| + if (pipefds[0] == 0) |
|
binji
2013/09/12 01:47:57
AllocateFD can never return a zero FD.
noelallen1
2013/09/12 23:19:03
Done.
|
| + return -1; |
| + |
| + pipefds[1] = AllocateFD(handle1); |
| + if (pipefds[1] == 0) { |
| + close(pipefds[0]); |
| + return -1; |
| + } |
| + |
| + return 0; |
| + } |
| + |
| + return -1; |
|
binji
2013/09/12 01:47:57
set errno?
noelallen1
2013/09/12 23:19:03
Done.
|
| +} |
| + |
| int KernelProxy::close(int fd) { |
| ScopedKernelHandle handle; |
| Error error = AcquireHandle(fd, &handle); |
| @@ -773,7 +782,7 @@ int KernelProxy::kill(pid_t pid, int sig) { |
| } |
| // Raise an event so that select/poll get interrupted. |
| - signal_emitter_->SignalOccurred(); |
| + signal_emitter_->Raise(POLLERR); |
| switch (sig) { |
| case SIGWINCH: |
| if (sigwinch_handler_ != SIG_IGN) |
| @@ -788,7 +797,6 @@ int KernelProxy::kill(pid_t pid, int sig) { |
| errno = EINVAL; |
| return -1; |
| } |
| - |
| return 0; |
| } |
| @@ -831,22 +839,10 @@ sighandler_t KernelProxy::sigset(int signum, sighandler_t handler) { |
| int KernelProxy::select(int nfds, fd_set* readfds, fd_set* writefds, |
| fd_set* exceptfds, struct timeval* timeout) { |
| - ScopedEventListener listener(new EventListener); |
| - |
| - std::vector<struct pollfd> fds; |
| + std::vector<pollfd> pollfds; |
| - fd_set readout, writeout, exceptout; |
| - |
| - FD_ZERO(&readout); |
| - FD_ZERO(&writeout); |
| - FD_ZERO(&exceptout); |
| - |
| - int fd; |
| - size_t event_cnt = 0; |
| - int event_track = 0; |
| - for (fd = 0; fd < nfds; fd++) { |
| + for (int fd = 0; fd < nfds; fd++) { |
| int events = 0; |
| - |
| if (readfds != NULL && FD_ISSET(fd, readfds)) |
| events |= POLLIN; |
| @@ -856,179 +852,141 @@ int KernelProxy::select(int nfds, fd_set* readfds, fd_set* writefds, |
| if (exceptfds != NULL && FD_ISSET(fd, exceptfds)) |
| events |= POLLERR | POLLHUP; |
| - // If we are not interested in this FD, skip it |
| - if (0 == events) continue; |
| - |
| - ScopedKernelHandle handle; |
| - Error err = AcquireHandle(fd, &handle); |
| - |
| - // Select will return immediately if there are bad FDs. |
| - if (err != 0) { |
| - errno = EBADF; |
| - return -1; |
| - } |
| - |
| - int status = handle->node()->GetEventStatus() & events; |
| - if (status & POLLIN) { |
| - FD_SET(fd, &readout); |
| - event_cnt++; |
| - } |
| - |
| - if (status & POLLOUT) { |
| - FD_SET(fd, &writeout); |
| - event_cnt++; |
| - } |
| - |
| - if (status & (POLLERR | POLLHUP)) { |
| - FD_SET(fd, &exceptout); |
| - event_cnt++; |
| - } |
| - |
| - // Otherwise track it. |
| - if (0 == status) { |
| - err = listener->Track(fd, handle->node(), events, fd); |
| - if (err != 0) { |
| - errno = EBADF; |
| - return -1; |
| - } |
| - event_track++; |
| + if (events) { |
| + pollfd info; |
| + info.fd = fd; |
| + info.events = events; |
| + pollfds.push_back(info); |
| } |
| } |
| - // If nothing is signaled, then we must wait. |
| - if (event_cnt == 0) { |
| - std::vector<EventData> events; |
| - int ready_cnt; |
| - int ms_timeout; |
| - |
| - // NULL timeout signals wait forever. |
| - if (timeout == NULL) { |
| - ms_timeout = -1; |
| - } else { |
| - int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000); |
| - |
| - // If the timeout is invalid or too long (larger than signed 32 bit). |
| - if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) || |
| - (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) || |
| - (ms < 0) || (ms >= INT_MAX)) { |
| - errno = EINVAL; |
| - return -1; |
| - } |
| + // NULL timeout signals wait forever. |
| + int ms_timeout = -1; |
| + if (timeout != NULL) { |
| + int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000); |
| - ms_timeout = static_cast<int>(ms); |
| + // If the timeout is invalid or too long (larger than signed 32 bit). |
| + if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) || |
| + (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) || |
| + (ms < 0) || (ms >= INT_MAX)) { |
| + errno = EINVAL; |
| + return -1; |
| } |
| - // Add a special node to listen for events |
| - // coming from the KernelProxy itself (kill will |
| - // generated a SIGERR event). |
| - listener->Track(-1, signal_emitter_, POLLERR, -1); |
| - event_track += 1; |
| - |
| - events.resize(event_track); |
| + ms_timeout = static_cast<int>(ms); |
| + } |
| - bool interrupted = false; |
| - listener->Wait(events.data(), event_track, ms_timeout, &ready_cnt); |
| - for (fd = 0; static_cast<int>(fd) < ready_cnt; fd++) { |
| - if (events[fd].user_data == static_cast<uint64_t>(-1)) { |
| - if (events[fd].events & POLLERR) { |
| - interrupted = true; |
| - } |
| - continue; |
| - } |
| + int result = poll(&pollfds[0], pollfds.size(), ms_timeout); |
| + if (result == -1) |
| + return -1; |
| - if (events[fd].events & POLLIN) { |
| - FD_SET(events[fd].user_data, &readout); |
| - event_cnt++; |
| - } |
| + FD_ZERO(readfds); |
| + FD_ZERO(writefds); |
| + FD_ZERO(exceptfds); |
| - if (events[fd].events & POLLOUT) { |
| - FD_SET(events[fd].user_data, &writeout); |
| - event_cnt++; |
| - } |
| - |
| - if (events[fd].events & (POLLERR | POLLHUP)) { |
| - FD_SET(events[fd].user_data, &exceptout); |
| - event_cnt++; |
| - } |
| + int event_cnt = 0; |
| + for (size_t index = 0; index < pollfds.size(); index++) { |
| + pollfd* info = &pollfds[index]; |
| + if (info->revents & POLLIN) { |
| + FD_SET(info->fd, readfds); |
| + event_cnt++; |
| } |
| - |
| - if (0 == event_cnt && interrupted) { |
| - errno = EINTR; |
| - return -1; |
| + if (info->revents & POLLOUT) { |
| + FD_SET(info->fd, writefds); |
| + event_cnt++; |
| + } |
| + if (info->revents & POLLHUP) { |
|
binji
2013/09/12 01:47:57
not POLLERR? You request it above.
noelallen1
2013/09/12 23:19:03
Done.
|
| + FD_SET(info->fd, exceptfds); |
| + event_cnt++; |
| } |
| } |
| - // Copy out the results |
| - if (readfds != NULL) |
| - *readfds = readout; |
| + return event_cnt; |
| +} |
| - if (writefds != NULL) |
| - *writefds = writeout; |
| +struct PollInfo { |
| + PollInfo() : index(-1) {}; |
| - if (exceptfds != NULL) |
| - *exceptfds = exceptout; |
| + std::vector<struct pollfd*> fds; |
| + int index; |
| +}; |
| - return event_cnt; |
| -} |
| +typedef std::map<EventEmitter*, PollInfo> EventPollMap_t; |
| int KernelProxy::poll(struct pollfd *fds, nfds_t nfds, int timeout) { |
| - ScopedEventListener listener(new EventListener); |
| - listener->Track(-1, signal_emitter_, POLLERR, 0); |
| + EventPollMap_t event_map; |
| - int index; |
| + std::vector<EventRequest> requests; |
| size_t event_cnt = 0; |
| - size_t event_track = 1; |
| - for (index = 0; static_cast<nfds_t>(index) < nfds; index++) { |
| + |
| + for (int index = 0; static_cast<nfds_t>(index) < nfds; index++) { |
| ScopedKernelHandle handle; |
| - struct pollfd* info = &fds[index]; |
| - Error err = AcquireHandle(info->fd, &handle); |
| + struct pollfd* fd_info = &fds[index]; |
| + Error err = AcquireHandle(fd_info->fd, &handle); |
| // If the node isn't open, or somehow invalid, mark it so. |
| if (err != 0) { |
| - info->revents = POLLNVAL; |
| + fd_info->revents = POLLNVAL; |
| event_cnt++; |
| continue; |
| } |
| // If it's already signaled, then just capture the event |
| - if (handle->node()->GetEventStatus() & info->events) { |
| - info->revents = info->events & handle->node()->GetEventStatus(); |
| + ScopedEventEmitter emitter(handle->node()->GetEventEmitter()); |
| + int events = POLLIN | POLLOUT; |
| + if (emitter) |
| + events = emitter->GetEventStatus(); |
| + |
| + if (events & fd_info->events) { |
| + fd_info->revents = fd_info->events & events; |
|
binji
2013/09/12 01:47:57
change to "events & fd_info->events" like above?
noelallen1
2013/09/12 23:19:03
Done.
|
| event_cnt++; |
| continue; |
| } |
| - // Otherwise try to track it. |
| - err = listener->Track(info->fd, handle->node(), info->events, index); |
| - if (err != 0) { |
| - info->revents = POLLNVAL; |
| + if (NULL == emitter) { |
| + fd_info->revents = POLLNVAL; |
| event_cnt++; |
| continue; |
| } |
| - event_track++; |
| + |
| + // Otherwise try to track it. |
| + PollInfo* info = &event_map[emitter.get()]; |
|
binji
2013/09/12 01:47:57
this is a lot of extra work to prevent sending the
noelallen1
2013/09/12 23:19:03
Having a one-to-one mapping between Listener and E
|
| + if (info->index == -1) { |
| + EventRequest request; |
| + request.emitter = emitter; |
| + request.filter = fd_info->events; |
| + request.events = 0; |
| + |
| + info->index = requests.size(); |
| + requests.push_back(request); |
| + } |
| + info->fds.push_back(fd_info); |
| + requests[info->index].filter |= fd_info->events; |
| } |
| - // If nothing is signaled, then we must wait. |
| + // If nothing is signaled, then we must wait on the event map |
| if (0 == event_cnt) { |
| - std::vector<EventData> events; |
| - int ready_cnt; |
| - |
| - bool interrupted = false; |
| - events.resize(event_track); |
| - listener->Wait(events.data(), event_track, timeout, &ready_cnt); |
| - for (index = 0; index < ready_cnt; index++) { |
| - struct pollfd* info = &fds[events[index].user_data]; |
| - if (!info) { |
| - interrupted = true; |
| - continue; |
| - } |
| - |
| - info->revents = events[index].events; |
| - event_cnt++; |
| - } |
| - if (0 == event_cnt && interrupted) { |
| - errno = EINTR; |
| + EventListenerPoll wait; |
| + Error err = wait.WaitOnAny(&requests[0], requests.size(), timeout); |
| + if (err != 0) { |
| + errno = err; |
| return -1; |
| } |
| + |
| + for (size_t rindex = 0; rindex < requests.size(); rindex++) { |
| + EventRequest* request = &requests[rindex]; |
| + if (request->events) { |
| + PollInfo* poll_info = &event_map[request->emitter.get()]; |
| + for (size_t findex = 0; findex < poll_info->fds.size(); findex++) { |
| + struct pollfd* fd_info = poll_info->fds[findex]; |
| + uint32_t events = fd_info->events & request->events; |
| + if (events) { |
| + fd_info->revents = events; |
| + event_cnt++; |
| + } |
| + } |
| + } |
| + } |
| } |
| return event_cnt; |
| @@ -1337,11 +1295,11 @@ int KernelProxy::socket(int domain, int type, int protocol) { |
| MountNodeSocket* sock = NULL; |
| switch (type) { |
| case SOCK_DGRAM: |
| - sock = new MountNodeUDP(socket_mount_.get()); |
| + sock = new MountNodeUDP(stream_mount_.get()); |
| break; |
| case SOCK_STREAM: |
| - sock = new MountNodeTCP(socket_mount_.get()); |
| + sock = new MountNodeTCP(stream_mount_.get()); |
| break; |
| default: |
| @@ -1351,7 +1309,7 @@ int KernelProxy::socket(int domain, int type, int protocol) { |
| ScopedMountNode node(sock); |
| if (sock->Init(S_IREAD | S_IWRITE) == 0) { |
| - ScopedKernelHandle handle(new KernelHandle(socket_mount_, node)); |
| + ScopedKernelHandle handle(new KernelHandle(stream_mount_, node)); |
| return AllocateFD(handle); |
| } |