OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <memory> | 9 #include <memory> |
10 #include <unordered_set> | 10 #include <unordered_set> |
11 #include <vector> | 11 #include <vector> |
12 | 12 |
| 13 #include "base/atomicops.h" |
13 #include "base/bind.h" | 14 #include "base/bind.h" |
14 #include "base/bind_helpers.h" | 15 #include "base/bind_helpers.h" |
15 #include "base/callback.h" | 16 #include "base/callback.h" |
16 #include "base/macros.h" | 17 #include "base/macros.h" |
17 #include "base/memory/ptr_util.h" | 18 #include "base/memory/ptr_util.h" |
18 #include "base/memory/ref_counted.h" | 19 #include "base/memory/ref_counted.h" |
19 #include "base/synchronization/condition_variable.h" | 20 #include "base/synchronization/condition_variable.h" |
20 #include "base/synchronization/lock.h" | 21 #include "base/synchronization/lock.h" |
21 #include "base/synchronization/waitable_event.h" | 22 #include "base/synchronization/waitable_event.h" |
22 #include "base/task_runner.h" | 23 #include "base/task_runner.h" |
23 #include "base/task_scheduler/delayed_task_manager.h" | 24 #include "base/task_scheduler/delayed_task_manager.h" |
24 #include "base/task_scheduler/sequence.h" | 25 #include "base/task_scheduler/sequence.h" |
25 #include "base/task_scheduler/sequence_sort_key.h" | 26 #include "base/task_scheduler/sequence_sort_key.h" |
26 #include "base/task_scheduler/task_tracker.h" | 27 #include "base/task_scheduler/task_tracker.h" |
27 #include "base/task_scheduler/test_task_factory.h" | 28 #include "base/task_scheduler/test_task_factory.h" |
28 #include "base/task_scheduler/test_utils.h" | 29 #include "base/task_scheduler/test_utils.h" |
29 #include "base/threading/platform_thread.h" | 30 #include "base/threading/platform_thread.h" |
30 #include "base/threading/simple_thread.h" | 31 #include "base/threading/simple_thread.h" |
| 32 #include "base/threading/thread_local_storage.h" |
31 #include "base/threading/thread_restrictions.h" | 33 #include "base/threading/thread_restrictions.h" |
| 34 #include "base/time/time.h" |
32 #include "testing/gtest/include/gtest/gtest.h" | 35 #include "testing/gtest/include/gtest/gtest.h" |
33 | 36 |
34 namespace base { | 37 namespace base { |
35 namespace internal { | 38 namespace internal { |
36 namespace { | 39 namespace { |
37 | 40 |
38 const size_t kNumWorkersInWorkerPool = 4; | 41 const size_t kNumWorkersInWorkerPool = 4; |
39 const size_t kNumThreadsPostingTasks = 4; | 42 const size_t kNumThreadsPostingTasks = 4; |
40 const size_t kNumTasksPostedPerThread = 150; | 43 const size_t kNumTasksPostedPerThread = 150; |
| 44 const TimeDelta kSuggestedReclaimTime = TimeDelta::FromMilliseconds(10); |
41 | 45 |
42 using IORestriction = SchedulerWorkerPoolImpl::IORestriction; | 46 using IORestriction = SchedulerWorkerPoolImpl::IORestriction; |
43 | 47 |
44 class TestDelayedTaskManager : public DelayedTaskManager { | 48 class TestDelayedTaskManager : public DelayedTaskManager { |
45 public: | 49 public: |
46 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} | 50 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} |
47 | 51 |
48 void SetCurrentTime(TimeTicks now) { now_ = now; } | 52 void SetCurrentTime(TimeTicks now) { now_ = now; } |
49 | 53 |
50 // DelayedTaskManager: | 54 // DelayedTaskManager: |
51 TimeTicks Now() const override { return now_; } | 55 TimeTicks Now() const override { return now_; } |
52 | 56 |
53 private: | 57 private: |
54 TimeTicks now_ = TimeTicks::Now(); | 58 TimeTicks now_ = TimeTicks::Now(); |
55 | 59 |
56 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); | 60 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); |
57 }; | 61 }; |
58 | 62 |
59 class TaskSchedulerWorkerPoolImplTest | 63 class TaskSchedulerWorkerPoolImplTest |
60 : public testing::TestWithParam<ExecutionMode> { | 64 : public testing::TestWithParam<ExecutionMode> { |
61 protected: | 65 protected: |
62 TaskSchedulerWorkerPoolImplTest() = default; | 66 TaskSchedulerWorkerPoolImplTest() = default; |
63 | 67 |
64 void SetUp() override { | 68 void SetUp() override { |
65 worker_pool_ = SchedulerWorkerPoolImpl::Create( | 69 worker_pool_ = SchedulerWorkerPoolImpl::Create( |
66 "TestWorkerPoolWithFileIO", ThreadPriority::NORMAL, | 70 "TestWorkerPoolWithFileIO", ThreadPriority::NORMAL, |
67 kNumWorkersInWorkerPool, IORestriction::ALLOWED, | 71 kNumWorkersInWorkerPool, IORestriction::ALLOWED, |
| 72 kSuggestedReclaimTime, |
68 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, | 73 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, |
69 Unretained(this)), | 74 Unretained(this)), |
70 &task_tracker_, &delayed_task_manager_); | 75 &task_tracker_, &delayed_task_manager_); |
71 ASSERT_TRUE(worker_pool_); | 76 ASSERT_TRUE(worker_pool_); |
72 } | 77 } |
73 | 78 |
74 void TearDown() override { | 79 void TearDown() override { |
75 worker_pool_->WaitForAllWorkersIdleForTesting(); | 80 worker_pool_->WaitForAllWorkersIdleForTesting(); |
76 worker_pool_->JoinForTesting(); | 81 worker_pool_->JoinForTesting(); |
77 } | 82 } |
(...skipping 279 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
357 }; | 362 }; |
358 | 363 |
359 } // namespace | 364 } // namespace |
360 | 365 |
361 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { | 366 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { |
362 TaskTracker task_tracker; | 367 TaskTracker task_tracker; |
363 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); | 368 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); |
364 | 369 |
365 auto worker_pool = SchedulerWorkerPoolImpl::Create( | 370 auto worker_pool = SchedulerWorkerPoolImpl::Create( |
366 "TestWorkerPoolWithParam", ThreadPriority::NORMAL, 1U, GetParam(), | 371 "TestWorkerPoolWithParam", ThreadPriority::NORMAL, 1U, GetParam(), |
367 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, | 372 TimeDelta::Max(), Bind(&NotReachedReEnqueueSequenceCallback), |
368 &delayed_task_manager); | 373 &task_tracker, &delayed_task_manager); |
369 ASSERT_TRUE(worker_pool); | 374 ASSERT_TRUE(worker_pool); |
370 | 375 |
371 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, | 376 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, |
372 WaitableEvent::InitialState::NOT_SIGNALED); | 377 WaitableEvent::InitialState::NOT_SIGNALED); |
373 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) | 378 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) |
374 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); | 379 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); |
375 task_ran.Wait(); | 380 task_ran.Wait(); |
376 | 381 |
377 worker_pool->JoinForTesting(); | 382 worker_pool->JoinForTesting(); |
378 } | 383 } |
379 | 384 |
380 INSTANTIATE_TEST_CASE_P(IOAllowed, | 385 INSTANTIATE_TEST_CASE_P(IOAllowed, |
381 TaskSchedulerWorkerPoolImplIORestrictionTest, | 386 TaskSchedulerWorkerPoolImplIORestrictionTest, |
382 ::testing::Values(IORestriction::ALLOWED)); | 387 ::testing::Values(IORestriction::ALLOWED)); |
383 INSTANTIATE_TEST_CASE_P(IODisallowed, | 388 INSTANTIATE_TEST_CASE_P(IODisallowed, |
384 TaskSchedulerWorkerPoolImplIORestrictionTest, | 389 TaskSchedulerWorkerPoolImplIORestrictionTest, |
385 ::testing::Values(IORestriction::DISALLOWED)); | 390 ::testing::Values(IORestriction::DISALLOWED)); |
386 | 391 |
| 392 namespace { |
| 393 |
| 394 constexpr size_t kMagicTlsValue = 42; |
| 395 |
| 396 class TaskSchedulerWorkerPoolSingleThreadedTest |
| 397 : public TaskSchedulerWorkerPoolImplTest { |
| 398 public: |
| 399 void SetTlsValue() { |
| 400 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); |
| 401 } |
| 402 |
| 403 void CheckTlsValue() { |
| 404 EXPECT_EQ(kMagicTlsValue, reinterpret_cast<size_t>(slot_.Get())); |
| 405 } |
| 406 |
| 407 protected: |
| 408 TaskSchedulerWorkerPoolSingleThreadedTest() = default; |
| 409 |
| 410 private: |
| 411 ThreadLocalStorage::Slot slot_; |
| 412 |
| 413 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolSingleThreadedTest); |
| 414 }; |
| 415 |
| 416 } // namespace |
| 417 |
| 418 // Verify that thread resources for a single thread remain. |
| 419 TEST_F(TaskSchedulerWorkerPoolSingleThreadedTest, SingleThreadTask) { |
| 420 auto single_thread_task_runner = |
| 421 worker_pool_->CreateTaskRunnerWithTraits( |
| 422 TaskTraits(). |
| 423 WithShutdownBehavior(TaskShutdownBehavior::BLOCK_SHUTDOWN), |
| 424 ExecutionMode::SINGLE_THREADED); |
| 425 single_thread_task_runner->PostTask( |
| 426 FROM_HERE, |
| 427 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::SetTlsValue, |
| 428 Unretained(this))); |
| 429 WaitableEvent task_waiter(WaitableEvent::ResetPolicy::MANUAL, |
| 430 WaitableEvent::InitialState::NOT_SIGNALED); |
| 431 single_thread_task_runner->PostTask( |
| 432 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter))); |
| 433 task_waiter.Wait(); |
| 434 task_waiter.Reset(); |
| 435 worker_pool_->WaitForAllWorkersIdleForTesting(); |
| 436 |
| 437 // Give the worker pool a chance to reclaim its threads. |
| 438 PlatformThread::Sleep( |
| 439 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200)); |
| 440 |
| 441 single_thread_task_runner->PostTask( |
| 442 FROM_HERE, |
| 443 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::CheckTlsValue, |
| 444 Unretained(this))); |
| 445 single_thread_task_runner->PostTask( |
| 446 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter))); |
| 447 task_waiter.Wait(); |
| 448 } |
| 449 |
| 450 namespace { |
| 451 |
| 452 class TaskSchedulerWorkerPoolCheckTlsReuse |
| 453 : public TaskSchedulerWorkerPoolImplTest { |
| 454 public: |
| 455 void SetTlsValueAndWait() { |
| 456 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue)); |
| 457 waiter_.Wait(); |
| 458 } |
| 459 |
| 460 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) { |
| 461 if (!slot_.Get()) |
| 462 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1); |
| 463 |
| 464 count_waiter->Signal(); |
| 465 waiter_.Wait(); |
| 466 } |
| 467 |
| 468 protected: |
| 469 TaskSchedulerWorkerPoolCheckTlsReuse() : |
| 470 waiter_(WaitableEvent::ResetPolicy::MANUAL, |
| 471 WaitableEvent::InitialState::NOT_SIGNALED) {} |
| 472 subtle::Atomic32 zero_tls_values_ = 0; |
| 473 |
| 474 WaitableEvent waiter_; |
| 475 |
| 476 private: |
| 477 ThreadLocalStorage::Slot slot_; |
| 478 |
| 479 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse); |
| 480 }; |
| 481 |
| 482 } // namespace |
| 483 |
| 484 // Checks that at least one thread has detached by checking the TLS. |
| 485 TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckDetachedThreads) { |
| 486 // Saturate the threads and mark each thread with a magic TLS value. |
| 487 std::vector<std::unique_ptr<test::TestTaskFactory>> factories; |
| 488 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) { |
| 489 factories.push_back(WrapUnique(new test::TestTaskFactory( |
| 490 worker_pool_->CreateTaskRunnerWithTraits( |
| 491 TaskTraits(), ExecutionMode::PARALLEL), |
| 492 ExecutionMode::PARALLEL))); |
| 493 ASSERT_TRUE(factories.back()->PostTask( |
| 494 PostNestedTask::NO, |
| 495 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait, |
| 496 Unretained(this)))); |
| 497 factories.back()->WaitForAllTasksToRun(); |
| 498 } |
| 499 |
| 500 // Release tasks waiting on |waiter_|. |
| 501 waiter_.Signal(); |
| 502 worker_pool_->WaitForAllWorkersIdleForTesting(); |
| 503 |
| 504 // All threads should be done running by now, so reset for the next phase. |
| 505 waiter_.Reset(); |
| 506 |
| 507 // Give the worker pool a chance to detach its threads. |
| 508 PlatformThread::Sleep( |
| 509 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200)); |
| 510 |
| 511 // Saturate and count the threads that do not have the magic TLS value. If the |
| 512 // value is not there, that means we're at a new thread. |
| 513 std::vector<std::unique_ptr<WaitableEvent>> count_waiters; |
| 514 for (auto& factory : factories) { |
| 515 count_waiters.push_back(WrapUnique(new WaitableEvent( |
| 516 WaitableEvent::ResetPolicy::MANUAL, |
| 517 WaitableEvent::InitialState::NOT_SIGNALED))); |
| 518 ASSERT_TRUE(factory->PostTask( |
| 519 PostNestedTask::NO, |
| 520 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValuesAndWait, |
| 521 Unretained(this), |
| 522 count_waiters.back().get()))); |
| 523 factory->WaitForAllTasksToRun(); |
| 524 } |
| 525 |
| 526 // Wait for all counters to complete. |
| 527 for (auto& count_waiter : count_waiters) |
| 528 count_waiter->Wait(); |
| 529 |
| 530 EXPECT_GT(zero_tls_values_, 0); |
| 531 |
| 532 // Release tasks waiting on |waiter_|. |
| 533 waiter_.Signal(); |
| 534 worker_pool_->WaitForAllWorkersIdleForTesting(); |
| 535 } |
| 536 |
387 } // namespace internal | 537 } // namespace internal |
388 } // namespace base | 538 } // namespace base |
OLD | NEW |