OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <memory> | 9 #include <memory> |
10 #include <unordered_set> | 10 #include <unordered_set> |
11 #include <vector> | 11 #include <vector> |
12 | 12 |
| 13 #include "base/atomicops.h" |
13 #include "base/bind.h" | 14 #include "base/bind.h" |
14 #include "base/bind_helpers.h" | 15 #include "base/bind_helpers.h" |
15 #include "base/callback.h" | 16 #include "base/callback.h" |
16 #include "base/macros.h" | 17 #include "base/macros.h" |
17 #include "base/memory/ptr_util.h" | 18 #include "base/memory/ptr_util.h" |
18 #include "base/memory/ref_counted.h" | 19 #include "base/memory/ref_counted.h" |
19 #include "base/synchronization/condition_variable.h" | 20 #include "base/synchronization/condition_variable.h" |
20 #include "base/synchronization/lock.h" | 21 #include "base/synchronization/lock.h" |
21 #include "base/synchronization/waitable_event.h" | 22 #include "base/synchronization/waitable_event.h" |
22 #include "base/task_runner.h" | 23 #include "base/task_runner.h" |
23 #include "base/task_scheduler/delayed_task_manager.h" | 24 #include "base/task_scheduler/delayed_task_manager.h" |
24 #include "base/task_scheduler/scheduler_worker_pool_params.h" | 25 #include "base/task_scheduler/scheduler_worker_pool_params.h" |
25 #include "base/task_scheduler/sequence.h" | 26 #include "base/task_scheduler/sequence.h" |
26 #include "base/task_scheduler/sequence_sort_key.h" | 27 #include "base/task_scheduler/sequence_sort_key.h" |
27 #include "base/task_scheduler/task_tracker.h" | 28 #include "base/task_scheduler/task_tracker.h" |
28 #include "base/task_scheduler/test_task_factory.h" | 29 #include "base/task_scheduler/test_task_factory.h" |
29 #include "base/task_scheduler/test_utils.h" | 30 #include "base/task_scheduler/test_utils.h" |
30 #include "base/threading/platform_thread.h" | 31 #include "base/threading/platform_thread.h" |
31 #include "base/threading/simple_thread.h" | 32 #include "base/threading/simple_thread.h" |
| 33 #include "base/threading/thread_checker_impl.h" |
| 34 #include "base/threading/thread_local_storage.h" |
32 #include "base/threading/thread_restrictions.h" | 35 #include "base/threading/thread_restrictions.h" |
| 36 #include "base/time/time.h" |
33 #include "testing/gtest/include/gtest/gtest.h" | 37 #include "testing/gtest/include/gtest/gtest.h" |
34 | 38 |
35 namespace base { | 39 namespace base { |
36 namespace internal { | 40 namespace internal { |
37 namespace { | 41 namespace { |
38 | 42 |
39 const size_t kNumWorkersInWorkerPool = 4; | 43 const size_t kNumWorkersInWorkerPool = 4; |
40 const size_t kNumThreadsPostingTasks = 4; | 44 const size_t kNumThreadsPostingTasks = 4; |
41 const size_t kNumTasksPostedPerThread = 150; | 45 const size_t kNumTasksPostedPerThread = 150; |
| 46 constexpr TimeDelta kReclaimTimeForDetachTests = |
| 47 TimeDelta::FromMilliseconds(10); |
42 | 48 |
43 using IORestriction = SchedulerWorkerPoolParams::IORestriction; | 49 using IORestriction = SchedulerWorkerPoolParams::IORestriction; |
44 | 50 |
45 class TestDelayedTaskManager : public DelayedTaskManager { | 51 class TestDelayedTaskManager : public DelayedTaskManager { |
46 public: | 52 public: |
47 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} | 53 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} |
48 | 54 |
49 void SetCurrentTime(TimeTicks now) { now_ = now; } | 55 void SetCurrentTime(TimeTicks now) { now_ = now; } |
50 | 56 |
51 // DelayedTaskManager: | 57 // DelayedTaskManager: |
52 TimeTicks Now() const override { return now_; } | 58 TimeTicks Now() const override { return now_; } |
53 | 59 |
54 private: | 60 private: |
55 TimeTicks now_ = TimeTicks::Now(); | 61 TimeTicks now_ = TimeTicks::Now(); |
56 | 62 |
57 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); | 63 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); |
58 }; | 64 }; |
59 | 65 |
60 class TaskSchedulerWorkerPoolImplTest | 66 class TaskSchedulerWorkerPoolImplTest |
61 : public testing::TestWithParam<ExecutionMode> { | 67 : public testing::TestWithParam<ExecutionMode> { |
62 protected: | 68 protected: |
63 TaskSchedulerWorkerPoolImplTest() = default; | 69 TaskSchedulerWorkerPoolImplTest() = default; |
64 | 70 |
65 void SetUp() override { | 71 void SetUp() override { |
66 worker_pool_ = SchedulerWorkerPoolImpl::Create( | 72 InitializeWorkerPool(TimeDelta::Max()); |
67 SchedulerWorkerPoolParams("TestWorkerPoolWithFileIO", | |
68 ThreadPriority::NORMAL, | |
69 IORestriction::ALLOWED, | |
70 kNumWorkersInWorkerPool), | |
71 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, | |
72 Unretained(this)), | |
73 &task_tracker_, &delayed_task_manager_); | |
74 ASSERT_TRUE(worker_pool_); | |
75 } | 73 } |
76 | 74 |
77 void TearDown() override { | 75 void TearDown() override { |
78 worker_pool_->WaitForAllWorkersIdleForTesting(); | 76 worker_pool_->WaitForAllWorkersIdleForTesting(); |
79 worker_pool_->JoinForTesting(); | 77 worker_pool_->JoinForTesting(); |
80 } | 78 } |
81 | 79 |
| 80 void InitializeWorkerPool(const TimeDelta& suggested_reclaim_time) { |
| 81 worker_pool_ = SchedulerWorkerPoolImpl::Create( |
| 82 SchedulerWorkerPoolParams("TestWorkerPoolWithFileIO", |
| 83 ThreadPriority::NORMAL, |
| 84 IORestriction::ALLOWED, |
| 85 kNumWorkersInWorkerPool, |
| 86 suggested_reclaim_time), |
| 87 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, |
| 88 Unretained(this)), |
| 89 &task_tracker_, &delayed_task_manager_); |
| 90 ASSERT_TRUE(worker_pool_); |
| 91 } |
| 92 |
82 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; | 93 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_; |
83 | 94 |
84 TaskTracker task_tracker_; | 95 TaskTracker task_tracker_; |
85 TestDelayedTaskManager delayed_task_manager_; | 96 TestDelayedTaskManager delayed_task_manager_; |
86 | 97 |
87 private: | 98 private: |
88 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { | 99 void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) { |
89 // In production code, this callback would be implemented by the | 100 // In production code, this callback would be implemented by the |
90 // TaskScheduler which would first determine which PriorityQueue the | 101 // TaskScheduler which would first determine which PriorityQueue the |
91 // sequence must be re-enqueued. | 102 // sequence must be re-enqueued. |
(...skipping 268 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
360 }; | 371 }; |
361 | 372 |
362 } // namespace | 373 } // namespace |
363 | 374 |
364 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { | 375 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { |
365 TaskTracker task_tracker; | 376 TaskTracker task_tracker; |
366 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); | 377 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); |
367 | 378 |
368 auto worker_pool = SchedulerWorkerPoolImpl::Create( | 379 auto worker_pool = SchedulerWorkerPoolImpl::Create( |
369 SchedulerWorkerPoolParams("TestWorkerPoolWithParam", | 380 SchedulerWorkerPoolParams("TestWorkerPoolWithParam", |
370 ThreadPriority::NORMAL, GetParam(), 1U), | 381 ThreadPriority::NORMAL, GetParam(), 1U, |
| 382 TimeDelta::Max()), |
371 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, | 383 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, |
372 &delayed_task_manager); | 384 &delayed_task_manager); |
373 ASSERT_TRUE(worker_pool); | 385 ASSERT_TRUE(worker_pool); |
374 | 386 |
375 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, | 387 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, |
376 WaitableEvent::InitialState::NOT_SIGNALED); | 388 WaitableEvent::InitialState::NOT_SIGNALED); |
377 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) | 389 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) |
378 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); | 390 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); |
379 task_ran.Wait(); | 391 task_ran.Wait(); |
380 | 392 |
381 worker_pool->JoinForTesting(); | 393 worker_pool->JoinForTesting(); |
382 } | 394 } |
383 | 395 |
384 INSTANTIATE_TEST_CASE_P(IOAllowed, | 396 INSTANTIATE_TEST_CASE_P(IOAllowed, |
385 TaskSchedulerWorkerPoolImplIORestrictionTest, | 397 TaskSchedulerWorkerPoolImplIORestrictionTest, |
386 ::testing::Values(IORestriction::ALLOWED)); | 398 ::testing::Values(IORestriction::ALLOWED)); |
387 INSTANTIATE_TEST_CASE_P(IODisallowed, | 399 INSTANTIATE_TEST_CASE_P(IODisallowed, |
388 TaskSchedulerWorkerPoolImplIORestrictionTest, | 400 TaskSchedulerWorkerPoolImplIORestrictionTest, |
389 ::testing::Values(IORestriction::DISALLOWED)); | 401 ::testing::Values(IORestriction::DISALLOWED)); |
390 | 402 |
| 403 namespace { |
| 404 |
| 405 class TaskSchedulerWorkerPoolSingleThreadedTest |
| 406 : public TaskSchedulerWorkerPoolImplTest { |
| 407 public: |
| 408 void InitializeThreadChecker() { |
| 409 thread_checker_.reset(new ThreadCheckerImpl()); |
| 410 } |
| 411 |
| 412 void CheckValidThread() { |
| 413 EXPECT_TRUE(thread_checker_->CalledOnValidThread()); |
| 414 } |
| 415 |
| 416 protected: |
| 417 void SetUp() override { |
| 418 InitializeWorkerPool(kReclaimTimeForDetachTests); |
| 419 } |
| 420 |
| 421 TaskSchedulerWorkerPoolSingleThreadedTest() = default; |
| 422 |
| 423 private: |
| 424 std::unique_ptr<ThreadCheckerImpl> thread_checker_; |
| 425 |
| 426 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolSingleThreadedTest); |
| 427 }; |
| 428 |
| 429 } // namespace |
| 430 |
| 431 // Verify that thread resources for a single thread remain. |
| 432 TEST_F(TaskSchedulerWorkerPoolSingleThreadedTest, SingleThreadTask) { |
| 433 auto single_thread_task_runner = |
| 434 worker_pool_->CreateTaskRunnerWithTraits( |
| 435 TaskTraits(). |
| 436 WithShutdownBehavior(TaskShutdownBehavior::BLOCK_SHUTDOWN), |
| 437 ExecutionMode::SINGLE_THREADED); |
| 438 single_thread_task_runner->PostTask( |
| 439 FROM_HERE, |
| 440 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::InitializeThreadChecker, |
| 441 Unretained(this))); |
| 442 WaitableEvent task_waiter(WaitableEvent::ResetPolicy::AUTOMATIC, |
| 443 WaitableEvent::InitialState::NOT_SIGNALED); |
| 444 single_thread_task_runner->PostTask( |
| 445 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter))); |
| 446 task_waiter.Wait(); |
| 447 worker_pool_->WaitForAllWorkersIdleForTesting(); |
| 448 |
| 449 // Give the worker pool a chance to reclaim its threads. |
| 450 PlatformThread::Sleep( |
| 451 kReclaimTimeForDetachTests + TimeDelta::FromMilliseconds(200)); |
| 452 |
| 453 worker_pool_->DisallowWorkerDetachmentForTesting(); |
| 454 |
| 455 single_thread_task_runner->PostTask( |
| 456 FROM_HERE, |
| 457 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::CheckValidThread, |
| 458 Unretained(this))); |
| 459 single_thread_task_runner->PostTask( |
| 460 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter))); |
| 461 task_waiter.Wait(); |
| 462 } |
| 463 |
| 464 namespace { |
| 465 |
| 466 constexpr size_t kMagicTlsValue = 42; |
| 467 |
| 468 class TaskSchedulerWorkerPoolCheckTlsReuse |
| 469 : public TaskSchedulerWorkerPoolImplTest { |
| 470 public: |
| 471 void SetTlsValueAndWait() { |
| 472 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); |
| 473 waiter_.Wait(); |
| 474 } |
| 475 |
| 476 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) { |
| 477 if (!slot_.Get()) |
| 478 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1); |
| 479 |
| 480 count_waiter->Signal(); |
| 481 waiter_.Wait(); |
| 482 } |
| 483 |
| 484 protected: |
| 485 TaskSchedulerWorkerPoolCheckTlsReuse() : |
| 486 waiter_(WaitableEvent::ResetPolicy::MANUAL, |
| 487 WaitableEvent::InitialState::NOT_SIGNALED) {} |
| 488 |
| 489 void SetUp() override { |
| 490 InitializeWorkerPool(kReclaimTimeForDetachTests); |
| 491 } |
| 492 |
| 493 subtle::Atomic32 zero_tls_values_ = 0; |
| 494 |
| 495 WaitableEvent waiter_; |
| 496 |
| 497 private: |
| 498 ThreadLocalStorage::Slot slot_; |
| 499 |
| 500 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse); |
| 501 }; |
| 502 |
| 503 } // namespace |
| 504 |
| 505 // Checks that at least one thread has detached by checking the TLS. |
| 506 TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckDetachedThreads) { |
| 507 // Saturate the threads and mark each thread with a magic TLS value. |
| 508 std::vector<std::unique_ptr<test::TestTaskFactory>> factories; |
| 509 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) { |
| 510 factories.push_back(WrapUnique(new test::TestTaskFactory( |
| 511 worker_pool_->CreateTaskRunnerWithTraits( |
| 512 TaskTraits(), ExecutionMode::PARALLEL), |
| 513 ExecutionMode::PARALLEL))); |
| 514 ASSERT_TRUE(factories.back()->PostTask( |
| 515 PostNestedTask::NO, |
| 516 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait, |
| 517 Unretained(this)))); |
| 518 factories.back()->WaitForAllTasksToRun(); |
| 519 } |
| 520 |
| 521 // Release tasks waiting on |waiter_|. |
| 522 waiter_.Signal(); |
| 523 worker_pool_->WaitForAllWorkersIdleForTesting(); |
| 524 |
| 525 // All threads should be done running by now, so reset for the next phase. |
| 526 waiter_.Reset(); |
| 527 |
| 528 // Give the worker pool a chance to detach its threads. |
| 529 PlatformThread::Sleep( |
| 530 kReclaimTimeForDetachTests + TimeDelta::FromMilliseconds(200)); |
| 531 |
| 532 worker_pool_->DisallowWorkerDetachmentForTesting(); |
| 533 |
| 534 // Saturate and count the threads that do not have the magic TLS value. If the |
| 535 // value is not there, that means we're at a new thread. |
| 536 std::vector<std::unique_ptr<WaitableEvent>> count_waiters; |
| 537 for (auto& factory : factories) { |
| 538 count_waiters.push_back(WrapUnique(new WaitableEvent( |
| 539 WaitableEvent::ResetPolicy::MANUAL, |
| 540 WaitableEvent::InitialState::NOT_SIGNALED))); |
| 541 ASSERT_TRUE(factory->PostTask( |
| 542 PostNestedTask::NO, |
| 543 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValuesAndWait, |
| 544 Unretained(this), |
| 545 count_waiters.back().get()))); |
| 546 factory->WaitForAllTasksToRun(); |
| 547 } |
| 548 |
| 549 // Wait for all counters to complete. |
| 550 for (auto& count_waiter : count_waiters) |
| 551 count_waiter->Wait(); |
| 552 |
| 553 EXPECT_GT(subtle::NoBarrier_Load(&zero_tls_values_), 0); |
| 554 |
| 555 // Release tasks waiting on |waiter_|. |
| 556 waiter_.Signal(); |
| 557 } |
| 558 |
391 } // namespace internal | 559 } // namespace internal |
392 } // namespace base | 560 } // namespace base |
OLD | NEW |