Chromium Code Reviews| Index: native_client_sdk/src/libraries/utils/thread_pool.h |
| =================================================================== |
| --- native_client_sdk/src/libraries/utils/thread_pool.h (revision 203001) |
| +++ native_client_sdk/src/libraries/utils/thread_pool.h (working copy) |
| @@ -4,33 +4,40 @@ |
| // Simple thread pool class |
| -#ifndef EXAMPLES_DEMO_VORONOI_THREADPOOL_H_ |
| -#define EXAMPLES_DEMO_VORONOI_THREADPOOL_H_ |
| +#ifndef LIBRARY_UTILS_THREAD_POOL_H_ |
| +#define LIBRARY_UTILS_THREAD_POOL_H_ |
| #include <pthread.h> |
| #include <semaphore.h> |
| +#include <stdio.h> |
| +#include <stdlib.h> |
| +#include "utils/auto_lock.h" |
| + |
| // typdef helper for work function |
| typedef void (*WorkFunction)(int task_index, void* data); |
| -// ThreadPool is a class to manage num_threads and assign |
| -// them num_tasks of work at a time. Each call |
| -// to Dispatch(..) will block until all tasks complete. |
| -// If 0 is passed in for num_threads, all tasks will be |
| -// issued on the dispatch thread. |
| +// ThreadPool is a class to manage num_threads and assign them num_tasks of |
| +// work at a time. Each call to Dispatch(..) will block until all tasks |
| +// complete. If 0 is passed in for num_threads, all tasks will be issued on the |
| +// dispatch thread. |
| class ThreadPool { |
| public: |
| void Dispatch(int num_tasks, WorkFunction work, void* data); |
| explicit ThreadPool(int num_threads); |
| ~ThreadPool(); |
| + |
| private: |
| int DecCounter(); |
| void Setup(int counter, WorkFunction work, void* data); |
| void DispatchMany(int num_tasks, WorkFunction work, void* data); |
| void DispatchHere(int num_tasks, WorkFunction work, void* data); |
| void WorkLoop(); |
| - static void* WorkerThreadEntry(void* data); |
| + static void* WorkerThreadEntry(void* thiz) { |
| + static_cast<ThreadPool*>(thiz)->WorkLoop(); |
| + return NULL; |
| + } |
| void PostExitAndJoinAll(); |
| pthread_t* threads_; |
| int counter_; |
| @@ -38,8 +45,131 @@ |
| bool exiting_; |
| void* user_data_; |
| WorkFunction user_work_function_; |
| + pthread_mutex_t mutex_; |
| sem_t work_sem_; |
| sem_t done_sem_; |
| }; |
| -#endif // EXAMPLES_DEMO_VORONOI_THREADPOOL_H_ |
| +// Initializes mutex, semaphores and a pool of threads. If 0 is passed for |
| +// num_threads, all work will be performed on the dispatch thread. |
| +ThreadPool::ThreadPool(int num_threads) |
|
binji
2013/06/04 23:30:28
should this all be inline...?
nfullagar1
2013/06/04 23:47:09
I'm on the fence - it's not that much code (compar
|
| + : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), |
| + user_data_(NULL), user_work_function_(NULL) { |
| + if (num_threads_ > 0) { |
| + int status; |
| + status = sem_init(&work_sem_, 0, 0); |
| + if (-1 == status) { |
| + fprintf(stderr, "Failed to initialize semaphore!\n"); |
| + exit(-1); |
| + } |
| + status = sem_init(&done_sem_, 0, 0); |
| + if (-1 == status) { |
| + fprintf(stderr, "Failed to initialize semaphore!\n"); |
| + exit(-1); |
| + } |
| + threads_ = new pthread_t[num_threads_]; |
| + for (int i = 0; i < num_threads_; i++) { |
| + status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); |
| + if (0 != status) { |
| + fprintf(stderr, "Failed to create thread!\n"); |
| + exit(-1); |
| + } |
| + } |
| + } |
| +} |
| + |
| +// Post exit request, wait for all threads to join, and cleanup. |
| +ThreadPool::~ThreadPool() { |
| + if (num_threads_ > 0) { |
| + PostExitAndJoinAll(); |
| + delete[] threads_; |
| + sem_destroy(&done_sem_); |
| + sem_destroy(&work_sem_); |
| + } |
| +} |
| + |
| +// Dispatch() will invoke the user supplied work function across |
| +// one or more threads for each task. |
| +// Note: This function will block until all work has completed. |
| +inline void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) { |
| + if (num_threads_ > 0) |
| + DispatchMany(num_tasks, work, data); |
| + else |
| + DispatchHere(num_tasks, work, data); |
| +} |
| + |
| +// Decrement and get the value of the mutex protected counter. This function |
| +// can be called from multiple threads at any given time. |
| +inline int ThreadPool::DecCounter() { |
| +#if defined(__native_client__) |
| + return __sync_sub_and_fetch(&counter_, 1); |
| +#else |
| + // Slightly more overhead, but more portable mutex guard. |
| + static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; |
| + AutoLock(&mutex); |
| + return --counter_; |
| +#endif |
| +} |
| + |
| +// Setup work parameters. This function is called from the dispatch thread, |
| +// when all worker threads are sleeping. |
| +inline void ThreadPool::Setup(int counter, WorkFunction work, void *data) { |
| + counter_ = counter; |
| + user_work_function_ = work; |
| + user_data_ = data; |
| +} |
| + |
| +// DispatchMany() will dispatch a set of tasks across worker threads. |
| +// Note: This function will block until all work has completed. |
| +inline void ThreadPool::DispatchMany(int tasks, WorkFunction work, void* data) { |
| + // On entry, all worker threads are sleeping. |
| + Setup(tasks, work, data); |
| + // Wake up the worker threads & have them process tasks. |
| + for (int i = 0; i < num_threads_; i++) |
| + sem_post(&work_sem_); |
| + // Worker threads are now awake and busy. |
| + // This dispatch thread will now sleep-wait for the worker threads to finish. |
| + for (int i = 0; i < num_threads_; i++) |
| + sem_wait(&done_sem_); |
| + // On exit, all tasks are done and all worker threads are sleeping again. |
| +} |
| + |
| +// DispatchHere will dispatch all tasks on this thread. |
| +inline void ThreadPool::DispatchHere(int tasks, WorkFunction work, void* data) { |
| + for (int i = 0; i < tasks; i++) |
| + work(i, data); |
| +} |
| + |
| +// Set exit flag, post and join all the threads in the pool. This function is |
| +// called only from the dispatch thread, and only when all worker threads are |
| +// sleeping. |
| +inline void ThreadPool::PostExitAndJoinAll() { |
| + exiting_ = true; |
| + // Wake up all the sleeping worker threads. |
| + for (int i = 0; i < num_threads_; ++i) |
| + sem_post(&work_sem_); |
| + void* retval; |
| + for (int i = 0; i < num_threads_; ++i) |
| + pthread_join(threads_[i], &retval); |
| +} |
| + |
| +// Main work loop - one for each worker thread. |
| +inline void ThreadPool::WorkLoop() { |
| + while (true) { |
| + // Wait for work. If no work is availble, this thread will sleep here. |
| + sem_wait(&work_sem_); |
| + if (exiting_) break; |
| + while (true) { |
| + // Grab a task index to work on from the counter. |
| + int task_index = DecCounter(); |
| + if (task_index < 0) |
| + break; |
| + user_work_function_(task_index, user_data_); |
| + } |
| + // Post to dispatch thread work is done. |
| + sem_post(&done_sem_); |
| + } |
| +} |
| + |
| +#endif // LIBRARY_UTILS_THREAD_POOL_H_ |
| + |