Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(213)

Unified Diff: native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Remove event friends, rename EventListenerPoll Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
diff --git a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
index a929a6a169a53ec30a4fb115c0f274d610ba101b..9818f83dd0b46437682fad3d3c088cb1614acbf4 100644
--- a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
+++ b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
@@ -29,9 +29,11 @@
#include "nacl_io/mount_http.h"
#include "nacl_io/mount_mem.h"
#include "nacl_io/mount_node.h"
+#include "nacl_io/mount_node_pipe.h"
#include "nacl_io/mount_node_tcp.h"
#include "nacl_io/mount_node_udp.h"
#include "nacl_io/mount_passthrough.h"
+#include "nacl_io/mount_stream.h"
#include "nacl_io/osmman.h"
#include "nacl_io/ossocket.h"
#include "nacl_io/osstat.h"
@@ -48,30 +50,10 @@
namespace nacl_io {
-class SignalEmitter : public EventEmitter {
- public:
- // From EventEmitter. The SignalEmitter exists in order
- // to inturrupt anything waiting in select()/poll() when kill()
- // is called. It is an edge trigger only and therefore has no
- // persistent readable/wriable/error state.
- uint32_t GetEventStatus() {
- return 0;
- }
-
- int GetType() {
- // For lack of a better type, report socket to signify it can be in an
- // used to signal.
- return S_IFSOCK;
- }
-
- void SignalOccurred() {
- RaiseEvent(POLLERR);
- }
-};
KernelProxy::KernelProxy() : dev_(0), ppapi_(NULL),
sigwinch_handler_(SIG_IGN),
- signal_emitter_(new SignalEmitter) {
+ signal_emitter_(new EventEmitterSignal) {
}
@@ -132,8 +114,8 @@ Error KernelProxy::Init(PepperInterface* ppapi) {
#endif
StringMap_t args;
- socket_mount_.reset(new MountSocket());
- result = socket_mount_->Init(0, args, ppapi);
+ stream_mount_.reset(new MountStream());
+ result = stream_mount_->Init(0, args, ppapi);
if (result != 0) {
assert(false);
rtn = result;
@@ -193,6 +175,33 @@ int KernelProxy::open(const char* path, int oflags) {
return AllocateFD(handle);
}
+int KernelProxy::pipe(int pipefds[2]) {
+ MountNodePipe* pipe = new MountNodePipe(stream_mount_.get());
+ ScopedMountNode node(pipe);
+
+ if (pipe->Init(S_IREAD | S_IWRITE) == 0) {
+ ScopedKernelHandle handle0(new KernelHandle(stream_mount_, node));
+ ScopedKernelHandle handle1(new KernelHandle(stream_mount_, node));
+
+ handle0->Init(S_IREAD);
binji 2013/09/12 01:47:57 check return from Init?
noelallen1 2013/09/12 23:19:03 Done.
+ handle1->Init(S_IWRITE);
+
+ pipefds[0] = AllocateFD(handle0);
+ if (pipefds[0] == 0)
binji 2013/09/12 01:47:57 AllocateFD can never return a zero FD.
noelallen1 2013/09/12 23:19:03 Done.
+ return -1;
+
+ pipefds[1] = AllocateFD(handle1);
+ if (pipefds[1] == 0) {
+ close(pipefds[0]);
+ return -1;
+ }
+
+ return 0;
+ }
+
+ return -1;
binji 2013/09/12 01:47:57 set errno?
noelallen1 2013/09/12 23:19:03 Done.
+}
+
int KernelProxy::close(int fd) {
ScopedKernelHandle handle;
Error error = AcquireHandle(fd, &handle);
@@ -773,7 +782,7 @@ int KernelProxy::kill(pid_t pid, int sig) {
}
// Raise an event so that select/poll get interrupted.
- signal_emitter_->SignalOccurred();
+ signal_emitter_->Raise(POLLERR);
switch (sig) {
case SIGWINCH:
if (sigwinch_handler_ != SIG_IGN)
@@ -788,7 +797,6 @@ int KernelProxy::kill(pid_t pid, int sig) {
errno = EINVAL;
return -1;
}
-
return 0;
}
@@ -831,22 +839,10 @@ sighandler_t KernelProxy::sigset(int signum, sighandler_t handler) {
int KernelProxy::select(int nfds, fd_set* readfds, fd_set* writefds,
fd_set* exceptfds, struct timeval* timeout) {
- ScopedEventListener listener(new EventListener);
-
- std::vector<struct pollfd> fds;
+ std::vector<pollfd> pollfds;
- fd_set readout, writeout, exceptout;
-
- FD_ZERO(&readout);
- FD_ZERO(&writeout);
- FD_ZERO(&exceptout);
-
- int fd;
- size_t event_cnt = 0;
- int event_track = 0;
- for (fd = 0; fd < nfds; fd++) {
+ for (int fd = 0; fd < nfds; fd++) {
int events = 0;
-
if (readfds != NULL && FD_ISSET(fd, readfds))
events |= POLLIN;
@@ -856,179 +852,141 @@ int KernelProxy::select(int nfds, fd_set* readfds, fd_set* writefds,
if (exceptfds != NULL && FD_ISSET(fd, exceptfds))
events |= POLLERR | POLLHUP;
- // If we are not interested in this FD, skip it
- if (0 == events) continue;
-
- ScopedKernelHandle handle;
- Error err = AcquireHandle(fd, &handle);
-
- // Select will return immediately if there are bad FDs.
- if (err != 0) {
- errno = EBADF;
- return -1;
- }
-
- int status = handle->node()->GetEventStatus() & events;
- if (status & POLLIN) {
- FD_SET(fd, &readout);
- event_cnt++;
- }
-
- if (status & POLLOUT) {
- FD_SET(fd, &writeout);
- event_cnt++;
- }
-
- if (status & (POLLERR | POLLHUP)) {
- FD_SET(fd, &exceptout);
- event_cnt++;
- }
-
- // Otherwise track it.
- if (0 == status) {
- err = listener->Track(fd, handle->node(), events, fd);
- if (err != 0) {
- errno = EBADF;
- return -1;
- }
- event_track++;
+ if (events) {
+ pollfd info;
+ info.fd = fd;
+ info.events = events;
+ pollfds.push_back(info);
}
}
- // If nothing is signaled, then we must wait.
- if (event_cnt == 0) {
- std::vector<EventData> events;
- int ready_cnt;
- int ms_timeout;
-
- // NULL timeout signals wait forever.
- if (timeout == NULL) {
- ms_timeout = -1;
- } else {
- int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000);
-
- // If the timeout is invalid or too long (larger than signed 32 bit).
- if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) ||
- (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) ||
- (ms < 0) || (ms >= INT_MAX)) {
- errno = EINVAL;
- return -1;
- }
+ // NULL timeout signals wait forever.
+ int ms_timeout = -1;
+ if (timeout != NULL) {
+ int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000);
- ms_timeout = static_cast<int>(ms);
+ // If the timeout is invalid or too long (larger than signed 32 bit).
+ if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) ||
+ (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) ||
+ (ms < 0) || (ms >= INT_MAX)) {
+ errno = EINVAL;
+ return -1;
}
- // Add a special node to listen for events
- // coming from the KernelProxy itself (kill will
- // generated a SIGERR event).
- listener->Track(-1, signal_emitter_, POLLERR, -1);
- event_track += 1;
-
- events.resize(event_track);
+ ms_timeout = static_cast<int>(ms);
+ }
- bool interrupted = false;
- listener->Wait(events.data(), event_track, ms_timeout, &ready_cnt);
- for (fd = 0; static_cast<int>(fd) < ready_cnt; fd++) {
- if (events[fd].user_data == static_cast<uint64_t>(-1)) {
- if (events[fd].events & POLLERR) {
- interrupted = true;
- }
- continue;
- }
+ int result = poll(&pollfds[0], pollfds.size(), ms_timeout);
+ if (result == -1)
+ return -1;
- if (events[fd].events & POLLIN) {
- FD_SET(events[fd].user_data, &readout);
- event_cnt++;
- }
+ FD_ZERO(readfds);
+ FD_ZERO(writefds);
+ FD_ZERO(exceptfds);
- if (events[fd].events & POLLOUT) {
- FD_SET(events[fd].user_data, &writeout);
- event_cnt++;
- }
-
- if (events[fd].events & (POLLERR | POLLHUP)) {
- FD_SET(events[fd].user_data, &exceptout);
- event_cnt++;
- }
+ int event_cnt = 0;
+ for (size_t index = 0; index < pollfds.size(); index++) {
+ pollfd* info = &pollfds[index];
+ if (info->revents & POLLIN) {
+ FD_SET(info->fd, readfds);
+ event_cnt++;
}
-
- if (0 == event_cnt && interrupted) {
- errno = EINTR;
- return -1;
+ if (info->revents & POLLOUT) {
+ FD_SET(info->fd, writefds);
+ event_cnt++;
+ }
+ if (info->revents & POLLHUP) {
binji 2013/09/12 01:47:57 not POLLERR? You request it above.
noelallen1 2013/09/12 23:19:03 Done.
+ FD_SET(info->fd, exceptfds);
+ event_cnt++;
}
}
- // Copy out the results
- if (readfds != NULL)
- *readfds = readout;
+ return event_cnt;
+}
- if (writefds != NULL)
- *writefds = writeout;
+struct PollInfo {
+ PollInfo() : index(-1) {};
- if (exceptfds != NULL)
- *exceptfds = exceptout;
+ std::vector<struct pollfd*> fds;
+ int index;
+};
- return event_cnt;
-}
+typedef std::map<EventEmitter*, PollInfo> EventPollMap_t;
int KernelProxy::poll(struct pollfd *fds, nfds_t nfds, int timeout) {
- ScopedEventListener listener(new EventListener);
- listener->Track(-1, signal_emitter_, POLLERR, 0);
+ EventPollMap_t event_map;
- int index;
+ std::vector<EventRequest> requests;
size_t event_cnt = 0;
- size_t event_track = 1;
- for (index = 0; static_cast<nfds_t>(index) < nfds; index++) {
+
+ for (int index = 0; static_cast<nfds_t>(index) < nfds; index++) {
ScopedKernelHandle handle;
- struct pollfd* info = &fds[index];
- Error err = AcquireHandle(info->fd, &handle);
+ struct pollfd* fd_info = &fds[index];
+ Error err = AcquireHandle(fd_info->fd, &handle);
// If the node isn't open, or somehow invalid, mark it so.
if (err != 0) {
- info->revents = POLLNVAL;
+ fd_info->revents = POLLNVAL;
event_cnt++;
continue;
}
// If it's already signaled, then just capture the event
- if (handle->node()->GetEventStatus() & info->events) {
- info->revents = info->events & handle->node()->GetEventStatus();
+ ScopedEventEmitter emitter(handle->node()->GetEventEmitter());
+ int events = POLLIN | POLLOUT;
+ if (emitter)
+ events = emitter->GetEventStatus();
+
+ if (events & fd_info->events) {
+ fd_info->revents = fd_info->events & events;
binji 2013/09/12 01:47:57 change to "events & fd_info->events" like above?
noelallen1 2013/09/12 23:19:03 Done.
event_cnt++;
continue;
}
- // Otherwise try to track it.
- err = listener->Track(info->fd, handle->node(), info->events, index);
- if (err != 0) {
- info->revents = POLLNVAL;
+ if (NULL == emitter) {
+ fd_info->revents = POLLNVAL;
event_cnt++;
continue;
}
- event_track++;
+
+ // Otherwise try to track it.
+ PollInfo* info = &event_map[emitter.get()];
binji 2013/09/12 01:47:57 this is a lot of extra work to prevent sending the
noelallen1 2013/09/12 23:19:03 Having a one-to-one mapping between Listener and E
+ if (info->index == -1) {
+ EventRequest request;
+ request.emitter = emitter;
+ request.filter = fd_info->events;
+ request.events = 0;
+
+ info->index = requests.size();
+ requests.push_back(request);
+ }
+ info->fds.push_back(fd_info);
+ requests[info->index].filter |= fd_info->events;
}
- // If nothing is signaled, then we must wait.
+ // If nothing is signaled, then we must wait on the event map
if (0 == event_cnt) {
- std::vector<EventData> events;
- int ready_cnt;
-
- bool interrupted = false;
- events.resize(event_track);
- listener->Wait(events.data(), event_track, timeout, &ready_cnt);
- for (index = 0; index < ready_cnt; index++) {
- struct pollfd* info = &fds[events[index].user_data];
- if (!info) {
- interrupted = true;
- continue;
- }
-
- info->revents = events[index].events;
- event_cnt++;
- }
- if (0 == event_cnt && interrupted) {
- errno = EINTR;
+ EventListenerPoll wait;
+ Error err = wait.WaitOnAny(&requests[0], requests.size(), timeout);
+ if (err != 0) {
+ errno = err;
return -1;
}
+
+ for (size_t rindex = 0; rindex < requests.size(); rindex++) {
+ EventRequest* request = &requests[rindex];
+ if (request->events) {
+ PollInfo* poll_info = &event_map[request->emitter.get()];
+ for (size_t findex = 0; findex < poll_info->fds.size(); findex++) {
+ struct pollfd* fd_info = poll_info->fds[findex];
+ uint32_t events = fd_info->events & request->events;
+ if (events) {
+ fd_info->revents = events;
+ event_cnt++;
+ }
+ }
+ }
+ }
}
return event_cnt;
@@ -1337,11 +1295,11 @@ int KernelProxy::socket(int domain, int type, int protocol) {
MountNodeSocket* sock = NULL;
switch (type) {
case SOCK_DGRAM:
- sock = new MountNodeUDP(socket_mount_.get());
+ sock = new MountNodeUDP(stream_mount_.get());
break;
case SOCK_STREAM:
- sock = new MountNodeTCP(socket_mount_.get());
+ sock = new MountNodeTCP(stream_mount_.get());
break;
default:
@@ -1351,7 +1309,7 @@ int KernelProxy::socket(int domain, int type, int protocol) {
ScopedMountNode node(sock);
if (sock->Init(S_IREAD | S_IWRITE) == 0) {
- ScopedKernelHandle handle(new KernelHandle(socket_mount_, node));
+ ScopedKernelHandle handle(new KernelHandle(stream_mount_, node));
return AllocateFD(handle);
}

Powered by Google App Engine
This is Rietveld 408576698