| Index: native_client_sdk/src/libraries/nacl_io/event_listener.cc
|
| diff --git a/native_client_sdk/src/libraries/nacl_io/event_listener.cc b/native_client_sdk/src/libraries/nacl_io/event_listener.cc
|
| index 11297b6c05debcb02c7939590a18eb23ee65dd96..ce679f8030418c6a173228e41732c1805d53409f 100644
|
| --- a/native_client_sdk/src/libraries/nacl_io/event_listener.cc
|
| +++ b/native_client_sdk/src/libraries/nacl_io/event_listener.cc
|
| @@ -27,43 +27,6 @@ EventListener::~EventListener() {
|
| pthread_cond_destroy(&signal_cond_);
|
| }
|
|
|
| -// Before we can destroy ourselves, we must first unregister all the
|
| -// EventInfo objects from the various EventListeners
|
| -void EventListener::Destroy() {
|
| - EventInfoMap_t::iterator it;
|
| -
|
| - // We do not take the lock since this is the only reference to this object.
|
| - for (it = event_info_map_.begin(); it != event_info_map_.end(); it++) {
|
| - if (it->second->emitter) {
|
| - it->second->emitter->UnregisterEventInfo(it->second);
|
| - }
|
| - }
|
| -
|
| - EventEmitter::Destroy();
|
| -}
|
| -
|
| -uint32_t EventListener::GetEventStatus() {
|
| - // Always writable, but we can only assume it to be readable if there
|
| - // is an event waiting.
|
| - return signaled_.empty() ? POLLOUT : POLLIN | POLLOUT;
|
| -}
|
| -
|
| -int EventListener::GetType() {
|
| - // For lack of a better type, report socket to signify it can be in an
|
| - // used to signal.
|
| - return S_IFSOCK;
|
| -}
|
| -
|
| -// Called by EventEmitter, wakes up any blocking threads to verify if the wait
|
| -// conditions have been met.
|
| -void EventListener::Signal(const ScopedEventInfo& info) {
|
| - AUTO_LOCK(signal_lock_);
|
| - if (waiting_) {
|
| - signaled_.insert(info);
|
| - pthread_cond_broadcast(&signal_cond_);
|
| - }
|
| -}
|
| -
|
| static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) {
|
| if (ms_timeout >= 0) {
|
| uint64_t usec = usec_since_epoch();
|
| @@ -77,168 +40,124 @@ static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) {
|
| }
|
| }
|
|
|
| -Error EventListener::Wait(EventData* events,
|
| - int max,
|
| - int ms_timeout,
|
| - int* out_count) {
|
| - *out_count = 0;
|
| -
|
| - if (max <= 0)
|
| - return EINVAL;
|
|
|
| - if (NULL == events)
|
| - return EFAULT;
|
| -
|
| - {
|
| - AUTO_LOCK(info_lock_);
|
| -
|
| - // Go through the "live" event infos and see if they are in a signaled state
|
| - EventInfoMap_t::iterator it = event_info_map_.begin();
|
| - while ((it != event_info_map_.end()) && (*out_count < max)) {
|
| - ScopedEventInfo& info = it->second;
|
| - uint32_t event_bits = info->emitter->GetEventStatus() & info->filter;
|
| -
|
| - if (event_bits) {
|
| - events[*out_count].events = event_bits;
|
| - events[*out_count].user_data = info->user_data;
|
| - (*out_count)++;
|
| - }
|
| +EventListenerLock::EventListenerLock(EventEmitter* emitter)
|
| + : EventListener(),
|
| + emitter_(emitter),
|
| + lock_(new sdk_util::AutoLock(emitter->GetLock())),
|
| + events_(0) {
|
| +}
|
|
|
| - it++;
|
| - }
|
| - } // End of info_lock scope.
|
| +EventListenerLock::~EventListenerLock() {
|
| + delete lock_;
|
| +}
|
|
|
| - // We are done if we have a signal or no timeout specified.
|
| - if ((*out_count > 0) || (0 == ms_timeout))
|
| - return 0;
|
| +void EventListenerLock::ReceiveEvents(EventEmitter* emitter,
|
| + uint32_t events) {
|
| + // We are using the emitter's mutex, which is already locked.
|
| + pthread_cond_signal(&signal_cond_);
|
| +}
|
|
|
| - // Compute the absolute time we can wait until.
|
| +Error EventListenerLock::WaitOnEvent(uint32_t events, int ms_timeout) {
|
| struct timespec timeout;
|
| AbsoluteFromDeltaMS(&timeout, ms_timeout);
|
|
|
| - // Keep looking if until we receive something.
|
| - while (0 == *out_count) {
|
| - // We are now officially waiting.
|
| - AUTO_LOCK(signal_lock_);
|
| - waiting_++;
|
| -
|
| - // If we don't have any signals yet, wait for any Emitter to Signal.
|
| - while (signaled_.empty()) {
|
| - int return_code;
|
| - if (ms_timeout >= 0) {
|
| - return_code = pthread_cond_timedwait(&signal_cond_,
|
| - signal_lock_.mutex(),
|
| - &timeout);
|
| - } else {
|
| - return_code = pthread_cond_wait(&signal_cond_, signal_lock_.mutex());
|
| - }
|
| -
|
| - Error error(return_code);
|
| -
|
| - // If there is no error, then we may have been signaled.
|
| - if (0 == error)
|
| - break;
|
| -
|
| - // For any error case:
|
| - if (ETIMEDOUT == error) {
|
| - // A "TIMEOUT" is not an error.
|
| - error = 0;
|
| - } else {
|
| - // Otherwise this has gone bad, so return EBADF.
|
| - error = EBADF;
|
| - }
|
| -
|
| - waiting_--;
|
| - return error;
|
| + emitter_->RegisterListener_Locked(this, events);
|
| + while ((emitter_->GetEventStatus() & events) == 0) {
|
| + int return_code;
|
| + if (ms_timeout >= 0) {
|
| + return_code = pthread_cond_timedwait(&signal_cond_,
|
| + emitter_->GetLock().mutex(),
|
| + &timeout);
|
| + } else {
|
| + return_code = pthread_cond_wait(&signal_cond_,
|
| + emitter_->GetLock().mutex());
|
| }
|
|
|
| - // Copy signals over as long as we have room
|
| - while (!signaled_.empty() && (*out_count < max)) {
|
| - EventInfoSet_t::iterator it = signaled_.begin();
|
| + if (emitter_->GetEventStatus() & POLLERR)
|
| + return_code = EINTR;
|
|
|
| - events[*out_count].events = (*it)->events;
|
| - events[*out_count].user_data = (*it)->user_data;
|
| - (*out_count)++;
|
| -
|
| - signaled_.erase(it);
|
| + // Return the failure, unlocked
|
| + if (return_code != 0) {
|
| + emitter_->UnregisterListener_Locked(this);
|
| + return Error(return_code);
|
| }
|
| -
|
| - // If we are the last thread waiting, clear out the signalled set
|
| - if (1 == waiting_)
|
| - signaled_.clear();
|
| -
|
| - // We are done waiting.
|
| - waiting_--;
|
| }
|
|
|
| + emitter_->UnregisterListener_Locked(this);
|
| return 0;
|
| }
|
|
|
| -Error EventListener::Track(int id,
|
| - const ScopedEventEmitter& emitter,
|
| - uint32_t filter,
|
| - uint64_t user_data) {
|
| - AUTO_LOCK(info_lock_);
|
| - EventInfoMap_t::iterator it = event_info_map_.find(id);
|
| -
|
| - // If it's not a streaming type, then it can not be added.
|
| - if ((emitter->GetType() & (S_IFIFO | S_IFSOCK | S_IFCHR)) == 0)
|
| - return EPERM;
|
| -
|
| - if (it != event_info_map_.end())
|
| - return EEXIST;
|
| -
|
| - if (emitter.get() == this)
|
| - return EINVAL;
|
| -
|
| - ScopedEventInfo info(new EventInfo);
|
| - info->emitter = emitter.get();
|
| - info->listener = this;
|
| - info->id = id;
|
| - info->filter = filter;
|
| - info->user_data = user_data;
|
| - info->events = 0;
|
| -
|
| - emitter->RegisterEventInfo(info);
|
| - event_info_map_[id] = info;
|
| - return 0;
|
| +void EventListenerPoll::ReceiveEvents(EventEmitter* emitter,
|
| + uint32_t events) {
|
| + AUTO_LOCK(signal_lock_);
|
| + emitters_[emitter]->events |= events;
|
| + signaled_++;
|
| + pthread_cond_signal(&signal_cond_);
|
| }
|
|
|
| -Error EventListener::Update(int id, uint32_t filter, uint64_t user_data) {
|
| - AUTO_LOCK(info_lock_);
|
| - EventInfoMap_t::iterator it = event_info_map_.find(id);
|
| - if (it == event_info_map_.end())
|
| - return ENOENT;
|
| +Error EventListenerPoll::WaitOnAny(EventRequest* requests,
|
| + size_t cnt,
|
| + int ms_timeout) {
|
|
|
| - ScopedEventInfo& info = it->second;
|
| - info->filter = filter;
|
| - info->user_data = user_data;
|
| - return 0;
|
| -}
|
| + signaled_ = 0;
|
|
|
| -Error EventListener::Free(int id) {
|
| - AUTO_LOCK(info_lock_);
|
| - EventInfoMap_t::iterator it = event_info_map_.find(id);
|
| - if (event_info_map_.end() == it)
|
| - return ENOENT;
|
| + // Build a map of request emitters to request data before
|
| + // emitters can access them.
|
| + for (size_t index = 0; index < cnt; index++) {
|
| + EventRequest* request = requests + index;
|
| + emitters_[request->emitter.get()] = request;
|
| + request->events = 0;
|
| + }
|
|
|
| - it->second->emitter->UnregisterEventInfo(it->second);
|
| - event_info_map_.erase(it);
|
| - return 0;
|
| -}
|
| + // Emitters can now accessed the unlocked set, since each emitter is
|
| + // responsible for it's own request.
|
| + for (size_t index = 0; index < cnt; index++) {
|
| + EventRequest* request = requests + index;
|
| + requests->emitter->RegisterListener(this, request->filter);
|
| + uint32_t events = requests->emitter->GetEventStatus() & request->filter;
|
| +
|
| + if (events) {
|
| + AUTO_LOCK(signal_lock_);
|
| + request->events |= events;
|
| + signaled_++;
|
| + }
|
| + }
|
| +
|
| + struct timespec timeout;
|
| + AbsoluteFromDeltaMS(&timeout, ms_timeout);
|
| + int return_code = 0;
|
|
|
| -void EventListener::AbandonedEventInfo(const ScopedEventInfo& event) {
|
| {
|
| - AUTO_LOCK(info_lock_);
|
| + AUTO_LOCK(signal_lock_)
|
| + while (0 == signaled_) {
|
| + if (ms_timeout >= 0) {
|
| + return_code = pthread_cond_timedwait(&signal_cond_,
|
| + signal_lock_.mutex(),
|
| + &timeout);
|
| + } else {
|
| + return_code = pthread_cond_wait(&signal_cond_,
|
| + signal_lock_.mutex());
|
| + }
|
|
|
| - event->emitter = NULL;
|
| - event_info_map_.erase(event->id);
|
| + if (return_code != 0)
|
| + signaled_++;
|
| + }
|
| }
|
|
|
| - // EventInfos abandoned by the destroyed emitter must still be kept in
|
| - // signaled_ set for POLLHUP.
|
| - event->events = POLLHUP;
|
| - Signal(event);
|
| + // Unregister first to prevent emitters from modifying the set any further
|
| + for (size_t index = 0; index < cnt; index++) {
|
| + EventRequest* request = requests + index;
|
| + request->emitter->UnregisterListener(this);
|
| +
|
| + if (request->events & POLLERR)
|
| + return_code = EINTR;
|
| + }
|
| +
|
| + // We can now release the map.
|
| + emitters_.clear();
|
| +
|
| + return Error(return_code);
|
| }
|
|
|
| } // namespace nacl_io
|
|
|