Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(24)

Unified Diff: native_client_sdk/src/libraries/nacl_io/event_listener.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Merge Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: native_client_sdk/src/libraries/nacl_io/event_listener.cc
diff --git a/native_client_sdk/src/libraries/nacl_io/event_listener.cc b/native_client_sdk/src/libraries/nacl_io/event_listener.cc
index 11297b6c05debcb02c7939590a18eb23ee65dd96..ce679f8030418c6a173228e41732c1805d53409f 100644
--- a/native_client_sdk/src/libraries/nacl_io/event_listener.cc
+++ b/native_client_sdk/src/libraries/nacl_io/event_listener.cc
@@ -27,43 +27,6 @@ EventListener::~EventListener() {
pthread_cond_destroy(&signal_cond_);
}
-// Before we can destroy ourselves, we must first unregister all the
-// EventInfo objects from the various EventListeners
-void EventListener::Destroy() {
- EventInfoMap_t::iterator it;
-
- // We do not take the lock since this is the only reference to this object.
- for (it = event_info_map_.begin(); it != event_info_map_.end(); it++) {
- if (it->second->emitter) {
- it->second->emitter->UnregisterEventInfo(it->second);
- }
- }
-
- EventEmitter::Destroy();
-}
-
-uint32_t EventListener::GetEventStatus() {
- // Always writable, but we can only assume it to be readable if there
- // is an event waiting.
- return signaled_.empty() ? POLLOUT : POLLIN | POLLOUT;
-}
-
-int EventListener::GetType() {
- // For lack of a better type, report socket to signify it can be in an
- // used to signal.
- return S_IFSOCK;
-}
-
-// Called by EventEmitter, wakes up any blocking threads to verify if the wait
-// conditions have been met.
-void EventListener::Signal(const ScopedEventInfo& info) {
- AUTO_LOCK(signal_lock_);
- if (waiting_) {
- signaled_.insert(info);
- pthread_cond_broadcast(&signal_cond_);
- }
-}
-
static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) {
if (ms_timeout >= 0) {
uint64_t usec = usec_since_epoch();
@@ -77,168 +40,124 @@ static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) {
}
}
-Error EventListener::Wait(EventData* events,
- int max,
- int ms_timeout,
- int* out_count) {
- *out_count = 0;
-
- if (max <= 0)
- return EINVAL;
- if (NULL == events)
- return EFAULT;
-
- {
- AUTO_LOCK(info_lock_);
-
- // Go through the "live" event infos and see if they are in a signaled state
- EventInfoMap_t::iterator it = event_info_map_.begin();
- while ((it != event_info_map_.end()) && (*out_count < max)) {
- ScopedEventInfo& info = it->second;
- uint32_t event_bits = info->emitter->GetEventStatus() & info->filter;
-
- if (event_bits) {
- events[*out_count].events = event_bits;
- events[*out_count].user_data = info->user_data;
- (*out_count)++;
- }
+EventListenerLock::EventListenerLock(EventEmitter* emitter)
+ : EventListener(),
+ emitter_(emitter),
+ lock_(new sdk_util::AutoLock(emitter->GetLock())),
+ events_(0) {
+}
- it++;
- }
- } // End of info_lock scope.
+EventListenerLock::~EventListenerLock() {
+ delete lock_;
+}
- // We are done if we have a signal or no timeout specified.
- if ((*out_count > 0) || (0 == ms_timeout))
- return 0;
+void EventListenerLock::ReceiveEvents(EventEmitter* emitter,
+ uint32_t events) {
+ // We are using the emitter's mutex, which is already locked.
+ pthread_cond_signal(&signal_cond_);
+}
- // Compute the absolute time we can wait until.
+Error EventListenerLock::WaitOnEvent(uint32_t events, int ms_timeout) {
struct timespec timeout;
AbsoluteFromDeltaMS(&timeout, ms_timeout);
- // Keep looking if until we receive something.
- while (0 == *out_count) {
- // We are now officially waiting.
- AUTO_LOCK(signal_lock_);
- waiting_++;
-
- // If we don't have any signals yet, wait for any Emitter to Signal.
- while (signaled_.empty()) {
- int return_code;
- if (ms_timeout >= 0) {
- return_code = pthread_cond_timedwait(&signal_cond_,
- signal_lock_.mutex(),
- &timeout);
- } else {
- return_code = pthread_cond_wait(&signal_cond_, signal_lock_.mutex());
- }
-
- Error error(return_code);
-
- // If there is no error, then we may have been signaled.
- if (0 == error)
- break;
-
- // For any error case:
- if (ETIMEDOUT == error) {
- // A "TIMEOUT" is not an error.
- error = 0;
- } else {
- // Otherwise this has gone bad, so return EBADF.
- error = EBADF;
- }
-
- waiting_--;
- return error;
+ emitter_->RegisterListener_Locked(this, events);
+ while ((emitter_->GetEventStatus() & events) == 0) {
+ int return_code;
+ if (ms_timeout >= 0) {
+ return_code = pthread_cond_timedwait(&signal_cond_,
+ emitter_->GetLock().mutex(),
+ &timeout);
+ } else {
+ return_code = pthread_cond_wait(&signal_cond_,
+ emitter_->GetLock().mutex());
}
- // Copy signals over as long as we have room
- while (!signaled_.empty() && (*out_count < max)) {
- EventInfoSet_t::iterator it = signaled_.begin();
+ if (emitter_->GetEventStatus() & POLLERR)
+ return_code = EINTR;
- events[*out_count].events = (*it)->events;
- events[*out_count].user_data = (*it)->user_data;
- (*out_count)++;
-
- signaled_.erase(it);
+ // Return the failure, unlocked
+ if (return_code != 0) {
+ emitter_->UnregisterListener_Locked(this);
+ return Error(return_code);
}
-
- // If we are the last thread waiting, clear out the signalled set
- if (1 == waiting_)
- signaled_.clear();
-
- // We are done waiting.
- waiting_--;
}
+ emitter_->UnregisterListener_Locked(this);
return 0;
}
-Error EventListener::Track(int id,
- const ScopedEventEmitter& emitter,
- uint32_t filter,
- uint64_t user_data) {
- AUTO_LOCK(info_lock_);
- EventInfoMap_t::iterator it = event_info_map_.find(id);
-
- // If it's not a streaming type, then it can not be added.
- if ((emitter->GetType() & (S_IFIFO | S_IFSOCK | S_IFCHR)) == 0)
- return EPERM;
-
- if (it != event_info_map_.end())
- return EEXIST;
-
- if (emitter.get() == this)
- return EINVAL;
-
- ScopedEventInfo info(new EventInfo);
- info->emitter = emitter.get();
- info->listener = this;
- info->id = id;
- info->filter = filter;
- info->user_data = user_data;
- info->events = 0;
-
- emitter->RegisterEventInfo(info);
- event_info_map_[id] = info;
- return 0;
+void EventListenerPoll::ReceiveEvents(EventEmitter* emitter,
+ uint32_t events) {
+ AUTO_LOCK(signal_lock_);
+ emitters_[emitter]->events |= events;
+ signaled_++;
+ pthread_cond_signal(&signal_cond_);
}
-Error EventListener::Update(int id, uint32_t filter, uint64_t user_data) {
- AUTO_LOCK(info_lock_);
- EventInfoMap_t::iterator it = event_info_map_.find(id);
- if (it == event_info_map_.end())
- return ENOENT;
+Error EventListenerPoll::WaitOnAny(EventRequest* requests,
+ size_t cnt,
+ int ms_timeout) {
- ScopedEventInfo& info = it->second;
- info->filter = filter;
- info->user_data = user_data;
- return 0;
-}
+ signaled_ = 0;
-Error EventListener::Free(int id) {
- AUTO_LOCK(info_lock_);
- EventInfoMap_t::iterator it = event_info_map_.find(id);
- if (event_info_map_.end() == it)
- return ENOENT;
+ // Build a map of request emitters to request data before
+ // emitters can access them.
+ for (size_t index = 0; index < cnt; index++) {
+ EventRequest* request = requests + index;
+ emitters_[request->emitter.get()] = request;
+ request->events = 0;
+ }
- it->second->emitter->UnregisterEventInfo(it->second);
- event_info_map_.erase(it);
- return 0;
-}
+ // Emitters can now accessed the unlocked set, since each emitter is
+ // responsible for it's own request.
+ for (size_t index = 0; index < cnt; index++) {
+ EventRequest* request = requests + index;
+ requests->emitter->RegisterListener(this, request->filter);
+ uint32_t events = requests->emitter->GetEventStatus() & request->filter;
+
+ if (events) {
+ AUTO_LOCK(signal_lock_);
+ request->events |= events;
+ signaled_++;
+ }
+ }
+
+ struct timespec timeout;
+ AbsoluteFromDeltaMS(&timeout, ms_timeout);
+ int return_code = 0;
-void EventListener::AbandonedEventInfo(const ScopedEventInfo& event) {
{
- AUTO_LOCK(info_lock_);
+ AUTO_LOCK(signal_lock_)
+ while (0 == signaled_) {
+ if (ms_timeout >= 0) {
+ return_code = pthread_cond_timedwait(&signal_cond_,
+ signal_lock_.mutex(),
+ &timeout);
+ } else {
+ return_code = pthread_cond_wait(&signal_cond_,
+ signal_lock_.mutex());
+ }
- event->emitter = NULL;
- event_info_map_.erase(event->id);
+ if (return_code != 0)
+ signaled_++;
+ }
}
- // EventInfos abandoned by the destroyed emitter must still be kept in
- // signaled_ set for POLLHUP.
- event->events = POLLHUP;
- Signal(event);
+ // Unregister first to prevent emitters from modifying the set any further
+ for (size_t index = 0; index < cnt; index++) {
+ EventRequest* request = requests + index;
+ request->emitter->UnregisterListener(this);
+
+ if (request->events & POLLERR)
+ return_code = EINTR;
+ }
+
+ // We can now release the map.
+ emitters_.clear();
+
+ return Error(return_code);
}
} // namespace nacl_io
« no previous file with comments | « native_client_sdk/src/libraries/nacl_io/event_listener.h ('k') | native_client_sdk/src/libraries/nacl_io/fifo_char.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698