OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "base/task_scheduler/scheduler_thread_pool.h" | |
6 | |
7 #include <stddef.h> | |
8 | |
9 #include <memory> | |
10 #include <unordered_set> | |
11 #include <vector> | |
12 | |
13 #include "base/bind.h" | |
14 #include "base/bind_helpers.h" | |
15 #include "base/macros.h" | |
16 #include "base/memory/ptr_util.h" | |
17 #include "base/memory/ref_counted.h" | |
18 #include "base/synchronization/condition_variable.h" | |
19 #include "base/synchronization/lock.h" | |
20 #include "base/synchronization/waitable_event.h" | |
21 #include "base/task_runner.h" | |
22 #include "base/task_scheduler/sequence.h" | |
23 #include "base/task_scheduler/sequence_sort_key.h" | |
24 #include "base/task_scheduler/task_tracker.h" | |
25 #include "base/threading/platform_thread.h" | |
26 #include "base/threading/simple_thread.h" | |
27 #include "testing/gtest/include/gtest/gtest.h" | |
28 | |
29 namespace base { | |
30 namespace internal { | |
31 namespace { | |
32 | |
33 const size_t kNumThreadsInThreadPool = 4; | |
34 const size_t kNumThreadsPostingTasks = 4; | |
35 const size_t kNumTasksPostedPerThread = 150; | |
36 | |
37 class TaskSchedulerThreadPoolTest : public testing::Test { | |
38 protected: | |
39 TaskSchedulerThreadPoolTest() = default; | |
40 | |
41 void SetUp() override { | |
42 thread_pool_ = SchedulerThreadPool::CreateThreadPool( | |
43 ThreadPriority::NORMAL, kNumThreadsInThreadPool, | |
44 Bind(&TaskSchedulerThreadPoolTest::EnqueueSequenceCallback, | |
45 Unretained(this)), | |
46 &task_tracker_); | |
47 ASSERT_TRUE(thread_pool_); | |
48 } | |
49 | |
50 void TearDown() override { | |
51 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
52 thread_pool_->JoinForTesting(); | |
53 } | |
54 | |
55 std::unique_ptr<SchedulerThreadPool> thread_pool_; | |
56 | |
57 private: | |
58 void EnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { | |
59 // In production code, this callback would be implemented by the | |
60 // TaskScheduler which would first determine in which PriorityQueue the | |
robliao
2016/04/13 20:50:55
Nit: Remove "in"
fdoray
2016/04/13 23:13:13
Done.
| |
61 // sequence must be reinserted. | |
62 const SequenceSortKey sort_key(sequence->GetSortKey()); | |
63 thread_pool_->EnqueueSequence(std::move(sequence), sort_key); | |
64 } | |
65 | |
66 TaskTracker task_tracker_; | |
67 | |
68 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerThreadPoolTest); | |
69 }; | |
70 | |
71 class TaskFactory { | |
72 public: | |
73 TaskFactory() : cv_(&lock_) {} | |
74 | |
75 // Posts a task through |task_runner|. If |post_nested_task| is true, the task | |
76 // will post a new task through |task_runner| when it runs. If |event| is set, | |
77 // the task will block until it is signaled. | |
78 void PostTestTask(scoped_refptr<TaskRunner> task_runner, | |
79 bool post_nested_task, | |
80 WaitableEvent* event) { | |
81 AutoLock auto_lock(lock_); | |
82 EXPECT_TRUE(task_runner->PostTask( | |
83 FROM_HERE, Bind(&TaskFactory::RunTaskCallback, Unretained(this), | |
84 num_created_tasks_++, task_runner, post_nested_task, | |
85 Unretained(event)))); | |
86 } | |
87 | |
88 // Waits for all tasks posted by PostTestTask() to start running. It is not | |
89 // guaranteed that the tasks have completed their execution when this returns. | |
90 void WaitForAllTasksToRun() const { | |
91 AutoLock auto_lock(lock_); | |
92 while (ran_tasks_.size() < num_created_tasks_) | |
93 cv_.Wait(); | |
94 } | |
95 | |
96 size_t NumRunTasks() const { | |
97 AutoLock auto_lock(lock_); | |
98 return ran_tasks_.size(); | |
99 } | |
100 | |
101 private: | |
102 void RunTaskCallback(size_t task_index, | |
103 scoped_refptr<TaskRunner> task_runner, | |
104 bool post_nested_task, | |
105 WaitableEvent* event) { | |
106 if (post_nested_task) | |
107 PostTestTask(task_runner, false, nullptr); | |
108 | |
109 EXPECT_TRUE(task_runner->RunsTasksOnCurrentThread()); | |
110 | |
111 { | |
112 AutoLock auto_lock(lock_); | |
113 | |
114 if (ran_tasks_.find(task_index) != ran_tasks_.end()) | |
115 ADD_FAILURE() << "A task ran more than once."; | |
116 ran_tasks_.insert(task_index); | |
117 | |
118 cv_.Signal(); | |
119 } | |
120 | |
121 if (event) | |
122 event->Wait(); | |
123 } | |
124 | |
125 // Synchronizes access to all members below. | |
126 mutable Lock lock_; | |
127 | |
128 // Condition variable signaled when a task runs. | |
129 mutable ConditionVariable cv_; | |
130 | |
131 // Number of tasks posted by PostTestTask(). | |
132 size_t num_created_tasks_ = 0; | |
133 | |
134 // Indexes of tasks that ran. | |
135 std::unordered_set<size_t> ran_tasks_; | |
136 | |
137 DISALLOW_COPY_AND_ASSIGN(TaskFactory); | |
138 }; | |
139 | |
140 class ThreadPostingTasks : public SimpleThread { | |
141 public: | |
142 // Constructs a thread that posts tasks to |thread_pool| through an | |
143 // |execution_mode| task runner. If |wait_for_all_threads_idle| is true, the | |
144 // thread wait until all worker threads in |thread_pool| are idle before | |
145 // posting a new task. If |post_nested_task| is true, each task posted by this | |
146 // thread posts another task when it runs. | |
147 ThreadPostingTasks(SchedulerThreadPool* thread_pool, | |
148 ExecutionMode execution_mode, | |
149 bool wait_for_all_threads_idle, | |
150 bool post_nested_task) | |
151 : SimpleThread("ThreadPostingTasks"), | |
152 thread_pool_(thread_pool), | |
153 task_runner_(thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), | |
154 execution_mode)), | |
155 wait_for_all_threads_idle_(wait_for_all_threads_idle), | |
156 post_nested_task_(post_nested_task) { | |
157 DCHECK(thread_pool_); | |
158 } | |
159 | |
160 const TaskFactory* factory() const { return &factory_; } | |
161 | |
162 private: | |
163 void Run() override { | |
164 EXPECT_FALSE(task_runner_->RunsTasksOnCurrentThread()); | |
165 | |
166 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) { | |
167 if (wait_for_all_threads_idle_) | |
168 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
169 factory_.PostTestTask(task_runner_, post_nested_task_, nullptr); | |
170 } | |
171 } | |
172 | |
173 SchedulerThreadPool* const thread_pool_; | |
174 const scoped_refptr<TaskRunner> task_runner_; | |
175 const bool wait_for_all_threads_idle_; | |
176 const bool post_nested_task_; | |
177 TaskFactory factory_; | |
178 | |
179 DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks); | |
180 }; | |
181 | |
182 TEST_F(TaskSchedulerThreadPoolTest, PostParallelTasks) { | |
183 // Create threads to post tasks to PARALLEL TaskRunners. | |
184 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | |
185 for (size_t j = 0; j < kNumThreadsPostingTasks; ++j) { | |
robliao
2016/04/13 20:50:55
Nit: size_t i, here and below.
fdoray
2016/04/13 23:13:13
Done.
| |
186 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks( | |
187 thread_pool_.get(), ExecutionMode::PARALLEL, false, false))); | |
robliao
2016/04/13 20:50:55
Optional: To make "false, false" more readable wit
fdoray
2016/04/13 23:13:12
Done.
| |
188 threads_posting_tasks.back()->Start(); | |
189 } | |
190 | |
191 // Wait for all tasks to run. | |
192 for (const auto& thread_posting_tasks : threads_posting_tasks) { | |
193 thread_posting_tasks->Join(); | |
194 thread_posting_tasks->factory()->WaitForAllTasksToRun(); | |
195 EXPECT_EQ(kNumTasksPostedPerThread, | |
196 thread_posting_tasks->factory()->NumRunTasks()); | |
197 } | |
198 | |
199 // Wait until all worker threads are idle to be sure that no task accesses | |
200 // its TaskFactory after |thread_posting_tasks| is destroyed. | |
201 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
202 } | |
203 | |
204 TEST_F(TaskSchedulerThreadPoolTest, PostParallelTasksWaitAllThreadsIdle) { | |
205 // Create threads to post tasks to PARALLEL TaskRunners. To verify that | |
206 // worker threads can sleep and be woken up when new tasks are posted, wait | |
207 // for all threads to become idle before posting a new task. | |
208 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | |
209 for (size_t j = 0; j < kNumThreadsPostingTasks; ++j) { | |
210 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks( | |
211 thread_pool_.get(), ExecutionMode::PARALLEL, true, false))); | |
212 threads_posting_tasks.back()->Start(); | |
213 } | |
214 | |
215 // Wait for all tasks to run. | |
216 for (const auto& thread_posting_tasks : threads_posting_tasks) { | |
217 thread_posting_tasks->Join(); | |
218 thread_posting_tasks->factory()->WaitForAllTasksToRun(); | |
219 EXPECT_EQ(kNumTasksPostedPerThread, | |
220 thread_posting_tasks->factory()->NumRunTasks()); | |
221 } | |
222 | |
223 // Wait until all worker threads are idle to be sure that no task accesses | |
224 // its TaskFactory after |thread_posting_tasks| is destroyed. | |
225 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
226 } | |
227 | |
228 TEST_F(TaskSchedulerThreadPoolTest, NestedPostParallelTasks) { | |
229 // Create threads to post tasks to PARALLEL TaskRunners. Each task posted by | |
230 // these threads will post another task when it runs. | |
231 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | |
232 for (size_t j = 0; j < kNumThreadsPostingTasks; ++j) { | |
233 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks( | |
234 thread_pool_.get(), ExecutionMode::PARALLEL, false, true))); | |
235 threads_posting_tasks.back()->Start(); | |
236 } | |
237 | |
238 // Wait for all tasks to run. | |
239 for (const auto& thread_posting_tasks : threads_posting_tasks) { | |
240 thread_posting_tasks->Join(); | |
241 thread_posting_tasks->factory()->WaitForAllTasksToRun(); | |
242 EXPECT_EQ(2 * kNumTasksPostedPerThread, | |
243 thread_posting_tasks->factory()->NumRunTasks()); | |
244 } | |
245 | |
246 // Wait until all worker threads are idle to be sure that no task accesses | |
247 // its TaskFactory after |thread_posting_tasks| is destroyed. | |
248 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
249 } | |
250 | |
251 TEST_F(TaskSchedulerThreadPoolTest, PostParallelTasksWithOneAvailableThread) { | |
252 TaskFactory factory; | |
253 | |
254 // Post tasks to keep all threads busy except one until |event| is signaled. | |
255 WaitableEvent event(true, false); | |
256 auto task_runner = thread_pool_->CreateTaskRunnerWithTraits( | |
257 TaskTraits(), ExecutionMode::PARALLEL); | |
258 for (size_t i = 0; i < (kNumThreadsInThreadPool - 1); ++i) | |
259 factory.PostTestTask(task_runner, false, &event); | |
260 factory.WaitForAllTasksToRun(); | |
261 | |
262 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact | |
263 // that only one thread in |thread_pool_| isn't busy. | |
264 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) | |
265 factory.PostTestTask(task_runner, false, nullptr); | |
266 factory.WaitForAllTasksToRun(); | |
267 | |
268 // Release tasks waiting on |event|. | |
269 event.Signal(); | |
270 | |
271 // Wait until all worker threads are idle to be sure that no task accesses | |
272 // |factory| after it is destroyed. | |
273 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
274 } | |
275 | |
276 TEST_F(TaskSchedulerThreadPoolTest, Saturate) { | |
277 TaskFactory factory; | |
278 | |
279 // Verify that it is possible to have |kNumThreadsInThreadPool| tasks running | |
280 // simultaneously. | |
281 WaitableEvent event(true, false); | |
282 auto task_runner = thread_pool_->CreateTaskRunnerWithTraits( | |
283 TaskTraits(), ExecutionMode::PARALLEL); | |
284 for (size_t i = 0; i < kNumThreadsInThreadPool; ++i) | |
285 factory.PostTestTask(task_runner, false, &event); | |
286 factory.WaitForAllTasksToRun(); | |
287 | |
288 // Release tasks waiting on |event|. | |
289 event.Signal(); | |
290 | |
291 // Wait until all worker threads are idle to be sure that no task accesses | |
292 // |factory| after it is destroyed. | |
293 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
294 } | |
295 | |
296 } // namespace | |
297 } // namespace internal | |
298 } // namespace base | |
OLD | NEW |