Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Side by Side Diff: base/task_scheduler/task_scheduler_impl.cc

Issue 2064073003: TaskScheduler: Make the worker pools of TaskSchedulerImpl configurable (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: remove unused "using" Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/task_scheduler_impl.h" 5 #include "base/task_scheduler/task_scheduler_impl.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/bind_helpers.h" 10 #include "base/bind_helpers.h"
11 #include "base/memory/ptr_util.h" 11 #include "base/memory/ptr_util.h"
12 #include "base/task_scheduler/scheduler_service_thread.h" 12 #include "base/task_scheduler/scheduler_service_thread.h"
13 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
14 #include "base/task_scheduler/sequence_sort_key.h" 13 #include "base/task_scheduler/sequence_sort_key.h"
15 #include "base/task_scheduler/task.h" 14 #include "base/task_scheduler/task.h"
16 #include "base/time/time.h" 15 #include "base/time/time.h"
17 16
18 namespace base { 17 namespace base {
19 namespace internal { 18 namespace internal {
20 19
21 // static 20 // static
22 std::unique_ptr<TaskSchedulerImpl> TaskSchedulerImpl::Create() { 21 std::unique_ptr<TaskSchedulerImpl> TaskSchedulerImpl::Create(
23 std::unique_ptr<TaskSchedulerImpl> scheduler(new TaskSchedulerImpl); 22 const std::vector<WorkerPoolCreationArgs>& worker_pools,
24 scheduler->Initialize(); 23 const WorkerPoolIndexForTraitsCallback&
24 worker_pool_index_for_traits_callback) {
25 std::unique_ptr<TaskSchedulerImpl> scheduler(
26 new TaskSchedulerImpl(worker_pool_index_for_traits_callback));
27 scheduler->Initialize(worker_pools);
25 return scheduler; 28 return scheduler;
26 } 29 }
27 30
28 TaskSchedulerImpl::~TaskSchedulerImpl() { 31 TaskSchedulerImpl::~TaskSchedulerImpl() {
29 #if DCHECK_IS_ON() 32 #if DCHECK_IS_ON()
30 DCHECK(join_for_testing_returned_.IsSignaled()); 33 DCHECK(join_for_testing_returned_.IsSignaled());
31 #endif 34 #endif
32 } 35 }
33 36
34 void TaskSchedulerImpl::PostTaskWithTraits( 37 void TaskSchedulerImpl::PostTaskWithTraits(
(...skipping 15 matching lines...) Expand all
50 53
51 void TaskSchedulerImpl::Shutdown() { 54 void TaskSchedulerImpl::Shutdown() {
52 // TODO(fdoray): Increase the priority of BACKGROUND tasks blocking shutdown. 55 // TODO(fdoray): Increase the priority of BACKGROUND tasks blocking shutdown.
53 task_tracker_.Shutdown(); 56 task_tracker_.Shutdown();
54 } 57 }
55 58
56 void TaskSchedulerImpl::JoinForTesting() { 59 void TaskSchedulerImpl::JoinForTesting() {
57 #if DCHECK_IS_ON() 60 #if DCHECK_IS_ON()
58 DCHECK(!join_for_testing_returned_.IsSignaled()); 61 DCHECK(!join_for_testing_returned_.IsSignaled());
59 #endif 62 #endif
60 background_worker_pool_->JoinForTesting(); 63 for (const auto& worker_pool : worker_pools_)
61 background_file_io_worker_pool_->JoinForTesting(); 64 worker_pool->JoinForTesting();
62 normal_worker_pool_->JoinForTesting();
63 normal_file_io_worker_pool_->JoinForTesting();
64 service_thread_->JoinForTesting(); 65 service_thread_->JoinForTesting();
65 #if DCHECK_IS_ON() 66 #if DCHECK_IS_ON()
66 join_for_testing_returned_.Signal(); 67 join_for_testing_returned_.Signal();
67 #endif 68 #endif
68 } 69 }
69 70
70 TaskSchedulerImpl::TaskSchedulerImpl() 71 TaskSchedulerImpl::TaskSchedulerImpl(const WorkerPoolIndexForTraitsCallback&
72 worker_pool_index_for_traits_callback)
71 : delayed_task_manager_( 73 : delayed_task_manager_(
72 Bind(&TaskSchedulerImpl::OnDelayedRunTimeUpdated, Unretained(this))) 74 Bind(&TaskSchedulerImpl::OnDelayedRunTimeUpdated, Unretained(this))),
75 worker_pool_index_for_traits_callback_(
76 worker_pool_index_for_traits_callback)
73 #if DCHECK_IS_ON() 77 #if DCHECK_IS_ON()
74 , 78 ,
75 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, 79 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
76 WaitableEvent::InitialState::NOT_SIGNALED) 80 WaitableEvent::InitialState::NOT_SIGNALED)
77 #endif 81 #endif
78 { 82 {
83 DCHECK(!worker_pool_index_for_traits_callback_.is_null());
79 } 84 }
80 85
81 void TaskSchedulerImpl::Initialize() { 86 void TaskSchedulerImpl::Initialize(
82 using IORestriction = SchedulerWorkerPoolImpl::IORestriction; 87 const std::vector<WorkerPoolCreationArgs>& worker_pools) {
88 DCHECK(!worker_pools.empty());
83 89
84 const SchedulerWorkerPoolImpl::ReEnqueueSequenceCallback 90 const SchedulerWorkerPoolImpl::ReEnqueueSequenceCallback
85 re_enqueue_sequence_callback = 91 re_enqueue_sequence_callback =
86 Bind(&TaskSchedulerImpl::ReEnqueueSequenceCallback, Unretained(this)); 92 Bind(&TaskSchedulerImpl::ReEnqueueSequenceCallback, Unretained(this));
87 93
88 // TODO(fdoray): Derive the number of threads per pool from hardware 94 for (const auto& worker_pool : worker_pools) {
89 // characteristics rather than using hard-coded constants. 95 // Passing pointers to objects owned by |this| to
90 96 // SchedulerWorkerPoolImpl::Create() is safe because a TaskSchedulerImpl
91 // Passing pointers to objects owned by |this| to 97 // can't be deleted before all its worker pools have been joined.
92 // SchedulerWorkerPoolImpl::Create() is safe because a TaskSchedulerImpl can't 98 worker_pools_.push_back(SchedulerWorkerPoolImpl::Create(
93 // be deleted before all its worker pools have been joined. 99 worker_pool.name, worker_pool.thread_priority, worker_pool.max_threads,
94 background_worker_pool_ = SchedulerWorkerPoolImpl::Create( 100 worker_pool.io_restriction, re_enqueue_sequence_callback,
95 "TaskSchedulerBackground", ThreadPriority::BACKGROUND, 1U, 101 &task_tracker_, &delayed_task_manager_));
96 IORestriction::DISALLOWED, re_enqueue_sequence_callback, &task_tracker_, 102 CHECK(worker_pools_.back());
97 &delayed_task_manager_); 103 }
98 CHECK(background_worker_pool_);
99
100 background_file_io_worker_pool_ = SchedulerWorkerPoolImpl::Create(
101 "TaskSchedulerBackgroundFileIO", ThreadPriority::BACKGROUND, 1U,
102 IORestriction::ALLOWED, re_enqueue_sequence_callback, &task_tracker_,
103 &delayed_task_manager_);
104 CHECK(background_file_io_worker_pool_);
105
106 normal_worker_pool_ = SchedulerWorkerPoolImpl::Create(
107 "TaskSchedulerForeground", ThreadPriority::NORMAL, 4U,
108 IORestriction::DISALLOWED, re_enqueue_sequence_callback, &task_tracker_,
109 &delayed_task_manager_);
110 CHECK(normal_worker_pool_);
111
112 normal_file_io_worker_pool_ = SchedulerWorkerPoolImpl::Create(
113 "TaskSchedulerForegroundFileIO", ThreadPriority::NORMAL, 12U,
114 IORestriction::ALLOWED, re_enqueue_sequence_callback, &task_tracker_,
115 &delayed_task_manager_);
116 CHECK(normal_file_io_worker_pool_);
117 104
118 service_thread_ = SchedulerServiceThread::Create(&task_tracker_, 105 service_thread_ = SchedulerServiceThread::Create(&task_tracker_,
119 &delayed_task_manager_); 106 &delayed_task_manager_);
120 CHECK(service_thread_); 107 CHECK(service_thread_);
121 } 108 }
122 109
123 SchedulerWorkerPool* TaskSchedulerImpl::GetWorkerPoolForTraits( 110 SchedulerWorkerPool* TaskSchedulerImpl::GetWorkerPoolForTraits(
124 const TaskTraits& traits) { 111 const TaskTraits& traits) {
125 if (traits.with_file_io()) { 112 const size_t index = worker_pool_index_for_traits_callback_.Run(traits);
126 if (traits.priority() == TaskPriority::BACKGROUND) 113 DCHECK_LT(index, worker_pools_.size());
127 return background_file_io_worker_pool_.get(); 114 return worker_pools_[index].get();
128 return normal_file_io_worker_pool_.get();
129 }
130
131 if (traits.priority() == TaskPriority::BACKGROUND)
132 return background_worker_pool_.get();
133 return normal_worker_pool_.get();
134 } 115 }
135 116
136 void TaskSchedulerImpl::ReEnqueueSequenceCallback( 117 void TaskSchedulerImpl::ReEnqueueSequenceCallback(
137 scoped_refptr<Sequence> sequence) { 118 scoped_refptr<Sequence> sequence) {
138 DCHECK(sequence); 119 DCHECK(sequence);
139 120
140 const SequenceSortKey sort_key = sequence->GetSortKey(); 121 const SequenceSortKey sort_key = sequence->GetSortKey();
141 TaskTraits traits(sequence->PeekTask()->traits); 122 TaskTraits traits(sequence->PeekTask()->traits);
142 123
143 // Update the priority of |traits| so that the next task in |sequence| runs 124 // Update the priority of |traits| so that the next task in |sequence| runs
144 // with the highest priority in |sequence| as opposed to the next task's 125 // with the highest priority in |sequence| as opposed to the next task's
145 // specific priority. 126 // specific priority.
146 traits.WithPriority(sort_key.priority()); 127 traits.WithPriority(sort_key.priority());
147 128
148 GetWorkerPoolForTraits(traits)->ReEnqueueSequence(std::move(sequence), 129 GetWorkerPoolForTraits(traits)->ReEnqueueSequence(std::move(sequence),
149 sort_key); 130 sort_key);
150 } 131 }
151 132
152 void TaskSchedulerImpl::OnDelayedRunTimeUpdated() { 133 void TaskSchedulerImpl::OnDelayedRunTimeUpdated() {
153 service_thread_->WakeUp(); 134 service_thread_->WakeUp();
154 } 135 }
155 136
156 } // namespace internal 137 } // namespace internal
157 } // namespace base 138 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698