Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(149)

Unified Diff: native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Merge Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
diff --git a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
index a929a6a169a53ec30a4fb115c0f274d610ba101b..4445d397cce192fe66a86b2c11a21bea91e90fc9 100644
--- a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
+++ b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
@@ -29,9 +29,11 @@
#include "nacl_io/mount_http.h"
#include "nacl_io/mount_mem.h"
#include "nacl_io/mount_node.h"
+#include "nacl_io/mount_node_pipe.h"
#include "nacl_io/mount_node_tcp.h"
#include "nacl_io/mount_node_udp.h"
#include "nacl_io/mount_passthrough.h"
+#include "nacl_io/mount_stream.h"
#include "nacl_io/osmman.h"
#include "nacl_io/ossocket.h"
#include "nacl_io/osstat.h"
@@ -48,30 +50,10 @@
namespace nacl_io {
-class SignalEmitter : public EventEmitter {
- public:
- // From EventEmitter. The SignalEmitter exists in order
- // to inturrupt anything waiting in select()/poll() when kill()
- // is called. It is an edge trigger only and therefore has no
- // persistent readable/wriable/error state.
- uint32_t GetEventStatus() {
- return 0;
- }
-
- int GetType() {
- // For lack of a better type, report socket to signify it can be in an
- // used to signal.
- return S_IFSOCK;
- }
-
- void SignalOccurred() {
- RaiseEvent(POLLERR);
- }
-};
KernelProxy::KernelProxy() : dev_(0), ppapi_(NULL),
sigwinch_handler_(SIG_IGN),
- signal_emitter_(new SignalEmitter) {
+ signal_emitter_(new EventEmitter) {
}
@@ -132,8 +114,8 @@ Error KernelProxy::Init(PepperInterface* ppapi) {
#endif
StringMap_t args;
- socket_mount_.reset(new MountSocket());
- result = socket_mount_->Init(0, args, ppapi);
+ stream_mount_.reset(new MountStream());
+ result = stream_mount_->Init(0, args, ppapi);
if (result != 0) {
assert(false);
rtn = result;
@@ -193,6 +175,29 @@ int KernelProxy::open(const char* path, int oflags) {
return AllocateFD(handle);
}
+int KernelProxy::pipe(int pipefds[2]) {
+ MountNodePipe* pipe = new MountNodePipe(stream_mount_.get());
+ ScopedMountNode node(pipe);
+
+ if (pipe->Init(S_IREAD | S_IWRITE) == 0) {
+ ScopedKernelHandle handle0(new KernelHandle(stream_mount_, node));
+ ScopedKernelHandle handle1(new KernelHandle(stream_mount_, node));
+
+ // Should never fail, but...
+ if (handle0->Init(S_IREAD) || handle1->Init(S_IWRITE)) {
+ errno = EACCES;
+ return -1;
+ }
+
+ pipefds[0] = AllocateFD(handle0);
+ pipefds[1] = AllocateFD(handle1);
+ return 0;
+ }
+
+ errno = ENOSYS;
+ return -1;
+}
+
int KernelProxy::close(int fd) {
ScopedKernelHandle handle;
Error error = AcquireHandle(fd, &handle);
@@ -773,7 +778,8 @@ int KernelProxy::kill(pid_t pid, int sig) {
}
// Raise an event so that select/poll get interrupted.
- signal_emitter_->SignalOccurred();
+ AUTO_LOCK(signal_emitter_->GetLock())
+ signal_emitter_->RaiseEvents_Locked(POLLERR);
switch (sig) {
case SIGWINCH:
if (sigwinch_handler_ != SIG_IGN)
@@ -788,7 +794,6 @@ int KernelProxy::kill(pid_t pid, int sig) {
errno = EINVAL;
return -1;
}
-
return 0;
}
@@ -831,204 +836,166 @@ sighandler_t KernelProxy::sigset(int signum, sighandler_t handler) {
int KernelProxy::select(int nfds, fd_set* readfds, fd_set* writefds,
fd_set* exceptfds, struct timeval* timeout) {
- ScopedEventListener listener(new EventListener);
-
- std::vector<struct pollfd> fds;
-
- fd_set readout, writeout, exceptout;
-
- FD_ZERO(&readout);
- FD_ZERO(&writeout);
- FD_ZERO(&exceptout);
-
- int fd;
- size_t event_cnt = 0;
- int event_track = 0;
- for (fd = 0; fd < nfds; fd++) {
+ fd_set ignore;
+ std::vector<pollfd> pollfds;
+
+ // Simplify logic, by using an IGNORE set for any undefined set
+ FD_ZERO(&ignore);
+ if (NULL == readfds)
+ readfds = &ignore;
+ if (NULL == writefds)
+ writefds = &ignore;
+ if (NULL == exceptfds)
+ exceptfds = &ignore;
+
+ for (int fd = 0; fd < nfds; fd++) {
int events = 0;
-
- if (readfds != NULL && FD_ISSET(fd, readfds))
+ if (FD_ISSET(fd, readfds))
events |= POLLIN;
- if (writefds != NULL && FD_ISSET(fd, writefds))
+ if (FD_ISSET(fd, writefds))
events |= POLLOUT;
- if (exceptfds != NULL && FD_ISSET(fd, exceptfds))
+ if (FD_ISSET(fd, exceptfds))
events |= POLLERR | POLLHUP;
- // If we are not interested in this FD, skip it
- if (0 == events) continue;
-
- ScopedKernelHandle handle;
- Error err = AcquireHandle(fd, &handle);
-
- // Select will return immediately if there are bad FDs.
- if (err != 0) {
- errno = EBADF;
- return -1;
+ if (events) {
+ pollfd info;
+ info.fd = fd;
+ info.events = events;
+ pollfds.push_back(info);
}
+ }
- int status = handle->node()->GetEventStatus() & events;
- if (status & POLLIN) {
- FD_SET(fd, &readout);
- event_cnt++;
- }
+ FD_ZERO(readfds);
+ FD_ZERO(writefds);
+ FD_ZERO(exceptfds);
- if (status & POLLOUT) {
- FD_SET(fd, &writeout);
- event_cnt++;
- }
+ // NULL timeout signals wait forever.
+ int ms_timeout = -1;
+ if (timeout != NULL) {
+ int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000);
- if (status & (POLLERR | POLLHUP)) {
- FD_SET(fd, &exceptout);
- event_cnt++;
+ // If the timeout is invalid or too long (larger than signed 32 bit).
+ if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) ||
+ (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) ||
+ (ms < 0) || (ms >= INT_MAX)) {
+ errno = EINVAL;
+ return -1;
}
- // Otherwise track it.
- if (0 == status) {
- err = listener->Track(fd, handle->node(), events, fd);
- if (err != 0) {
- errno = EBADF;
- return -1;
- }
- event_track++;
- }
+ ms_timeout = static_cast<int>(ms);
}
- // If nothing is signaled, then we must wait.
- if (event_cnt == 0) {
- std::vector<EventData> events;
- int ready_cnt;
- int ms_timeout;
-
- // NULL timeout signals wait forever.
- if (timeout == NULL) {
- ms_timeout = -1;
- } else {
- int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000);
-
- // If the timeout is invalid or too long (larger than signed 32 bit).
- if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) ||
- (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) ||
- (ms < 0) || (ms >= INT_MAX)) {
- errno = EINVAL;
- return -1;
- }
+ int result = poll(&pollfds[0], pollfds.size(), ms_timeout);
+ if (result == -1)
+ return -1;
- ms_timeout = static_cast<int>(ms);
+ int event_cnt = 0;
+ for (size_t index = 0; index < pollfds.size(); index++) {
+ pollfd* info = &pollfds[index];
+ if (info->revents & POLLIN) {
+ FD_SET(info->fd, readfds);
+ event_cnt++;
}
-
- // Add a special node to listen for events
- // coming from the KernelProxy itself (kill will
- // generated a SIGERR event).
- listener->Track(-1, signal_emitter_, POLLERR, -1);
- event_track += 1;
-
- events.resize(event_track);
-
- bool interrupted = false;
- listener->Wait(events.data(), event_track, ms_timeout, &ready_cnt);
- for (fd = 0; static_cast<int>(fd) < ready_cnt; fd++) {
- if (events[fd].user_data == static_cast<uint64_t>(-1)) {
- if (events[fd].events & POLLERR) {
- interrupted = true;
- }
- continue;
- }
-
- if (events[fd].events & POLLIN) {
- FD_SET(events[fd].user_data, &readout);
- event_cnt++;
- }
-
- if (events[fd].events & POLLOUT) {
- FD_SET(events[fd].user_data, &writeout);
- event_cnt++;
- }
-
- if (events[fd].events & (POLLERR | POLLHUP)) {
- FD_SET(events[fd].user_data, &exceptout);
- event_cnt++;
- }
+ if (info->revents & POLLOUT) {
+ FD_SET(info->fd, writefds);
+ event_cnt++;
}
-
- if (0 == event_cnt && interrupted) {
- errno = EINTR;
- return -1;
+ if (info->revents & (POLLHUP | POLLERR)) {
+ FD_SET(info->fd, exceptfds);
+ event_cnt++;
}
}
- // Copy out the results
- if (readfds != NULL)
- *readfds = readout;
+ return event_cnt;
+}
- if (writefds != NULL)
- *writefds = writeout;
+struct PollInfo {
+ PollInfo() : index(-1) {};
- if (exceptfds != NULL)
- *exceptfds = exceptout;
+ std::vector<struct pollfd*> fds;
+ int index;
+};
- return event_cnt;
-}
+typedef std::map<EventEmitter*, PollInfo> EventPollMap_t;
int KernelProxy::poll(struct pollfd *fds, nfds_t nfds, int timeout) {
- ScopedEventListener listener(new EventListener);
- listener->Track(-1, signal_emitter_, POLLERR, 0);
+ EventPollMap_t event_map;
- int index;
+ std::vector<EventRequest> requests;
size_t event_cnt = 0;
- size_t event_track = 1;
- for (index = 0; static_cast<nfds_t>(index) < nfds; index++) {
+
+ for (int index = 0; static_cast<nfds_t>(index) < nfds; index++) {
ScopedKernelHandle handle;
- struct pollfd* info = &fds[index];
- Error err = AcquireHandle(info->fd, &handle);
+ struct pollfd* fd_info = &fds[index];
+ Error err = AcquireHandle(fd_info->fd, &handle);
+
+ fd_info->revents = 0;
// If the node isn't open, or somehow invalid, mark it so.
if (err != 0) {
- info->revents = POLLNVAL;
+ fd_info->revents = POLLNVAL;
event_cnt++;
continue;
}
// If it's already signaled, then just capture the event
- if (handle->node()->GetEventStatus() & info->events) {
- info->revents = info->events & handle->node()->GetEventStatus();
+ ScopedEventEmitter emitter(handle->node()->GetEventEmitter());
+ int events = POLLIN | POLLOUT;
+ if (emitter)
+ events = emitter->GetEventStatus();
+
+ if (events & fd_info->events) {
+ fd_info->revents = events & fd_info->events;
event_cnt++;
continue;
}
- // Otherwise try to track it.
- err = listener->Track(info->fd, handle->node(), info->events, index);
- if (err != 0) {
- info->revents = POLLNVAL;
+ if (NULL == emitter) {
+ fd_info->revents = POLLNVAL;
event_cnt++;
continue;
}
- event_track++;
+
+ // Otherwise try to track it.
+ PollInfo* info = &event_map[emitter.get()];
+ if (info->index == -1) {
+ EventRequest request;
+ request.emitter = emitter;
+ request.filter = fd_info->events;
+ request.events = 0;
+
+ info->index = requests.size();
+ requests.push_back(request);
+ }
+ info->fds.push_back(fd_info);
+ requests[info->index].filter |= fd_info->events;
}
- // If nothing is signaled, then we must wait.
+ // If nothing is signaled, then we must wait on the event map
if (0 == event_cnt) {
- std::vector<EventData> events;
- int ready_cnt;
-
- bool interrupted = false;
- events.resize(event_track);
- listener->Wait(events.data(), event_track, timeout, &ready_cnt);
- for (index = 0; index < ready_cnt; index++) {
- struct pollfd* info = &fds[events[index].user_data];
- if (!info) {
- interrupted = true;
- continue;
- }
-
- info->revents = events[index].events;
- event_cnt++;
- }
- if (0 == event_cnt && interrupted) {
- errno = EINTR;
+ EventListenerPoll wait;
+ Error err = wait.WaitOnAny(&requests[0], requests.size(), timeout);
+ if ((err != 0) && (err != ETIMEDOUT)) {
+ errno = err;
return -1;
}
+
+ for (size_t rindex = 0; rindex < requests.size(); rindex++) {
+ EventRequest* request = &requests[rindex];
+ if (request->events) {
+ PollInfo* poll_info = &event_map[request->emitter.get()];
+ for (size_t findex = 0; findex < poll_info->fds.size(); findex++) {
+ struct pollfd* fd_info = poll_info->fds[findex];
+ uint32_t events = fd_info->events & request->events;
+ if (events) {
+ fd_info->revents = events;
+ event_cnt++;
+ }
+ }
+ }
+ }
}
return event_cnt;
@@ -1337,11 +1304,11 @@ int KernelProxy::socket(int domain, int type, int protocol) {
MountNodeSocket* sock = NULL;
switch (type) {
case SOCK_DGRAM:
- sock = new MountNodeUDP(socket_mount_.get());
+ sock = new MountNodeUDP(stream_mount_.get());
break;
case SOCK_STREAM:
- sock = new MountNodeTCP(socket_mount_.get());
+ sock = new MountNodeTCP(stream_mount_.get());
break;
default:
@@ -1351,7 +1318,7 @@ int KernelProxy::socket(int domain, int type, int protocol) {
ScopedMountNode node(sock);
if (sock->Init(S_IREAD | S_IWRITE) == 0) {
- ScopedKernelHandle handle(new KernelHandle(socket_mount_, node));
+ ScopedKernelHandle handle(new KernelHandle(stream_mount_, node));
return AllocateFD(handle);
}
« no previous file with comments | « native_client_sdk/src/libraries/nacl_io/kernel_proxy.h ('k') | native_client_sdk/src/libraries/nacl_io/library.dsc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698