OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "base/task_scheduler/scheduler_thread_pool.h" | |
6 | |
7 #include <stddef.h> | |
8 | |
9 #include <memory> | |
10 #include <unordered_set> | |
11 #include <vector> | |
12 | |
13 #include "base/bind.h" | |
14 #include "base/bind_helpers.h" | |
15 #include "base/macros.h" | |
16 #include "base/memory/ptr_util.h" | |
17 #include "base/synchronization/condition_variable.h" | |
18 #include "base/synchronization/waitable_event.h" | |
19 #include "base/task_runner.h" | |
20 #include "base/task_scheduler/scheduler_lock.h" | |
21 #include "base/task_scheduler/task_tracker.h" | |
22 #include "base/threading/platform_thread.h" | |
23 #include "base/threading/simple_thread.h" | |
24 #include "testing/gtest/include/gtest/gtest.h" | |
25 | |
26 namespace base { | |
27 namespace internal { | |
28 namespace { | |
29 | |
30 const size_t kNumRepetitions = 4; | |
31 const size_t kNumThreadsInThreadPool = 4; | |
32 const size_t kNumThreadsPostingTasks = 4; | |
33 const size_t kNumTasksPostedPerThread = 150; | |
34 | |
35 class TaskSchedulerThreadPoolTest : public testing::Test { | |
36 protected: | |
37 TaskSchedulerThreadPoolTest() : cv_(lock_.CreateConditionVariable()) {} | |
38 | |
39 void SetUp() override { | |
40 thread_pool_ = SchedulerThreadPool::CreateThreadPool( | |
41 ThreadPriority::NORMAL, kNumThreadsInThreadPool, | |
42 Bind(&TaskSchedulerThreadPoolTest::RanTaskFromSequenceCallback, | |
43 Unretained(this)), | |
44 &task_tracker_); | |
45 ASSERT_TRUE(thread_pool_); | |
46 } | |
47 | |
48 void TearDown() override { thread_pool_->JoinForTesting(); } | |
49 | |
50 // Waits for all tasks returned by CreateTask() to start running. It is not | |
51 // guaranteed that the tasks have completed their execution when this returns. | |
52 void WaitForAllTasksToRun() { | |
53 AutoSchedulerLock auto_lock(lock_); | |
54 while (run_tasks_.size() < num_created_tasks_) | |
55 cv_->Wait(); | |
56 } | |
57 | |
58 size_t NumRunTasks() const { | |
59 AutoSchedulerLock auto_lock(lock_); | |
60 return run_tasks_.size(); | |
61 } | |
62 | |
63 std::unique_ptr<SchedulerThreadPool> thread_pool_; | |
64 | |
65 public: | |
66 // Creates a task. |task_runner| is the TaskRunner through which the task will | |
67 // be posted. If |post_nested_task| is true, the task will post a new task | |
68 // through |task_runner| when it runs. If |event| is set, the task will block | |
69 // until |event| is signaled. | |
70 Closure CreateTask(scoped_refptr<TaskRunner> task_runner, | |
71 bool post_nested_task, | |
72 WaitableEvent* event) { | |
73 AutoSchedulerLock auto_lock(lock_); | |
74 return Bind(&TaskSchedulerThreadPoolTest::RunTaskCallback, Unretained(this), | |
75 num_created_tasks_++, task_runner, post_nested_task, | |
76 Unretained(event)); | |
77 } | |
78 | |
79 private: | |
80 void RanTaskFromSequenceCallback(const SchedulerWorkerThread* worker_thread, | |
81 scoped_refptr<Sequence> sequence) { | |
82 if (!sequence->PopTask()) { | |
83 const SequenceSortKey sort_key(sequence->GetSortKey()); | |
84 thread_pool_->ReinsertSequence(std::move(sequence), sort_key); | |
85 } | |
86 } | |
87 | |
88 void RunTaskCallback(size_t task_index, | |
89 scoped_refptr<TaskRunner> task_runner, | |
90 bool post_nested_task, | |
91 WaitableEvent* event) { | |
92 if (post_nested_task) | |
93 task_runner->PostTask(FROM_HERE, CreateTask(task_runner, false, nullptr)); | |
94 | |
95 EXPECT_TRUE(task_runner->RunsTasksOnCurrentThread()); | |
96 | |
97 { | |
98 AutoSchedulerLock auto_lock(lock_); | |
99 | |
100 if (run_tasks_.find(task_index) != run_tasks_.end()) | |
101 ADD_FAILURE() << "A task ran more than once."; | |
102 run_tasks_.insert(task_index); | |
103 | |
104 cv_->Signal(); | |
105 } | |
106 | |
107 if (event) | |
108 event->Wait(); | |
109 } | |
110 | |
111 TaskTracker task_tracker_; | |
112 | |
113 // Synchronizes access to all members below. | |
114 mutable SchedulerLock lock_; | |
115 | |
116 // Condition variable signaled when a task completes its execution. | |
117 std::unique_ptr<ConditionVariable> cv_; | |
118 | |
119 // Number of tasks returned by CreateTask(). | |
120 size_t num_created_tasks_ = 0; | |
121 | |
122 // Indexes of tasks that ran. | |
123 std::unordered_set<size_t> run_tasks_; | |
124 | |
125 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerThreadPoolTest); | |
126 }; | |
127 | |
128 class ThreadPostingTasks : public SimpleThread { | |
129 public: | |
130 // If |post_nested_task| is true, each task posted by this thread will post | |
131 // another task when it runs. | |
132 ThreadPostingTasks(scoped_refptr<TaskRunner> task_runner, | |
133 bool post_nested_task, | |
134 TaskSchedulerThreadPoolTest* test) | |
135 : SimpleThread("ThreadPostingTasks"), | |
136 task_runner_(std::move(task_runner)), | |
137 post_nested_task_(post_nested_task), | |
138 test_(test) {} | |
139 | |
140 private: | |
141 void Run() override { | |
142 EXPECT_FALSE(task_runner_->RunsTasksOnCurrentThread()); | |
143 | |
144 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) { | |
145 task_runner_->PostTask( | |
146 FROM_HERE, | |
147 test_->CreateTask(task_runner_, post_nested_task_, nullptr)); | |
148 } | |
149 } | |
150 | |
151 scoped_refptr<TaskRunner> task_runner_; | |
152 const bool post_nested_task_; | |
153 TaskSchedulerThreadPoolTest* const test_; | |
154 | |
155 DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks); | |
156 }; | |
157 | |
158 TEST_F(TaskSchedulerThreadPoolTest, PostParallelTasks) { | |
159 // Repeat the test |kNumRepetitions| times, waiting for all | |
160 // SchedulerWorkerThreads to become idle between each repetition. This ensures | |
161 // that TaskRunners can wake up worker threads that have added themselves to | |
162 // the stack of idle threads of their thread pool. | |
163 for (size_t i = 0; i < kNumRepetitions; ++i) { | |
164 // Create |kNumTaskRunners| * |kNumThreadsPostingTasksPerTaskRunner| threads | |
165 // that will post tasks to |kNumTaskRunners| PARALLEL TaskRunners. | |
166 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | |
167 for (size_t j = 0; j < kNumThreadsPostingTasks; ++j) { | |
168 scoped_refptr<TaskRunner> task_runner = | |
169 thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), | |
170 ExecutionMode::PARALLEL); | |
171 threads_posting_tasks.push_back( | |
172 WrapUnique(new ThreadPostingTasks(task_runner, false, this))); | |
173 threads_posting_tasks.back()->Start(); | |
174 } | |
175 | |
176 for (const auto& thread_posting_tasks : threads_posting_tasks) | |
177 thread_posting_tasks->Join(); | |
178 | |
179 WaitForAllTasksToRun(); | |
180 EXPECT_EQ((i + 1) * kNumThreadsPostingTasks * kNumTasksPostedPerThread, | |
181 NumRunTasks()); | |
182 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
183 } | |
184 } | |
185 | |
186 TEST_F(TaskSchedulerThreadPoolTest, PostParallelTasksWithNestedPostTasks) { | |
187 // Repeat the test |kNumRepetitions| times, waiting for all | |
188 // SchedulerWorkerThreads to become idle between each repetition. This ensures | |
189 // that TaskRunners can wake up worker threads that have added themselves to | |
190 // the stack of idle threads of their thread pool. | |
191 for (size_t i = 0; i < kNumRepetitions; ++i) { | |
192 // Create |kNumTaskRunners| * |kNumThreadsPostingTasksPerTaskRunner| threads | |
193 // that will post tasks to |kNumTaskRunners| PARALLEL TaskRunners. Each task | |
194 // posted by these threads will post another task when it runs. | |
195 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | |
196 for (size_t j = 0; j < kNumThreadsPostingTasks; ++j) { | |
197 auto task_runner = thread_pool_->CreateTaskRunnerWithTraits( | |
198 TaskTraits(), ExecutionMode::PARALLEL); | |
199 threads_posting_tasks.push_back( | |
200 WrapUnique(new ThreadPostingTasks(task_runner, true, this))); | |
201 threads_posting_tasks.back()->Start(); | |
202 } | |
203 | |
204 for (const auto& thread_posting_tasks : threads_posting_tasks) | |
205 thread_posting_tasks->Join(); | |
206 | |
207 WaitForAllTasksToRun(); | |
208 EXPECT_EQ(2 * (i + 1) * kNumThreadsPostingTasks * kNumTasksPostedPerThread, | |
209 NumRunTasks()); | |
210 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
211 } | |
212 } | |
213 | |
214 TEST_F(TaskSchedulerThreadPoolTest, PostParallelTasksWithOneAvailableThread) { | |
215 // Post tasks to keep all threads busy except one until |event| is signaled. | |
216 WaitableEvent event( | |
217 true, // Manual reset, to wake up multiple waiters at once. | |
robliao
2016/04/01 21:05:51
Nit: I think we can omit these comments. The premi
fdoray
2016/04/01 21:45:30
Done.
| |
218 false); // Not signaled initially. | |
219 auto task_runner = thread_pool_->CreateTaskRunnerWithTraits( | |
220 TaskTraits(), ExecutionMode::PARALLEL); | |
221 for (size_t i = 0; i < (kNumThreadsInThreadPool - 1); ++i) | |
222 task_runner->PostTask(FROM_HERE, CreateTask(task_runner, false, &event)); | |
223 WaitForAllTasksToRun(); | |
224 | |
225 // Post |kNumTasksPostedPerThread| tasks that should run despite the fact | |
226 // that only one thread in |thread_pool_| isn't busy. | |
227 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) | |
228 task_runner->PostTask(FROM_HERE, CreateTask(task_runner, false, nullptr)); | |
229 WaitForAllTasksToRun(); | |
230 | |
231 // Release the tasks waiting on |event|. | |
232 event.Signal(); | |
233 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
234 } | |
235 | |
236 TEST_F(TaskSchedulerThreadPoolTest, Saturate) { | |
237 // Verify that it is possible to have |kNumThreadsInThreadPool| tasks running | |
238 // simultaneously. | |
239 WaitableEvent event( | |
240 true, // Manual reset, to wake up multiple waiters at once. | |
241 false); // Not signaled initially. | |
242 auto task_runner = thread_pool_->CreateTaskRunnerWithTraits( | |
243 TaskTraits(), ExecutionMode::PARALLEL); | |
244 for (size_t i = 0; i < kNumThreadsInThreadPool; ++i) | |
245 task_runner->PostTask(FROM_HERE, CreateTask(task_runner, false, &event)); | |
246 WaitForAllTasksToRun(); | |
247 event.Signal(); | |
248 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
249 } | |
250 | |
251 // Checks that when PostTaskHelper is called with an empty sequence, the task | |
252 // is added to the sequence and the sequence is added to the priority queue. | |
253 TEST(TaskSchedulerPostTaskHelperTest, PostTaskInEmptySequence) { | |
254 std::unique_ptr<Task> task( | |
255 new Task(FROM_HERE, Bind(&DoNothing), TaskTraits())); | |
256 const Task* task_raw = task.get(); | |
257 scoped_refptr<Sequence> sequence(new Sequence); | |
258 PriorityQueue priority_queue(Bind(&DoNothing)); | |
259 TaskTracker task_tracker; | |
260 | |
261 // Post |task|. | |
262 PostTaskHelper(std::move(task), sequence, &priority_queue, &task_tracker); | |
263 | |
264 // Expect to find the sequence in the priority queue. | |
265 EXPECT_EQ(sequence, priority_queue.BeginTransaction()->Peek().sequence); | |
266 | |
267 // Expect to find |task| alone in |sequence|. | |
268 EXPECT_EQ(task_raw, sequence->PeekTask()); | |
269 sequence->PopTask(); | |
270 EXPECT_EQ(nullptr, sequence->PeekTask()); | |
271 } | |
272 | |
273 // Checks that when PostTaskHelper is called with a sequence that already | |
274 // contains a task, the task is added to the sequence but the sequence is not | |
275 // added to the priority queue. | |
276 TEST(TaskSchedulerPostTaskHelperTest, PostTaskInNonEmptySequence) { | |
277 std::unique_ptr<Task> task( | |
278 new Task(FROM_HERE, Bind(&DoNothing), TaskTraits())); | |
279 const Task* task_raw = task.get(); | |
280 scoped_refptr<Sequence> sequence(new Sequence); | |
281 PriorityQueue priority_queue(Bind(&DoNothing)); | |
282 TaskTracker task_tracker; | |
283 | |
284 // Add an initial task in |sequence|. | |
285 sequence->PushTask( | |
286 WrapUnique(new Task(FROM_HERE, Bind(&DoNothing), TaskTraits()))); | |
287 | |
288 // Post |task|. | |
289 PostTaskHelper(std::move(task), sequence, &priority_queue, &task_tracker); | |
290 | |
291 // Expect to find the priority queue empty. | |
292 EXPECT_TRUE(priority_queue.BeginTransaction()->Peek().is_null()); | |
293 | |
294 // Expect to find |task| in |sequence| behind the initial task. | |
295 EXPECT_NE(task_raw, sequence->PeekTask()); | |
296 sequence->PopTask(); | |
297 EXPECT_EQ(task_raw, sequence->PeekTask()); | |
298 sequence->PopTask(); | |
299 EXPECT_EQ(nullptr, sequence->PeekTask()); | |
300 } | |
301 | |
302 } // namespace | |
303 } // namespace internal | |
304 } // namespace base | |
OLD | NEW |