Index: native_client_sdk/src/libraries/sdk_util/thread_pool.cc |
=================================================================== |
--- native_client_sdk/src/libraries/sdk_util/thread_pool.cc (revision 0) |
+++ native_client_sdk/src/libraries/sdk_util/thread_pool.cc (revision 0) |
@@ -0,0 +1,144 @@ |
+// Copyright (c) 2013 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "sdk_util/thread_pool.h" |
+ |
+#include <pthread.h> |
+#include <semaphore.h> |
+#include <stdio.h> |
+#include <stdlib.h> |
+ |
+#include "sdk_util/auto_lock.h" |
+ |
+// Initializes mutex, semaphores and a pool of threads. If 0 is passed for |
+// num_threads, all work will be performed on the dispatch thread. |
+ThreadPool::ThreadPool(int num_threads) |
+ : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), |
+ user_data_(NULL), user_work_function_(NULL) { |
+ if (num_threads_ > 0) { |
+ int status; |
+ status = sem_init(&work_sem_, 0, 0); |
+ if (-1 == status) { |
+ fprintf(stderr, "Failed to initialize semaphore!\n"); |
+ exit(-1); |
+ } |
+ status = sem_init(&done_sem_, 0, 0); |
+ if (-1 == status) { |
+ fprintf(stderr, "Failed to initialize semaphore!\n"); |
+ exit(-1); |
+ } |
+ threads_ = new pthread_t[num_threads_]; |
+ for (int i = 0; i < num_threads_; i++) { |
+ status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); |
+ if (0 != status) { |
+ fprintf(stderr, "Failed to create thread!\n"); |
+ exit(-1); |
+ } |
+ } |
+ } |
+} |
+ |
+// Post exit request, wait for all threads to join, and cleanup. |
+ThreadPool::~ThreadPool() { |
+ if (num_threads_ > 0) { |
+ PostExitAndJoinAll(); |
+ delete[] threads_; |
+ sem_destroy(&done_sem_); |
+ sem_destroy(&work_sem_); |
+ } |
+} |
+ |
+// Setup work parameters. This function is called from the dispatch thread, |
+// when all worker threads are sleeping. |
+void ThreadPool::Setup(int counter, WorkFunction work, void *data) { |
+ counter_ = counter; |
+ user_work_function_ = work; |
+ user_data_ = data; |
+} |
+ |
+// Return decremented task counter. This function |
+// can be called from multiple threads at any given time. |
+int ThreadPool::DecCounter() { |
+#if defined(__native_client__) |
+ // Use fast atomic sub & fetch. |
+ return __sync_sub_and_fetch(&counter_, 1); |
+#else |
+ // Fallback to a more platform independent pthread mutex via AutoLock. |
+ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; |
+ AutoLock lock(&mutex); |
+ return --counter_; |
+#endif |
+} |
+ |
+// Set exit flag, post and join all the threads in the pool. This function is |
+// called only from the dispatch thread, and only when all worker threads are |
+// sleeping. |
+void ThreadPool::PostExitAndJoinAll() { |
+ exiting_ = true; |
+ // Wake up all the sleeping worker threads. |
+ for (int i = 0; i < num_threads_; ++i) |
+ sem_post(&work_sem_); |
+ void* retval; |
+ for (int i = 0; i < num_threads_; ++i) |
+ pthread_join(threads_[i], &retval); |
+} |
+ |
+// Main work loop - one for each worker thread. |
+void ThreadPool::WorkLoop() { |
+ while (true) { |
+ // Wait for work. If no work is availble, this thread will sleep here. |
+ sem_wait(&work_sem_); |
+ if (exiting_) break; |
+ while (true) { |
+ // Grab a task index to work on from the counter. |
+ int task_index = DecCounter(); |
+ if (task_index < 0) |
+ break; |
+ user_work_function_(task_index, user_data_); |
+ } |
+ // Post to dispatch thread work is done. |
+ sem_post(&done_sem_); |
+ } |
+} |
+ |
+// pthread entry point for a worker thread. |
+void* ThreadPool::WorkerThreadEntry(void* thiz) { |
+ static_cast<ThreadPool*>(thiz)->WorkLoop(); |
+ return NULL; |
+} |
+ |
+// DispatchMany() will dispatch a set of tasks across worker threads. |
+// Note: This function will block until all work has completed. |
+void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) { |
+ // On entry, all worker threads are sleeping. |
+ Setup(num_tasks, work, data); |
+ |
+ // Wake up the worker threads & have them process tasks. |
+ for (int i = 0; i < num_threads_; i++) |
+ sem_post(&work_sem_); |
+ |
+ // Worker threads are now awake and busy. |
+ |
+ // This dispatch thread will now sleep-wait for the worker threads to finish. |
+ for (int i = 0; i < num_threads_; i++) |
+ sem_wait(&done_sem_); |
+ // On exit, all tasks are done and all worker threads are sleeping again. |
+} |
+ |
+// DispatchHere will dispatch all tasks on this thread. |
+void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) { |
+ for (int i = 0; i < num_tasks; i++) |
+ work(i, data); |
+} |
+ |
+// Dispatch() will invoke the user supplied work function across |
+// one or more threads for each task. |
+// Note: This function will block until all work has completed. |
+void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) { |
+ if (num_threads_ > 0) |
+ DispatchMany(num_tasks, work, data); |
+ else |
+ DispatchHere(num_tasks, work, data); |
+} |
+ |
Property changes on: native_client_sdk/src/libraries/sdk_util/thread_pool.cc |
___________________________________________________________________ |
Added: svn:eol-style |
+ LF |