Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(63)

Side by Side Diff: native_client_sdk/src/libraries/sdk_util/thread_pool.cc

Issue 16325024: Move thread_pool.h into utils so it can be shared by more than one example. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
Property Changes:
Added: svn:eol-style
+ LF
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sdk_util/thread_pool.h"
6
7 #include <pthread.h>
8 #include <semaphore.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11
12 #include "sdk_util/auto_lock.h"
13
14 // Initializes mutex, semaphores and a pool of threads. If 0 is passed for
15 // num_threads, all work will be performed on the dispatch thread.
16 ThreadPool::ThreadPool(int num_threads)
17 : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false),
18 user_data_(NULL), user_work_function_(NULL) {
19 if (num_threads_ > 0) {
20 int status;
21 status = sem_init(&work_sem_, 0, 0);
22 if (-1 == status) {
23 fprintf(stderr, "Failed to initialize semaphore!\n");
24 exit(-1);
25 }
26 status = sem_init(&done_sem_, 0, 0);
27 if (-1 == status) {
28 fprintf(stderr, "Failed to initialize semaphore!\n");
29 exit(-1);
30 }
31 threads_ = new pthread_t[num_threads_];
32 for (int i = 0; i < num_threads_; i++) {
33 status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this);
34 if (0 != status) {
35 fprintf(stderr, "Failed to create thread!\n");
36 exit(-1);
37 }
38 }
39 }
40 }
41
42 // Post exit request, wait for all threads to join, and cleanup.
43 ThreadPool::~ThreadPool() {
44 if (num_threads_ > 0) {
45 PostExitAndJoinAll();
46 delete[] threads_;
47 sem_destroy(&done_sem_);
48 sem_destroy(&work_sem_);
49 }
50 }
51
52 // Setup work parameters. This function is called from the dispatch thread,
53 // when all worker threads are sleeping.
54 void ThreadPool::Setup(int counter, WorkFunction work, void *data) {
55 counter_ = counter;
56 user_work_function_ = work;
57 user_data_ = data;
58 }
59
60 // Return decremented task counter. This function
61 // can be called from multiple threads at any given time.
62 int ThreadPool::DecCounter() {
63 #if defined(__native_client__)
64 // Use fast atomic sub & fetch.
65 return __sync_sub_and_fetch(&counter_, 1);
66 #else
67 // Fallback to a more platform independent pthread mutex via AutoLock.
68 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
69 AutoLock lock(&mutex);
70 return --counter_;
71 #endif
72 }
73
74 // Set exit flag, post and join all the threads in the pool. This function is
75 // called only from the dispatch thread, and only when all worker threads are
76 // sleeping.
77 void ThreadPool::PostExitAndJoinAll() {
78 exiting_ = true;
79 // Wake up all the sleeping worker threads.
80 for (int i = 0; i < num_threads_; ++i)
81 sem_post(&work_sem_);
82 void* retval;
83 for (int i = 0; i < num_threads_; ++i)
84 pthread_join(threads_[i], &retval);
85 }
86
87 // Main work loop - one for each worker thread.
88 void ThreadPool::WorkLoop() {
89 while (true) {
90 // Wait for work. If no work is availble, this thread will sleep here.
91 sem_wait(&work_sem_);
92 if (exiting_) break;
93 while (true) {
94 // Grab a task index to work on from the counter.
95 int task_index = DecCounter();
96 if (task_index < 0)
97 break;
98 user_work_function_(task_index, user_data_);
99 }
100 // Post to dispatch thread work is done.
101 sem_post(&done_sem_);
102 }
103 }
104
105 // pthread entry point for a worker thread.
106 void* ThreadPool::WorkerThreadEntry(void* thiz) {
107 static_cast<ThreadPool*>(thiz)->WorkLoop();
108 return NULL;
109 }
110
111 // DispatchMany() will dispatch a set of tasks across worker threads.
112 // Note: This function will block until all work has completed.
113 void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) {
114 // On entry, all worker threads are sleeping.
115 Setup(num_tasks, work, data);
116
117 // Wake up the worker threads & have them process tasks.
118 for (int i = 0; i < num_threads_; i++)
119 sem_post(&work_sem_);
120
121 // Worker threads are now awake and busy.
122
123 // This dispatch thread will now sleep-wait for the worker threads to finish.
124 for (int i = 0; i < num_threads_; i++)
125 sem_wait(&done_sem_);
126 // On exit, all tasks are done and all worker threads are sleeping again.
127 }
128
129 // DispatchHere will dispatch all tasks on this thread.
130 void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) {
131 for (int i = 0; i < num_tasks; i++)
132 work(i, data);
133 }
134
135 // Dispatch() will invoke the user supplied work function across
136 // one or more threads for each task.
137 // Note: This function will block until all work has completed.
138 void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) {
139 if (num_threads_ > 0)
140 DispatchMany(num_tasks, work, data);
141 else
142 DispatchHere(num_tasks, work, data);
143 }
144
OLDNEW
« no previous file with comments | « native_client_sdk/src/libraries/sdk_util/thread_pool.h ('k') | native_client_sdk/src/libraries/sdk_util/thread_safe_queue.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698