OLD | NEW |
| (Empty) |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" | |
6 | |
7 #include <stddef.h> | |
8 | |
9 #include <memory> | |
10 #include <unordered_set> | |
11 #include <vector> | |
12 | |
13 #include "base/bind.h" | |
14 #include "base/bind_helpers.h" | |
15 #include "base/callback.h" | |
16 #include "base/macros.h" | |
17 #include "base/memory/ptr_util.h" | |
18 #include "base/memory/ref_counted.h" | |
19 #include "base/synchronization/condition_variable.h" | |
20 #include "base/synchronization/lock.h" | |
21 #include "base/synchronization/waitable_event.h" | |
22 #include "base/task_runner.h" | |
23 #include "base/task_scheduler/delayed_task_manager.h" | |
24 #include "base/task_scheduler/sequence.h" | |
25 #include "base/task_scheduler/sequence_sort_key.h" | |
26 #include "base/task_scheduler/task_tracker.h" | |
27 #include "base/task_scheduler/test_task_factory.h" | |
28 #include "base/task_scheduler/test_utils.h" | |
29 #include "base/threading/platform_thread.h" | |
30 #include "base/threading/simple_thread.h" | |
31 #include "base/threading/thread_restrictions.h" | |
32 #include "testing/gtest/include/gtest/gtest.h" | |
33 | |
34 namespace base { | |
35 namespace internal { | |
36 namespace { | |
37 | |
38 const size_t kNumThreadsInThreadPool = 4; | |
39 const size_t kNumThreadsPostingTasks = 4; | |
40 const size_t kNumTasksPostedPerThread = 150; | |
41 | |
42 using IORestriction = SchedulerThreadPoolImpl::IORestriction; | |
43 | |
44 class TestDelayedTaskManager : public DelayedTaskManager { | |
45 public: | |
46 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} | |
47 | |
48 void SetCurrentTime(TimeTicks now) { now_ = now; } | |
49 | |
50 // DelayedTaskManager: | |
51 TimeTicks Now() const override { return now_; } | |
52 | |
53 private: | |
54 TimeTicks now_ = TimeTicks::Now(); | |
55 | |
56 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); | |
57 }; | |
58 | |
59 class TaskSchedulerThreadPoolImplTest | |
60 : public testing::TestWithParam<ExecutionMode> { | |
61 protected: | |
62 TaskSchedulerThreadPoolImplTest() = default; | |
63 | |
64 void SetUp() override { | |
65 thread_pool_ = SchedulerThreadPoolImpl::Create( | |
66 "TestThreadPoolWithFileIO", ThreadPriority::NORMAL, | |
67 kNumThreadsInThreadPool, IORestriction::ALLOWED, | |
68 Bind(&TaskSchedulerThreadPoolImplTest::ReEnqueueSequenceCallback, | |
69 Unretained(this)), | |
70 &task_tracker_, &delayed_task_manager_); | |
71 ASSERT_TRUE(thread_pool_); | |
72 } | |
73 | |
74 void TearDown() override { | |
75 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
76 thread_pool_->JoinForTesting(); | |
77 } | |
78 | |
79 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool_; | |
80 | |
81 TaskTracker task_tracker_; | |
82 TestDelayedTaskManager delayed_task_manager_; | |
83 | |
84 private: | |
85 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { | |
86 // In production code, this callback would be implemented by the | |
87 // TaskScheduler which would first determine which PriorityQueue the | |
88 // sequence must be re-enqueued. | |
89 const SequenceSortKey sort_key(sequence->GetSortKey()); | |
90 thread_pool_->ReEnqueueSequence(std::move(sequence), sort_key); | |
91 } | |
92 | |
93 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerThreadPoolImplTest); | |
94 }; | |
95 | |
96 using PostNestedTask = test::TestTaskFactory::PostNestedTask; | |
97 | |
98 class ThreadPostingTasks : public SimpleThread { | |
99 public: | |
100 enum class WaitBeforePostTask { | |
101 NO_WAIT, | |
102 WAIT_FOR_ALL_THREADS_IDLE, | |
103 }; | |
104 | |
105 // Constructs a thread that posts tasks to |thread_pool| through an | |
106 // |execution_mode| task runner. If |wait_before_post_task| is | |
107 // WAIT_FOR_ALL_THREADS_IDLE, the thread waits until all worker threads in | |
108 // |thread_pool| are idle before posting a new task. If |post_nested_task| is | |
109 // YES, each task posted by this thread posts another task when it runs. | |
110 ThreadPostingTasks(SchedulerThreadPoolImpl* thread_pool, | |
111 ExecutionMode execution_mode, | |
112 WaitBeforePostTask wait_before_post_task, | |
113 PostNestedTask post_nested_task) | |
114 : SimpleThread("ThreadPostingTasks"), | |
115 thread_pool_(thread_pool), | |
116 wait_before_post_task_(wait_before_post_task), | |
117 post_nested_task_(post_nested_task), | |
118 factory_(thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), | |
119 execution_mode), | |
120 execution_mode) { | |
121 DCHECK(thread_pool_); | |
122 } | |
123 | |
124 const test::TestTaskFactory* factory() const { return &factory_; } | |
125 | |
126 private: | |
127 void Run() override { | |
128 EXPECT_FALSE(factory_.task_runner()->RunsTasksOnCurrentThread()); | |
129 | |
130 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) { | |
131 if (wait_before_post_task_ == | |
132 WaitBeforePostTask::WAIT_FOR_ALL_THREADS_IDLE) { | |
133 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
134 } | |
135 EXPECT_TRUE(factory_.PostTask(post_nested_task_, Closure())); | |
136 } | |
137 } | |
138 | |
139 SchedulerThreadPoolImpl* const thread_pool_; | |
140 const scoped_refptr<TaskRunner> task_runner_; | |
141 const WaitBeforePostTask wait_before_post_task_; | |
142 const PostNestedTask post_nested_task_; | |
143 test::TestTaskFactory factory_; | |
144 | |
145 DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks); | |
146 }; | |
147 | |
148 using WaitBeforePostTask = ThreadPostingTasks::WaitBeforePostTask; | |
149 | |
150 void ShouldNotRunCallback() { | |
151 ADD_FAILURE() << "Ran a task that shouldn't run."; | |
152 } | |
153 | |
154 } // namespace | |
155 | |
156 TEST_P(TaskSchedulerThreadPoolImplTest, PostTasks) { | |
157 // Create threads to post tasks. | |
158 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | |
159 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) { | |
160 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks( | |
161 thread_pool_.get(), GetParam(), WaitBeforePostTask::NO_WAIT, | |
162 PostNestedTask::NO))); | |
163 threads_posting_tasks.back()->Start(); | |
164 } | |
165 | |
166 // Wait for all tasks to run. | |
167 for (const auto& thread_posting_tasks : threads_posting_tasks) { | |
168 thread_posting_tasks->Join(); | |
169 thread_posting_tasks->factory()->WaitForAllTasksToRun(); | |
170 } | |
171 | |
172 // Wait until all worker threads are idle to be sure that no task accesses | |
173 // its TestTaskFactory after |thread_posting_tasks| is destroyed. | |
174 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
175 } | |
176 | |
177 TEST_P(TaskSchedulerThreadPoolImplTest, PostTasksWaitAllThreadsIdle) { | |
178 // Create threads to post tasks. To verify that worker threads can sleep and | |
179 // be woken up when new tasks are posted, wait for all threads to become idle | |
180 // before posting a new task. | |
181 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | |
182 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) { | |
183 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks( | |
184 thread_pool_.get(), GetParam(), | |
185 WaitBeforePostTask::WAIT_FOR_ALL_THREADS_IDLE, PostNestedTask::NO))); | |
186 threads_posting_tasks.back()->Start(); | |
187 } | |
188 | |
189 // Wait for all tasks to run. | |
190 for (const auto& thread_posting_tasks : threads_posting_tasks) { | |
191 thread_posting_tasks->Join(); | |
192 thread_posting_tasks->factory()->WaitForAllTasksToRun(); | |
193 } | |
194 | |
195 // Wait until all worker threads are idle to be sure that no task accesses | |
196 // its TestTaskFactory after |thread_posting_tasks| is destroyed. | |
197 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
198 } | |
199 | |
200 TEST_P(TaskSchedulerThreadPoolImplTest, NestedPostTasks) { | |
201 // Create threads to post tasks. Each task posted by these threads will post | |
202 // another task when it runs. | |
203 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | |
204 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) { | |
205 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks( | |
206 thread_pool_.get(), GetParam(), WaitBeforePostTask::NO_WAIT, | |
207 PostNestedTask::YES))); | |
208 threads_posting_tasks.back()->Start(); | |
209 } | |
210 | |
211 // Wait for all tasks to run. | |
212 for (const auto& thread_posting_tasks : threads_posting_tasks) { | |
213 thread_posting_tasks->Join(); | |
214 thread_posting_tasks->factory()->WaitForAllTasksToRun(); | |
215 } | |
216 | |
217 // Wait until all worker threads are idle to be sure that no task accesses | |
218 // its TestTaskFactory after |thread_posting_tasks| is destroyed. | |
219 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
220 } | |
221 | |
222 TEST_P(TaskSchedulerThreadPoolImplTest, PostTasksWithOneAvailableThread) { | |
223 // Post blocking tasks to keep all threads busy except one until |event| is | |
224 // signaled. Use different factories so that tasks are added to different | |
225 // sequences and can run simultaneously when the execution mode is SEQUENCED. | |
226 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL, | |
227 WaitableEvent::InitialState::NOT_SIGNALED); | |
228 std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories; | |
229 for (size_t i = 0; i < (kNumThreadsInThreadPool - 1); ++i) { | |
230 blocked_task_factories.push_back(WrapUnique(new test::TestTaskFactory( | |
231 thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam()), | |
232 GetParam()))); | |
233 EXPECT_TRUE(blocked_task_factories.back()->PostTask( | |
234 PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event)))); | |
235 blocked_task_factories.back()->WaitForAllTasksToRun(); | |
236 } | |
237 | |
238 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact | |
239 // that only one thread in |thread_pool_| isn't busy. | |
240 test::TestTaskFactory short_task_factory( | |
241 thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam()), | |
242 GetParam()); | |
243 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) | |
244 EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, Closure())); | |
245 short_task_factory.WaitForAllTasksToRun(); | |
246 | |
247 // Release tasks waiting on |event|. | |
248 event.Signal(); | |
249 | |
250 // Wait until all worker threads are idle to be sure that no task accesses | |
251 // its TestTaskFactory after it is destroyed. | |
252 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
253 } | |
254 | |
255 TEST_P(TaskSchedulerThreadPoolImplTest, Saturate) { | |
256 // Verify that it is possible to have |kNumThreadsInThreadPool| | |
257 // tasks/sequences running simultaneously. Use different factories so that the | |
258 // blocking tasks are added to different sequences and can run simultaneously | |
259 // when the execution mode is SEQUENCED. | |
260 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL, | |
261 WaitableEvent::InitialState::NOT_SIGNALED); | |
262 std::vector<std::unique_ptr<test::TestTaskFactory>> factories; | |
263 for (size_t i = 0; i < kNumThreadsInThreadPool; ++i) { | |
264 factories.push_back(WrapUnique(new test::TestTaskFactory( | |
265 thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam()), | |
266 GetParam()))); | |
267 EXPECT_TRUE(factories.back()->PostTask( | |
268 PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event)))); | |
269 factories.back()->WaitForAllTasksToRun(); | |
270 } | |
271 | |
272 // Release tasks waiting on |event|. | |
273 event.Signal(); | |
274 | |
275 // Wait until all worker threads are idle to be sure that no task accesses | |
276 // its TestTaskFactory after it is destroyed. | |
277 thread_pool_->WaitForAllWorkerThreadsIdleForTesting(); | |
278 } | |
279 | |
280 // Verify that a Task can't be posted after shutdown. | |
281 TEST_P(TaskSchedulerThreadPoolImplTest, PostTaskAfterShutdown) { | |
282 auto task_runner = | |
283 thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam()); | |
284 task_tracker_.Shutdown(); | |
285 EXPECT_FALSE(task_runner->PostTask(FROM_HERE, Bind(&ShouldNotRunCallback))); | |
286 } | |
287 | |
288 // Verify that a Task posted with a delay is added to the DelayedTaskManager and | |
289 // doesn't run before its delay expires. | |
290 TEST_P(TaskSchedulerThreadPoolImplTest, PostDelayedTask) { | |
291 EXPECT_TRUE(delayed_task_manager_.GetDelayedRunTime().is_null()); | |
292 | |
293 // Post a delayed task. | |
294 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, | |
295 WaitableEvent::InitialState::NOT_SIGNALED); | |
296 EXPECT_TRUE(thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam()) | |
297 ->PostDelayedTask(FROM_HERE, Bind(&WaitableEvent::Signal, | |
298 Unretained(&task_ran)), | |
299 TimeDelta::FromSeconds(10))); | |
300 | |
301 // The task should have been added to the DelayedTaskManager. | |
302 EXPECT_FALSE(delayed_task_manager_.GetDelayedRunTime().is_null()); | |
303 | |
304 // The task shouldn't run. | |
305 EXPECT_FALSE(task_ran.IsSignaled()); | |
306 | |
307 // Fast-forward time and post tasks that are ripe for execution. | |
308 delayed_task_manager_.SetCurrentTime( | |
309 delayed_task_manager_.GetDelayedRunTime()); | |
310 delayed_task_manager_.PostReadyTasks(); | |
311 | |
312 // The task should run. | |
313 task_ran.Wait(); | |
314 } | |
315 | |
316 INSTANTIATE_TEST_CASE_P(Parallel, | |
317 TaskSchedulerThreadPoolImplTest, | |
318 ::testing::Values(ExecutionMode::PARALLEL)); | |
319 INSTANTIATE_TEST_CASE_P(Sequenced, | |
320 TaskSchedulerThreadPoolImplTest, | |
321 ::testing::Values(ExecutionMode::SEQUENCED)); | |
322 INSTANTIATE_TEST_CASE_P(SingleThreaded, | |
323 TaskSchedulerThreadPoolImplTest, | |
324 ::testing::Values(ExecutionMode::SINGLE_THREADED)); | |
325 | |
326 namespace { | |
327 | |
328 void NotReachedReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { | |
329 ADD_FAILURE() | |
330 << "Unexpected invocation of NotReachedReEnqueueSequenceCallback."; | |
331 } | |
332 | |
333 // Verifies that the current thread allows I/O if |io_restriction| is ALLOWED | |
334 // and disallows it otherwise. Signals |event| before returning. | |
335 void ExpectIORestriction(IORestriction io_restriction, WaitableEvent* event) { | |
336 DCHECK(event); | |
337 | |
338 if (io_restriction == IORestriction::ALLOWED) { | |
339 ThreadRestrictions::AssertIOAllowed(); | |
340 } else { | |
341 static_assert( | |
342 ENABLE_THREAD_RESTRICTIONS == DCHECK_IS_ON(), | |
343 "ENABLE_THREAD_RESTRICTIONS and DCHECK_IS_ON() have diverged."); | |
344 EXPECT_DCHECK_DEATH({ ThreadRestrictions::AssertIOAllowed(); }, ""); | |
345 } | |
346 | |
347 event->Signal(); | |
348 } | |
349 | |
350 class TaskSchedulerThreadPoolImplIORestrictionTest | |
351 : public testing::TestWithParam<IORestriction> { | |
352 public: | |
353 TaskSchedulerThreadPoolImplIORestrictionTest() = default; | |
354 | |
355 private: | |
356 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerThreadPoolImplIORestrictionTest); | |
357 }; | |
358 | |
359 } // namespace | |
360 | |
361 TEST_P(TaskSchedulerThreadPoolImplIORestrictionTest, IORestriction) { | |
362 TaskTracker task_tracker; | |
363 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); | |
364 | |
365 auto thread_pool = SchedulerThreadPoolImpl::Create( | |
366 "TestThreadPoolWithParam", ThreadPriority::NORMAL, 1U, GetParam(), | |
367 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, | |
368 &delayed_task_manager); | |
369 ASSERT_TRUE(thread_pool); | |
370 | |
371 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, | |
372 WaitableEvent::InitialState::NOT_SIGNALED); | |
373 thread_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) | |
374 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); | |
375 task_ran.Wait(); | |
376 | |
377 thread_pool->JoinForTesting(); | |
378 } | |
379 | |
380 INSTANTIATE_TEST_CASE_P(IOAllowed, | |
381 TaskSchedulerThreadPoolImplIORestrictionTest, | |
382 ::testing::Values(IORestriction::ALLOWED)); | |
383 INSTANTIATE_TEST_CASE_P(IODisallowed, | |
384 TaskSchedulerThreadPoolImplIORestrictionTest, | |
385 ::testing::Values(IORestriction::DISALLOWED)); | |
386 | |
387 } // namespace internal | |
388 } // namespace base | |
OLD | NEW |