| Index: native_client_sdk/src/libraries/nacl_io/event_listener.cc | 
| diff --git a/native_client_sdk/src/libraries/nacl_io/event_listener.cc b/native_client_sdk/src/libraries/nacl_io/event_listener.cc | 
| index 11297b6c05debcb02c7939590a18eb23ee65dd96..ce679f8030418c6a173228e41732c1805d53409f 100644 | 
| --- a/native_client_sdk/src/libraries/nacl_io/event_listener.cc | 
| +++ b/native_client_sdk/src/libraries/nacl_io/event_listener.cc | 
| @@ -27,43 +27,6 @@ EventListener::~EventListener() { | 
| pthread_cond_destroy(&signal_cond_); | 
| } | 
|  | 
| -// Before we can destroy ourselves, we must first unregister all the | 
| -// EventInfo objects from the various EventListeners | 
| -void EventListener::Destroy() { | 
| -  EventInfoMap_t::iterator it; | 
| - | 
| -  // We do not take the lock since this is the only reference to this object. | 
| -  for (it = event_info_map_.begin(); it != event_info_map_.end(); it++) { | 
| -    if (it->second->emitter) { | 
| -      it->second->emitter->UnregisterEventInfo(it->second); | 
| -    } | 
| -  } | 
| - | 
| -  EventEmitter::Destroy(); | 
| -} | 
| - | 
| -uint32_t EventListener::GetEventStatus() { | 
| -  // Always writable, but we can only assume it to be readable if there | 
| -  // is an event waiting. | 
| -  return signaled_.empty() ? POLLOUT : POLLIN | POLLOUT; | 
| -} | 
| - | 
| -int EventListener::GetType() { | 
| -  // For lack of a better type, report socket to signify it can be in an | 
| -  // used to signal. | 
| -  return S_IFSOCK; | 
| -} | 
| - | 
| -// Called by EventEmitter, wakes up any blocking threads to verify if the wait | 
| -// conditions have been met. | 
| -void EventListener::Signal(const ScopedEventInfo& info) { | 
| -  AUTO_LOCK(signal_lock_); | 
| -  if (waiting_) { | 
| -    signaled_.insert(info); | 
| -    pthread_cond_broadcast(&signal_cond_); | 
| -  } | 
| -} | 
| - | 
| static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) { | 
| if (ms_timeout >= 0) { | 
| uint64_t usec = usec_since_epoch(); | 
| @@ -77,168 +40,124 @@ static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) { | 
| } | 
| } | 
|  | 
| -Error EventListener::Wait(EventData* events, | 
| -                          int max, | 
| -                          int ms_timeout, | 
| -                          int* out_count) { | 
| -  *out_count = 0; | 
| - | 
| -  if (max <= 0) | 
| -    return EINVAL; | 
|  | 
| -  if (NULL == events) | 
| -    return EFAULT; | 
| - | 
| -  { | 
| -    AUTO_LOCK(info_lock_); | 
| - | 
| -    // Go through the "live" event infos and see if they are in a signaled state | 
| -    EventInfoMap_t::iterator it = event_info_map_.begin(); | 
| -    while ((it != event_info_map_.end()) && (*out_count < max)) { | 
| -      ScopedEventInfo& info = it->second; | 
| -      uint32_t event_bits = info->emitter->GetEventStatus() & info->filter; | 
| - | 
| -      if (event_bits) { | 
| -        events[*out_count].events = event_bits; | 
| -        events[*out_count].user_data = info->user_data; | 
| -        (*out_count)++; | 
| -      } | 
| +EventListenerLock::EventListenerLock(EventEmitter* emitter) | 
| +    : EventListener(), | 
| +      emitter_(emitter), | 
| +      lock_(new sdk_util::AutoLock(emitter->GetLock())), | 
| +      events_(0) { | 
| +} | 
|  | 
| -      it++; | 
| -    } | 
| -  } // End of info_lock scope. | 
| +EventListenerLock::~EventListenerLock() { | 
| +  delete lock_; | 
| +} | 
|  | 
| -  // We are done if we have a signal or no timeout specified. | 
| -  if ((*out_count > 0) || (0 == ms_timeout)) | 
| -    return 0; | 
| +void EventListenerLock::ReceiveEvents(EventEmitter* emitter, | 
| +                                        uint32_t events) { | 
| +  // We are using the emitter's mutex, which is already locked. | 
| +  pthread_cond_signal(&signal_cond_); | 
| +} | 
|  | 
| -  // Compute the absolute time we can wait until. | 
| +Error EventListenerLock::WaitOnEvent(uint32_t events, int ms_timeout) { | 
| struct timespec timeout; | 
| AbsoluteFromDeltaMS(&timeout, ms_timeout); | 
|  | 
| -  // Keep looking if until we receive something. | 
| -  while (0 == *out_count) { | 
| -    // We are now officially waiting. | 
| -    AUTO_LOCK(signal_lock_); | 
| -    waiting_++; | 
| - | 
| -    // If we don't have any signals yet, wait for any Emitter to Signal. | 
| -    while (signaled_.empty()) { | 
| -      int return_code; | 
| -      if (ms_timeout >= 0) { | 
| -        return_code = pthread_cond_timedwait(&signal_cond_, | 
| -                                             signal_lock_.mutex(), | 
| -                                             &timeout); | 
| -      } else { | 
| -        return_code = pthread_cond_wait(&signal_cond_, signal_lock_.mutex()); | 
| -      } | 
| - | 
| -      Error error(return_code); | 
| - | 
| -      // If there is no error, then we may have been signaled. | 
| -      if (0 == error) | 
| -        break; | 
| - | 
| -      // For any error case: | 
| -      if (ETIMEDOUT == error) { | 
| -        // A "TIMEOUT" is not an error. | 
| -        error = 0; | 
| -      } else { | 
| -        // Otherwise this has gone bad, so return EBADF. | 
| -        error = EBADF; | 
| -      } | 
| - | 
| -      waiting_--; | 
| -      return error; | 
| +  emitter_->RegisterListener_Locked(this, events); | 
| +  while ((emitter_->GetEventStatus() & events) == 0) { | 
| +    int return_code; | 
| +    if (ms_timeout >= 0) { | 
| +      return_code = pthread_cond_timedwait(&signal_cond_, | 
| +                                            emitter_->GetLock().mutex(), | 
| +                                            &timeout); | 
| +    } else { | 
| +      return_code = pthread_cond_wait(&signal_cond_, | 
| +                                      emitter_->GetLock().mutex()); | 
| } | 
|  | 
| -    // Copy signals over as long as we have room | 
| -    while (!signaled_.empty() && (*out_count < max)) { | 
| -      EventInfoSet_t::iterator it = signaled_.begin(); | 
| +    if (emitter_->GetEventStatus() & POLLERR) | 
| +      return_code = EINTR; | 
|  | 
| -      events[*out_count].events = (*it)->events; | 
| -      events[*out_count].user_data = (*it)->user_data; | 
| -      (*out_count)++; | 
| - | 
| -      signaled_.erase(it); | 
| +    // Return the failure, unlocked | 
| +    if (return_code != 0) { | 
| +      emitter_->UnregisterListener_Locked(this); | 
| +      return Error(return_code); | 
| } | 
| - | 
| -    // If we are the last thread waiting, clear out the signalled set | 
| -    if (1 == waiting_) | 
| -      signaled_.clear(); | 
| - | 
| -    // We are done waiting. | 
| -    waiting_--; | 
| } | 
|  | 
| +  emitter_->UnregisterListener_Locked(this); | 
| return 0; | 
| } | 
|  | 
| -Error EventListener::Track(int id, | 
| -                           const ScopedEventEmitter& emitter, | 
| -                           uint32_t filter, | 
| -                           uint64_t user_data) { | 
| -  AUTO_LOCK(info_lock_); | 
| -  EventInfoMap_t::iterator it = event_info_map_.find(id); | 
| - | 
| -  // If it's not a streaming type, then it can not be added. | 
| -  if ((emitter->GetType() & (S_IFIFO | S_IFSOCK | S_IFCHR)) == 0) | 
| -    return EPERM; | 
| - | 
| -  if (it != event_info_map_.end()) | 
| -    return EEXIST; | 
| - | 
| -  if (emitter.get() == this) | 
| -    return EINVAL; | 
| - | 
| -  ScopedEventInfo info(new EventInfo); | 
| -  info->emitter = emitter.get(); | 
| -  info->listener = this; | 
| -  info->id = id; | 
| -  info->filter = filter; | 
| -  info->user_data = user_data; | 
| -  info->events = 0; | 
| - | 
| -  emitter->RegisterEventInfo(info); | 
| -  event_info_map_[id] = info; | 
| -  return 0; | 
| +void EventListenerPoll::ReceiveEvents(EventEmitter* emitter, | 
| +                                        uint32_t events) { | 
| +  AUTO_LOCK(signal_lock_); | 
| +  emitters_[emitter]->events |= events; | 
| +  signaled_++; | 
| +  pthread_cond_signal(&signal_cond_); | 
| } | 
|  | 
| -Error EventListener::Update(int id, uint32_t filter, uint64_t user_data) { | 
| -  AUTO_LOCK(info_lock_); | 
| -  EventInfoMap_t::iterator it = event_info_map_.find(id); | 
| -  if (it == event_info_map_.end()) | 
| -    return ENOENT; | 
| +Error EventListenerPoll::WaitOnAny(EventRequest* requests, | 
| +                                     size_t cnt, | 
| +                                     int ms_timeout) { | 
|  | 
| -  ScopedEventInfo& info = it->second; | 
| -  info->filter = filter; | 
| -  info->user_data = user_data; | 
| -  return 0; | 
| -} | 
| +  signaled_ = 0; | 
|  | 
| -Error EventListener::Free(int id) { | 
| -  AUTO_LOCK(info_lock_); | 
| -  EventInfoMap_t::iterator it = event_info_map_.find(id); | 
| -  if (event_info_map_.end() == it) | 
| -    return ENOENT; | 
| +  // Build a map of request emitters to request data before | 
| +  // emitters can access them. | 
| +  for (size_t index = 0; index < cnt; index++) { | 
| +    EventRequest* request = requests + index; | 
| +    emitters_[request->emitter.get()] = request; | 
| +    request->events = 0; | 
| +  } | 
|  | 
| -  it->second->emitter->UnregisterEventInfo(it->second); | 
| -  event_info_map_.erase(it); | 
| -  return 0; | 
| -} | 
| +  // Emitters can now accessed the unlocked set, since each emitter is | 
| +  // responsible for it's own request. | 
| +  for (size_t index = 0; index < cnt; index++) { | 
| +    EventRequest* request = requests + index; | 
| +    requests->emitter->RegisterListener(this, request->filter); | 
| +    uint32_t events = requests->emitter->GetEventStatus() & request->filter; | 
| + | 
| +    if (events) { | 
| +      AUTO_LOCK(signal_lock_); | 
| +      request->events |= events; | 
| +      signaled_++; | 
| +    } | 
| +  } | 
| + | 
| +  struct timespec timeout; | 
| +  AbsoluteFromDeltaMS(&timeout, ms_timeout); | 
| +  int return_code = 0; | 
|  | 
| -void EventListener::AbandonedEventInfo(const ScopedEventInfo& event) { | 
| { | 
| -    AUTO_LOCK(info_lock_); | 
| +    AUTO_LOCK(signal_lock_) | 
| +    while (0 == signaled_) { | 
| +      if (ms_timeout >= 0) { | 
| +        return_code = pthread_cond_timedwait(&signal_cond_, | 
| +                                              signal_lock_.mutex(), | 
| +                                              &timeout); | 
| +      } else { | 
| +        return_code = pthread_cond_wait(&signal_cond_, | 
| +                                        signal_lock_.mutex()); | 
| +      } | 
|  | 
| -    event->emitter = NULL; | 
| -    event_info_map_.erase(event->id); | 
| +      if (return_code != 0) | 
| +        signaled_++; | 
| +    } | 
| } | 
|  | 
| -  // EventInfos abandoned by the destroyed emitter must still be kept in | 
| -  // signaled_ set for POLLHUP. | 
| -  event->events = POLLHUP; | 
| -  Signal(event); | 
| +  // Unregister first to prevent emitters from modifying the set any further | 
| +  for (size_t index = 0; index < cnt; index++) { | 
| +    EventRequest* request = requests + index; | 
| +    request->emitter->UnregisterListener(this); | 
| + | 
| +    if (request->events & POLLERR) | 
| +      return_code = EINTR; | 
| +  } | 
| + | 
| +  // We can now release the map. | 
| +  emitters_.clear(); | 
| + | 
| +  return Error(return_code); | 
| } | 
|  | 
| }  // namespace nacl_io | 
|  |