Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(570)

Side by Side Diff: base/task_scheduler/task_scheduler_impl.cc

Issue 2064073003: TaskScheduler: Make the worker pools of TaskSchedulerImpl configurable (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/task_scheduler_impl.h" 5 #include "base/task_scheduler/task_scheduler_impl.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/bind_helpers.h" 10 #include "base/bind_helpers.h"
11 #include "base/memory/ptr_util.h" 11 #include "base/memory/ptr_util.h"
12 #include "base/task_scheduler/scheduler_service_thread.h" 12 #include "base/task_scheduler/scheduler_service_thread.h"
13 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
14 #include "base/task_scheduler/sequence_sort_key.h" 13 #include "base/task_scheduler/sequence_sort_key.h"
15 #include "base/task_scheduler/task.h" 14 #include "base/task_scheduler/task.h"
16 #include "base/time/time.h" 15 #include "base/time/time.h"
17 16
18 namespace base { 17 namespace base {
19 namespace internal { 18 namespace internal {
20 19
21 // static 20 // static
22 std::unique_ptr<TaskSchedulerImpl> TaskSchedulerImpl::Create() { 21 std::unique_ptr<TaskSchedulerImpl> TaskSchedulerImpl::Create(
23 std::unique_ptr<TaskSchedulerImpl> scheduler(new TaskSchedulerImpl); 22 const std::vector<WorkerPoolCreationArgs>& worker_pools,
24 scheduler->Initialize(); 23 const WorkerPoolIndexForTraitsCallback&
24 worker_pool_index_for_traits_callback) {
25 std::unique_ptr<TaskSchedulerImpl> scheduler(
26 new TaskSchedulerImpl(worker_pool_index_for_traits_callback));
27 scheduler->Initialize(worker_pools);
25 return scheduler; 28 return scheduler;
26 } 29 }
27 30
28 TaskSchedulerImpl::~TaskSchedulerImpl() { 31 TaskSchedulerImpl::~TaskSchedulerImpl() {
29 #if DCHECK_IS_ON() 32 #if DCHECK_IS_ON()
30 DCHECK(join_for_testing_returned_.IsSignaled()); 33 DCHECK(join_for_testing_returned_.IsSignaled());
31 #endif 34 #endif
32 } 35 }
33 36
34 void TaskSchedulerImpl::PostTaskWithTraits( 37 void TaskSchedulerImpl::PostTaskWithTraits(
(...skipping 15 matching lines...) Expand all
50 53
51 void TaskSchedulerImpl::Shutdown() { 54 void TaskSchedulerImpl::Shutdown() {
52 // TODO(fdoray): Increase the priority of BACKGROUND tasks blocking shutdown. 55 // TODO(fdoray): Increase the priority of BACKGROUND tasks blocking shutdown.
53 task_tracker_.Shutdown(); 56 task_tracker_.Shutdown();
54 } 57 }
55 58
56 void TaskSchedulerImpl::JoinForTesting() { 59 void TaskSchedulerImpl::JoinForTesting() {
57 #if DCHECK_IS_ON() 60 #if DCHECK_IS_ON()
58 DCHECK(!join_for_testing_returned_.IsSignaled()); 61 DCHECK(!join_for_testing_returned_.IsSignaled());
59 #endif 62 #endif
60 background_worker_pool_->JoinForTesting(); 63 for (const auto& worker_pool : worker_pools_)
61 background_file_io_worker_pool_->JoinForTesting(); 64 worker_pool->JoinForTesting();
62 normal_worker_pool_->JoinForTesting();
63 normal_file_io_worker_pool_->JoinForTesting();
64 service_thread_->JoinForTesting(); 65 service_thread_->JoinForTesting();
65 #if DCHECK_IS_ON() 66 #if DCHECK_IS_ON()
66 join_for_testing_returned_.Signal(); 67 join_for_testing_returned_.Signal();
67 #endif 68 #endif
68 } 69 }
69 70
70 TaskSchedulerImpl::TaskSchedulerImpl() 71 TaskSchedulerImpl::TaskSchedulerImpl(const WorkerPoolIndexForTraitsCallback&
72 worker_pool_index_for_traits_callback)
71 : delayed_task_manager_( 73 : delayed_task_manager_(
72 Bind(&TaskSchedulerImpl::OnDelayedRunTimeUpdated, Unretained(this))) 74 Bind(&TaskSchedulerImpl::OnDelayedRunTimeUpdated, Unretained(this))),
75 worker_pool_index_for_traits_callback_(
76 worker_pool_index_for_traits_callback)
robliao 2016/06/23 22:37:03 DCHECK(!worker_pool_index_for_traits_callback.is_n
fdoray 2016/06/27 19:45:04 Done.
73 #if DCHECK_IS_ON() 77 #if DCHECK_IS_ON()
74 , 78 ,
75 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, 79 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
76 WaitableEvent::InitialState::NOT_SIGNALED) 80 WaitableEvent::InitialState::NOT_SIGNALED)
77 #endif 81 #endif
78 { 82 {
79 } 83 }
80 84
81 void TaskSchedulerImpl::Initialize() { 85 void TaskSchedulerImpl::Initialize(
86 const std::vector<WorkerPoolCreationArgs>& worker_pools) {
87 DCHECK(!worker_pools.empty());
88
82 using IORestriction = SchedulerWorkerPoolImpl::IORestriction; 89 using IORestriction = SchedulerWorkerPoolImpl::IORestriction;
83 90
84 const SchedulerWorkerPoolImpl::ReEnqueueSequenceCallback 91 const SchedulerWorkerPoolImpl::ReEnqueueSequenceCallback
85 re_enqueue_sequence_callback = 92 re_enqueue_sequence_callback =
86 Bind(&TaskSchedulerImpl::ReEnqueueSequenceCallback, Unretained(this)); 93 Bind(&TaskSchedulerImpl::ReEnqueueSequenceCallback, Unretained(this));
87 94
88 // TODO(fdoray): Derive the number of threads per pool from hardware 95 for (const auto& worker_pool : worker_pools) {
89 // characteristics rather than using hard-coded constants. 96 // Passing pointers to objects owned by |this| to
90 97 // SchedulerWorkerPoolImpl::Create() is safe because a TaskSchedulerImpl
91 // Passing pointers to objects owned by |this| to 98 // can't be deleted before all its worker pools have been joined.
92 // SchedulerWorkerPoolImpl::Create() is safe because a TaskSchedulerImpl can't 99 worker_pools_.push_back(SchedulerWorkerPoolImpl::Create(
93 // be deleted before all its worker pools have been joined. 100 worker_pool.name, worker_pool.thread_priority, worker_pool.max_threads,
94 background_worker_pool_ = SchedulerWorkerPoolImpl::Create( 101 worker_pool.io_restriction, re_enqueue_sequence_callback,
95 "TaskSchedulerBackground", ThreadPriority::BACKGROUND, 1U, 102 &task_tracker_, &delayed_task_manager_));
96 IORestriction::DISALLOWED, re_enqueue_sequence_callback, &task_tracker_, 103 CHECK(worker_pools_.back());
97 &delayed_task_manager_); 104 }
98 CHECK(background_worker_pool_);
99
100 background_file_io_worker_pool_ = SchedulerWorkerPoolImpl::Create(
101 "TaskSchedulerBackgroundFileIO", ThreadPriority::BACKGROUND, 1U,
102 IORestriction::ALLOWED, re_enqueue_sequence_callback, &task_tracker_,
103 &delayed_task_manager_);
104 CHECK(background_file_io_worker_pool_);
105
106 normal_worker_pool_ = SchedulerWorkerPoolImpl::Create(
107 "TaskSchedulerForeground", ThreadPriority::NORMAL, 4U,
108 IORestriction::DISALLOWED, re_enqueue_sequence_callback, &task_tracker_,
109 &delayed_task_manager_);
110 CHECK(normal_worker_pool_);
111
112 normal_file_io_worker_pool_ = SchedulerWorkerPoolImpl::Create(
113 "TaskSchedulerForegroundFileIO", ThreadPriority::NORMAL, 12U,
114 IORestriction::ALLOWED, re_enqueue_sequence_callback, &task_tracker_,
115 &delayed_task_manager_);
116 CHECK(normal_file_io_worker_pool_);
117 105
118 service_thread_ = SchedulerServiceThread::Create(&task_tracker_, 106 service_thread_ = SchedulerServiceThread::Create(&task_tracker_,
119 &delayed_task_manager_); 107 &delayed_task_manager_);
120 CHECK(service_thread_); 108 CHECK(service_thread_);
121 } 109 }
122 110
123 SchedulerWorkerPool* TaskSchedulerImpl::GetWorkerPoolForTraits( 111 SchedulerWorkerPool* TaskSchedulerImpl::GetWorkerPoolForTraits(
124 const TaskTraits& traits) { 112 const TaskTraits& traits) {
125 if (traits.with_file_io()) { 113 const size_t index = worker_pool_index_for_traits_callback_.Run(traits);
126 if (traits.priority() == TaskPriority::BACKGROUND) 114 DCHECK_LT(index, worker_pools_.size());
127 return background_file_io_worker_pool_.get(); 115 return worker_pools_[index].get();
128 return normal_file_io_worker_pool_.get();
129 }
130
131 if (traits.priority() == TaskPriority::BACKGROUND)
132 return background_worker_pool_.get();
133 return normal_worker_pool_.get();
134 } 116 }
135 117
136 void TaskSchedulerImpl::ReEnqueueSequenceCallback( 118 void TaskSchedulerImpl::ReEnqueueSequenceCallback(
137 scoped_refptr<Sequence> sequence) { 119 scoped_refptr<Sequence> sequence) {
138 DCHECK(sequence); 120 DCHECK(sequence);
139 121
140 const SequenceSortKey sort_key = sequence->GetSortKey(); 122 const SequenceSortKey sort_key = sequence->GetSortKey();
141 TaskTraits traits(sequence->PeekTask()->traits); 123 TaskTraits traits(sequence->PeekTask()->traits);
142 124
143 // Update the priority of |traits| so that the next task in |sequence| runs 125 // Update the priority of |traits| so that the next task in |sequence| runs
144 // with the highest priority in |sequence| as opposed to the next task's 126 // with the highest priority in |sequence| as opposed to the next task's
145 // specific priority. 127 // specific priority.
146 traits.WithPriority(sort_key.priority()); 128 traits.WithPriority(sort_key.priority());
147 129
148 GetWorkerPoolForTraits(traits)->ReEnqueueSequence(std::move(sequence), 130 GetWorkerPoolForTraits(traits)->ReEnqueueSequence(std::move(sequence),
149 sort_key); 131 sort_key);
150 } 132 }
151 133
152 void TaskSchedulerImpl::OnDelayedRunTimeUpdated() { 134 void TaskSchedulerImpl::OnDelayedRunTimeUpdated() {
153 service_thread_->WakeUp(); 135 service_thread_->WakeUp();
154 } 136 }
155 137
156 } // namespace internal 138 } // namespace internal
157 } // namespace base 139 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698