OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "base/task_scheduler/scheduler_thread_pool.h" | |
6 | |
7 #include <stddef.h> | |
8 | |
9 #include <memory> | |
10 #include <unordered_set> | |
11 #include <vector> | |
12 | |
13 #include "base/bind.h" | |
14 #include "base/bind_helpers.h" | |
15 #include "base/macros.h" | |
16 #include "base/memory/ptr_util.h" | |
17 #include "base/synchronization/condition_variable.h" | |
18 #include "base/synchronization/lock.h" | |
19 #include "base/synchronization/waitable_event.h" | |
20 #include "base/task_runner.h" | |
21 #include "base/task_scheduler/task_tracker.h" | |
22 #include "base/threading/platform_thread.h" | |
23 #include "base/threading/simple_thread.h" | |
24 #include "testing/gtest/include/gtest/gtest.h" | |
25 | |
26 namespace base { | |
27 namespace internal { | |
28 namespace { | |
29 | |
30 const size_t kNumThreadsInThreadPool = 4; | |
31 const size_t kNumThreadsPostingTasks = 4; | |
32 const size_t kNumTasksPostedPerThread = 150; | |
33 | |
34 class TaskSchedulerThreadPoolTest : public testing::Test { | |
35 protected: | |
36 TaskSchedulerThreadPoolTest() = default; | |
37 | |
38 void SetUp() override { | |
39 thread_pool_ = SchedulerThreadPool::CreateThreadPool( | |
40 ThreadPriority::NORMAL, kNumThreadsInThreadPool, | |
41 Bind(&TaskSchedulerThreadPoolTest::RanTaskFromSequenceCallback, | |
42 Unretained(this)), | |
43 &task_tracker_); | |
44 ASSERT_TRUE(thread_pool_); | |
45 } | |
46 | |
47 void TearDown() override { | |
48 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
49 thread_pool_->JoinForTesting(); | |
50 } | |
51 | |
52 std::unique_ptr<SchedulerThreadPool> thread_pool_; | |
53 | |
54 private: | |
55 void RanTaskFromSequenceCallback(const SchedulerWorkerThread* worker_thread, | |
56 scoped_refptr<Sequence> sequence) { | |
57 // Reinsert |sequence| in |thread_pool_|'s shared PriorityQueue if it isn't | |
58 // empty after popping one of its Tasks. In production code, this callback | |
59 // would be implemented by the TaskScheduler which would first determine in | |
60 // which PriorityQueue the sequence must be reinserted. | |
61 if (!sequence->PopTask()) { | |
danakj
2016/04/05 00:27:59
document what this return value means with a temp
fdoray
2016/04/06 19:31:02
Done.
| |
62 const SequenceSortKey sort_key(sequence->GetSortKey()); | |
63 thread_pool_->ReinsertSequence(std::move(sequence), sort_key); | |
64 } | |
65 } | |
66 | |
67 TaskTracker task_tracker_; | |
68 | |
69 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerThreadPoolTest); | |
70 }; | |
71 | |
72 class TaskFactory { | |
73 public: | |
74 TaskFactory() : cv_(&lock_) {} | |
75 | |
76 // Posts a task through |task_runner|. If |post_nested_task| is true, the task | |
77 // will post a new task through |task_runner| when it runs. If |event| is set, | |
78 // the task will block until it is signaled. | |
79 void PostTask(scoped_refptr<TaskRunner> task_runner, | |
80 bool post_nested_task, | |
81 WaitableEvent* event) { | |
82 AutoLock auto_lock(lock_); | |
83 task_runner->PostTask( | |
84 FROM_HERE, Bind(&TaskFactory::RunTaskCallback, Unretained(this), | |
85 num_created_tasks_++, task_runner, post_nested_task, | |
86 Unretained(event))); | |
87 } | |
88 | |
89 // Waits for all tasks posted by PostTask() to start running. It is not | |
90 // guaranteed that the tasks have completed their execution when this returns. | |
91 void WaitForAllTasksToRun() const { | |
92 AutoLock auto_lock(lock_); | |
93 while (run_tasks_.size() < num_created_tasks_) | |
94 cv_.Wait(); | |
95 } | |
96 | |
97 size_t NumRunTasks() const { | |
98 AutoLock auto_lock(lock_); | |
99 return run_tasks_.size(); | |
100 } | |
101 | |
102 private: | |
103 void RunTaskCallback(size_t task_index, | |
104 scoped_refptr<TaskRunner> task_runner, | |
105 bool post_nested_task, | |
106 WaitableEvent* event) { | |
107 if (post_nested_task) | |
108 PostTask(task_runner, false, nullptr); | |
109 | |
110 EXPECT_TRUE(task_runner->RunsTasksOnCurrentThread()); | |
111 | |
112 { | |
113 AutoLock auto_lock(lock_); | |
114 | |
115 if (run_tasks_.find(task_index) != run_tasks_.end()) | |
116 ADD_FAILURE() << "A task ran more than once."; | |
117 run_tasks_.insert(task_index); | |
118 | |
119 cv_.Signal(); | |
120 } | |
121 | |
122 if (event) | |
123 event->Wait(); | |
124 } | |
125 | |
126 // Synchronizes access to all members below. | |
127 mutable Lock lock_; | |
128 | |
129 // Condition variable signaled when a task runs. | |
130 mutable ConditionVariable cv_; | |
131 | |
132 // Number of tasks posted by PostTask(). | |
133 size_t num_created_tasks_ = 0; | |
134 | |
135 // Indexes of tasks that ran. | |
136 std::unordered_set<size_t> run_tasks_; | |
137 | |
138 DISALLOW_COPY_AND_ASSIGN(TaskFactory); | |
139 }; | |
140 | |
141 class ThreadPostingTasks : public SimpleThread { | |
142 public: | |
143 // Constructs a thread that posts tasks to |thread_pool| through an | |
144 // |execution_mode| task runner. If |wait_for_all_threads_idle| is true, the | |
145 // thread wait until all worker threads in |thread_pool| are idle before | |
146 // posting a new task. If |post_nested_task| is true, each task posted by this | |
147 // thread posts another task when it runs. | |
148 ThreadPostingTasks(SchedulerThreadPool* thread_pool, | |
149 ExecutionMode execution_mode, | |
150 bool wait_for_all_threads_idle, | |
151 bool post_nested_task) | |
152 : SimpleThread("ThreadPostingTasks"), | |
153 thread_pool_(thread_pool), | |
154 task_runner_(thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), | |
155 execution_mode)), | |
156 wait_for_all_threads_idle_(wait_for_all_threads_idle), | |
157 post_nested_task_(post_nested_task) {} | |
158 | |
159 const TaskFactory* factory() const { return &factory_; } | |
160 | |
161 private: | |
162 void Run() override { | |
163 EXPECT_FALSE(task_runner_->RunsTasksOnCurrentThread()); | |
164 | |
165 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) { | |
166 if (wait_for_all_threads_idle_) | |
167 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
168 factory_.PostTask(task_runner_, post_nested_task_, nullptr); | |
169 } | |
170 } | |
171 | |
172 SchedulerThreadPool* const thread_pool_; | |
173 scoped_refptr<TaskRunner> task_runner_; | |
174 const bool wait_for_all_threads_idle_; | |
175 const bool post_nested_task_; | |
176 TaskFactory factory_; | |
177 | |
178 DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks); | |
179 }; | |
180 | |
181 TEST_F(TaskSchedulerThreadPoolTest, PostParallelTasks) { | |
182 // Create threads to post tasks to PARALLEL TaskRunners. | |
183 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | |
184 for (size_t j = 0; j < kNumThreadsPostingTasks; ++j) { | |
185 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks( | |
186 thread_pool_.get(), ExecutionMode::PARALLEL, false, false))); | |
187 threads_posting_tasks.back()->Start(); | |
188 } | |
189 | |
190 // Wait for all tasks to run. | |
191 for (const auto& thread_posting_tasks : threads_posting_tasks) { | |
192 thread_posting_tasks->Join(); | |
193 thread_posting_tasks->factory()->WaitForAllTasksToRun(); | |
194 EXPECT_EQ(kNumTasksPostedPerThread, | |
195 thread_posting_tasks->factory()->NumRunTasks()); | |
196 } | |
197 | |
198 // Wait until all worker threads are idle to be sure that no task accesses | |
199 // its TaskFactory after |thread_posting_tasks| is destroyed. | |
200 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
201 } | |
202 | |
203 TEST_F(TaskSchedulerThreadPoolTest, PostParallelTasksWaitAllThreadsIdle) { | |
204 // Create threads to post tasks to PARALLEL TaskRunners. To verify that | |
205 // worker threads can sleep and be woken up when new tasks are posted, wait | |
206 // for all threads to become idle before posting a new task. | |
207 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | |
208 for (size_t j = 0; j < kNumThreadsPostingTasks; ++j) { | |
209 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks( | |
210 thread_pool_.get(), ExecutionMode::PARALLEL, true, false))); | |
211 threads_posting_tasks.back()->Start(); | |
212 } | |
213 | |
214 // Wait for all tasks to run. | |
215 for (const auto& thread_posting_tasks : threads_posting_tasks) { | |
216 thread_posting_tasks->Join(); | |
217 thread_posting_tasks->factory()->WaitForAllTasksToRun(); | |
218 EXPECT_EQ(kNumTasksPostedPerThread, | |
219 thread_posting_tasks->factory()->NumRunTasks()); | |
220 } | |
221 | |
222 // Wait until all worker threads are idle to be sure that no task accesses | |
223 // its TaskFactory after |thread_posting_tasks| is destroyed. | |
224 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
225 } | |
226 | |
227 TEST_F(TaskSchedulerThreadPoolTest, NestedPostParallelTasks) { | |
228 // Create threads to post tasks to PARALLEL TaskRunners. Each task posted by | |
229 // these threads will post another task when it runs. | |
230 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | |
231 for (size_t j = 0; j < kNumThreadsPostingTasks; ++j) { | |
232 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks( | |
233 thread_pool_.get(), ExecutionMode::PARALLEL, false, true))); | |
234 threads_posting_tasks.back()->Start(); | |
235 } | |
236 | |
237 // Wait for all tasks to run. | |
238 for (const auto& thread_posting_tasks : threads_posting_tasks) { | |
239 thread_posting_tasks->Join(); | |
240 thread_posting_tasks->factory()->WaitForAllTasksToRun(); | |
241 EXPECT_EQ(2 * kNumTasksPostedPerThread, | |
242 thread_posting_tasks->factory()->NumRunTasks()); | |
243 } | |
244 | |
245 // Wait until all worker threads are idle to be sure that no task accesses | |
246 // its TaskFactory after |thread_posting_tasks| is destroyed. | |
247 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
248 } | |
249 | |
250 TEST_F(TaskSchedulerThreadPoolTest, PostParallelTasksWithOneAvailableThread) { | |
251 TaskFactory factory; | |
252 | |
253 // Post tasks to keep all threads busy except one until |event| is signaled. | |
254 WaitableEvent event(true, false); | |
255 auto task_runner = thread_pool_->CreateTaskRunnerWithTraits( | |
256 TaskTraits(), ExecutionMode::PARALLEL); | |
257 for (size_t i = 0; i < (kNumThreadsInThreadPool - 1); ++i) | |
258 factory.PostTask(task_runner, false, &event); | |
259 factory.WaitForAllTasksToRun(); | |
260 | |
261 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact | |
262 // that only one thread in |thread_pool_| isn't busy. | |
263 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) | |
264 factory.PostTask(task_runner, false, nullptr); | |
265 factory.WaitForAllTasksToRun(); | |
266 | |
267 // Release tasks waiting on |event|. | |
268 event.Signal(); | |
269 | |
270 // Wait until all worker threads are idle to be sure that no task accesses | |
271 // |factory| after it is destroyed. | |
272 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
273 } | |
274 | |
275 TEST_F(TaskSchedulerThreadPoolTest, Saturate) { | |
276 TaskFactory factory; | |
277 | |
278 // Verify that it is possible to have |kNumThreadsInThreadPool| tasks running | |
279 // simultaneously. | |
280 WaitableEvent event(true, false); | |
281 auto task_runner = thread_pool_->CreateTaskRunnerWithTraits( | |
282 TaskTraits(), ExecutionMode::PARALLEL); | |
283 for (size_t i = 0; i < kNumThreadsInThreadPool; ++i) | |
284 factory.PostTask(task_runner, false, &event); | |
285 factory.WaitForAllTasksToRun(); | |
286 | |
287 // Release tasks waiting on |event|. | |
288 event.Signal(); | |
289 | |
290 // Wait until all worker threads are idle to be sure that no task accesses | |
291 // |factory| after it is destroyed. | |
292 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
293 } | |
294 | |
295 // Checks that when PostTaskHelper is called with an empty sequence, the task | |
296 // is added to the sequence and the sequence is added to the priority queue. | |
297 TEST(TaskSchedulerPostTaskHelperTest, PostTaskInEmptySequence) { | |
298 std::unique_ptr<Task> task( | |
299 new Task(FROM_HERE, Bind(&DoNothing), TaskTraits())); | |
300 const Task* task_raw = task.get(); | |
301 scoped_refptr<Sequence> sequence(new Sequence); | |
302 PriorityQueue priority_queue(Bind(&DoNothing)); | |
303 TaskTracker task_tracker; | |
304 | |
305 // Post |task|. | |
306 PostTaskHelper(std::move(task), sequence, &priority_queue, &task_tracker); | |
307 | |
308 // Expect to find the sequence in the priority queue. | |
309 EXPECT_EQ(sequence, priority_queue.BeginTransaction()->Peek().sequence); | |
310 | |
311 // Expect to find |task| alone in |sequence|. | |
312 EXPECT_EQ(task_raw, sequence->PeekTask()); | |
313 sequence->PopTask(); | |
314 EXPECT_EQ(nullptr, sequence->PeekTask()); | |
315 } | |
316 | |
317 // Checks that when PostTaskHelper is called with a sequence that already | |
318 // contains a task, the task is added to the sequence but the sequence is not | |
319 // added to the priority queue. | |
320 TEST(TaskSchedulerPostTaskHelperTest, PostTaskInNonEmptySequence) { | |
321 std::unique_ptr<Task> task( | |
322 new Task(FROM_HERE, Bind(&DoNothing), TaskTraits())); | |
323 const Task* task_raw = task.get(); | |
324 scoped_refptr<Sequence> sequence(new Sequence); | |
325 PriorityQueue priority_queue(Bind(&DoNothing)); | |
326 TaskTracker task_tracker; | |
327 | |
328 // Add an initial task in |sequence|. | |
329 sequence->PushTask( | |
330 WrapUnique(new Task(FROM_HERE, Bind(&DoNothing), TaskTraits()))); | |
331 | |
332 // Post |task|. | |
333 PostTaskHelper(std::move(task), sequence, &priority_queue, &task_tracker); | |
334 | |
335 // Expect to find the priority queue empty. | |
336 EXPECT_TRUE(priority_queue.BeginTransaction()->Peek().is_null()); | |
337 | |
338 // Expect to find |task| in |sequence| behind the initial task. | |
339 EXPECT_NE(task_raw, sequence->PeekTask()); | |
340 sequence->PopTask(); | |
341 EXPECT_EQ(task_raw, sequence->PeekTask()); | |
342 sequence->PopTask(); | |
343 EXPECT_EQ(nullptr, sequence->PeekTask()); | |
344 } | |
345 | |
346 } // namespace | |
347 } // namespace internal | |
348 } // namespace base | |
OLD | NEW |