OLD | NEW |
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "threadpool.h" | 5 #include "threadpool.h" |
6 | 6 |
7 #include <pthread.h> | 7 #include <pthread.h> |
8 #include <semaphore.h> | 8 #include <semaphore.h> |
9 #include <stdio.h> | 9 #include <stdio.h> |
10 #include <stdlib.h> | 10 #include <stdlib.h> |
11 | 11 |
12 // TODO(nfullagar): Switch DecCounter to use atomic decrement. | 12 #include "utils/auto_lock.h" |
13 | 13 |
14 // Initializes mutex, semaphores and a pool of threads. If 0 is passed for | 14 // Initializes mutex, semaphores and a pool of threads. If 0 is passed for |
15 // num_threads, all work will be performed on the dispatch thread. | 15 // num_threads, all work will be performed on the dispatch thread. |
16 ThreadPool::ThreadPool(int num_threads) | 16 ThreadPool::ThreadPool(int num_threads) |
17 : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), | 17 : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), |
18 user_data_(NULL), user_work_function_(NULL) { | 18 user_data_(NULL), user_work_function_(NULL) { |
19 if (num_threads_ > 0) { | 19 if (num_threads_ > 0) { |
20 int status; | 20 int status; |
21 status = pthread_mutex_init(&mutex_, NULL); | |
22 if (0 != status) { | |
23 fprintf(stderr, "Failed to initialize mutex!\n"); | |
24 exit(-1); | |
25 } | |
26 status = sem_init(&work_sem_, 0, 0); | 21 status = sem_init(&work_sem_, 0, 0); |
27 if (-1 == status) { | 22 if (-1 == status) { |
28 fprintf(stderr, "Failed to initialize semaphore!\n"); | 23 fprintf(stderr, "Failed to initialize semaphore!\n"); |
29 exit(-1); | 24 exit(-1); |
30 } | 25 } |
31 status = sem_init(&done_sem_, 0, 0); | 26 status = sem_init(&done_sem_, 0, 0); |
32 if (-1 == status) { | 27 if (-1 == status) { |
33 fprintf(stderr, "Failed to initialize semaphore!\n"); | 28 fprintf(stderr, "Failed to initialize semaphore!\n"); |
34 exit(-1); | 29 exit(-1); |
35 } | 30 } |
36 threads_ = new pthread_t[num_threads_]; | 31 threads_ = new pthread_t[num_threads_]; |
37 for (int i = 0; i < num_threads_; i++) { | 32 for (int i = 0; i < num_threads_; i++) { |
38 status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); | 33 status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); |
39 if (0 != status) { | 34 if (0 != status) { |
40 fprintf(stderr, "Failed to create thread!\n"); | 35 fprintf(stderr, "Failed to create thread!\n"); |
41 exit(-1); | 36 exit(-1); |
42 } | 37 } |
43 } | 38 } |
44 } | 39 } |
45 } | 40 } |
46 | 41 |
47 // Post exit request, wait for all threads to join, and cleanup. | 42 // Post exit request, wait for all threads to join, and cleanup. |
48 ThreadPool::~ThreadPool() { | 43 ThreadPool::~ThreadPool() { |
49 if (num_threads_ > 1) { | 44 if (num_threads_ > 0) { |
50 PostExitAndJoinAll(); | 45 PostExitAndJoinAll(); |
51 delete[] threads_; | 46 delete[] threads_; |
52 sem_destroy(&done_sem_); | 47 sem_destroy(&done_sem_); |
53 sem_destroy(&work_sem_); | 48 sem_destroy(&work_sem_); |
54 pthread_mutex_destroy(&mutex_); | |
55 } | 49 } |
56 } | 50 } |
57 | 51 |
58 // Setup work parameters. This function is called from the dispatch thread, | 52 // Setup work parameters. This function is called from the dispatch thread, |
59 // when all worker threads are sleeping. | 53 // when all worker threads are sleeping. |
60 void ThreadPool::Setup(int counter, WorkFunction work, void *data) { | 54 void ThreadPool::Setup(int counter, WorkFunction work, void *data) { |
61 counter_ = counter; | 55 counter_ = counter; |
62 user_work_function_ = work; | 56 user_work_function_ = work; |
63 user_data_ = data; | 57 user_data_ = data; |
64 } | 58 } |
65 | 59 |
66 // Decrement and get the value of the mutex protected counter. This function | 60 // Return decremented task counter. This function |
67 // can be called from multiple threads at any given time. | 61 // can be called from multiple threads at any given time. |
68 int ThreadPool::DecCounter() { | 62 int ThreadPool::DecCounter() { |
69 int v; | 63 #if defined(__native_client__) |
70 pthread_mutex_lock(&mutex_); | 64 // Use fast atomic sub & fetch. |
71 { | 65 return __sync_sub_and_fetch(&counter_, 1); |
72 v = --counter_; | 66 #else |
73 } | 67 // Fallback to a more platform independent pthread mutex via AutoLock. |
74 pthread_mutex_unlock(&mutex_); | 68 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; |
75 return v; | 69 AutoLock lock(&mutex); |
| 70 return --counter_; |
| 71 #endif |
76 } | 72 } |
77 | 73 |
78 // Set exit flag, post and join all the threads in the pool. This function is | 74 // Set exit flag, post and join all the threads in the pool. This function is |
79 // called only from the dispatch thread, and only when all worker threads are | 75 // called only from the dispatch thread, and only when all worker threads are |
80 // sleeping. | 76 // sleeping. |
81 void ThreadPool::PostExitAndJoinAll() { | 77 void ThreadPool::PostExitAndJoinAll() { |
82 exiting_ = true; | 78 exiting_ = true; |
83 // Wake up all the sleeping worker threads. | 79 // Wake up all the sleeping worker threads. |
84 for (int i = 0; i < num_threads_; ++i) | 80 for (int i = 0; i < num_threads_; ++i) |
85 sem_post(&work_sem_); | 81 sem_post(&work_sem_); |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
139 // Dispatch() will invoke the user supplied work function across | 135 // Dispatch() will invoke the user supplied work function across |
140 // one or more threads for each task. | 136 // one or more threads for each task. |
141 // Note: This function will block until all work has completed. | 137 // Note: This function will block until all work has completed. |
142 void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) { | 138 void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) { |
143 if (num_threads_ > 0) | 139 if (num_threads_ > 0) |
144 DispatchMany(num_tasks, work, data); | 140 DispatchMany(num_tasks, work, data); |
145 else | 141 else |
146 DispatchHere(num_tasks, work, data); | 142 DispatchHere(num_tasks, work, data); |
147 } | 143 } |
148 | 144 |
OLD | NEW |