Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Simple thread pool class | 5 // Simple thread pool class |
| 6 | 6 |
| 7 #ifndef EXAMPLES_DEMO_VORONOI_THREADPOOL_H_ | 7 #ifndef LIBRARY_UTILS_THREAD_POOL_H_ |
| 8 #define EXAMPLES_DEMO_VORONOI_THREADPOOL_H_ | 8 #define LIBRARY_UTILS_THREAD_POOL_H_ |
| 9 | 9 |
| 10 #include <pthread.h> | 10 #include <pthread.h> |
| 11 #include <semaphore.h> | 11 #include <semaphore.h> |
| 12 #include <stdio.h> | |
| 13 #include <stdlib.h> | |
| 14 | |
| 15 #include "utils/auto_lock.h" | |
| 12 | 16 |
| 13 // typdef helper for work function | 17 // typdef helper for work function |
| 14 typedef void (*WorkFunction)(int task_index, void* data); | 18 typedef void (*WorkFunction)(int task_index, void* data); |
| 15 | 19 |
| 16 // ThreadPool is a class to manage num_threads and assign | 20 // ThreadPool is a class to manage num_threads and assign them num_tasks of |
| 17 // them num_tasks of work at a time. Each call | 21 // work at a time. Each call to Dispatch(..) will block until all tasks |
| 18 // to Dispatch(..) will block until all tasks complete. | 22 // complete. If 0 is passed in for num_threads, all tasks will be issued on the |
| 19 // If 0 is passed in for num_threads, all tasks will be | 23 // dispatch thread. |
| 20 // issued on the dispatch thread. | |
| 21 | 24 |
| 22 class ThreadPool { | 25 class ThreadPool { |
| 23 public: | 26 public: |
| 24 void Dispatch(int num_tasks, WorkFunction work, void* data); | 27 void Dispatch(int num_tasks, WorkFunction work, void* data); |
| 25 explicit ThreadPool(int num_threads); | 28 explicit ThreadPool(int num_threads); |
| 26 ~ThreadPool(); | 29 ~ThreadPool(); |
| 30 | |
| 27 private: | 31 private: |
| 28 int DecCounter(); | 32 int DecCounter(); |
| 29 void Setup(int counter, WorkFunction work, void* data); | 33 void Setup(int counter, WorkFunction work, void* data); |
| 30 void DispatchMany(int num_tasks, WorkFunction work, void* data); | 34 void DispatchMany(int num_tasks, WorkFunction work, void* data); |
| 31 void DispatchHere(int num_tasks, WorkFunction work, void* data); | 35 void DispatchHere(int num_tasks, WorkFunction work, void* data); |
| 32 void WorkLoop(); | 36 void WorkLoop(); |
| 33 static void* WorkerThreadEntry(void* data); | 37 static void* WorkerThreadEntry(void* thiz) { |
| 38 static_cast<ThreadPool*>(thiz)->WorkLoop(); | |
| 39 return NULL; | |
| 40 } | |
| 34 void PostExitAndJoinAll(); | 41 void PostExitAndJoinAll(); |
| 35 pthread_t* threads_; | 42 pthread_t* threads_; |
| 36 int counter_; | 43 int counter_; |
| 37 const int num_threads_; | 44 const int num_threads_; |
| 38 bool exiting_; | 45 bool exiting_; |
| 39 void* user_data_; | 46 void* user_data_; |
| 40 WorkFunction user_work_function_; | 47 WorkFunction user_work_function_; |
| 48 pthread_mutex_t mutex_; | |
| 41 sem_t work_sem_; | 49 sem_t work_sem_; |
| 42 sem_t done_sem_; | 50 sem_t done_sem_; |
| 43 }; | 51 }; |
| 44 #endif // EXAMPLES_DEMO_VORONOI_THREADPOOL_H_ | |
| 45 | 52 |
| 53 // Initializes mutex, semaphores and a pool of threads. If 0 is passed for | |
| 54 // num_threads, all work will be performed on the dispatch thread. | |
| 55 ThreadPool::ThreadPool(int num_threads) | |
|
binji
2013/06/04 23:30:28
should this all be inline...?
nfullagar1
2013/06/04 23:47:09
I'm on the fence - it's not that much code (compar
| |
| 56 : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), | |
| 57 user_data_(NULL), user_work_function_(NULL) { | |
| 58 if (num_threads_ > 0) { | |
| 59 int status; | |
| 60 status = sem_init(&work_sem_, 0, 0); | |
| 61 if (-1 == status) { | |
| 62 fprintf(stderr, "Failed to initialize semaphore!\n"); | |
| 63 exit(-1); | |
| 64 } | |
| 65 status = sem_init(&done_sem_, 0, 0); | |
| 66 if (-1 == status) { | |
| 67 fprintf(stderr, "Failed to initialize semaphore!\n"); | |
| 68 exit(-1); | |
| 69 } | |
| 70 threads_ = new pthread_t[num_threads_]; | |
| 71 for (int i = 0; i < num_threads_; i++) { | |
| 72 status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); | |
| 73 if (0 != status) { | |
| 74 fprintf(stderr, "Failed to create thread!\n"); | |
| 75 exit(-1); | |
| 76 } | |
| 77 } | |
| 78 } | |
| 79 } | |
| 80 | |
| 81 // Post exit request, wait for all threads to join, and cleanup. | |
| 82 ThreadPool::~ThreadPool() { | |
| 83 if (num_threads_ > 0) { | |
| 84 PostExitAndJoinAll(); | |
| 85 delete[] threads_; | |
| 86 sem_destroy(&done_sem_); | |
| 87 sem_destroy(&work_sem_); | |
| 88 } | |
| 89 } | |
| 90 | |
| 91 // Dispatch() will invoke the user supplied work function across | |
| 92 // one or more threads for each task. | |
| 93 // Note: This function will block until all work has completed. | |
| 94 inline void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) { | |
| 95 if (num_threads_ > 0) | |
| 96 DispatchMany(num_tasks, work, data); | |
| 97 else | |
| 98 DispatchHere(num_tasks, work, data); | |
| 99 } | |
| 100 | |
| 101 // Decrement and get the value of the mutex protected counter. This function | |
| 102 // can be called from multiple threads at any given time. | |
| 103 inline int ThreadPool::DecCounter() { | |
| 104 #if defined(__native_client__) | |
| 105 return __sync_sub_and_fetch(&counter_, 1); | |
| 106 #else | |
| 107 // Slightly more overhead, but more portable mutex guard. | |
| 108 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; | |
| 109 AutoLock(&mutex); | |
| 110 return --counter_; | |
| 111 #endif | |
| 112 } | |
| 113 | |
| 114 // Setup work parameters. This function is called from the dispatch thread, | |
| 115 // when all worker threads are sleeping. | |
| 116 inline void ThreadPool::Setup(int counter, WorkFunction work, void *data) { | |
| 117 counter_ = counter; | |
| 118 user_work_function_ = work; | |
| 119 user_data_ = data; | |
| 120 } | |
| 121 | |
| 122 // DispatchMany() will dispatch a set of tasks across worker threads. | |
| 123 // Note: This function will block until all work has completed. | |
| 124 inline void ThreadPool::DispatchMany(int tasks, WorkFunction work, void* data) { | |
| 125 // On entry, all worker threads are sleeping. | |
| 126 Setup(tasks, work, data); | |
| 127 // Wake up the worker threads & have them process tasks. | |
| 128 for (int i = 0; i < num_threads_; i++) | |
| 129 sem_post(&work_sem_); | |
| 130 // Worker threads are now awake and busy. | |
| 131 // This dispatch thread will now sleep-wait for the worker threads to finish. | |
| 132 for (int i = 0; i < num_threads_; i++) | |
| 133 sem_wait(&done_sem_); | |
| 134 // On exit, all tasks are done and all worker threads are sleeping again. | |
| 135 } | |
| 136 | |
| 137 // DispatchHere will dispatch all tasks on this thread. | |
| 138 inline void ThreadPool::DispatchHere(int tasks, WorkFunction work, void* data) { | |
| 139 for (int i = 0; i < tasks; i++) | |
| 140 work(i, data); | |
| 141 } | |
| 142 | |
| 143 // Set exit flag, post and join all the threads in the pool. This function is | |
| 144 // called only from the dispatch thread, and only when all worker threads are | |
| 145 // sleeping. | |
| 146 inline void ThreadPool::PostExitAndJoinAll() { | |
| 147 exiting_ = true; | |
| 148 // Wake up all the sleeping worker threads. | |
| 149 for (int i = 0; i < num_threads_; ++i) | |
| 150 sem_post(&work_sem_); | |
| 151 void* retval; | |
| 152 for (int i = 0; i < num_threads_; ++i) | |
| 153 pthread_join(threads_[i], &retval); | |
| 154 } | |
| 155 | |
| 156 // Main work loop - one for each worker thread. | |
| 157 inline void ThreadPool::WorkLoop() { | |
| 158 while (true) { | |
| 159 // Wait for work. If no work is availble, this thread will sleep here. | |
| 160 sem_wait(&work_sem_); | |
| 161 if (exiting_) break; | |
| 162 while (true) { | |
| 163 // Grab a task index to work on from the counter. | |
| 164 int task_index = DecCounter(); | |
| 165 if (task_index < 0) | |
| 166 break; | |
| 167 user_work_function_(task_index, user_data_); | |
| 168 } | |
| 169 // Post to dispatch thread work is done. | |
| 170 sem_post(&done_sem_); | |
| 171 } | |
| 172 } | |
| 173 | |
| 174 #endif // LIBRARY_UTILS_THREAD_POOL_H_ | |
| 175 | |
| OLD | NEW |