 Chromium Code Reviews
 Chromium Code Reviews Issue 23498015:
  [NaCl SDK] Support non blocking TCP/UDP  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src
    
  
    Issue 23498015:
  [NaCl SDK] Support non blocking TCP/UDP  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src| Index: native_client_sdk/src/libraries/nacl_io/event_listener.cc | 
| diff --git a/native_client_sdk/src/libraries/nacl_io/event_listener.cc b/native_client_sdk/src/libraries/nacl_io/event_listener.cc | 
| index 11297b6c05debcb02c7939590a18eb23ee65dd96..545078cb3f2f45b5ba9f3e87c21b00c4004da0df 100644 | 
| --- a/native_client_sdk/src/libraries/nacl_io/event_listener.cc | 
| +++ b/native_client_sdk/src/libraries/nacl_io/event_listener.cc | 
| @@ -27,43 +27,6 @@ EventListener::~EventListener() { | 
| pthread_cond_destroy(&signal_cond_); | 
| } | 
| -// Before we can destroy ourselves, we must first unregister all the | 
| -// EventInfo objects from the various EventListeners | 
| -void EventListener::Destroy() { | 
| - EventInfoMap_t::iterator it; | 
| - | 
| - // We do not take the lock since this is the only reference to this object. | 
| - for (it = event_info_map_.begin(); it != event_info_map_.end(); it++) { | 
| - if (it->second->emitter) { | 
| - it->second->emitter->UnregisterEventInfo(it->second); | 
| - } | 
| - } | 
| - | 
| - EventEmitter::Destroy(); | 
| -} | 
| - | 
| -uint32_t EventListener::GetEventStatus() { | 
| - // Always writable, but we can only assume it to be readable if there | 
| - // is an event waiting. | 
| - return signaled_.empty() ? POLLOUT : POLLIN | POLLOUT; | 
| -} | 
| - | 
| -int EventListener::GetType() { | 
| - // For lack of a better type, report socket to signify it can be in an | 
| - // used to signal. | 
| - return S_IFSOCK; | 
| -} | 
| - | 
| -// Called by EventEmitter, wakes up any blocking threads to verify if the wait | 
| -// conditions have been met. | 
| -void EventListener::Signal(const ScopedEventInfo& info) { | 
| - AUTO_LOCK(signal_lock_); | 
| - if (waiting_) { | 
| - signaled_.insert(info); | 
| - pthread_cond_broadcast(&signal_cond_); | 
| - } | 
| -} | 
| - | 
| static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) { | 
| if (ms_timeout >= 0) { | 
| uint64_t usec = usec_since_epoch(); | 
| @@ -77,168 +40,124 @@ static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) { | 
| } | 
| } | 
| -Error EventListener::Wait(EventData* events, | 
| - int max, | 
| - int ms_timeout, | 
| - int* out_count) { | 
| - *out_count = 0; | 
| - | 
| - if (max <= 0) | 
| - return EINVAL; | 
| +void EventListenerSingle::ReceiveEvents(EventEmitter* emitter, | 
| + uint32_t events) { | 
| + // We are using the emitter's mutex, which is already locked. | 
| + pthread_cond_signal(&signal_cond_); | 
| +} | 
| - if (NULL == events) | 
| - return EFAULT; | 
| +Error EventListenerSingle::WaitOnLock(EventEmitter* emitter, | 
| + uint32_t events, | 
| + int ms_timeout, | 
| + sdk_util::AutoUnlock* unlock) { | 
| - { | 
| - AUTO_LOCK(info_lock_); | 
| - | 
| - // Go through the "live" event infos and see if they are in a signaled state | 
| - EventInfoMap_t::iterator it = event_info_map_.begin(); | 
| - while ((it != event_info_map_.end()) && (*out_count < max)) { | 
| - ScopedEventInfo& info = it->second; | 
| - uint32_t event_bits = info->emitter->GetEventStatus() & info->filter; | 
| - | 
| - if (event_bits) { | 
| - events[*out_count].events = event_bits; | 
| - events[*out_count].user_data = info->user_data; | 
| - (*out_count)++; | 
| - } | 
| + // We are using the emitter's mutex for signalling, so lock it | 
| + unlock->Give(); | 
| + emitter->GetLock().Lock(); | 
| + unlock->Take(emitter->GetLock()); | 
| - it++; | 
| - } | 
| - } // End of info_lock scope. | 
| + emitter->RegisterListener_Locked(this, events); | 
| - // We are done if we have a signal or no timeout specified. | 
| - if ((*out_count > 0) || (0 == ms_timeout)) | 
| - return 0; | 
| - | 
| - // Compute the absolute time we can wait until. | 
| struct timespec timeout; | 
| AbsoluteFromDeltaMS(&timeout, ms_timeout); | 
| - // Keep looking if until we receive something. | 
| - while (0 == *out_count) { | 
| - // We are now officially waiting. | 
| - AUTO_LOCK(signal_lock_); | 
| - waiting_++; | 
| - | 
| - // If we don't have any signals yet, wait for any Emitter to Signal. | 
| - while (signaled_.empty()) { | 
| - int return_code; | 
| - if (ms_timeout >= 0) { | 
| - return_code = pthread_cond_timedwait(&signal_cond_, | 
| - signal_lock_.mutex(), | 
| - &timeout); | 
| - } else { | 
| - return_code = pthread_cond_wait(&signal_cond_, signal_lock_.mutex()); | 
| - } | 
| - | 
| - Error error(return_code); | 
| - | 
| - // If there is no error, then we may have been signaled. | 
| - if (0 == error) | 
| - break; | 
| - | 
| - // For any error case: | 
| - if (ETIMEDOUT == error) { | 
| - // A "TIMEOUT" is not an error. | 
| - error = 0; | 
| - } else { | 
| - // Otherwise this has gone bad, so return EBADF. | 
| - error = EBADF; | 
| - } | 
| - | 
| - waiting_--; | 
| - return error; | 
| + while ((emitter->GetEventStatus() & events) == 0) { | 
| + int return_code; | 
| + if (ms_timeout >= 0) { | 
| + return_code = pthread_cond_timedwait(&signal_cond_, | 
| + emitter->GetLock().mutex(), | 
| + &timeout); | 
| + } else { | 
| + return_code = pthread_cond_wait(&signal_cond_, | 
| + emitter->GetLock().mutex()); | 
| } | 
| - // Copy signals over as long as we have room | 
| - while (!signaled_.empty() && (*out_count < max)) { | 
| - EventInfoSet_t::iterator it = signaled_.begin(); | 
| + if (emitter->GetEventStatus() & POLLERR) | 
| + return_code = EINTR; | 
| - events[*out_count].events = (*it)->events; | 
| - events[*out_count].user_data = (*it)->user_data; | 
| - (*out_count)++; | 
| - | 
| - signaled_.erase(it); | 
| + // Return the failure, unlocked | 
| 
binji
2013/09/12 01:47:56
comment is incorrect, still locked here, right?
 | 
| + if (return_code != 0) { | 
| + emitter->UnregisterListener_Locked(this); | 
| + return Error(return_code); | 
| } | 
| - | 
| - // If we are the last thread waiting, clear out the signalled set | 
| - if (1 == waiting_) | 
| - signaled_.clear(); | 
| - | 
| - // We are done waiting. | 
| - waiting_--; | 
| } | 
| + // Exiting with emitter locked | 
| + emitter->UnregisterListener_Locked(this); | 
| return 0; | 
| } | 
| -Error EventListener::Track(int id, | 
| - const ScopedEventEmitter& emitter, | 
| - uint32_t filter, | 
| - uint64_t user_data) { | 
| - AUTO_LOCK(info_lock_); | 
| - EventInfoMap_t::iterator it = event_info_map_.find(id); | 
| - | 
| - // If it's not a streaming type, then it can not be added. | 
| - if ((emitter->GetType() & (S_IFIFO | S_IFSOCK | S_IFCHR)) == 0) | 
| - return EPERM; | 
| - | 
| - if (it != event_info_map_.end()) | 
| - return EEXIST; | 
| - | 
| - if (emitter.get() == this) | 
| - return EINVAL; | 
| - | 
| - ScopedEventInfo info(new EventInfo); | 
| - info->emitter = emitter.get(); | 
| - info->listener = this; | 
| - info->id = id; | 
| - info->filter = filter; | 
| - info->user_data = user_data; | 
| - info->events = 0; | 
| - | 
| - emitter->RegisterEventInfo(info); | 
| - event_info_map_[id] = info; | 
| - return 0; | 
| + | 
| +void EventListenerGroup::ReceiveEvents(EventEmitter* emitter, | 
| + uint32_t events) { | 
| 
binji
2013/09/12 01:47:56
nit: line up params
 | 
| + AUTO_LOCK(signal_lock_); | 
| + emitters_[emitter]->events |= events; | 
| + signaled_++; | 
| + pthread_cond_signal(&signal_cond_); | 
| } | 
| -Error EventListener::Update(int id, uint32_t filter, uint64_t user_data) { | 
| - AUTO_LOCK(info_lock_); | 
| - EventInfoMap_t::iterator it = event_info_map_.find(id); | 
| - if (it == event_info_map_.end()) | 
| - return ENOENT; | 
| +Error EventListenerGroup::WaitOnAny(EventRequest* requests, | 
| + size_t cnt, | 
| 
binji
2013/09/12 01:47:56
nit: line up params
 | 
| + int ms_timeout) { | 
| - ScopedEventInfo& info = it->second; | 
| - info->filter = filter; | 
| - info->user_data = user_data; | 
| - return 0; | 
| -} | 
| + signaled_ = 0; | 
| -Error EventListener::Free(int id) { | 
| - AUTO_LOCK(info_lock_); | 
| - EventInfoMap_t::iterator it = event_info_map_.find(id); | 
| - if (event_info_map_.end() == it) | 
| - return ENOENT; | 
| + // Build a map of request emitters to request data before | 
| + // emitters can access them. | 
| + for (size_t index = 0; index < cnt; index++) { | 
| + EventRequest* request = requests + index; | 
| + emitters_[request->emitter.get()] = request; | 
| + request->events = 0; | 
| + } | 
| - it->second->emitter->UnregisterEventInfo(it->second); | 
| - event_info_map_.erase(it); | 
| - return 0; | 
| -} | 
| + // Emitters can now accessed the unlocked set, since each emitter is | 
| + // responsible for it's own request. | 
| + for (size_t index = 0; index < cnt; index++) { | 
| + EventRequest* request = requests + index; | 
| + requests->emitter->RegisterListener(this, request->filter); | 
| + uint32_t events = requests->emitter->GetEventStatus() & request->filter; | 
| + | 
| + if (events) { | 
| + AUTO_LOCK(signal_lock_); | 
| + request->events |= events; | 
| + signaled_++; | 
| + } | 
| + } | 
| + | 
| + struct timespec timeout; | 
| + AbsoluteFromDeltaMS(&timeout, ms_timeout); | 
| + int return_code = 0; | 
| -void EventListener::AbandonedEventInfo(const ScopedEventInfo& event) { | 
| { | 
| - AUTO_LOCK(info_lock_); | 
| + AUTO_LOCK(signal_lock_) | 
| + while (0 == signaled_) { | 
| + if (ms_timeout >= 0) { | 
| + return_code = pthread_cond_timedwait(&signal_cond_, | 
| + signal_lock_.mutex(), | 
| + &timeout); | 
| + } else { | 
| + return_code = pthread_cond_wait(&signal_cond_, | 
| + signal_lock_.mutex()); | 
| + } | 
| + | 
| + if (return_code != 0) | 
| + signaled_++; | 
| + } | 
| + } | 
| + | 
| + // Unregister first to prevent emitters from modifying the set any further | 
| + for (size_t index = 0; index < cnt; index++) { | 
| + EventRequest* request = requests + index; | 
| + request->emitter->UnregisterListener(this); | 
| - event->emitter = NULL; | 
| - event_info_map_.erase(event->id); | 
| + if (request->events & POLLERR) | 
| + return_code = EINTR; | 
| } | 
| - // EventInfos abandoned by the destroyed emitter must still be kept in | 
| - // signaled_ set for POLLHUP. | 
| - event->events = POLLHUP; | 
| - Signal(event); | 
| + // We can now release the map. | 
| + emitters_.clear(); | 
| + | 
| + return Error(return_code); | 
| } | 
| } // namespace nacl_io |