Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(230)

Unified Diff: native_client_sdk/src/libraries/utils/thread_pool.h

Issue 16325024: Move thread_pool.h into utils so it can be shared by more than one example. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: native_client_sdk/src/libraries/utils/thread_pool.h
===================================================================
--- native_client_sdk/src/libraries/utils/thread_pool.h (revision 203001)
+++ native_client_sdk/src/libraries/utils/thread_pool.h (working copy)
@@ -4,33 +4,40 @@
// Simple thread pool class
-#ifndef EXAMPLES_DEMO_VORONOI_THREADPOOL_H_
-#define EXAMPLES_DEMO_VORONOI_THREADPOOL_H_
+#ifndef LIBRARY_UTILS_THREAD_POOL_H_
+#define LIBRARY_UTILS_THREAD_POOL_H_
#include <pthread.h>
#include <semaphore.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include "utils/auto_lock.h"
+
// typdef helper for work function
typedef void (*WorkFunction)(int task_index, void* data);
-// ThreadPool is a class to manage num_threads and assign
-// them num_tasks of work at a time. Each call
-// to Dispatch(..) will block until all tasks complete.
-// If 0 is passed in for num_threads, all tasks will be
-// issued on the dispatch thread.
+// ThreadPool is a class to manage num_threads and assign them num_tasks of
+// work at a time. Each call to Dispatch(..) will block until all tasks
+// complete. If 0 is passed in for num_threads, all tasks will be issued on the
+// dispatch thread.
class ThreadPool {
public:
void Dispatch(int num_tasks, WorkFunction work, void* data);
explicit ThreadPool(int num_threads);
~ThreadPool();
+
private:
int DecCounter();
void Setup(int counter, WorkFunction work, void* data);
void DispatchMany(int num_tasks, WorkFunction work, void* data);
void DispatchHere(int num_tasks, WorkFunction work, void* data);
void WorkLoop();
- static void* WorkerThreadEntry(void* data);
+ static void* WorkerThreadEntry(void* thiz) {
+ static_cast<ThreadPool*>(thiz)->WorkLoop();
+ return NULL;
+ }
void PostExitAndJoinAll();
pthread_t* threads_;
int counter_;
@@ -38,8 +45,131 @@
bool exiting_;
void* user_data_;
WorkFunction user_work_function_;
+ pthread_mutex_t mutex_;
sem_t work_sem_;
sem_t done_sem_;
};
-#endif // EXAMPLES_DEMO_VORONOI_THREADPOOL_H_
+// Initializes mutex, semaphores and a pool of threads. If 0 is passed for
+// num_threads, all work will be performed on the dispatch thread.
+ThreadPool::ThreadPool(int num_threads)
binji 2013/06/04 23:30:28 should this all be inline...?
nfullagar1 2013/06/04 23:47:09 I'm on the fence - it's not that much code (compar
+ : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false),
+ user_data_(NULL), user_work_function_(NULL) {
+ if (num_threads_ > 0) {
+ int status;
+ status = sem_init(&work_sem_, 0, 0);
+ if (-1 == status) {
+ fprintf(stderr, "Failed to initialize semaphore!\n");
+ exit(-1);
+ }
+ status = sem_init(&done_sem_, 0, 0);
+ if (-1 == status) {
+ fprintf(stderr, "Failed to initialize semaphore!\n");
+ exit(-1);
+ }
+ threads_ = new pthread_t[num_threads_];
+ for (int i = 0; i < num_threads_; i++) {
+ status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this);
+ if (0 != status) {
+ fprintf(stderr, "Failed to create thread!\n");
+ exit(-1);
+ }
+ }
+ }
+}
+
+// Post exit request, wait for all threads to join, and cleanup.
+ThreadPool::~ThreadPool() {
+ if (num_threads_ > 0) {
+ PostExitAndJoinAll();
+ delete[] threads_;
+ sem_destroy(&done_sem_);
+ sem_destroy(&work_sem_);
+ }
+}
+
+// Dispatch() will invoke the user supplied work function across
+// one or more threads for each task.
+// Note: This function will block until all work has completed.
+inline void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) {
+ if (num_threads_ > 0)
+ DispatchMany(num_tasks, work, data);
+ else
+ DispatchHere(num_tasks, work, data);
+}
+
+// Decrement and get the value of the mutex protected counter. This function
+// can be called from multiple threads at any given time.
+inline int ThreadPool::DecCounter() {
+#if defined(__native_client__)
+ return __sync_sub_and_fetch(&counter_, 1);
+#else
+ // Slightly more overhead, but more portable mutex guard.
+ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+ AutoLock(&mutex);
+ return --counter_;
+#endif
+}
+
+// Setup work parameters. This function is called from the dispatch thread,
+// when all worker threads are sleeping.
+inline void ThreadPool::Setup(int counter, WorkFunction work, void *data) {
+ counter_ = counter;
+ user_work_function_ = work;
+ user_data_ = data;
+}
+
+// DispatchMany() will dispatch a set of tasks across worker threads.
+// Note: This function will block until all work has completed.
+inline void ThreadPool::DispatchMany(int tasks, WorkFunction work, void* data) {
+ // On entry, all worker threads are sleeping.
+ Setup(tasks, work, data);
+ // Wake up the worker threads & have them process tasks.
+ for (int i = 0; i < num_threads_; i++)
+ sem_post(&work_sem_);
+ // Worker threads are now awake and busy.
+ // This dispatch thread will now sleep-wait for the worker threads to finish.
+ for (int i = 0; i < num_threads_; i++)
+ sem_wait(&done_sem_);
+ // On exit, all tasks are done and all worker threads are sleeping again.
+}
+
+// DispatchHere will dispatch all tasks on this thread.
+inline void ThreadPool::DispatchHere(int tasks, WorkFunction work, void* data) {
+ for (int i = 0; i < tasks; i++)
+ work(i, data);
+}
+
+// Set exit flag, post and join all the threads in the pool. This function is
+// called only from the dispatch thread, and only when all worker threads are
+// sleeping.
+inline void ThreadPool::PostExitAndJoinAll() {
+ exiting_ = true;
+ // Wake up all the sleeping worker threads.
+ for (int i = 0; i < num_threads_; ++i)
+ sem_post(&work_sem_);
+ void* retval;
+ for (int i = 0; i < num_threads_; ++i)
+ pthread_join(threads_[i], &retval);
+}
+
+// Main work loop - one for each worker thread.
+inline void ThreadPool::WorkLoop() {
+ while (true) {
+ // Wait for work. If no work is availble, this thread will sleep here.
+ sem_wait(&work_sem_);
+ if (exiting_) break;
+ while (true) {
+ // Grab a task index to work on from the counter.
+ int task_index = DecCounter();
+ if (task_index < 0)
+ break;
+ user_work_function_(task_index, user_data_);
+ }
+ // Post to dispatch thread work is done.
+ sem_post(&done_sem_);
+ }
+}
+
+#endif // LIBRARY_UTILS_THREAD_POOL_H_
+

Powered by Google App Engine
This is Rietveld 408576698