Chromium Code Reviews| Index: native_client_sdk/src/libraries/nacl_io/event_listener.cc |
| diff --git a/native_client_sdk/src/libraries/nacl_io/event_listener.cc b/native_client_sdk/src/libraries/nacl_io/event_listener.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..a2533ce8c47d58decf87c08008b4a8367deb8a02 |
| --- /dev/null |
| +++ b/native_client_sdk/src/libraries/nacl_io/event_listener.cc |
| @@ -0,0 +1,243 @@ |
| +/* Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| + * Use of this source code is governed by a BSD-style license that can be |
| + * found in the LICENSE file. |
| + */ |
| + |
| +#include <errno.h> |
| +#include <pthread.h> |
| +#include <stdio.h> |
| +#include <sys/stat.h> |
| +#include <sys/time.h> |
| +#include <time.h> |
| +#include <unistd.h> |
| + |
| +#include "nacl_io/error.h" |
| +#include "nacl_io/event_listener.h" |
| +#include "sdk_util/auto_lock.h" |
| + |
| + |
| +EventListener::EventListener() { |
| + pthread_cond_init(&signal_cond_, NULL); |
| +} |
| + |
| +EventListener::~EventListener() { |
| + pthread_cond_destroy(&signal_cond_); |
| +} |
| + |
| +// Before we can destroy ourselves, we must first unregister all the |
| +// EventInfo objects from the various EventListeners |
| +void EventListener::Destroy() { |
| + EventInfoMap_t::iterator it; |
| + |
| + // We do not take the lock since this is the only refence to this object. |
|
binji
2013/07/19 20:47:18
sp: reference
noelallen1
2013/07/19 21:35:43
Done.
|
| + for (it = event_info_map_.begin(); it != event_info_map_.end(); it++) { |
| + if (it->second->emitter) { |
| + it->second->emitter->UnregisterEventInfo(it->second); |
| + } |
| + } |
| + |
| + EventEmitter::Destroy(); |
| +} |
| + |
| +uint32_t EventListener::GetEventStatus() { |
| + // Always writable, but we can only assume it to be readable if there |
| + // is an event waiting. |
| + return signaled_.empty() ? KE_WRITE_READY : KE_WRITE_READY | KE_READ_READY; |
| +} |
| + |
| +int EventListener::GetType() { |
| + // For lack of a better type, report socket to signify it can be in an |
| + // used to signal. |
| + return S_IFSOCK; |
| +} |
| + |
| +// Called by EventEmitter, wakes up any blocking threads to verify if the wait |
| +// conditions have been met. |
| +void EventListener::Signal(const ScopedEventInfo& info) { |
| + AutoLock lock(signal_lock_); |
| + if (waiting_) { |
| + signaled_.insert(info); |
| + pthread_cond_broadcast(&signal_cond_); |
| + } |
| +} |
| + |
| +static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) { |
| + struct timeval current_time; |
| + if (ms_timeout >= 0) { |
| + gettimeofday(¤t_time, NULL); |
| + timeout->tv_sec = current_time.tv_sec; |
| + timeout->tv_nsec = current_time.tv_usec * 1000; |
| + timeout->tv_nsec += ms_timeout * 1000000; |
| + while (timeout->tv_nsec > 1000000000) { |
| + timeout->tv_nsec -= 1000000000; |
| + timeout->tv_sec += 1; |
| + } |
| + } else { |
| + timeout->tv_sec = 0; |
| + timeout->tv_nsec = 0; |
| + } |
| +} |
| + |
| +Error EventListener::Wait(EventData* events, |
| + int max, |
| + int ms_timeout, |
| + int* out_count) { |
| + *out_count = 0; |
| + |
| + if (max <= 0) |
| + return EINVAL; |
| + |
| + if (NULL == events) |
| + return EFAULT; |
| + |
| + { |
| + AutoLock info_lock(info_lock_); |
| + |
| + // Go through the "live" event infos and see if they are in a signaled state |
| + EventInfoMap_t::iterator it = event_info_map_.begin(); |
| + while ((it != event_info_map_.end()) && (*out_count < max)) { |
| + ScopedEventInfo& info = it->second; |
| + uint32_t event_bits = info->emitter->GetEventStatus() & info->filter; |
| + |
| + if (event_bits) { |
| + events[*out_count].events = event_bits; |
| + events[*out_count].user_data = info->user_data; |
| + (*out_count)++; |
| + } |
| + |
| + it++; |
| + } |
| + } // End of info_lock scope. |
| + |
| + // We are done if we have a signal or no timeout specified. |
| + if ((*out_count > 0) || (0 == ms_timeout)) |
| + return 0; |
| + |
| + // Compute the absolute time we can wait until. |
| + struct timespec timeout; |
| + AbsoluteFromDeltaMS(&timeout, ms_timeout); |
| + |
| + // Keep looking if until we receive something. |
| + while (0 == *out_count) { |
| + // We are now officially waiting. |
| + AutoLock signal_lock(signal_lock_); |
| + waiting_++; |
| + |
| + // If we don't have any signals yet, wait for any Emitter to Signal. |
| + while (signaled_.empty()) { |
| + int return_code; |
| + if (ms_timeout >= 0) { |
| + return_code = pthread_cond_timedwait(&signal_cond_, |
| + signal_lock_.mutex(), |
| + &timeout); |
| + } else { |
| + return_code = pthread_cond_wait(&signal_cond_, signal_lock_.mutex()); |
| + } |
| + |
| + Error error(return_code); |
| + |
| + // If there is no error, then we may have been signaled. |
| + if (0 == error) |
| + break; |
| + |
| + // For any error case: |
| + if (ETIMEDOUT == error) { |
| + // A "TIMEOUT" is not an error. |
| + error = 0; |
| + } else { |
| + // Otherwise this has gone bad, so return EBADF. |
| + error = EBADF; |
| + } |
| + |
| + waiting_--; |
| + return error; |
| + } |
| + |
| + // Copy signals over as long as we have room |
| + while (!signaled_.empty() && (*out_count < max)) { |
| + EventInfoSet_t::iterator it = signaled_.begin(); |
| + |
| + events[*out_count].events = (*it)->events; |
| + events[*out_count].user_data = (*it)->user_data; |
| + (*out_count)++; |
| + |
| + signaled_.erase(it); |
| + } |
| + |
| + // If we are the last thread waiting, clear out the signalled set |
| + if (1 == waiting_) |
| + signaled_.clear(); |
| + |
| + // We are done waiting. |
| + waiting_--; |
| + } |
| + |
| + return 0; |
| +} |
| + |
| +Error EventListener::Track(int id, |
| + const ScopedRef<EventEmitter>& emitter, |
| + uint32_t filter, |
| + uint64_t user_data) { |
| + AutoLock lock(info_lock_); |
| + EventInfoMap_t::iterator it = event_info_map_.find(id); |
| + |
| + // If it's not a streaming type, then it can not be added. |
| + if ((emitter->GetType() & (S_IFIFO | S_IFSOCK)) == 0) |
| + return EPERM; |
| + |
| + if (it != event_info_map_.end()) |
| + return EEXIST; |
| + |
| + if (emitter.get() == this) |
| + return EINVAL; |
| + |
| + ScopedEventInfo info(new EventInfo); |
| + info->emitter = emitter.get(); |
| + info->listener = this; |
| + info->id = id; |
| + info->filter = filter; |
| + info->user_data = user_data; |
| + info->events = 0; |
| + |
| + emitter->RegisterEventInfo(info); |
| + event_info_map_[id] = info; |
| + return 0; |
| +} |
| + |
| +Error EventListener::Update(int id, uint32_t filter, uint64_t user_data) { |
| + AutoLock lock(info_lock_); |
| + EventInfoMap_t::iterator it = event_info_map_.find(id); |
| + if (it == event_info_map_.end()) |
| + return ENOENT; |
| + |
| + ScopedEventInfo& info = it->second; |
| + info->filter = filter; |
| + info->user_data = user_data; |
| + return 0; |
| +} |
| + |
| +Error EventListener::Free(int id) { |
| + AutoLock lock(info_lock_); |
| + EventInfoMap_t::iterator it = event_info_map_.find(id); |
| + if (event_info_map_.end() == it) |
| + return ENOENT; |
| + |
| + event_info_map_.erase(it); |
| + return 0; |
| +} |
| + |
| +// Notify the EventEmitter has abandon this event. |
|
binji
2013/07/19 20:47:18
comment is unnecessary, it is documented in the he
noelallen1
2013/07/19 21:35:43
Done.
|
| +void EventListener::AbandonedEventInfo(const ScopedEventInfo& event) { |
| + { |
| + AutoLock lock(info_lock_); |
| + |
| + // Remove abandoned events from the event_info_map_ for abandoned |
|
binji
2013/07/19 20:47:18
Is this comment necessary? It just describes what
noelallen1
2013/07/19 21:35:43
Done.
|
| + event->emitter = NULL; |
| + event_info_map_.erase(event->id); |
| + } |
| + |
| + // However they still must be kept in signaled_ to produce KE_SHUTDOWN. |
| + event->events = KE_SHUTDOWN; |
| + Signal(event); |
| +} |