OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #if !defined(DISABLE_THREADS) |
| 6 |
| 7 #include "threadpool.h" |
| 8 |
| 9 #include <pthread.h> |
| 10 #include <semaphore.h> |
| 11 #include <stdio.h> |
| 12 #include <stdlib.h> |
| 13 |
| 14 // TODO(nfullagar): Switch DecCounter to use atomic decrement. |
| 15 // TODO(nfullagar): Large sets of fine grained tasks may benefit from |
| 16 // pthread_condvar_* over sem_* |
| 17 |
| 18 // Initializes mutex, semaphores and a pool of threads. |
| 19 ThreadPool::ThreadPool(const int num_threads) |
| 20 : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), |
| 21 user_data_(NULL), user_work_function_(NULL) { |
| 22 if (num_threads_ > 1) { |
| 23 int status; |
| 24 status = pthread_mutex_init(&mutex_, NULL); |
| 25 if (0 != status) { |
| 26 fprintf(stderr, "Failed to initialize mutex!\n"); |
| 27 exit(-1); |
| 28 } |
| 29 status = sem_init(&work_tasks_, 0, 0); |
| 30 if (-1 == status) { |
| 31 fprintf(stderr, "Failed to initialize semaphore!\n"); |
| 32 exit(-1); |
| 33 } |
| 34 status = sem_init(&done_tasks_, 0, 0); |
| 35 if (-1 == status) { |
| 36 fprintf(stderr, "Failed to initialize semaphore!\n"); |
| 37 exit(-1); |
| 38 } |
| 39 threads_ = new pthread_t[num_threads_]; |
| 40 for (int i = 0; i < num_threads_; i++) { |
| 41 status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); |
| 42 if (0 != status) { |
| 43 fprintf(stderr, "Failed to create thread!\n"); |
| 44 exit(-1); |
| 45 } |
| 46 } |
| 47 } |
| 48 } |
| 49 |
| 50 // Post exit request, wait for all threads to join, and cleanup. |
| 51 ThreadPool::~ThreadPool() { |
| 52 if (num_threads_ > 1) { |
| 53 PostExitAndJoinAll(); |
| 54 delete[] threads_; |
| 55 sem_destroy(&done_tasks_); |
| 56 sem_destroy(&work_tasks_); |
| 57 pthread_mutex_destroy(&mutex_); |
| 58 } |
| 59 } |
| 60 |
| 61 // Setup work parameters. This function is called from the dispatch thread, |
| 62 // when all worker threads are sleeping. |
| 63 void ThreadPool::Setup(int counter, WorkFunction work, void *data) { |
| 64 counter_ = counter; |
| 65 user_work_function_ = work; |
| 66 user_data_ = data; |
| 67 } |
| 68 |
| 69 // Decrement and get the value of the mutex protected counter. This function |
| 70 // can be called from multiple threads at any given time. |
| 71 int ThreadPool::DecCounter() { |
| 72 int v; |
| 73 pthread_mutex_lock(&mutex_); |
| 74 { |
| 75 v = --counter_; |
| 76 } |
| 77 pthread_mutex_unlock(&mutex_); |
| 78 return v; |
| 79 } |
| 80 |
| 81 // Set exit flag, post and join all the threads in the pool. This function is |
| 82 // called only from the dispatch thread, and only when all worker threads are |
| 83 // sleeping. |
| 84 void ThreadPool::PostExitAndJoinAll() { |
| 85 exiting_ = true; |
| 86 // Wake up all the sleeping worker threads. |
| 87 for (int i = 0; i < num_threads_; ++i) |
| 88 sem_post(&work_tasks_); |
| 89 void *retval; |
| 90 for (int i = 0; i < num_threads_; ++i) |
| 91 pthread_join(threads_[i], &retval); |
| 92 } |
| 93 |
| 94 // Main work loop - one for each worker thread. |
| 95 void ThreadPool::WorkLoop() { |
| 96 while (true) { |
| 97 // Wait for work. If no work is availble, this thread will sleep. |
| 98 sem_wait(&work_tasks_); |
| 99 // Workers wake up from PostWork() issued from the dispatch thread. |
| 100 if (exiting_) break; |
| 101 // Grab a task index to work on from the counter. |
| 102 int task_index = DecCounter(); |
| 103 if (task_index < 0) { |
| 104 // This indicates we're not sync'ing properly. |
| 105 fprintf(stderr, "Task index went negative!\n"); |
| 106 exit(-1); |
| 107 } |
| 108 user_work_function_(task_index, user_data_); |
| 109 // Post to dispatch thread that a task completed. |
| 110 sem_post(&done_tasks_); |
| 111 } |
| 112 } |
| 113 |
| 114 // pthread entry point for a worker thread. |
| 115 void* ThreadPool::WorkerThreadEntry(void *thiz) { |
| 116 static_cast<ThreadPool *>(thiz)->WorkLoop(); |
| 117 return NULL; |
| 118 } |
| 119 |
| 120 // MultiThread() will dispatch a set of tasks across multiple worker threads. |
| 121 // Note: This function will block until all work has completed. |
| 122 void ThreadPool::MultiThread(int num_tasks, WorkFunction work, void *data) { |
| 123 // On entry, all worker threads are sleeping. |
| 124 Setup(num_tasks, work, data); |
| 125 |
| 126 // Wake up the worker threads & have them process the tasks. |
| 127 for (int i = 0; i < num_tasks; i++) |
| 128 sem_post(&work_tasks_); |
| 129 |
| 130 // Worker threads are now awake and busy. |
| 131 |
| 132 // This dispatch thread will now sleep-wait on the done_tasks_ semaphore, |
| 133 // waiting for the worker threads to finish all tasks. |
| 134 for (int i = 0; i < num_tasks; i++) |
| 135 sem_wait(&done_tasks_); |
| 136 |
| 137 // Make sure the counter is where we expect. |
| 138 int c = DecCounter(); |
| 139 if (-1 != c) { |
| 140 fprintf(stderr, "We're not syncing correctly! (%d)\n", c); |
| 141 exit(-1); |
| 142 } |
| 143 // On exit, all tasks are done and all worker threads are sleeping again. |
| 144 } |
| 145 |
| 146 // SingleThread() will dispatch all work on this thread. |
| 147 void ThreadPool::SingleThread(int num_tasks, WorkFunction work, void *data) { |
| 148 for (int i = 0; i < num_tasks; i++) |
| 149 work(i, data); |
| 150 } |
| 151 |
| 152 // Dispatch() will invoke the user supplied work function across |
| 153 // one or more threads for each task. |
| 154 // Note: This function will block until all work has completed. |
| 155 void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void *data) { |
| 156 if (num_threads_ > 1) |
| 157 MultiThread(num_tasks, work, data); |
| 158 else |
| 159 SingleThread(num_tasks, work, data); |
| 160 } |
| 161 #endif |
OLD | NEW |