Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(480)

Side by Side Diff: base/task_scheduler/scheduler_thread_pool.cc

Issue 1895513002: TaskScheduler [12] Support SINGLE_THREADED in SchedulerThreadPool DO NOT SUBMIT (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@10_superstack
Patch Set: rebase Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_thread_pool.h" 5 #include "base/task_scheduler/scheduler_thread_pool.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/bind_helpers.h" 10 #include "base/bind_helpers.h"
(...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after
167 ExecutionMode execution_mode) { 167 ExecutionMode execution_mode) {
168 switch (execution_mode) { 168 switch (execution_mode) {
169 case ExecutionMode::PARALLEL: 169 case ExecutionMode::PARALLEL:
170 return make_scoped_refptr(new SchedulerParallelTaskRunner( 170 return make_scoped_refptr(new SchedulerParallelTaskRunner(
171 traits, this, task_tracker_, delayed_task_manager_)); 171 traits, this, task_tracker_, delayed_task_manager_));
172 172
173 case ExecutionMode::SEQUENCED: 173 case ExecutionMode::SEQUENCED:
174 return make_scoped_refptr(new SchedulerSequencedTaskRunner( 174 return make_scoped_refptr(new SchedulerSequencedTaskRunner(
175 traits, this, task_tracker_, delayed_task_manager_)); 175 traits, this, task_tracker_, delayed_task_manager_));
176 176
177 case ExecutionMode::SINGLE_THREADED: 177 case ExecutionMode::SINGLE_THREADED: {
178 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. 178 // Acquire a lock to ensure that the number of single-thread TaskRunners
179 NOTREACHED(); 179 // for a given SchedulerWorkerThread doesn't increase between the calls to
180 return nullptr; 180 // GetNumSingleThreadTaskRunners() and CreateTaskRunnerWithTraits() below.
181 // This number can still decrease.
182 AutoSchedulerLock auto_lock(single_thread_task_runner_creation_lock_);
183
184 SchedulerWorkerThread* worker_thread = nullptr;
185 size_t min_num_single_thread_task_runners = -1;
186 for (const auto& current_worker_thread : worker_threads_) {
187 const size_t current_num_single_thread_task_runners =
188 current_worker_thread->GetNumSingleThreadTaskRunners();
189 if (current_num_single_thread_task_runners <
190 min_num_single_thread_task_runners) {
191 worker_thread = current_worker_thread.get();
192 min_num_single_thread_task_runners =
193 current_num_single_thread_task_runners;
194 if (min_num_single_thread_task_runners == 0)
195 break;
196 }
197 }
198
199 // |worker_thread| is the SchedulerWorkerThread with the fewer single-
200 // thread TaskRunners.
201 DCHECK(worker_thread);
202 return worker_thread->CreateTaskRunnerWithTraits(traits);
203 }
181 } 204 }
182 205
183 NOTREACHED(); 206 NOTREACHED();
184 return nullptr; 207 return nullptr;
185 } 208 }
186 209
187 void SchedulerThreadPool::EnqueueSequence( 210 void SchedulerThreadPool::EnqueueSequence(
188 scoped_refptr<Sequence> sequence, 211 scoped_refptr<Sequence> sequence,
189 const SequenceSortKey& sequence_sort_key) { 212 const SequenceSortKey& sequence_sort_key) {
190 shared_priority_queue_.BeginTransaction()->Push( 213 shared_priority_queue_.BeginTransaction()->Push(
191 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), 214 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
192 sequence_sort_key))); 215 sequence_sort_key)));
193 216
194 // The thread calling this method just ran a Task from |sequence| and will 217 // The thread calling this method just ran a Task from |sequence| and will
195 // soon try to get another Sequence from which to run a Task. If the thread 218 // soon try to get another Sequence from which to run a Task. If the thread
196 // belongs to this pool, it will get that Sequence from 219 // belongs to this pool, it will get that Sequence from
197 // |shared_priority_queue_|. When that's the case, there is no need to wake up 220 // |shared_priority_queue_|. When that's the case, there is no need to wake up
198 // another thread after |sequence| is inserted in |shared_priority_queue_|. If 221 // another thread after |sequence| is inserted in |shared_priority_queue_|. If
199 // we did wake up another thread, we would waste resources by having more 222 // we did wake up another thread, we would waste resources by having more
200 // threads trying to get a Sequence from |shared_priority_queue_| than the 223 // threads trying to get a Sequence from |shared_priority_queue_| than the
201 // number of Sequences in it. 224 // number of Sequences in it.
202 if (tls_current_thread_pool.Get().Get() != this) 225 if (tls_current_thread_pool.Get().Get() != this)
203 WakeUpOneThread(); 226 WakeUpOneThread();
204 } 227 }
205 228
206 void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() { 229 void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() {
207 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 230 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
208 while (idle_worker_threads_stack_.size() < worker_threads_.size()) 231 while (idle_worker_threads_stack_.Size() < worker_threads_.size())
209 idle_worker_threads_stack_cv_for_testing_->Wait(); 232 idle_worker_threads_stack_cv_for_testing_->Wait();
210 } 233 }
211 234
212 void SchedulerThreadPool::JoinForTesting() { 235 void SchedulerThreadPool::JoinForTesting() {
213 for (const auto& worker_thread : worker_threads_) 236 for (const auto& worker_thread : worker_threads_)
214 worker_thread->JoinForTesting(); 237 worker_thread->JoinForTesting();
215 238
216 DCHECK(!join_for_testing_returned_.IsSignaled()); 239 DCHECK(!join_for_testing_returned_.IsSignaled());
217 join_for_testing_returned_.Signal(); 240 join_for_testing_returned_.Signal();
218 } 241 }
(...skipping 25 matching lines...) Expand all
244 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry() { 267 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry() {
245 DCHECK(!tls_current_thread_pool.Get().Get()); 268 DCHECK(!tls_current_thread_pool.Get().Get());
246 tls_current_thread_pool.Get().Set(outer_); 269 tls_current_thread_pool.Get().Set(outer_);
247 } 270 }
248 271
249 scoped_refptr<Sequence> 272 scoped_refptr<Sequence>
250 SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork( 273 SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
251 SchedulerWorkerThread* worker_thread, 274 SchedulerWorkerThread* worker_thread,
252 PriorityQueue* alternate_priority_queue, 275 PriorityQueue* alternate_priority_queue,
253 bool* alternate_priority_queue_used) { 276 bool* alternate_priority_queue_used) {
254 // TODO(fdoray): Return a Sequence from |alternate_priority_queue| when 277 DCHECK(worker_thread);
255 // appropriate. 278 DCHECK(alternate_priority_queue);
279 DCHECK(alternate_priority_queue_used);
256 280
257 std::unique_ptr<PriorityQueue::Transaction> transaction( 281 *is_single_threaded_sequence = false;
282
283 std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
258 outer_->shared_priority_queue_.BeginTransaction()); 284 outer_->shared_priority_queue_.BeginTransaction());
259 const auto sequence_and_sort_key = transaction->Peek(); 285 const auto shared_sequence_and_sort_key = shared_transaction->Peek();
260 286
261 if (sequence_and_sort_key.is_null()) { 287 std::unique_ptr<PriorityQueue::Transaction> alternate_transaction(
262 // |transaction| is kept alive while |worker_thread| is added to 288 alternate_priority_queue->BeginTransaction());
289 const auto alternate_sequence_and_sort_key =
290 alternate_transaction->Peek();
291
292 if (shared_sequence_and_sort_key.is_null() &&
293 alternate_sequence_and_sort_key.is_null()) {
294 alternate_transaction.reset();
295
296 // |shared_transaction| is kept alive while |worker_thread| is added to
263 // |idle_worker_threads_stack_| to avoid this race: 297 // |idle_worker_threads_stack_| to avoid this race:
264 // 1. This thread creates a Transaction, finds |shared_priority_queue_| 298 // 1. This thread creates a Transaction, finds |shared_priority_queue_|
265 // empty and ends the Transaction. 299 // empty and ends the Transaction.
266 // 2. Other thread creates a Transaction, inserts a Sequence into 300 // 2. Other thread creates a Transaction, inserts a Sequence into
267 // |shared_priority_queue_| and ends the Transaction. This can't happen 301 // |shared_priority_queue_| and ends the Transaction. This can't happen
268 // if the Transaction of step 1 is still active because because there can 302 // if the Transaction of step 1 is still active because because there can
269 // only be one active Transaction per PriorityQueue at a time. 303 // only be one active Transaction per PriorityQueue at a time.
270 // 3. Other thread calls WakeUpOneThread(). No thread is woken up because 304 // 3. Other thread calls WakeUpOneThread(). No thread is woken up because
271 // |idle_worker_threads_stack_| is empty. 305 // |idle_worker_threads_stack_| is empty.
272 // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to 306 // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to
273 // sleep. No thread runs the Sequence inserted in step 2. 307 // sleep. No thread runs the Sequence inserted in step 2.
274 outer_->AddToIdleWorkerThreadsStack(worker_thread); 308 outer_->AddToIdleWorkerThreadsStack(worker_thread);
275 return nullptr; 309 return nullptr;
276 } 310 }
277 311
278 transaction->Pop(); 312 scoped_refptr<Sequence> sequence;
279 return sequence_and_sort_key.sequence; 313
314 if (alternate_sequence_and_sort_key.is_null() ||
315 (!shared_sequence_and_sort_key.is_null() &&
316 alternate_sequence_and_sort_key.sort_key <
317 shared_sequence_and_sort_key.sort_key)) {
318 shared_transaction->Pop();
319 sequence = std::move(shared_sequence_and_sort_key.sequence);
320 } else {
321 DCHECK(!alternate_sequence_and_sort_key.is_null());
322 alternate_transaction->Pop();
323 sequence = std::move(alternate_sequence_and_sort_key.sequence);
324 *alternate_priority_queue_used = true;
325 }
326
327 shared_transaction.reset();
328 alternate_transaction.reset();
329 outer_->RemoveFromIdleWorkerThreadsStack(worker_thread);
330
331 return sequence;
280 } 332 }
281 333
282 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( 334 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence(
283 scoped_refptr<Sequence> sequence) { 335 scoped_refptr<Sequence> sequence) {
284 enqueue_sequence_callback_.Run(std::move(sequence)); 336 enqueue_sequence_callback_.Run(std::move(sequence));
285 } 337 }
286 338
287 SchedulerThreadPool::SchedulerThreadPool( 339 SchedulerThreadPool::SchedulerThreadPool(
288 const EnqueueSequenceCallback& enqueue_sequence_callback, 340 const EnqueueSequenceCallback& enqueue_sequence_callback,
289 TaskTracker* task_tracker, 341 TaskTracker* task_tracker,
(...skipping 17 matching lines...) Expand all
307 359
308 DCHECK(worker_threads_.empty()); 360 DCHECK(worker_threads_.empty());
309 361
310 for (size_t i = 0; i < max_threads; ++i) { 362 for (size_t i = 0; i < max_threads; ++i) {
311 std::unique_ptr<SchedulerWorkerThread> worker_thread = 363 std::unique_ptr<SchedulerWorkerThread> worker_thread =
312 SchedulerWorkerThread::CreateSchedulerWorkerThread( 364 SchedulerWorkerThread::CreateSchedulerWorkerThread(
313 thread_priority, worker_thread_delegate_.get(), task_tracker_, 365 thread_priority, worker_thread_delegate_.get(), task_tracker_,
314 delayed_task_manager_, &shared_priority_queue_); 366 delayed_task_manager_, &shared_priority_queue_);
315 if (!worker_thread) 367 if (!worker_thread)
316 break; 368 break;
317 idle_worker_threads_stack_.push(worker_thread.get()); 369 idle_worker_threads_stack_.Push(worker_thread.get());
318 worker_threads_.push_back(std::move(worker_thread)); 370 worker_threads_.push_back(std::move(worker_thread));
319 } 371 }
320 372
321 return !worker_threads_.empty(); 373 return !worker_threads_.empty();
322 } 374 }
323 375
324 void SchedulerThreadPool::WakeUpOneThread() { 376 void SchedulerThreadPool::WakeUpOneThread() {
325 SchedulerWorkerThread* worker_thread = PopOneIdleWorkerThread(); 377 SchedulerWorkerThread* worker_thread = nullptr;
378 {
379 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
380 if (!idle_worker_threads_stack_.Empty())
381 worker_thread = idle_worker_threads_stack_.Pop();
382 }
383
326 if (worker_thread) 384 if (worker_thread)
327 worker_thread->WakeUp(); 385 worker_thread->WakeUp();
328 } 386 }
329 387
330 void SchedulerThreadPool::AddToIdleWorkerThreadsStack( 388 void SchedulerThreadPool::AddToIdleWorkerThreadsStack(
331 SchedulerWorkerThread* worker_thread) { 389 SchedulerWorkerThread* worker_thread) {
332 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 390 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
333 idle_worker_threads_stack_.push(worker_thread); 391 idle_worker_threads_stack_.Push(worker_thread);
334 DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size()); 392 DCHECK_LE(idle_worker_threads_stack_.Size(), worker_threads_.size());
335 393
336 if (idle_worker_threads_stack_.size() == worker_threads_.size()) 394 if (idle_worker_threads_stack_.Size() == worker_threads_.size())
337 idle_worker_threads_stack_cv_for_testing_->Broadcast(); 395 idle_worker_threads_stack_cv_for_testing_->Broadcast();
338 } 396 }
339 397
340 SchedulerWorkerThread* SchedulerThreadPool::PopOneIdleWorkerThread() { 398 void SchedulerThreadPool::RemoveFromIdleWorkerThreadsStack(
399 SchedulerWorkerThread* worker_thread) {
341 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 400 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
342 401 idle_worker_threads_stack_.Remove(worker_thread);
343 if (idle_worker_threads_stack_.empty())
344 return nullptr;
345
346 auto worker_thread = idle_worker_threads_stack_.top();
347 idle_worker_threads_stack_.pop();
348 return worker_thread;
349 } 402 }
350 403
351 } // namespace internal 404 } // namespace internal
352 } // namespace base 405 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/scheduler_thread_pool.h ('k') | base/task_scheduler/scheduler_thread_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698