Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(358)

Side by Side Diff: native_client_sdk/src/libraries/utils/thread_pool.h

Issue 16325024: Move thread_pool.h into utils so it can be shared by more than one example. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Simple thread pool class 5 // Simple thread pool class
6 6
7 #ifndef EXAMPLES_DEMO_VORONOI_THREADPOOL_H_ 7 #ifndef LIBRARY_UTILS_THREAD_POOL_H_
8 #define EXAMPLES_DEMO_VORONOI_THREADPOOL_H_ 8 #define LIBRARY_UTILS_THREAD_POOL_H_
9 9
10 #include <pthread.h> 10 #include <pthread.h>
11 #include <semaphore.h> 11 #include <semaphore.h>
12 #include <stdio.h>
13 #include <stdlib.h>
14
15 #include "utils/auto_lock.h"
12 16
13 // typdef helper for work function 17 // typdef helper for work function
14 typedef void (*WorkFunction)(int task_index, void* data); 18 typedef void (*WorkFunction)(int task_index, void* data);
15 19
16 // ThreadPool is a class to manage num_threads and assign 20 // ThreadPool is a class to manage num_threads and assign them num_tasks of
17 // them num_tasks of work at a time. Each call 21 // work at a time. Each call to Dispatch(..) will block until all tasks
18 // to Dispatch(..) will block until all tasks complete. 22 // complete. If 0 is passed in for num_threads, all tasks will be issued on the
19 // If 0 is passed in for num_threads, all tasks will be 23 // dispatch thread.
20 // issued on the dispatch thread.
21 24
22 class ThreadPool { 25 class ThreadPool {
23 public: 26 public:
24 void Dispatch(int num_tasks, WorkFunction work, void* data); 27 void Dispatch(int num_tasks, WorkFunction work, void* data);
25 explicit ThreadPool(int num_threads); 28 explicit ThreadPool(int num_threads);
26 ~ThreadPool(); 29 ~ThreadPool();
30
27 private: 31 private:
28 int DecCounter(); 32 int DecCounter();
29 void Setup(int counter, WorkFunction work, void* data); 33 void Setup(int counter, WorkFunction work, void* data);
30 void DispatchMany(int num_tasks, WorkFunction work, void* data); 34 void DispatchMany(int num_tasks, WorkFunction work, void* data);
31 void DispatchHere(int num_tasks, WorkFunction work, void* data); 35 void DispatchHere(int num_tasks, WorkFunction work, void* data);
32 void WorkLoop(); 36 void WorkLoop();
33 static void* WorkerThreadEntry(void* data); 37 static void* WorkerThreadEntry(void* thiz) {
38 static_cast<ThreadPool*>(thiz)->WorkLoop();
39 return NULL;
40 }
34 void PostExitAndJoinAll(); 41 void PostExitAndJoinAll();
35 pthread_t* threads_; 42 pthread_t* threads_;
36 int counter_; 43 int counter_;
37 const int num_threads_; 44 const int num_threads_;
38 bool exiting_; 45 bool exiting_;
39 void* user_data_; 46 void* user_data_;
40 WorkFunction user_work_function_; 47 WorkFunction user_work_function_;
48 pthread_mutex_t mutex_;
41 sem_t work_sem_; 49 sem_t work_sem_;
42 sem_t done_sem_; 50 sem_t done_sem_;
43 }; 51 };
44 #endif // EXAMPLES_DEMO_VORONOI_THREADPOOL_H_
45 52
53 // Initializes mutex, semaphores and a pool of threads. If 0 is passed for
54 // num_threads, all work will be performed on the dispatch thread.
55 ThreadPool::ThreadPool(int num_threads)
binji 2013/06/04 23:30:28 should this all be inline...?
nfullagar1 2013/06/04 23:47:09 I'm on the fence - it's not that much code (compar
56 : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false),
57 user_data_(NULL), user_work_function_(NULL) {
58 if (num_threads_ > 0) {
59 int status;
60 status = sem_init(&work_sem_, 0, 0);
61 if (-1 == status) {
62 fprintf(stderr, "Failed to initialize semaphore!\n");
63 exit(-1);
64 }
65 status = sem_init(&done_sem_, 0, 0);
66 if (-1 == status) {
67 fprintf(stderr, "Failed to initialize semaphore!\n");
68 exit(-1);
69 }
70 threads_ = new pthread_t[num_threads_];
71 for (int i = 0; i < num_threads_; i++) {
72 status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this);
73 if (0 != status) {
74 fprintf(stderr, "Failed to create thread!\n");
75 exit(-1);
76 }
77 }
78 }
79 }
80
81 // Post exit request, wait for all threads to join, and cleanup.
82 ThreadPool::~ThreadPool() {
83 if (num_threads_ > 0) {
84 PostExitAndJoinAll();
85 delete[] threads_;
86 sem_destroy(&done_sem_);
87 sem_destroy(&work_sem_);
88 }
89 }
90
91 // Dispatch() will invoke the user supplied work function across
92 // one or more threads for each task.
93 // Note: This function will block until all work has completed.
94 inline void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) {
95 if (num_threads_ > 0)
96 DispatchMany(num_tasks, work, data);
97 else
98 DispatchHere(num_tasks, work, data);
99 }
100
101 // Decrement and get the value of the mutex protected counter. This function
102 // can be called from multiple threads at any given time.
103 inline int ThreadPool::DecCounter() {
104 #if defined(__native_client__)
105 return __sync_sub_and_fetch(&counter_, 1);
106 #else
107 // Slightly more overhead, but more portable mutex guard.
108 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
109 AutoLock(&mutex);
110 return --counter_;
111 #endif
112 }
113
114 // Setup work parameters. This function is called from the dispatch thread,
115 // when all worker threads are sleeping.
116 inline void ThreadPool::Setup(int counter, WorkFunction work, void *data) {
117 counter_ = counter;
118 user_work_function_ = work;
119 user_data_ = data;
120 }
121
122 // DispatchMany() will dispatch a set of tasks across worker threads.
123 // Note: This function will block until all work has completed.
124 inline void ThreadPool::DispatchMany(int tasks, WorkFunction work, void* data) {
125 // On entry, all worker threads are sleeping.
126 Setup(tasks, work, data);
127 // Wake up the worker threads & have them process tasks.
128 for (int i = 0; i < num_threads_; i++)
129 sem_post(&work_sem_);
130 // Worker threads are now awake and busy.
131 // This dispatch thread will now sleep-wait for the worker threads to finish.
132 for (int i = 0; i < num_threads_; i++)
133 sem_wait(&done_sem_);
134 // On exit, all tasks are done and all worker threads are sleeping again.
135 }
136
137 // DispatchHere will dispatch all tasks on this thread.
138 inline void ThreadPool::DispatchHere(int tasks, WorkFunction work, void* data) {
139 for (int i = 0; i < tasks; i++)
140 work(i, data);
141 }
142
143 // Set exit flag, post and join all the threads in the pool. This function is
144 // called only from the dispatch thread, and only when all worker threads are
145 // sleeping.
146 inline void ThreadPool::PostExitAndJoinAll() {
147 exiting_ = true;
148 // Wake up all the sleeping worker threads.
149 for (int i = 0; i < num_threads_; ++i)
150 sem_post(&work_sem_);
151 void* retval;
152 for (int i = 0; i < num_threads_; ++i)
153 pthread_join(threads_[i], &retval);
154 }
155
156 // Main work loop - one for each worker thread.
157 inline void ThreadPool::WorkLoop() {
158 while (true) {
159 // Wait for work. If no work is availble, this thread will sleep here.
160 sem_wait(&work_sem_);
161 if (exiting_) break;
162 while (true) {
163 // Grab a task index to work on from the counter.
164 int task_index = DecCounter();
165 if (task_index < 0)
166 break;
167 user_work_function_(task_index, user_data_);
168 }
169 // Post to dispatch thread work is done.
170 sem_post(&done_sem_);
171 }
172 }
173
174 #endif // LIBRARY_UTILS_THREAD_POOL_H_
175
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698