OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "threadpool.h" | |
6 | |
7 #include <pthread.h> | |
8 #include <semaphore.h> | |
9 #include <stdio.h> | |
10 #include <stdlib.h> | |
11 | |
12 #include "utils/auto_lock.h" | |
13 | |
14 // Initializes mutex, semaphores and a pool of threads. If 0 is passed for | |
15 // num_threads, all work will be performed on the dispatch thread. | |
16 ThreadPool::ThreadPool(int num_threads) | |
17 : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), | |
18 user_data_(NULL), user_work_function_(NULL) { | |
19 if (num_threads_ > 0) { | |
20 int status; | |
21 status = sem_init(&work_sem_, 0, 0); | |
22 if (-1 == status) { | |
23 fprintf(stderr, "Failed to initialize semaphore!\n"); | |
24 exit(-1); | |
25 } | |
26 status = sem_init(&done_sem_, 0, 0); | |
27 if (-1 == status) { | |
28 fprintf(stderr, "Failed to initialize semaphore!\n"); | |
29 exit(-1); | |
30 } | |
31 threads_ = new pthread_t[num_threads_]; | |
32 for (int i = 0; i < num_threads_; i++) { | |
33 status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); | |
34 if (0 != status) { | |
35 fprintf(stderr, "Failed to create thread!\n"); | |
36 exit(-1); | |
37 } | |
38 } | |
39 } | |
40 } | |
41 | |
42 // Post exit request, wait for all threads to join, and cleanup. | |
43 ThreadPool::~ThreadPool() { | |
44 if (num_threads_ > 0) { | |
45 PostExitAndJoinAll(); | |
46 delete[] threads_; | |
47 sem_destroy(&done_sem_); | |
48 sem_destroy(&work_sem_); | |
49 } | |
50 } | |
51 | |
52 // Setup work parameters. This function is called from the dispatch thread, | |
53 // when all worker threads are sleeping. | |
54 void ThreadPool::Setup(int counter, WorkFunction work, void *data) { | |
55 counter_ = counter; | |
56 user_work_function_ = work; | |
57 user_data_ = data; | |
58 } | |
59 | |
60 // Return decremented task counter. This function | |
61 // can be called from multiple threads at any given time. | |
62 int ThreadPool::DecCounter() { | |
63 #if defined(__native_client__) | |
64 // Use fast atomic sub & fetch. | |
65 return __sync_sub_and_fetch(&counter_, 1); | |
66 #else | |
67 // Fallback to a more platform independent pthread mutex via AutoLock. | |
68 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; | |
69 AutoLock lock(&mutex); | |
70 return --counter_; | |
71 #endif | |
72 } | |
73 | |
74 // Set exit flag, post and join all the threads in the pool. This function is | |
75 // called only from the dispatch thread, and only when all worker threads are | |
76 // sleeping. | |
77 void ThreadPool::PostExitAndJoinAll() { | |
78 exiting_ = true; | |
79 // Wake up all the sleeping worker threads. | |
80 for (int i = 0; i < num_threads_; ++i) | |
81 sem_post(&work_sem_); | |
82 void* retval; | |
83 for (int i = 0; i < num_threads_; ++i) | |
84 pthread_join(threads_[i], &retval); | |
85 } | |
86 | |
87 // Main work loop - one for each worker thread. | |
88 void ThreadPool::WorkLoop() { | |
89 while (true) { | |
90 // Wait for work. If no work is availble, this thread will sleep here. | |
91 sem_wait(&work_sem_); | |
92 if (exiting_) break; | |
93 while (true) { | |
94 // Grab a task index to work on from the counter. | |
95 int task_index = DecCounter(); | |
96 if (task_index < 0) | |
97 break; | |
98 user_work_function_(task_index, user_data_); | |
99 } | |
100 // Post to dispatch thread work is done. | |
101 sem_post(&done_sem_); | |
102 } | |
103 } | |
104 | |
105 // pthread entry point for a worker thread. | |
106 void* ThreadPool::WorkerThreadEntry(void* thiz) { | |
107 static_cast<ThreadPool*>(thiz)->WorkLoop(); | |
108 return NULL; | |
109 } | |
110 | |
111 // DispatchMany() will dispatch a set of tasks across worker threads. | |
112 // Note: This function will block until all work has completed. | |
113 void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) { | |
114 // On entry, all worker threads are sleeping. | |
115 Setup(num_tasks, work, data); | |
116 | |
117 // Wake up the worker threads & have them process tasks. | |
118 for (int i = 0; i < num_threads_; i++) | |
119 sem_post(&work_sem_); | |
120 | |
121 // Worker threads are now awake and busy. | |
122 | |
123 // This dispatch thread will now sleep-wait for the worker threads to finish. | |
124 for (int i = 0; i < num_threads_; i++) | |
125 sem_wait(&done_sem_); | |
126 // On exit, all tasks are done and all worker threads are sleeping again. | |
127 } | |
128 | |
129 // DispatchHere will dispatch all tasks on this thread. | |
130 void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) { | |
131 for (int i = 0; i < num_tasks; i++) | |
132 work(i, data); | |
133 } | |
134 | |
135 // Dispatch() will invoke the user supplied work function across | |
136 // one or more threads for each task. | |
137 // Note: This function will block until all work has completed. | |
138 void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) { | |
139 if (num_threads_ > 0) | |
140 DispatchMany(num_tasks, work, data); | |
141 else | |
142 DispatchHere(num_tasks, work, data); | |
143 } | |
144 | |
OLD | NEW |