OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/task_scheduler_impl.h" | 5 #include "base/task_scheduler/task_scheduler_impl.h" |
6 | 6 |
7 #include <utility> | 7 #include <utility> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/bind_helpers.h" | 10 #include "base/bind_helpers.h" |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
47 size_t GetEnvironmentIndexForTraits(const TaskTraits& traits) { | 47 size_t GetEnvironmentIndexForTraits(const TaskTraits& traits) { |
48 const bool is_background = | 48 const bool is_background = |
49 traits.priority() == base::TaskPriority::BACKGROUND; | 49 traits.priority() == base::TaskPriority::BACKGROUND; |
50 if (traits.may_block() || traits.with_base_sync_primitives()) | 50 if (traits.may_block() || traits.with_base_sync_primitives()) |
51 return is_background ? BACKGROUND_BLOCKING : FOREGROUND_BLOCKING; | 51 return is_background ? BACKGROUND_BLOCKING : FOREGROUND_BLOCKING; |
52 return is_background ? BACKGROUND : FOREGROUND; | 52 return is_background ? BACKGROUND : FOREGROUND; |
53 } | 53 } |
54 | 54 |
55 } // namespace | 55 } // namespace |
56 | 56 |
57 // static | 57 TaskSchedulerImpl::TaskSchedulerImpl(StringPiece name) |
58 std::unique_ptr<TaskSchedulerImpl> TaskSchedulerImpl::Create( | 58 : name_(name), |
59 StringPiece name, | 59 service_thread_("TaskSchedulerServiceThread"), |
60 const TaskScheduler::InitParams& init_params) { | 60 single_thread_task_runner_manager_(&task_tracker_, |
61 auto task_scheduler = WrapUnique(new TaskSchedulerImpl(name)); | 61 &delayed_task_manager_) { |
62 task_scheduler->Initialize(init_params); | 62 static_assert(arraysize(worker_pools_) == ENVIRONMENT_COUNT, |
63 return task_scheduler; | 63 "The size of |worker_pools_| must match ENVIRONMENT_COUNT."); |
| 64 static_assert( |
| 65 arraysize(kEnvironmentParams) == ENVIRONMENT_COUNT, |
| 66 "The size of |kEnvironmentParams| must match ENVIRONMENT_COUNT."); |
| 67 |
| 68 // Callback invoked by workers to re-enqueue a sequence in the appropriate |
| 69 // PriorityQueue. |
| 70 const auto reenqueue_sequence_callback = BindRepeating( |
| 71 &TaskSchedulerImpl::ReEnqueueSequenceCallback, Unretained(this)); |
| 72 |
| 73 for (int environment_type = 0; environment_type < ENVIRONMENT_COUNT; |
| 74 ++environment_type) { |
| 75 worker_pools_[environment_type] = MakeUnique<SchedulerWorkerPoolImpl>( |
| 76 name_ + kEnvironmentParams[environment_type].name_suffix, |
| 77 kEnvironmentParams[environment_type].priority_hint, |
| 78 reenqueue_sequence_callback, &task_tracker_, &delayed_task_manager_); |
| 79 } |
64 } | 80 } |
65 | 81 |
66 TaskSchedulerImpl::~TaskSchedulerImpl() { | 82 TaskSchedulerImpl::~TaskSchedulerImpl() { |
67 #if DCHECK_IS_ON() | 83 #if DCHECK_IS_ON() |
68 DCHECK(join_for_testing_returned_.IsSet()); | 84 DCHECK(join_for_testing_returned_.IsSet()); |
69 #endif | 85 #endif |
70 } | 86 } |
71 | 87 |
| 88 void TaskSchedulerImpl::Start(const TaskScheduler::InitParams& init_params) { |
| 89 // Start the service thread. On platforms that support it (POSIX except NaCL |
| 90 // SFI), the service thread runs a MessageLoopForIO which is used to support |
| 91 // FileDescriptorWatcher in the scope in which tasks run. |
| 92 Thread::Options service_thread_options; |
| 93 service_thread_options.message_loop_type = |
| 94 #if defined(OS_POSIX) && !defined(OS_NACL_SFI) |
| 95 MessageLoop::TYPE_IO; |
| 96 #else |
| 97 MessageLoop::TYPE_DEFAULT; |
| 98 #endif |
| 99 service_thread_options.timer_slack = TIMER_SLACK_MAXIMUM; |
| 100 CHECK(service_thread_.StartWithOptions(service_thread_options)); |
| 101 |
| 102 #if defined(OS_POSIX) && !defined(OS_NACL_SFI) |
| 103 // Needs to happen after starting the service thread to get its |
| 104 // message_loop(). |
| 105 task_tracker_.set_watch_file_descriptor_message_loop( |
| 106 static_cast<MessageLoopForIO*>(service_thread_.message_loop())); |
| 107 #endif |
| 108 |
| 109 // Needs to happen after starting the service thread to get its task_runner(). |
| 110 delayed_task_manager_.Start(service_thread_.task_runner()); |
| 111 |
| 112 single_thread_task_runner_manager_.Start(); |
| 113 |
| 114 worker_pools_[BACKGROUND]->Start(init_params.background_worker_pool_params); |
| 115 worker_pools_[BACKGROUND_BLOCKING]->Start( |
| 116 init_params.background_blocking_worker_pool_params); |
| 117 worker_pools_[FOREGROUND]->Start(init_params.foreground_worker_pool_params); |
| 118 worker_pools_[FOREGROUND_BLOCKING]->Start( |
| 119 init_params.foreground_blocking_worker_pool_params); |
| 120 } |
| 121 |
72 void TaskSchedulerImpl::PostDelayedTaskWithTraits( | 122 void TaskSchedulerImpl::PostDelayedTaskWithTraits( |
73 const tracked_objects::Location& from_here, | 123 const tracked_objects::Location& from_here, |
74 const TaskTraits& traits, | 124 const TaskTraits& traits, |
75 OnceClosure task, | 125 OnceClosure task, |
76 TimeDelta delay) { | 126 TimeDelta delay) { |
77 // Post |task| as part of a one-off single-task Sequence. | 127 // Post |task| as part of a one-off single-task Sequence. |
78 GetWorkerPoolForTraits(traits)->PostTaskWithSequence( | 128 GetWorkerPoolForTraits(traits)->PostTaskWithSequence( |
79 MakeUnique<Task>(from_here, std::move(task), traits, delay), | 129 MakeUnique<Task>(from_here, std::move(task), traits, delay), |
80 make_scoped_refptr(new Sequence)); | 130 make_scoped_refptr(new Sequence)); |
81 } | 131 } |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
143 for (const auto& worker_pool : worker_pools_) | 193 for (const auto& worker_pool : worker_pools_) |
144 worker_pool->DisallowWorkerDetachmentForTesting(); | 194 worker_pool->DisallowWorkerDetachmentForTesting(); |
145 for (const auto& worker_pool : worker_pools_) | 195 for (const auto& worker_pool : worker_pools_) |
146 worker_pool->JoinForTesting(); | 196 worker_pool->JoinForTesting(); |
147 service_thread_.Stop(); | 197 service_thread_.Stop(); |
148 #if DCHECK_IS_ON() | 198 #if DCHECK_IS_ON() |
149 join_for_testing_returned_.Set(); | 199 join_for_testing_returned_.Set(); |
150 #endif | 200 #endif |
151 } | 201 } |
152 | 202 |
153 TaskSchedulerImpl::TaskSchedulerImpl(StringPiece name) | |
154 : name_(name), | |
155 service_thread_("TaskSchedulerServiceThread"), | |
156 single_thread_task_runner_manager_(&task_tracker_, | |
157 &delayed_task_manager_) {} | |
158 | |
159 void TaskSchedulerImpl::Initialize( | |
160 const TaskScheduler::InitParams& init_params) { | |
161 // Start the service thread. On platforms that support it (POSIX except NaCL | |
162 // SFI), the service thread runs a MessageLoopForIO which is used to support | |
163 // FileDescriptorWatcher in the scope in which tasks run. | |
164 Thread::Options service_thread_options; | |
165 service_thread_options.message_loop_type = | |
166 #if defined(OS_POSIX) && !defined(OS_NACL_SFI) | |
167 MessageLoop::TYPE_IO; | |
168 #else | |
169 MessageLoop::TYPE_DEFAULT; | |
170 #endif | |
171 service_thread_options.timer_slack = TIMER_SLACK_MAXIMUM; | |
172 CHECK(service_thread_.StartWithOptions(service_thread_options)); | |
173 | |
174 #if defined(OS_POSIX) && !defined(OS_NACL_SFI) | |
175 // Needs to happen after starting the service thread to get its | |
176 // message_loop(). | |
177 task_tracker_.set_watch_file_descriptor_message_loop( | |
178 static_cast<MessageLoopForIO*>(service_thread_.message_loop())); | |
179 #endif | |
180 | |
181 // Needs to happen after starting the service thread to get its task_runner(). | |
182 delayed_task_manager_.Start(service_thread_.task_runner()); | |
183 | |
184 single_thread_task_runner_manager_.Start(); | |
185 | |
186 // Callback invoked by workers to re-enqueue a sequence in the appropriate | |
187 // PriorityQueue. | |
188 const SchedulerWorkerPoolImpl::ReEnqueueSequenceCallback | |
189 re_enqueue_sequence_callback = | |
190 Bind(&TaskSchedulerImpl::ReEnqueueSequenceCallback, Unretained(this)); | |
191 | |
192 // Order must match the EnvironmentType enum. | |
193 const SchedulerWorkerPoolParams* worker_pool_params[] = { | |
194 &init_params.background_worker_pool_params, | |
195 &init_params.background_blocking_worker_pool_params, | |
196 &init_params.foreground_worker_pool_params, | |
197 &init_params.foreground_blocking_worker_pool_params}; | |
198 | |
199 static_assert(arraysize(worker_pools_) == ENVIRONMENT_COUNT, | |
200 "The size of |worker_pools_| must match ENVIRONMENT_COUNT."); | |
201 static_assert( | |
202 arraysize(kEnvironmentParams) == ENVIRONMENT_COUNT, | |
203 "The size of |kEnvironmentParams| must match ENVIRONMENT_COUNT."); | |
204 static_assert( | |
205 arraysize(worker_pool_params) == ENVIRONMENT_COUNT, | |
206 "The size of |worker_pool_params| must match ENVIRONMENT_COUNT."); | |
207 | |
208 // Start worker pools. | |
209 for (int environment_type = 0; environment_type < ENVIRONMENT_COUNT; | |
210 ++environment_type) { | |
211 // Passing pointers to objects owned by |this| to the constructor of | |
212 // SchedulerWorkerPoolImpl is safe because a TaskSchedulerImpl can't be | |
213 // deleted before all its worker pools have been joined. | |
214 worker_pools_[environment_type] = MakeUnique<SchedulerWorkerPoolImpl>( | |
215 name_ + kEnvironmentParams[environment_type].name_suffix, | |
216 kEnvironmentParams[environment_type].priority_hint, | |
217 re_enqueue_sequence_callback, &task_tracker_, &delayed_task_manager_); | |
218 worker_pools_[environment_type]->Start( | |
219 *worker_pool_params[environment_type]); | |
220 } | |
221 } | |
222 | |
223 SchedulerWorkerPoolImpl* TaskSchedulerImpl::GetWorkerPoolForTraits( | 203 SchedulerWorkerPoolImpl* TaskSchedulerImpl::GetWorkerPoolForTraits( |
224 const TaskTraits& traits) const { | 204 const TaskTraits& traits) const { |
225 return worker_pools_[GetEnvironmentIndexForTraits(traits)].get(); | 205 return worker_pools_[GetEnvironmentIndexForTraits(traits)].get(); |
226 } | 206 } |
227 | 207 |
228 void TaskSchedulerImpl::ReEnqueueSequenceCallback( | 208 void TaskSchedulerImpl::ReEnqueueSequenceCallback( |
229 scoped_refptr<Sequence> sequence) { | 209 scoped_refptr<Sequence> sequence) { |
230 DCHECK(sequence); | 210 DCHECK(sequence); |
231 | 211 |
232 const SequenceSortKey sort_key = sequence->GetSortKey(); | 212 const SequenceSortKey sort_key = sequence->GetSortKey(); |
233 | 213 |
234 // The next task in |sequence| should run in a worker pool suited for its | 214 // The next task in |sequence| should run in a worker pool suited for its |
235 // traits, except for the priority which is adjusted to the highest priority | 215 // traits, except for the priority which is adjusted to the highest priority |
236 // in |sequence|. | 216 // in |sequence|. |
237 const TaskTraits traits = | 217 const TaskTraits traits = |
238 sequence->PeekTaskTraits().WithPriority(sort_key.priority()); | 218 sequence->PeekTaskTraits().WithPriority(sort_key.priority()); |
239 | 219 |
240 GetWorkerPoolForTraits(traits)->ReEnqueueSequence(std::move(sequence), | 220 GetWorkerPoolForTraits(traits)->ReEnqueueSequence(std::move(sequence), |
241 sort_key); | 221 sort_key); |
242 } | 222 } |
243 | 223 |
244 } // namespace internal | 224 } // namespace internal |
245 } // namespace base | 225 } // namespace base |
OLD | NEW |