Chromium Code Reviews| Index: native_client_sdk/src/libraries/nacl_io/event_listener.cc |
| diff --git a/native_client_sdk/src/libraries/nacl_io/event_listener.cc b/native_client_sdk/src/libraries/nacl_io/event_listener.cc |
| index 11297b6c05debcb02c7939590a18eb23ee65dd96..c6f4b0170ab67be95bba0246c9ee425f11735932 100644 |
| --- a/native_client_sdk/src/libraries/nacl_io/event_listener.cc |
| +++ b/native_client_sdk/src/libraries/nacl_io/event_listener.cc |
| @@ -27,43 +27,6 @@ EventListener::~EventListener() { |
| pthread_cond_destroy(&signal_cond_); |
| } |
| -// Before we can destroy ourselves, we must first unregister all the |
| -// EventInfo objects from the various EventListeners |
| -void EventListener::Destroy() { |
| - EventInfoMap_t::iterator it; |
| - |
| - // We do not take the lock since this is the only reference to this object. |
| - for (it = event_info_map_.begin(); it != event_info_map_.end(); it++) { |
| - if (it->second->emitter) { |
| - it->second->emitter->UnregisterEventInfo(it->second); |
| - } |
| - } |
| - |
| - EventEmitter::Destroy(); |
| -} |
| - |
| -uint32_t EventListener::GetEventStatus() { |
| - // Always writable, but we can only assume it to be readable if there |
| - // is an event waiting. |
| - return signaled_.empty() ? POLLOUT : POLLIN | POLLOUT; |
| -} |
| - |
| -int EventListener::GetType() { |
| - // For lack of a better type, report socket to signify it can be in an |
| - // used to signal. |
| - return S_IFSOCK; |
| -} |
| - |
| -// Called by EventEmitter, wakes up any blocking threads to verify if the wait |
| -// conditions have been met. |
| -void EventListener::Signal(const ScopedEventInfo& info) { |
| - AUTO_LOCK(signal_lock_); |
| - if (waiting_) { |
| - signaled_.insert(info); |
| - pthread_cond_broadcast(&signal_cond_); |
| - } |
| -} |
| - |
| static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) { |
| if (ms_timeout >= 0) { |
| uint64_t usec = usec_since_epoch(); |
| @@ -77,168 +40,125 @@ static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) { |
| } |
| } |
| -Error EventListener::Wait(EventData* events, |
| - int max, |
| - int ms_timeout, |
| - int* out_count) { |
| - *out_count = 0; |
| - |
| - if (max <= 0) |
| - return EINVAL; |
| - if (NULL == events) |
| - return EFAULT; |
| - |
| - { |
| - AUTO_LOCK(info_lock_); |
| - |
| - // Go through the "live" event infos and see if they are in a signaled state |
| - EventInfoMap_t::iterator it = event_info_map_.begin(); |
| - while ((it != event_info_map_.end()) && (*out_count < max)) { |
| - ScopedEventInfo& info = it->second; |
| - uint32_t event_bits = info->emitter->GetEventStatus() & info->filter; |
| - |
| - if (event_bits) { |
| - events[*out_count].events = event_bits; |
| - events[*out_count].user_data = info->user_data; |
| - (*out_count)++; |
| - } |
| +EventListenerLock::EventListenerLock(EventEmitter* emitter) |
| + : EventListener(), |
| + emitter_(emitter), |
| + lock_(), |
| + events_(0) { |
| + lock_ = new sdk_util::AutoLock(emitter->GetLock()); |
|
binji
2013/09/12 01:47:57
move into initializer list above?
noelallen1
2013/09/12 23:19:03
Done.
|
| +} |
| - it++; |
| - } |
| - } // End of info_lock scope. |
| +EventListenerLock::~EventListenerLock() { |
| + delete lock_; |
| +} |
| - // We are done if we have a signal or no timeout specified. |
| - if ((*out_count > 0) || (0 == ms_timeout)) |
| - return 0; |
| +void EventListenerLock::ReceiveEvents(EventEmitter* emitter, |
| + uint32_t events) { |
| + // We are using the emitter's mutex, which is already locked. |
| + pthread_cond_signal(&signal_cond_); |
| +} |
| - // Compute the absolute time we can wait until. |
| +Error EventListenerLock::WaitOnEvent(uint32_t events, int ms_timeout) { |
| struct timespec timeout; |
| AbsoluteFromDeltaMS(&timeout, ms_timeout); |
| - // Keep looking if until we receive something. |
| - while (0 == *out_count) { |
| - // We are now officially waiting. |
| - AUTO_LOCK(signal_lock_); |
| - waiting_++; |
| - |
| - // If we don't have any signals yet, wait for any Emitter to Signal. |
| - while (signaled_.empty()) { |
| - int return_code; |
| - if (ms_timeout >= 0) { |
| - return_code = pthread_cond_timedwait(&signal_cond_, |
| - signal_lock_.mutex(), |
| - &timeout); |
| - } else { |
| - return_code = pthread_cond_wait(&signal_cond_, signal_lock_.mutex()); |
| - } |
| - |
| - Error error(return_code); |
| - |
| - // If there is no error, then we may have been signaled. |
| - if (0 == error) |
| - break; |
| - |
| - // For any error case: |
| - if (ETIMEDOUT == error) { |
| - // A "TIMEOUT" is not an error. |
| - error = 0; |
| - } else { |
| - // Otherwise this has gone bad, so return EBADF. |
| - error = EBADF; |
| - } |
| - |
| - waiting_--; |
| - return error; |
| + emitter_->RegisterListener_Locked(this, events); |
| + while ((emitter_->GetEventStatus() & events) == 0) { |
| + int return_code; |
| + if (ms_timeout >= 0) { |
| + return_code = pthread_cond_timedwait(&signal_cond_, |
| + emitter_->GetLock().mutex(), |
| + &timeout); |
| + } else { |
| + return_code = pthread_cond_wait(&signal_cond_, |
| + emitter_->GetLock().mutex()); |
| } |
| - // Copy signals over as long as we have room |
| - while (!signaled_.empty() && (*out_count < max)) { |
| - EventInfoSet_t::iterator it = signaled_.begin(); |
| + if (emitter_->GetEventStatus() & POLLERR) |
| + return_code = EINTR; |
| - events[*out_count].events = (*it)->events; |
| - events[*out_count].user_data = (*it)->user_data; |
| - (*out_count)++; |
| - |
| - signaled_.erase(it); |
| + // Return the failure, unlocked |
| + if (return_code != 0) { |
| + emitter_->UnregisterListener_Locked(this); |
| + return Error(return_code); |
| } |
| - |
| - // If we are the last thread waiting, clear out the signalled set |
| - if (1 == waiting_) |
| - signaled_.clear(); |
| - |
| - // We are done waiting. |
| - waiting_--; |
| } |
| + emitter_->UnregisterListener_Locked(this); |
| return 0; |
| } |
| -Error EventListener::Track(int id, |
| - const ScopedEventEmitter& emitter, |
| - uint32_t filter, |
| - uint64_t user_data) { |
| - AUTO_LOCK(info_lock_); |
| - EventInfoMap_t::iterator it = event_info_map_.find(id); |
| - |
| - // If it's not a streaming type, then it can not be added. |
| - if ((emitter->GetType() & (S_IFIFO | S_IFSOCK | S_IFCHR)) == 0) |
| - return EPERM; |
| - |
| - if (it != event_info_map_.end()) |
| - return EEXIST; |
| - |
| - if (emitter.get() == this) |
| - return EINVAL; |
| - |
| - ScopedEventInfo info(new EventInfo); |
| - info->emitter = emitter.get(); |
| - info->listener = this; |
| - info->id = id; |
| - info->filter = filter; |
| - info->user_data = user_data; |
| - info->events = 0; |
| - |
| - emitter->RegisterEventInfo(info); |
| - event_info_map_[id] = info; |
| - return 0; |
| +void EventListenerPoll::ReceiveEvents(EventEmitter* emitter, |
| + uint32_t events) { |
| + AUTO_LOCK(signal_lock_); |
| + emitters_[emitter]->events |= events; |
| + signaled_++; |
| + pthread_cond_signal(&signal_cond_); |
| } |
| -Error EventListener::Update(int id, uint32_t filter, uint64_t user_data) { |
| - AUTO_LOCK(info_lock_); |
| - EventInfoMap_t::iterator it = event_info_map_.find(id); |
| - if (it == event_info_map_.end()) |
| - return ENOENT; |
| +Error EventListenerPoll::WaitOnAny(EventRequest* requests, |
| + size_t cnt, |
| + int ms_timeout) { |
| - ScopedEventInfo& info = it->second; |
| - info->filter = filter; |
| - info->user_data = user_data; |
| - return 0; |
| -} |
| + signaled_ = 0; |
| -Error EventListener::Free(int id) { |
| - AUTO_LOCK(info_lock_); |
| - EventInfoMap_t::iterator it = event_info_map_.find(id); |
| - if (event_info_map_.end() == it) |
| - return ENOENT; |
| + // Build a map of request emitters to request data before |
| + // emitters can access them. |
| + for (size_t index = 0; index < cnt; index++) { |
| + EventRequest* request = requests + index; |
| + emitters_[request->emitter.get()] = request; |
| + request->events = 0; |
| + } |
| - it->second->emitter->UnregisterEventInfo(it->second); |
| - event_info_map_.erase(it); |
| - return 0; |
| -} |
| + // Emitters can now accessed the unlocked set, since each emitter is |
| + // responsible for it's own request. |
| + for (size_t index = 0; index < cnt; index++) { |
| + EventRequest* request = requests + index; |
| + requests->emitter->RegisterListener(this, request->filter); |
| + uint32_t events = requests->emitter->GetEventStatus() & request->filter; |
| + |
| + if (events) { |
| + AUTO_LOCK(signal_lock_); |
| + request->events |= events; |
| + signaled_++; |
| + } |
| + } |
| + |
| + struct timespec timeout; |
| + AbsoluteFromDeltaMS(&timeout, ms_timeout); |
| + int return_code = 0; |
| -void EventListener::AbandonedEventInfo(const ScopedEventInfo& event) { |
| { |
| - AUTO_LOCK(info_lock_); |
| + AUTO_LOCK(signal_lock_) |
| + while (0 == signaled_) { |
| + if (ms_timeout >= 0) { |
| + return_code = pthread_cond_timedwait(&signal_cond_, |
| + signal_lock_.mutex(), |
| + &timeout); |
| + } else { |
| + return_code = pthread_cond_wait(&signal_cond_, |
| + signal_lock_.mutex()); |
| + } |
| - event->emitter = NULL; |
| - event_info_map_.erase(event->id); |
| + if (return_code != 0) |
| + signaled_++; |
| + } |
| } |
| - // EventInfos abandoned by the destroyed emitter must still be kept in |
| - // signaled_ set for POLLHUP. |
| - event->events = POLLHUP; |
| - Signal(event); |
| + // Unregister first to prevent emitters from modifying the set any further |
| + for (size_t index = 0; index < cnt; index++) { |
| + EventRequest* request = requests + index; |
| + request->emitter->UnregisterListener(this); |
| + |
| + if (request->events & POLLERR) |
| + return_code = EINTR; |
| + } |
| + |
| + // We can now release the map. |
| + emitters_.clear(); |
| + |
| + return Error(return_code); |
| } |
| } // namespace nacl_io |