| Index: native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
|
| diff --git a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
|
| index a929a6a169a53ec30a4fb115c0f274d610ba101b..86fbac752af2787bc8b2f90cb39ff481616b4ab7 100644
|
| --- a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
|
| +++ b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
|
| @@ -29,9 +29,11 @@
|
| #include "nacl_io/mount_http.h"
|
| #include "nacl_io/mount_mem.h"
|
| #include "nacl_io/mount_node.h"
|
| +#include "nacl_io/mount_node_pipe.h"
|
| #include "nacl_io/mount_node_tcp.h"
|
| #include "nacl_io/mount_node_udp.h"
|
| #include "nacl_io/mount_passthrough.h"
|
| +#include "nacl_io/mount_stream.h"
|
| #include "nacl_io/osmman.h"
|
| #include "nacl_io/ossocket.h"
|
| #include "nacl_io/osstat.h"
|
| @@ -48,30 +50,10 @@
|
|
|
| namespace nacl_io {
|
|
|
| -class SignalEmitter : public EventEmitter {
|
| - public:
|
| - // From EventEmitter. The SignalEmitter exists in order
|
| - // to inturrupt anything waiting in select()/poll() when kill()
|
| - // is called. It is an edge trigger only and therefore has no
|
| - // persistent readable/wriable/error state.
|
| - uint32_t GetEventStatus() {
|
| - return 0;
|
| - }
|
| -
|
| - int GetType() {
|
| - // For lack of a better type, report socket to signify it can be in an
|
| - // used to signal.
|
| - return S_IFSOCK;
|
| - }
|
| -
|
| - void SignalOccurred() {
|
| - RaiseEvent(POLLERR);
|
| - }
|
| -};
|
|
|
| KernelProxy::KernelProxy() : dev_(0), ppapi_(NULL),
|
| sigwinch_handler_(SIG_IGN),
|
| - signal_emitter_(new SignalEmitter) {
|
| + signal_emitter_(new EventEmitter) {
|
|
|
| }
|
|
|
| @@ -132,8 +114,8 @@ Error KernelProxy::Init(PepperInterface* ppapi) {
|
| #endif
|
|
|
| StringMap_t args;
|
| - socket_mount_.reset(new MountSocket());
|
| - result = socket_mount_->Init(0, args, ppapi);
|
| + stream_mount_.reset(new MountStream());
|
| + result = stream_mount_->Init(0, args, ppapi);
|
| if (result != 0) {
|
| assert(false);
|
| rtn = result;
|
| @@ -193,6 +175,29 @@ int KernelProxy::open(const char* path, int oflags) {
|
| return AllocateFD(handle);
|
| }
|
|
|
| +int KernelProxy::pipe(int pipefds[2]) {
|
| + MountNodePipe* pipe = new MountNodePipe(stream_mount_.get());
|
| + ScopedMountNode node(pipe);
|
| +
|
| + if (pipe->Init(S_IREAD | S_IWRITE) == 0) {
|
| + ScopedKernelHandle handle0(new KernelHandle(stream_mount_, node));
|
| + ScopedKernelHandle handle1(new KernelHandle(stream_mount_, node));
|
| +
|
| + // Should never fail, but...
|
| + if (handle0->Init(S_IREAD) || handle1->Init(S_IWRITE)) {
|
| + errno = EACCES;
|
| + return -1;
|
| + }
|
| +
|
| + pipefds[0] = AllocateFD(handle0);
|
| + pipefds[1] = AllocateFD(handle1);
|
| + return 0;
|
| + }
|
| +
|
| + errno = ENOSYS;
|
| + return -1;
|
| +}
|
| +
|
| int KernelProxy::close(int fd) {
|
| ScopedKernelHandle handle;
|
| Error error = AcquireHandle(fd, &handle);
|
| @@ -773,7 +778,8 @@ int KernelProxy::kill(pid_t pid, int sig) {
|
| }
|
|
|
| // Raise an event so that select/poll get interrupted.
|
| - signal_emitter_->SignalOccurred();
|
| + AUTO_LOCK(signal_emitter_->GetLock())
|
| + signal_emitter_->RaiseEvents_Locked(POLLERR);
|
| switch (sig) {
|
| case SIGWINCH:
|
| if (sigwinch_handler_ != SIG_IGN)
|
| @@ -788,7 +794,6 @@ int KernelProxy::kill(pid_t pid, int sig) {
|
| errno = EINVAL;
|
| return -1;
|
| }
|
| -
|
| return 0;
|
| }
|
|
|
| @@ -831,204 +836,164 @@ sighandler_t KernelProxy::sigset(int signum, sighandler_t handler) {
|
|
|
| int KernelProxy::select(int nfds, fd_set* readfds, fd_set* writefds,
|
| fd_set* exceptfds, struct timeval* timeout) {
|
| - ScopedEventListener listener(new EventListener);
|
| -
|
| - std::vector<struct pollfd> fds;
|
| -
|
| - fd_set readout, writeout, exceptout;
|
| -
|
| - FD_ZERO(&readout);
|
| - FD_ZERO(&writeout);
|
| - FD_ZERO(&exceptout);
|
| -
|
| - int fd;
|
| - size_t event_cnt = 0;
|
| - int event_track = 0;
|
| - for (fd = 0; fd < nfds; fd++) {
|
| + fd_set ignore;
|
| + std::vector<pollfd> pollfds;
|
| +
|
| + // Simplify logic, by using an IGNORE set for any undefined set
|
| + FD_ZERO(&ignore);
|
| + if (NULL == readfds)
|
| + readfds = &ignore;
|
| + if (NULL == writefds)
|
| + writefds = &ignore;
|
| + if (NULL == exceptfds)
|
| + exceptfds = &ignore;
|
| +
|
| + for (int fd = 0; fd < nfds; fd++) {
|
| int events = 0;
|
| -
|
| - if (readfds != NULL && FD_ISSET(fd, readfds))
|
| + if (FD_ISSET(fd, readfds))
|
| events |= POLLIN;
|
|
|
| - if (writefds != NULL && FD_ISSET(fd, writefds))
|
| + if (FD_ISSET(fd, writefds))
|
| events |= POLLOUT;
|
|
|
| - if (exceptfds != NULL && FD_ISSET(fd, exceptfds))
|
| + if (FD_ISSET(fd, exceptfds))
|
| events |= POLLERR | POLLHUP;
|
|
|
| - // If we are not interested in this FD, skip it
|
| - if (0 == events) continue;
|
| -
|
| - ScopedKernelHandle handle;
|
| - Error err = AcquireHandle(fd, &handle);
|
| -
|
| - // Select will return immediately if there are bad FDs.
|
| - if (err != 0) {
|
| - errno = EBADF;
|
| - return -1;
|
| + if (events) {
|
| + pollfd info;
|
| + info.fd = fd;
|
| + info.events = events;
|
| + pollfds.push_back(info);
|
| }
|
| + }
|
|
|
| - int status = handle->node()->GetEventStatus() & events;
|
| - if (status & POLLIN) {
|
| - FD_SET(fd, &readout);
|
| - event_cnt++;
|
| - }
|
| + FD_ZERO(readfds);
|
| + FD_ZERO(writefds);
|
| + FD_ZERO(exceptfds);
|
|
|
| - if (status & POLLOUT) {
|
| - FD_SET(fd, &writeout);
|
| - event_cnt++;
|
| - }
|
| + // NULL timeout signals wait forever.
|
| + int ms_timeout = -1;
|
| + if (timeout != NULL) {
|
| + int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000);
|
|
|
| - if (status & (POLLERR | POLLHUP)) {
|
| - FD_SET(fd, &exceptout);
|
| - event_cnt++;
|
| + // If the timeout is invalid or too long (larger than signed 32 bit).
|
| + if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) ||
|
| + (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) ||
|
| + (ms < 0) || (ms >= INT_MAX)) {
|
| + errno = EINVAL;
|
| + return -1;
|
| }
|
|
|
| - // Otherwise track it.
|
| - if (0 == status) {
|
| - err = listener->Track(fd, handle->node(), events, fd);
|
| - if (err != 0) {
|
| - errno = EBADF;
|
| - return -1;
|
| - }
|
| - event_track++;
|
| - }
|
| + ms_timeout = static_cast<int>(ms);
|
| }
|
|
|
| - // If nothing is signaled, then we must wait.
|
| - if (event_cnt == 0) {
|
| - std::vector<EventData> events;
|
| - int ready_cnt;
|
| - int ms_timeout;
|
| -
|
| - // NULL timeout signals wait forever.
|
| - if (timeout == NULL) {
|
| - ms_timeout = -1;
|
| - } else {
|
| - int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000);
|
| -
|
| - // If the timeout is invalid or too long (larger than signed 32 bit).
|
| - if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) ||
|
| - (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) ||
|
| - (ms < 0) || (ms >= INT_MAX)) {
|
| - errno = EINVAL;
|
| - return -1;
|
| - }
|
| + int result = poll(&pollfds[0], pollfds.size(), ms_timeout);
|
| + if (result == -1)
|
| + return -1;
|
|
|
| - ms_timeout = static_cast<int>(ms);
|
| + int event_cnt = 0;
|
| + for (size_t index = 0; index < pollfds.size(); index++) {
|
| + pollfd* info = &pollfds[index];
|
| + if (info->revents & POLLIN) {
|
| + FD_SET(info->fd, readfds);
|
| + event_cnt++;
|
| }
|
| -
|
| - // Add a special node to listen for events
|
| - // coming from the KernelProxy itself (kill will
|
| - // generated a SIGERR event).
|
| - listener->Track(-1, signal_emitter_, POLLERR, -1);
|
| - event_track += 1;
|
| -
|
| - events.resize(event_track);
|
| -
|
| - bool interrupted = false;
|
| - listener->Wait(events.data(), event_track, ms_timeout, &ready_cnt);
|
| - for (fd = 0; static_cast<int>(fd) < ready_cnt; fd++) {
|
| - if (events[fd].user_data == static_cast<uint64_t>(-1)) {
|
| - if (events[fd].events & POLLERR) {
|
| - interrupted = true;
|
| - }
|
| - continue;
|
| - }
|
| -
|
| - if (events[fd].events & POLLIN) {
|
| - FD_SET(events[fd].user_data, &readout);
|
| - event_cnt++;
|
| - }
|
| -
|
| - if (events[fd].events & POLLOUT) {
|
| - FD_SET(events[fd].user_data, &writeout);
|
| - event_cnt++;
|
| - }
|
| -
|
| - if (events[fd].events & (POLLERR | POLLHUP)) {
|
| - FD_SET(events[fd].user_data, &exceptout);
|
| - event_cnt++;
|
| - }
|
| + if (info->revents & POLLOUT) {
|
| + FD_SET(info->fd, writefds);
|
| + event_cnt++;
|
| }
|
| -
|
| - if (0 == event_cnt && interrupted) {
|
| - errno = EINTR;
|
| - return -1;
|
| + if (info->revents & (POLLHUP | POLLERR)) {
|
| + FD_SET(info->fd, exceptfds);
|
| + event_cnt++;
|
| }
|
| }
|
|
|
| - // Copy out the results
|
| - if (readfds != NULL)
|
| - *readfds = readout;
|
| + return event_cnt;
|
| +}
|
|
|
| - if (writefds != NULL)
|
| - *writefds = writeout;
|
| +struct PollInfo {
|
| + PollInfo() : index(-1) {};
|
|
|
| - if (exceptfds != NULL)
|
| - *exceptfds = exceptout;
|
| + std::vector<struct pollfd*> fds;
|
| + int index;
|
| +};
|
|
|
| - return event_cnt;
|
| -}
|
| +typedef std::map<EventEmitter*, PollInfo> EventPollMap_t;
|
|
|
| int KernelProxy::poll(struct pollfd *fds, nfds_t nfds, int timeout) {
|
| - ScopedEventListener listener(new EventListener);
|
| - listener->Track(-1, signal_emitter_, POLLERR, 0);
|
| + EventPollMap_t event_map;
|
|
|
| - int index;
|
| + std::vector<EventRequest> requests;
|
| size_t event_cnt = 0;
|
| - size_t event_track = 1;
|
| - for (index = 0; static_cast<nfds_t>(index) < nfds; index++) {
|
| +
|
| + for (int index = 0; static_cast<nfds_t>(index) < nfds; index++) {
|
| ScopedKernelHandle handle;
|
| - struct pollfd* info = &fds[index];
|
| - Error err = AcquireHandle(info->fd, &handle);
|
| + struct pollfd* fd_info = &fds[index];
|
| + Error err = AcquireHandle(fd_info->fd, &handle);
|
|
|
| // If the node isn't open, or somehow invalid, mark it so.
|
| if (err != 0) {
|
| - info->revents = POLLNVAL;
|
| + fd_info->revents = POLLNVAL;
|
| event_cnt++;
|
| continue;
|
| }
|
|
|
| // If it's already signaled, then just capture the event
|
| - if (handle->node()->GetEventStatus() & info->events) {
|
| - info->revents = info->events & handle->node()->GetEventStatus();
|
| + ScopedEventEmitter emitter(handle->node()->GetEventEmitter());
|
| + int events = POLLIN | POLLOUT;
|
| + if (emitter)
|
| + events = emitter->GetEventStatus();
|
| +
|
| + if (events & fd_info->events) {
|
| + fd_info->revents = events & fd_info->events;
|
| event_cnt++;
|
| continue;
|
| }
|
|
|
| - // Otherwise try to track it.
|
| - err = listener->Track(info->fd, handle->node(), info->events, index);
|
| - if (err != 0) {
|
| - info->revents = POLLNVAL;
|
| + if (NULL == emitter) {
|
| + fd_info->revents = POLLNVAL;
|
| event_cnt++;
|
| continue;
|
| }
|
| - event_track++;
|
| +
|
| + // Otherwise try to track it.
|
| + PollInfo* info = &event_map[emitter.get()];
|
| + if (info->index == -1) {
|
| + EventRequest request;
|
| + request.emitter = emitter;
|
| + request.filter = fd_info->events;
|
| + request.events = 0;
|
| +
|
| + info->index = requests.size();
|
| + requests.push_back(request);
|
| + }
|
| + info->fds.push_back(fd_info);
|
| + requests[info->index].filter |= fd_info->events;
|
| }
|
|
|
| - // If nothing is signaled, then we must wait.
|
| + // If nothing is signaled, then we must wait on the event map
|
| if (0 == event_cnt) {
|
| - std::vector<EventData> events;
|
| - int ready_cnt;
|
| -
|
| - bool interrupted = false;
|
| - events.resize(event_track);
|
| - listener->Wait(events.data(), event_track, timeout, &ready_cnt);
|
| - for (index = 0; index < ready_cnt; index++) {
|
| - struct pollfd* info = &fds[events[index].user_data];
|
| - if (!info) {
|
| - interrupted = true;
|
| - continue;
|
| - }
|
| -
|
| - info->revents = events[index].events;
|
| - event_cnt++;
|
| - }
|
| - if (0 == event_cnt && interrupted) {
|
| - errno = EINTR;
|
| + EventListenerPoll wait;
|
| + Error err = wait.WaitOnAny(&requests[0], requests.size(), timeout);
|
| + if ((err != 0) && (err != ETIMEDOUT)) {
|
| + errno = err;
|
| return -1;
|
| }
|
| +
|
| + for (size_t rindex = 0; rindex < requests.size(); rindex++) {
|
| + EventRequest* request = &requests[rindex];
|
| + if (request->events) {
|
| + PollInfo* poll_info = &event_map[request->emitter.get()];
|
| + for (size_t findex = 0; findex < poll_info->fds.size(); findex++) {
|
| + struct pollfd* fd_info = poll_info->fds[findex];
|
| + uint32_t events = fd_info->events & request->events;
|
| + if (events) {
|
| + fd_info->revents = events;
|
| + event_cnt++;
|
| + }
|
| + }
|
| + }
|
| + }
|
| }
|
|
|
| return event_cnt;
|
| @@ -1337,11 +1302,11 @@ int KernelProxy::socket(int domain, int type, int protocol) {
|
| MountNodeSocket* sock = NULL;
|
| switch (type) {
|
| case SOCK_DGRAM:
|
| - sock = new MountNodeUDP(socket_mount_.get());
|
| + sock = new MountNodeUDP(stream_mount_.get());
|
| break;
|
|
|
| case SOCK_STREAM:
|
| - sock = new MountNodeTCP(socket_mount_.get());
|
| + sock = new MountNodeTCP(stream_mount_.get());
|
| break;
|
|
|
| default:
|
| @@ -1351,7 +1316,7 @@ int KernelProxy::socket(int domain, int type, int protocol) {
|
|
|
| ScopedMountNode node(sock);
|
| if (sock->Init(S_IREAD | S_IWRITE) == 0) {
|
| - ScopedKernelHandle handle(new KernelHandle(socket_mount_, node));
|
| + ScopedKernelHandle handle(new KernelHandle(stream_mount_, node));
|
| return AllocateFD(handle);
|
| }
|
|
|
|
|