Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(147)

Side by Side Diff: native_client_sdk/src/examples/demo/voronoi/threadpool.cc

Issue 15732012: Revive voronoi multi-threaded demo. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #if !defined(DISABLE_THREADS)
6
7 #include "threadpool.h"
8
9 #include <pthread.h>
10 #include <semaphore.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13
14 // TODO(nfullagar): Switch DecCounter to use atomic decrement.
15 // TODO(nfullagar): Large sets of fine grained tasks may benefit from
16 // pthread_condvar_* over sem_*
17
18 // Initializes mutex, semaphores and a pool of threads.
19 ThreadPool::ThreadPool(const int num_threads)
20 : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false),
21 user_data_(NULL), user_work_function_(NULL) {
22 if (num_threads_ > 1) {
23 int status;
24 status = pthread_mutex_init(&mutex_, NULL);
25 if (0 != status) {
26 fprintf(stderr, "Failed to initialize mutex!\n");
27 exit(-1);
28 }
29 status = sem_init(&work_tasks_, 0, 0);
30 if (-1 == status) {
31 fprintf(stderr, "Failed to initialize semaphore!\n");
32 exit(-1);
33 }
34 status = sem_init(&done_tasks_, 0, 0);
35 if (-1 == status) {
36 fprintf(stderr, "Failed to initialize semaphore!\n");
37 exit(-1);
38 }
39 threads_ = new pthread_t[num_threads_];
40 for (int i = 0; i < num_threads_; i++) {
41 status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this);
42 if (0 != status) {
43 fprintf(stderr, "Failed to create thread!\n");
44 exit(-1);
45 }
46 }
47 }
48 }
49
50 // Post exit request, wait for all threads to join, and cleanup.
51 ThreadPool::~ThreadPool() {
52 if (num_threads_ > 1) {
53 PostExitAndJoinAll();
54 delete[] threads_;
55 sem_destroy(&done_tasks_);
56 sem_destroy(&work_tasks_);
57 pthread_mutex_destroy(&mutex_);
58 }
59 }
60
61 // Setup work parameters. This function is called from the dispatch thread,
62 // when all worker threads are sleeping.
63 void ThreadPool::Setup(int counter, WorkFunction work, void *data) {
64 counter_ = counter;
65 user_work_function_ = work;
66 user_data_ = data;
67 }
68
69 // Decrement and get the value of the mutex protected counter. This function
70 // can be called from multiple threads at any given time.
71 int ThreadPool::DecCounter() {
72 int v;
73 pthread_mutex_lock(&mutex_);
74 {
75 v = --counter_;
76 }
77 pthread_mutex_unlock(&mutex_);
78 return v;
79 }
80
81 // Set exit flag, post and join all the threads in the pool. This function is
82 // called only from the dispatch thread, and only when all worker threads are
83 // sleeping.
84 void ThreadPool::PostExitAndJoinAll() {
85 exiting_ = true;
86 // Wake up all the sleeping worker threads.
87 for (int i = 0; i < num_threads_; ++i)
88 sem_post(&work_tasks_);
89 void *retval;
90 for (int i = 0; i < num_threads_; ++i)
91 pthread_join(threads_[i], &retval);
92 }
93
94 // Main work loop - one for each worker thread.
95 void ThreadPool::WorkLoop() {
96 while (true) {
97 // Wait for work. If no work is availble, this thread will sleep.
98 sem_wait(&work_tasks_);
99 // Workers wake up from PostWork() issued from the dispatch thread.
100 if (exiting_) break;
101 // Grab a task index to work on from the counter.
102 int task_index = DecCounter();
103 if (task_index < 0) {
104 // This indicates we're not sync'ing properly.
105 fprintf(stderr, "Task index went negative!\n");
106 exit(-1);
107 }
108 user_work_function_(task_index, user_data_);
109 // Post to dispatch thread that a task completed.
110 sem_post(&done_tasks_);
111 }
112 }
113
114 // pthread entry point for a worker thread.
115 void* ThreadPool::WorkerThreadEntry(void *thiz) {
116 static_cast<ThreadPool *>(thiz)->WorkLoop();
117 return NULL;
118 }
119
120 // MultiThread() will dispatch a set of tasks across multiple worker threads.
121 // Note: This function will block until all work has completed.
122 void ThreadPool::MultiThread(int num_tasks, WorkFunction work, void *data) {
123 // On entry, all worker threads are sleeping.
124 Setup(num_tasks, work, data);
125
126 // Wake up the worker threads & have them process the tasks.
127 for (int i = 0; i < num_tasks; i++)
128 sem_post(&work_tasks_);
129
130 // Worker threads are now awake and busy.
131
132 // This dispatch thread will now sleep-wait on the done_tasks_ semaphore,
133 // waiting for the worker threads to finish all tasks.
134 for (int i = 0; i < num_tasks; i++)
135 sem_wait(&done_tasks_);
136
137 // Make sure the counter is where we expect.
138 int c = DecCounter();
139 if (-1 != c) {
140 fprintf(stderr, "We're not syncing correctly! (%d)\n", c);
141 exit(-1);
142 }
143 // On exit, all tasks are done and all worker threads are sleeping again.
144 }
145
146 // SingleThread() will dispatch all work on this thread.
147 void ThreadPool::SingleThread(int num_tasks, WorkFunction work, void *data) {
148 for (int i = 0; i < num_tasks; i++)
149 work(i, data);
150 }
151
152 // Dispatch() will invoke the user supplied work function across
153 // one or more threads for each task.
154 // Note: This function will block until all work has completed.
155 void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void *data) {
156 if (num_threads_ > 1)
157 MultiThread(num_tasks, work, data);
158 else
159 SingleThread(num_tasks, work, data);
160 }
161 #endif
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698