Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(215)

Side by Side Diff: base/task_scheduler/scheduler_worker_pool_impl_unittest.cc

Issue 2116163002: Add Lazy Creation and Thread Detachment Support in the Scheduler Worker Pool (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <memory> 9 #include <memory>
10 #include <unordered_set> 10 #include <unordered_set>
11 #include <vector> 11 #include <vector>
12 12
13 #include "base/atomicops.h"
13 #include "base/bind.h" 14 #include "base/bind.h"
14 #include "base/bind_helpers.h" 15 #include "base/bind_helpers.h"
15 #include "base/callback.h" 16 #include "base/callback.h"
16 #include "base/macros.h" 17 #include "base/macros.h"
17 #include "base/memory/ptr_util.h" 18 #include "base/memory/ptr_util.h"
18 #include "base/memory/ref_counted.h" 19 #include "base/memory/ref_counted.h"
19 #include "base/synchronization/condition_variable.h" 20 #include "base/synchronization/condition_variable.h"
20 #include "base/synchronization/lock.h" 21 #include "base/synchronization/lock.h"
21 #include "base/synchronization/waitable_event.h" 22 #include "base/synchronization/waitable_event.h"
22 #include "base/task_runner.h" 23 #include "base/task_runner.h"
23 #include "base/task_scheduler/delayed_task_manager.h" 24 #include "base/task_scheduler/delayed_task_manager.h"
24 #include "base/task_scheduler/sequence.h" 25 #include "base/task_scheduler/sequence.h"
25 #include "base/task_scheduler/sequence_sort_key.h" 26 #include "base/task_scheduler/sequence_sort_key.h"
26 #include "base/task_scheduler/task_tracker.h" 27 #include "base/task_scheduler/task_tracker.h"
27 #include "base/task_scheduler/test_task_factory.h" 28 #include "base/task_scheduler/test_task_factory.h"
28 #include "base/task_scheduler/test_utils.h" 29 #include "base/task_scheduler/test_utils.h"
29 #include "base/threading/platform_thread.h" 30 #include "base/threading/platform_thread.h"
30 #include "base/threading/simple_thread.h" 31 #include "base/threading/simple_thread.h"
32 #include "base/threading/thread_local_storage.h"
31 #include "base/threading/thread_restrictions.h" 33 #include "base/threading/thread_restrictions.h"
34 #include "base/time/time.h"
32 #include "testing/gtest/include/gtest/gtest.h" 35 #include "testing/gtest/include/gtest/gtest.h"
33 36
34 namespace base { 37 namespace base {
35 namespace internal { 38 namespace internal {
36 namespace { 39 namespace {
37 40
38 const size_t kNumWorkersInWorkerPool = 4; 41 const size_t kNumWorkersInWorkerPool = 4;
39 const size_t kNumThreadsPostingTasks = 4; 42 const size_t kNumThreadsPostingTasks = 4;
40 const size_t kNumTasksPostedPerThread = 150; 43 const size_t kNumTasksPostedPerThread = 150;
44 const TimeDelta kSuggestedReclaimTime = TimeDelta::FromMilliseconds(10);
41 45
42 using IORestriction = SchedulerWorkerPoolImpl::IORestriction; 46 using IORestriction = SchedulerWorkerPoolImpl::IORestriction;
43 47
44 class TestDelayedTaskManager : public DelayedTaskManager { 48 class TestDelayedTaskManager : public DelayedTaskManager {
45 public: 49 public:
46 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {} 50 TestDelayedTaskManager() : DelayedTaskManager(Bind(&DoNothing)) {}
47 51
48 void SetCurrentTime(TimeTicks now) { now_ = now; } 52 void SetCurrentTime(TimeTicks now) { now_ = now; }
49 53
50 // DelayedTaskManager: 54 // DelayedTaskManager:
51 TimeTicks Now() const override { return now_; } 55 TimeTicks Now() const override { return now_; }
52 56
53 private: 57 private:
54 TimeTicks now_ = TimeTicks::Now(); 58 TimeTicks now_ = TimeTicks::Now();
55 59
56 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager); 60 DISALLOW_COPY_AND_ASSIGN(TestDelayedTaskManager);
57 }; 61 };
58 62
59 class TaskSchedulerWorkerPoolImplTest 63 class TaskSchedulerWorkerPoolImplTest
60 : public testing::TestWithParam<ExecutionMode> { 64 : public testing::TestWithParam<ExecutionMode> {
61 protected: 65 protected:
62 TaskSchedulerWorkerPoolImplTest() = default; 66 TaskSchedulerWorkerPoolImplTest() = default;
63 67
64 void SetUp() override { 68 void SetUp() override {
65 worker_pool_ = SchedulerWorkerPoolImpl::Create( 69 worker_pool_ = SchedulerWorkerPoolImpl::Create(
66 "TestWorkerPoolWithFileIO", ThreadPriority::NORMAL, 70 "TestWorkerPoolWithFileIO", ThreadPriority::NORMAL,
67 kNumWorkersInWorkerPool, IORestriction::ALLOWED, 71 kNumWorkersInWorkerPool, IORestriction::ALLOWED,
72 kSuggestedReclaimTime,
68 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback, 73 Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback,
69 Unretained(this)), 74 Unretained(this)),
70 &task_tracker_, &delayed_task_manager_); 75 &task_tracker_, &delayed_task_manager_);
71 ASSERT_TRUE(worker_pool_); 76 ASSERT_TRUE(worker_pool_);
72 } 77 }
73 78
74 void TearDown() override { 79 void TearDown() override {
75 worker_pool_->WaitForAllWorkersIdleForTesting(); 80 worker_pool_->WaitForAllWorkersIdleForTesting();
76 worker_pool_->JoinForTesting(); 81 worker_pool_->JoinForTesting();
77 } 82 }
(...skipping 279 matching lines...) Expand 10 before | Expand all | Expand 10 after
357 }; 362 };
358 363
359 } // namespace 364 } // namespace
360 365
361 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) { 366 TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) {
362 TaskTracker task_tracker; 367 TaskTracker task_tracker;
363 DelayedTaskManager delayed_task_manager(Bind(&DoNothing)); 368 DelayedTaskManager delayed_task_manager(Bind(&DoNothing));
364 369
365 auto worker_pool = SchedulerWorkerPoolImpl::Create( 370 auto worker_pool = SchedulerWorkerPoolImpl::Create(
366 "TestWorkerPoolWithParam", ThreadPriority::NORMAL, 1U, GetParam(), 371 "TestWorkerPoolWithParam", ThreadPriority::NORMAL, 1U, GetParam(),
367 Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker, 372 TimeDelta::Max(), Bind(&NotReachedReEnqueueSequenceCallback),
368 &delayed_task_manager); 373 &task_tracker, &delayed_task_manager);
369 ASSERT_TRUE(worker_pool); 374 ASSERT_TRUE(worker_pool);
370 375
371 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, 376 WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL,
372 WaitableEvent::InitialState::NOT_SIGNALED); 377 WaitableEvent::InitialState::NOT_SIGNALED);
373 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL) 378 worker_pool->CreateTaskRunnerWithTraits(TaskTraits(), ExecutionMode::PARALLEL)
374 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran)); 379 ->PostTask(FROM_HERE, Bind(&ExpectIORestriction, GetParam(), &task_ran));
375 task_ran.Wait(); 380 task_ran.Wait();
376 381
377 worker_pool->JoinForTesting(); 382 worker_pool->JoinForTesting();
378 } 383 }
379 384
380 INSTANTIATE_TEST_CASE_P(IOAllowed, 385 INSTANTIATE_TEST_CASE_P(IOAllowed,
381 TaskSchedulerWorkerPoolImplIORestrictionTest, 386 TaskSchedulerWorkerPoolImplIORestrictionTest,
382 ::testing::Values(IORestriction::ALLOWED)); 387 ::testing::Values(IORestriction::ALLOWED));
383 INSTANTIATE_TEST_CASE_P(IODisallowed, 388 INSTANTIATE_TEST_CASE_P(IODisallowed,
384 TaskSchedulerWorkerPoolImplIORestrictionTest, 389 TaskSchedulerWorkerPoolImplIORestrictionTest,
385 ::testing::Values(IORestriction::DISALLOWED)); 390 ::testing::Values(IORestriction::DISALLOWED));
386 391
392 namespace {
393
394 constexpr size_t kMagicTlsValue = 42;
395
396 class TaskSchedulerWorkerPoolSingleThreadedTest
397 : public TaskSchedulerWorkerPoolImplTest {
398 public:
399 void SetTlsValue() {
400 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue));
401 }
402
403 void CheckTlsValue() {
404 EXPECT_EQ(kMagicTlsValue, reinterpret_cast<size_t>(slot_.Get()));
405 }
406
407 protected:
408 TaskSchedulerWorkerPoolSingleThreadedTest() = default;
409
410 private:
411 ThreadLocalStorage::Slot slot_;
412
413 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolSingleThreadedTest);
414 };
415
416 } // namespace
417
418 // Verify that thread resources for a single thread remain.
419 TEST_F(TaskSchedulerWorkerPoolSingleThreadedTest, SingleThreadTask) {
420 auto single_thread_task_runner =
421 worker_pool_->CreateTaskRunnerWithTraits(
422 TaskTraits().
423 WithShutdownBehavior(TaskShutdownBehavior::BLOCK_SHUTDOWN),
424 ExecutionMode::SINGLE_THREADED);
425 single_thread_task_runner->PostTask(
426 FROM_HERE,
427 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::SetTlsValue,
428 Unretained(this)));
429 WaitableEvent task_waiter(WaitableEvent::ResetPolicy::MANUAL,
430 WaitableEvent::InitialState::NOT_SIGNALED);
431 single_thread_task_runner->PostTask(
432 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter)));
433 task_waiter.Wait();
434 task_waiter.Reset();
435 worker_pool_->WaitForAllWorkersIdleForTesting();
436
437 // Give the worker pool a chance to reclaim its threads.
438 PlatformThread::Sleep(
439 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200));
440
441 single_thread_task_runner->PostTask(
442 FROM_HERE,
443 Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::CheckTlsValue,
444 Unretained(this)));
445 single_thread_task_runner->PostTask(
446 FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter)));
447 task_waiter.Wait();
448 }
449
fdoray 2016/07/04 19:41:33 Test that no detachment occurs when suggested recl
robliao 2016/07/07 17:49:16 If we could test this, this would be better handle
fdoray 2016/07/07 20:45:57 Acknowledged.
450 namespace {
451
452 class TaskSchedulerWorkerPoolCheckTlsReuse
453 : public TaskSchedulerWorkerPoolImplTest {
454 public:
455 void SetTlsValueAndWait() {
456 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue));
457 waiter_.Wait();
458 }
459
460 void CountZeroTlsValues() {
461 if (!slot_.Get())
462 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1);
463 }
464
465 protected:
466 TaskSchedulerWorkerPoolCheckTlsReuse() :
467 waiter_(WaitableEvent::ResetPolicy::MANUAL,
468 WaitableEvent::InitialState::NOT_SIGNALED) {}
469 subtle::Atomic32 zero_tls_values_ = 0;
470
471 WaitableEvent waiter_;
472
473 private:
474 ThreadLocalStorage::Slot slot_;
475
476 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse);
477 };
478
479 } // namespace
480
481 // Checks that at least one thread has detached by checking the TLS.
482 TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckDetachedThreads) {
483 // Saturate the threads and mark each thread with a magic TLS value.
484 std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
485 for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
486 factories.push_back(WrapUnique(new test::TestTaskFactory(
487 worker_pool_->CreateTaskRunnerWithTraits(
488 TaskTraits(), ExecutionMode::PARALLEL),
489 ExecutionMode::PARALLEL)));
490 ASSERT_TRUE(factories.back()->PostTask(
491 PostNestedTask::NO,
492 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait,
493 Unretained(this))));
494 factories.back()->WaitForAllTasksToRun();
495 }
496
497 // Release tasks waiting on |waiter_|.
498 waiter_.Signal();
fdoray 2016/07/04 19:41:33 Should the Reset() be after WaitForAllWorkersIdleF
robliao 2016/07/07 17:49:16 The tricky bit here is that we can't wait for all
fdoray 2016/07/07 20:45:57 If Signal() stays before WaitForAllWorkersIdleForT
robliao 2016/07/08 17:25:49 I initially misread this. Yup, let's move Reset af
499 waiter_.Reset();
500
501 worker_pool_->WaitForAllWorkersIdleForTesting();
502
503 // Give the worker pool a chance to detach its threads.
504 PlatformThread::Sleep(
505 kSuggestedReclaimTime + TimeDelta::FromMilliseconds(200));
506
507 // Saturate and count the threads that do not have the magic TLS value. If the
508 // value is not there, that means we're at a new thread.
509 std::vector<std::unique_ptr<WaitableEvent>> count_waiters;
510 for (auto& factory : factories) {
511 ASSERT_TRUE(factory->PostTask(
512 PostNestedTask::NO,
513 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValues,
fdoray 2016/07/04 19:41:32 Nothing prevents all CountZeroTlsValue tasks from
robliao 2016/07/07 17:49:16 Doesn't WaitForAllTasksToRun with a waiting task a
fdoray 2016/07/07 20:45:57 Line 486: You create factories with PARALLEL TaskR
robliao 2016/07/08 17:25:49 Ah yes, indeed. I've now moved both waitable event
514 Unretained(this))));
515 count_waiters.push_back(WrapUnique(new WaitableEvent(
516 WaitableEvent::ResetPolicy::MANUAL,
517 WaitableEvent::InitialState::NOT_SIGNALED)));
518 ASSERT_TRUE(factory->PostTask(
519 PostNestedTask::NO,
520 Bind(&WaitableEvent::Signal,
521 Unretained(count_waiters.back().get()))));
522 ASSERT_TRUE(factory->PostTask(PostNestedTask::NO,
523 Bind(&WaitableEvent::Wait,
524 Unretained(&waiter_))));
525 factory->WaitForAllTasksToRun();
526 }
527
528 // Wait for all counters to complete.
529 for (auto& count_waiter : count_waiters)
530 count_waiter->Wait();
531
532 EXPECT_GT(zero_tls_values_, 0);
533
534 // Release tasks waiting on |waiter_|.
535 waiter_.Signal();
536 worker_pool_->WaitForAllWorkersIdleForTesting();
537 }
538
387 } // namespace internal 539 } // namespace internal
388 } // namespace base 540 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698