OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #if !defined(DISABLE_THREADS) | |
6 | |
7 #include "threadpool.h" | |
8 | |
9 #include <pthread.h> | |
10 #include <semaphore.h> | |
11 #include <stdio.h> | |
12 #include <stdlib.h> | |
13 | |
14 // TODO(nfullagar): Switch DecCounter to use atomic decrement. | |
15 // TODO(nfullagar): Large sets of fine grained tasks may benefit from | |
16 // pthread_condvar_* over sem_* | |
17 | |
18 // Initializes mutex, semaphores and a pool of threads. | |
19 ThreadPool::ThreadPool(const int num_threads) | |
noelallen1
2013/05/25 00:21:11
const? Wasn't this non-const in your header? Ma
nfullagar1
2013/05/28 23:11:50
Done.
| |
20 : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), | |
21 user_data_(NULL), user_work_function_(NULL) { | |
22 if (num_threads_ > 1) { | |
noelallen1
2013/05/25 00:21:11
Shouldn't a thread pool of 1 work?
nfullagar1
2013/05/28 23:11:50
changed so num_threads_ of 0 does all tasks on dis
| |
23 int status; | |
24 status = pthread_mutex_init(&mutex_, NULL); | |
25 if (0 != status) { | |
26 fprintf(stderr, "Failed to initialize mutex!\n"); | |
27 exit(-1); | |
28 } | |
29 status = sem_init(&work_tasks_, 0, 0); | |
30 if (-1 == status) { | |
31 fprintf(stderr, "Failed to initialize semaphore!\n"); | |
32 exit(-1); | |
33 } | |
34 status = sem_init(&done_tasks_, 0, 0); | |
35 if (-1 == status) { | |
36 fprintf(stderr, "Failed to initialize semaphore!\n"); | |
37 exit(-1); | |
38 } | |
39 threads_ = new pthread_t[num_threads_]; | |
40 for (int i = 0; i < num_threads_; i++) { | |
41 status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); | |
42 if (0 != status) { | |
43 fprintf(stderr, "Failed to create thread!\n"); | |
44 exit(-1); | |
45 } | |
46 } | |
47 } | |
48 } | |
49 | |
50 // Post exit request, wait for all threads to join, and cleanup. | |
51 ThreadPool::~ThreadPool() { | |
52 if (num_threads_ > 1) { | |
53 PostExitAndJoinAll(); | |
54 delete[] threads_; | |
55 sem_destroy(&done_tasks_); | |
56 sem_destroy(&work_tasks_); | |
57 pthread_mutex_destroy(&mutex_); | |
58 } | |
59 } | |
60 | |
61 // Setup work parameters. This function is called from the dispatch thread, | |
62 // when all worker threads are sleeping. | |
63 void ThreadPool::Setup(int counter, WorkFunction work, void *data) { | |
64 counter_ = counter; | |
65 user_work_function_ = work; | |
66 user_data_ = data; | |
67 } | |
68 | |
69 // Decrement and get the value of the mutex protected counter. This function | |
70 // can be called from multiple threads at any given time. | |
71 int ThreadPool::DecCounter() { | |
72 int v; | |
73 pthread_mutex_lock(&mutex_); | |
74 { | |
75 v = --counter_; | |
76 } | |
77 pthread_mutex_unlock(&mutex_); | |
78 return v; | |
79 } | |
80 | |
81 // Set exit flag, post and join all the threads in the pool. This function is | |
82 // called only from the dispatch thread, and only when all worker threads are | |
83 // sleeping. | |
84 void ThreadPool::PostExitAndJoinAll() { | |
85 exiting_ = true; | |
86 // Wake up all the sleeping worker threads. | |
87 for (int i = 0; i < num_threads_; ++i) | |
88 sem_post(&work_tasks_); | |
89 void *retval; | |
90 for (int i = 0; i < num_threads_; ++i) | |
91 pthread_join(threads_[i], &retval); | |
92 } | |
93 | |
94 // Main work loop - one for each worker thread. | |
95 void ThreadPool::WorkLoop() { | |
96 while (true) { | |
97 // Wait for work. If no work is availble, this thread will sleep. | |
98 sem_wait(&work_tasks_); | |
99 // Workers wake up from PostWork() issued from the dispatch thread. | |
100 if (exiting_) break; | |
101 // Grab a task index to work on from the counter. | |
102 int task_index = DecCounter(); | |
103 if (task_index < 0) { | |
104 // This indicates we're not sync'ing properly. | |
105 fprintf(stderr, "Task index went negative!\n"); | |
106 exit(-1); | |
107 } | |
108 user_work_function_(task_index, user_data_); | |
109 // Post to dispatch thread that a task completed. | |
110 sem_post(&done_tasks_); | |
111 } | |
112 } | |
113 | |
114 // pthread entry point for a worker thread. | |
115 void* ThreadPool::WorkerThreadEntry(void *thiz) { | |
116 static_cast<ThreadPool *>(thiz)->WorkLoop(); | |
117 return NULL; | |
118 } | |
119 | |
120 // MultiThread() will dispatch a set of tasks across multiple worker threads. | |
121 // Note: This function will block until all work has completed. | |
122 void ThreadPool::MultiThread(int num_tasks, WorkFunction work, void *data) { | |
123 // On entry, all worker threads are sleeping. | |
124 Setup(num_tasks, work, data); | |
125 | |
126 // Wake up the worker threads & have them process the tasks. | |
127 for (int i = 0; i < num_tasks; i++) | |
128 sem_post(&work_tasks_); | |
129 | |
130 // Worker threads are now awake and busy. | |
131 | |
132 // This dispatch thread will now sleep-wait on the done_tasks_ semaphore, | |
133 // waiting for the worker threads to finish all tasks. | |
134 for (int i = 0; i < num_tasks; i++) | |
135 sem_wait(&done_tasks_); | |
136 | |
137 // Make sure the counter is where we expect. | |
138 int c = DecCounter(); | |
139 if (-1 != c) { | |
140 fprintf(stderr, "We're not syncing correctly! (%d)\n", c); | |
141 exit(-1); | |
142 } | |
143 // On exit, all tasks are done and all worker threads are sleeping again. | |
144 } | |
145 | |
146 // SingleThread() will dispatch all work on this thread. | |
noelallen1
2013/05/25 00:21:11
You already check for >1 elsewhere, why not make t
nfullagar1
2013/05/28 23:11:50
see other changes
| |
147 void ThreadPool::SingleThread(int num_tasks, WorkFunction work, void *data) { | |
148 for (int i = 0; i < num_tasks; i++) | |
noelallen1
2013/05/25 00:21:11
Use of "single thread" is confusing. In the case
nfullagar1
2013/05/28 23:11:50
changed so 0 = dispatch thread, 1 = one thread off
| |
149 work(i, data); | |
150 } | |
151 | |
152 // Dispatch() will invoke the user supplied work function across | |
153 // one or more threads for each task. | |
154 // Note: This function will block until all work has completed. | |
155 void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void *data) { | |
156 if (num_threads_ > 1) | |
157 MultiThread(num_tasks, work, data); | |
158 else | |
159 SingleThread(num_tasks, work, data); | |
160 } | |
161 #endif | |
OLD | NEW |