Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(138)

Side by Side Diff: base/task_scheduler/scheduler_thread_pool_impl_unittest.cc

Issue 2068853002: Rename SchedulerThreadPool* to SchedulerWorkerPool* (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@rename1
Patch Set: CR Feedback fdoray@ Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_thread_pool_impl.h"
6
7 #include <stddef.h>
8
9 #include <memory>
10 #include <unordered_set>
11 #include <vector>
12
13 #include "base/bind.h"
14 #include "base/bind_helpers.h"
15 #include "base/callback.h"
16 #include "base/macros.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/synchronization/condition_variable.h"
20 #include "base/synchronization/lock.h"
21 #include "base/synchronization/waitable_event.h"
22 #include "base/task_runner.h"
23 #include "base/task_scheduler/delayed_task_manager.h"
24 #include "base/task_scheduler/sequence.h"
25 #include "base/task_scheduler/sequence_sort_key.h"
26 #include "base/task_scheduler/task_tracker.h"
27 #include "base/task_scheduler/test_task_factory.h"
28 #include "base/task_scheduler/test_utils.h"
29 #include "base/threading/platform_thread.h"
30 #include "base/threading/simple_thread.h"
31 #include "base/threading/thread_restrictions.h"
32 #include "testing/gtest/include/gtest/gtest.h"
33
34 namespace base {
35 namespace internal {
36 namespace {
37
38 const size_t kNumThreadsInThreadPool = 4;
39 const size_t kNumThreadsPostingTasks = 4;
40 const size_t kNumTasksPostedPerThread = 150;
41
42 using IORestriction = SchedulerThreadPoolImpl::IORestriction;
43
44 class TestDelayedTaskManager : public DelayedTaskManager {
45 public:
46 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {}
47
48 void SetCurrentTime(TimeTicks now) { now_ = now; }
49
50 // DelayedTaskManager:
51 TimeTicks Now() const override { return now_; }
52
53 private:
54 TimeTicks now_ = TimeTicks::Now();
55
56 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager);
57 };
58
59 class TaskSchedulerThreadPoolImplTest
60 : public testing::TestWithParam<ExecutionMode> {
61 protected:
62 TaskSchedulerThreadPoolImplTest() = default;
63
64 void SetUp() override {
65 thread_pool_ = SchedulerThreadPoolImpl::Create(
66 "TestThreadPoolWithFileIO", ThreadPriority::NORMAL,
67 kNumThreadsInThreadPool, IORestriction::ALLOWED,
68 Bind(&TaskSchedulerThreadPoolImplTest::ReEnqueueSequenceCallback,
69 Unretained(this)),
70 &task_tracker_, &delayed_task_manager_);
71 ASSERT_TRUE(thread_pool_);
72 }
73
74 void TearDown() override {
75 thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
76 thread_pool_->JoinForTesting();
77 }
78
79 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool_;
80
81 TaskTracker task_tracker_;
82 TestDelayedTaskManager delayed_task_manager_;
83
84 private:
85 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) {
86 // In production code, this callback would be implemented by the
87 // TaskScheduler which would first determine which PriorityQueue the
88 // sequence must be re-enqueued.
89 const SequenceSortKey sort_key(sequence->GetSortKey());
90 thread_pool_->ReEnqueueSequence(std::move(sequence), sort_key);
91 }
92
93 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerThreadPoolImplTest);
94 };
95
96 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
97
98 class ThreadPostingTasks : public SimpleThread {
99 public:
100 enum class WaitBeforePostTask {
101 NO_WAIT,
102 WAIT_FOR_ALL_THREADS_IDLE,
103 };
104
105 // Constructs a thread that posts tasks to |thread_pool| through an
106 // |execution_mode| task runner. If |wait_before_post_task| is
107 // WAIT_FOR_ALL_THREADS_IDLE, the thread waits until all worker threads in
108 // |thread_pool| are idle before posting a new task. If |post_nested_task| is
109 // YES, each task posted by this thread posts another task when it runs.
110 ThreadPostingTasks(SchedulerThreadPoolImpl* thread_pool,
111 ExecutionMode execution_mode,
112 WaitBeforePostTask wait_before_post_task,
113 PostNestedTask post_nested_task)
114 : SimpleThread("ThreadPostingTasks"),
115 thread_pool_(thread_pool),
116 wait_before_post_task_(wait_before_post_task),
117 post_nested_task_(post_nested_task),
118 factory_(thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(),
119 execution_mode),
120 execution_mode) {
121 DCHECK(thread_pool_);
122 }
123
124 const test::TestTaskFactory* factory() const { return &factory_; }
125
126 private:
127 void Run() override {
128 EXPECT_FALSE(factory_.task_runner()->RunsTasksOnCurrentThread());
129
130 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
131 if (wait_before_post_task_ ==
132 WaitBeforePostTask::WAIT_FOR_ALL_THREADS_IDLE) {
133 thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
134 }
135 EXPECT_TRUE(factory_.PostTask(post_nested_task_, Closure()));
136 }
137 }
138
139 SchedulerThreadPoolImpl* const thread_pool_;
140 const scoped_refptr<TaskRunner> task_runner_;
141 const WaitBeforePostTask wait_before_post_task_;
142 const PostNestedTask post_nested_task_;
143 test::TestTaskFactory factory_;
144
145 DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks);
146 };
147
148 using WaitBeforePostTask = ThreadPostingTasks::WaitBeforePostTask;
149
150 void ShouldNotRunCallback() {
151 ADD_FAILURE() << "Ran a task that shouldn't run.";
152 }
153
154 } // namespace
155
156 TEST_P(TaskSchedulerThreadPoolImplTest, PostTasks) {
157 // Create threads to post tasks.
158 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
159 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
160 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks(
161 thread_pool_.get(), GetParam(), WaitBeforePostTask::NO_WAIT,
162 PostNestedTask::NO)));
163 threads_posting_tasks.back()->Start();
164 }
165
166 // Wait for all tasks to run.
167 for (const auto& thread_posting_tasks : threads_posting_tasks) {
168 thread_posting_tasks->Join();
169 thread_posting_tasks->factory()->WaitForAllTasksToRun();
170 }
171
172 // Wait until all worker threads are idle to be sure that no task accesses
173 // its TestTaskFactory after |thread_posting_tasks| is destroyed.
174 thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
175 }
176
177 TEST_P(TaskSchedulerThreadPoolImplTest, PostTasksWaitAllThreadsIdle) {
178 // Create threads to post tasks. To verify that worker threads can sleep and
179 // be woken up when new tasks are posted, wait for all threads to become idle
180 // before posting a new task.
181 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
182 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
183 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks(
184 thread_pool_.get(), GetParam(),
185 WaitBeforePostTask::WAIT_FOR_ALL_THREADS_IDLE, PostNestedTask::NO)));
186 threads_posting_tasks.back()->Start();
187 }
188
189 // Wait for all tasks to run.
190 for (const auto& thread_posting_tasks : threads_posting_tasks) {
191 thread_posting_tasks->Join();
192 thread_posting_tasks->factory()->WaitForAllTasksToRun();
193 }
194
195 // Wait until all worker threads are idle to be sure that no task accesses
196 // its TestTaskFactory after |thread_posting_tasks| is destroyed.
197 thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
198 }
199
200 TEST_P(TaskSchedulerThreadPoolImplTest, NestedPostTasks) {
201 // Create threads to post tasks. Each task posted by these threads will post
202 // another task when it runs.
203 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
204 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
205 threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks(
206 thread_pool_.get(), GetParam(), WaitBeforePostTask::NO_WAIT,
207 PostNestedTask::YES)));
208 threads_posting_tasks.back()->Start();
209 }
210
211 // Wait for all tasks to run.
212 for (const auto& thread_posting_tasks : threads_posting_tasks) {
213 thread_posting_tasks->Join();
214 thread_posting_tasks->factory()->WaitForAllTasksToRun();
215 }
216
217 // Wait until all worker threads are idle to be sure that no task accesses
218 // its TestTaskFactory after |thread_posting_tasks| is destroyed.
219 thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
220 }
221
222 TEST_P(TaskSchedulerThreadPoolImplTest, PostTasksWithOneAvailableThread) {
223 // Post blocking tasks to keep all threads busy except one until |event| is
224 // signaled. Use different factories so that tasks are added to different
225 // sequences and can run simultaneously when the execution mode is SEQUENCED.
226 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
227 WaitableEvent::InitialState::NOT_SIGNALED);
228 std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
229 for (size_t i = 0; i < (kNumThreadsInThreadPool - 1); ++i) {
230 blocked_task_factories.push_back(WrapUnique(new test::TestTaskFactory(
231 thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam()),
232 GetParam())));
233 EXPECT_TRUE(blocked_task_factories.back()->PostTask(
234 PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event))));
235 blocked_task_factories.back()->WaitForAllTasksToRun();
236 }
237
238 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
239 // that only one thread in |thread_pool_| isn't busy.
240 test::TestTaskFactory short_task_factory(
241 thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam()),
242 GetParam());
243 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
244 EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, Closure()));
245 short_task_factory.WaitForAllTasksToRun();
246
247 // Release tasks waiting on |event|.
248 event.Signal();
249
250 // Wait until all worker threads are idle to be sure that no task accesses
251 // its TestTaskFactory after it is destroyed.
252 thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
253 }
254
255 TEST_P(TaskSchedulerThreadPoolImplTest, Saturate) {
256 // Verify that it is possible to have |kNumThreadsInThreadPool|
257 // tasks/sequences running simultaneously. Use different factories so that the
258 // blocking tasks are added to different sequences and can run simultaneously
259 // when the execution mode is SEQUENCED.
260 WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
261 WaitableEvent::InitialState::NOT_SIGNALED);
262 std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
263 for (size_t i = 0; i < kNumThreadsInThreadPool; ++i) {
264 factories.push_back(WrapUnique(new test::TestTaskFactory(
265 thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam()),
266 GetParam())));
267 EXPECT_TRUE(factories.back()->PostTask(
268 PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event))));
269 factories.back()->WaitForAllTasksToRun();
270 }
271
272 // Release tasks waiting on |event|.
273 event.Signal();
274
275 // Wait until all worker threads are idle to be sure that no task accesses
276 // its TestTaskFactory after it is destroyed.
277 thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
278 }
279
280 // Verify that a Task can't be posted after shutdown.
281 TEST_P(TaskSchedulerThreadPoolImplTest, PostTaskAfterShutdown) {
282 auto task_runner =
283 thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam());
284 task_tracker_.Shutdown();
285 EXPECT_FALSE(task_runner->PostTask(FROM_HERE, Bind(&ShouldNotRunCallback)));
286 }
287
288 // Verify that a Task posted with a delay is added to the DelayedTaskManager and
289 // doesn't run before its delay expires.
290 TEST_P(TaskSchedulerThreadPoolImplTest, PostDelayedTask) {
291 EXPECT_TRUE(delayed_task_manager_.GetDelayedRunTime().is_null());
292
293 // Post a delayed task.
294 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL,
295 WaitableEvent::InitialState::NOT_SIGNALED);
296 EXPECT_TRUE(thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam())
297 ->PostDelayedTask(FROM_HERE, Bind(&WaitableEvent::Signal,
298 Unretained(&task_ran)),
299 TimeDelta::FromSeconds(10)));
300
301 // The task should have been added to the DelayedTaskManager.
302 EXPECT_FALSE(delayed_task_manager_.GetDelayedRunTime().is_null());
303
304 // The task shouldn't run.
305 EXPECT_FALSE(task_ran.IsSignaled());
306
307 // Fast-forward time and post tasks that are ripe for execution.
308 delayed_task_manager_.SetCurrentTime(
309 delayed_task_manager_.GetDelayedRunTime());
310 delayed_task_manager_.PostReadyTasks();
311
312 // The task should run.
313 task_ran.Wait();
314 }
315
316 INSTANTIATE_TEST_CASE_P(Parallel,
317 TaskSchedulerThreadPoolImplTest,
318 ::testing::Values(ExecutionMode::PARALLEL));
319 INSTANTIATE_TEST_CASE_P(Sequenced,
320 TaskSchedulerThreadPoolImplTest,
321 ::testing::Values(ExecutionMode::SEQUENCED));
322 INSTANTIATE_TEST_CASE_P(SingleThreaded,
323 TaskSchedulerThreadPoolImplTest,
324 ::testing::Values(ExecutionMode::SINGLE_THREADED));
325
326 namespace {
327
328 void NotReachedReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) {
329 ADD_FAILURE()
330 << "Unexpected invocation of NotReachedReEnqueueSequenceCallback.";
331 }
332
333 // Verifies that the current thread allows I/O if |io_restriction| is ALLOWED
334 // and disallows it otherwise. Signals |event| before returning.
335 void ExpectIORestriction(IORestriction io_restriction, WaitableEvent* event) {
336 DCHECK(event);
337
338 if (io_restriction == IORestriction::ALLOWED) {
339 ThreadRestrictions::AssertIOAllowed();
340 } else {
341 static_assert(
342 ENABLE_THREAD_RESTRICTIONS == DCHECK_IS_ON(),
343 "ENABLE_THREAD_RESTRICTIONS and DCHECK_IS_ON() have diverged.");
344 EXPECT_DCHECK_DEATH({ ThreadRestrictions::AssertIOAllowed(); }, "");
345 }
346
347 event->Signal();
348 }
349
350 class TaskSchedulerThreadPoolImplIORestrictionTest
351 : public testing::TestWithParam<IORestriction> {
352 public:
353 TaskSchedulerThreadPoolImplIORestrictionTest() = default;
354
355 private:
356 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerThreadPoolImplIORestrictionTest);
357 };
358
359 } // namespace
360
361 TEST_P(TaskSchedulerThreadPoolImplIORestrictionTest, IORestriction) {
362 TaskTracker task_tracker;
363 DelayedTaskManager delayed_task_manager(Bind(&DoNothing));
364
365 auto thread_pool = SchedulerThreadPoolImpl::Create(
366 "TestThreadPoolWithParam", ThreadPriority::NORMAL, 1U, GetParam(),
367 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker,
368 &delayed_task_manager);
369 ASSERT_TRUE(thread_pool);
370
371 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL,
372 WaitableEvent::InitialState::NOT_SIGNALED);
373 thread_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL)
374 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran));
375 task_ran.Wait();
376
377 thread_pool->JoinForTesting();
378 }
379
380 INSTANTIATE_TEST_CASE_P(IOAllowed,
381 TaskSchedulerThreadPoolImplIORestrictionTest,
382 ::testing::Values(IORestriction::ALLOWED));
383 INSTANTIATE_TEST_CASE_P(IODisallowed,
384 TaskSchedulerThreadPoolImplIORestrictionTest,
385 ::testing::Values(IORestriction::DISALLOWED));
386
387 } // namespace internal
388 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/scheduler_thread_pool_impl.cc ('k') | base/task_scheduler/scheduler_worker_pool.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698