Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(9)

Side by Side Diff: base/task_scheduler/scheduler_thread_pool_impl.cc

Issue 1906083002: TaskScheduler: Remove base/task_scheduler/utils.h/.cc (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@sched_2_stack
Patch Set: typos Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_thread_pool.h" 5 #include "base/task_scheduler/scheduler_thread_pool_impl.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/bind_helpers.h" 10 #include "base/bind_helpers.h"
11 #include "base/lazy_instance.h" 11 #include "base/lazy_instance.h"
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/memory/ptr_util.h" 13 #include "base/memory/ptr_util.h"
14 #include "base/sequenced_task_runner.h" 14 #include "base/sequenced_task_runner.h"
15 #include "base/task_scheduler/utils.h" 15 #include "base/task_scheduler/delayed_task_manager.h"
16 #include "base/task_scheduler/task_tracker.h"
16 #include "base/threading/thread_local.h" 17 #include "base/threading/thread_local.h"
17 18
18 namespace base { 19 namespace base {
19 namespace internal { 20 namespace internal {
20 21
21 namespace { 22 namespace {
22 23
23 // SchedulerThreadPool that owns the current thread, if any. 24 // SchedulerThreadPool that owns the current thread, if any.
24 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky 25 LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky
25 tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER; 26 tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER;
26 27
27 // A task runner that runs tasks with the PARALLEL ExecutionMode. 28 // A task runner that runs tasks with the PARALLEL ExecutionMode.
28 class SchedulerParallelTaskRunner : public TaskRunner { 29 class SchedulerParallelTaskRunner : public TaskRunner {
29 public: 30 public:
30 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so 31 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
31 // long as |executor| is alive. 32 // long as |thread_pool| is alive.
32 // TODO(robliao): Find a concrete way to manage |executor|'s memory. 33 // TODO(robliao): Find a concrete way to manage |thread_pool|'s memory.
33 SchedulerParallelTaskRunner(const TaskTraits& traits, 34 SchedulerParallelTaskRunner(const TaskTraits& traits,
34 SchedulerTaskExecutor* executor, 35 SchedulerThreadPool* thread_pool)
35 TaskTracker* task_tracker, 36 : traits_(traits), thread_pool_(thread_pool) {}
36 DelayedTaskManager* delayed_task_manager)
37 : traits_(traits),
38 executor_(executor),
39 task_tracker_(task_tracker),
40 delayed_task_manager_(delayed_task_manager) {}
41 37
42 // TaskRunner: 38 // TaskRunner:
43 bool PostDelayedTask(const tracked_objects::Location& from_here, 39 bool PostDelayedTask(const tracked_objects::Location& from_here,
44 const Closure& closure, 40 const Closure& closure,
45 TimeDelta delay) override { 41 TimeDelta delay) override {
46 // Post the task as part of a one-off single-task Sequence. 42 // Post the task as part of a one-off single-task Sequence.
47 return PostTaskToExecutor( 43 return thread_pool_->PostTaskWithSequence(
48 WrapUnique( 44 WrapUnique(
49 new Task(from_here, closure, traits_, 45 new Task(from_here, closure, traits_,
50 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), 46 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)),
51 make_scoped_refptr(new Sequence), executor_, task_tracker_, 47 make_scoped_refptr(new Sequence));
52 delayed_task_manager_);
53 } 48 }
54 49
55 bool RunsTasksOnCurrentThread() const override { 50 bool RunsTasksOnCurrentThread() const override {
56 return tls_current_thread_pool.Get().Get() == executor_; 51 return tls_current_thread_pool.Get().Get() == thread_pool_;
57 } 52 }
58 53
59 private: 54 private:
60 ~SchedulerParallelTaskRunner() override = default; 55 ~SchedulerParallelTaskRunner() override = default;
61 56
62 const TaskTraits traits_; 57 const TaskTraits traits_;
63 SchedulerTaskExecutor* const executor_; 58 SchedulerThreadPool* const thread_pool_;
64 TaskTracker* const task_tracker_;
65 DelayedTaskManager* const delayed_task_manager_;
66 59
67 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); 60 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
68 }; 61 };
69 62
70 // A task runner that runs tasks with the SEQUENCED ExecutionMode. 63 // A task runner that runs tasks with the SEQUENCED ExecutionMode.
71 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { 64 class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
72 public: 65 public:
73 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so 66 // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks
74 // long as |executor| is alive. 67 // so long as |thread_pool| is alive.
75 // TODO(robliao): Find a concrete way to manage |executor|'s memory. 68 // TODO(robliao): Find a concrete way to manage |thread_pool|'s memory.
76 SchedulerSequencedTaskRunner(const TaskTraits& traits, 69 SchedulerSequencedTaskRunner(const TaskTraits& traits,
77 SchedulerTaskExecutor* executor, 70 SchedulerThreadPool* thread_pool)
78 TaskTracker* task_tracker, 71 : traits_(traits), thread_pool_(thread_pool) {}
79 DelayedTaskManager* delayed_task_manager)
80 : traits_(traits),
81 executor_(executor),
82 task_tracker_(task_tracker),
83 delayed_task_manager_(delayed_task_manager) {}
84 72
85 // SequencedTaskRunner: 73 // SequencedTaskRunner:
86 bool PostDelayedTask(const tracked_objects::Location& from_here, 74 bool PostDelayedTask(const tracked_objects::Location& from_here,
87 const Closure& closure, 75 const Closure& closure,
88 TimeDelta delay) override { 76 TimeDelta delay) override {
89 // Post the task as part of |sequence|. 77 // Post the task as part of |sequence|.
90 return PostTaskToExecutor( 78 return thread_pool_->PostTaskWithSequence(
91 WrapUnique( 79 WrapUnique(
92 new Task(from_here, closure, traits_, 80 new Task(from_here, closure, traits_,
93 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), 81 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)),
94 sequence_, executor_, task_tracker_, delayed_task_manager_); 82 sequence_);
95 } 83 }
96 84
97 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 85 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
98 const Closure& closure, 86 const Closure& closure,
99 base::TimeDelta delay) override { 87 base::TimeDelta delay) override {
100 // Tasks are never nested within the task scheduler. 88 // Tasks are never nested within the task scheduler.
101 return PostDelayedTask(from_here, closure, delay); 89 return PostDelayedTask(from_here, closure, delay);
102 } 90 }
103 91
104 bool RunsTasksOnCurrentThread() const override { 92 bool RunsTasksOnCurrentThread() const override {
105 return tls_current_thread_pool.Get().Get() == executor_; 93 return tls_current_thread_pool.Get().Get() == thread_pool_;
106 } 94 }
107 95
108 private: 96 private:
109 ~SchedulerSequencedTaskRunner() override = default; 97 ~SchedulerSequencedTaskRunner() override = default;
110 98
111 // Sequence for all Tasks posted through this TaskRunner. 99 // Sequence for all Tasks posted through this TaskRunner.
112 const scoped_refptr<Sequence> sequence_ = new Sequence; 100 const scoped_refptr<Sequence> sequence_ = new Sequence;
113 101
114 const TaskTraits traits_; 102 const TaskTraits traits_;
115 SchedulerTaskExecutor* const executor_; 103 SchedulerThreadPool* const thread_pool_;
116 TaskTracker* const task_tracker_;
117 DelayedTaskManager* const delayed_task_manager_;
118 104
119 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); 105 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
120 }; 106 };
121 107
122 } // namespace 108 } // namespace
123 109
124 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl 110 class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl
125 : public SchedulerWorkerThread::Delegate { 111 : public SchedulerWorkerThread::Delegate {
126 public: 112 public:
127 SchedulerWorkerThreadDelegateImpl( 113 SchedulerWorkerThreadDelegateImpl(
128 SchedulerThreadPool* outer, 114 SchedulerThreadPoolImpl* outer,
129 const EnqueueSequenceCallback& enqueue_sequence_callback); 115 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback);
130 ~SchedulerWorkerThreadDelegateImpl() override; 116 ~SchedulerWorkerThreadDelegateImpl() override;
131 117
132 // SchedulerWorkerThread::Delegate: 118 // SchedulerWorkerThread::Delegate:
133 void OnMainEntry() override; 119 void OnMainEntry() override;
134 scoped_refptr<Sequence> GetWork( 120 scoped_refptr<Sequence> GetWork(
135 SchedulerWorkerThread* worker_thread) override; 121 SchedulerWorkerThread* worker_thread) override;
136 void EnqueueSequence(scoped_refptr<Sequence> sequence) override; 122 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
137 123
138 private: 124 private:
139 SchedulerThreadPool* outer_; 125 SchedulerThreadPoolImpl* outer_;
140 const EnqueueSequenceCallback enqueue_sequence_callback_; 126 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
141 127
142 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); 128 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
143 }; 129 };
144 130
145 SchedulerThreadPool::~SchedulerThreadPool() { 131 SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() {
146 // SchedulerThreadPool should never be deleted in production unless its 132 // SchedulerThreadPool should never be deleted in production unless its
147 // initialization failed. 133 // initialization failed.
148 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); 134 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
149 } 135 }
150 136
151 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( 137 std::unique_ptr<SchedulerThreadPoolImpl>
138 SchedulerThreadPoolImpl::CreateThreadPool(
152 ThreadPriority thread_priority, 139 ThreadPriority thread_priority,
153 size_t max_threads, 140 size_t max_threads,
154 const EnqueueSequenceCallback& enqueue_sequence_callback, 141 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
155 TaskTracker* task_tracker, 142 TaskTracker* task_tracker,
156 DelayedTaskManager* delayed_task_manager) { 143 DelayedTaskManager* delayed_task_manager) {
157 std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool( 144 std::unique_ptr<SchedulerThreadPoolImpl> thread_pool(
158 enqueue_sequence_callback, task_tracker, delayed_task_manager)); 145 new SchedulerThreadPoolImpl(re_enqueue_sequence_callback, task_tracker,
146 delayed_task_manager));
159 if (thread_pool->Initialize(thread_priority, max_threads)) 147 if (thread_pool->Initialize(thread_priority, max_threads))
160 return thread_pool; 148 return thread_pool;
161 return nullptr; 149 return nullptr;
162 } 150 }
163 151
164 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( 152 void SchedulerThreadPoolImpl::WaitForAllWorkerThreadsIdleForTesting() {
153 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
154 while (idle_worker_threads_stack_.Size() < worker_threads_.size())
155 idle_worker_threads_stack_cv_for_testing_->Wait();
156 }
157
158 void SchedulerThreadPoolImpl::JoinForTesting() {
159 for (const auto& worker_thread : worker_threads_)
160 worker_thread->JoinForTesting();
161
162 DCHECK(!join_for_testing_returned_.IsSignaled());
163 join_for_testing_returned_.Signal();
164 }
165
166 scoped_refptr<TaskRunner> SchedulerThreadPoolImpl::CreateTaskRunnerWithTraits(
165 const TaskTraits& traits, 167 const TaskTraits& traits,
166 ExecutionMode execution_mode) { 168 ExecutionMode execution_mode) {
167 switch (execution_mode) { 169 switch (execution_mode) {
168 case ExecutionMode::PARALLEL: 170 case ExecutionMode::PARALLEL:
169 return make_scoped_refptr(new SchedulerParallelTaskRunner( 171 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this));
170 traits, this, task_tracker_, delayed_task_manager_));
171 172
172 case ExecutionMode::SEQUENCED: 173 case ExecutionMode::SEQUENCED:
173 return make_scoped_refptr(new SchedulerSequencedTaskRunner( 174 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
174 traits, this, task_tracker_, delayed_task_manager_));
175 175
176 case ExecutionMode::SINGLE_THREADED: 176 case ExecutionMode::SINGLE_THREADED:
177 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. 177 // TODO(fdoray): Support SINGLE_THREADED TaskRunners.
178 NOTREACHED(); 178 NOTREACHED();
179 return nullptr; 179 return nullptr;
180 } 180 }
181 181
182 NOTREACHED(); 182 NOTREACHED();
183 return nullptr; 183 return nullptr;
184 } 184 }
185 185
186 void SchedulerThreadPool::EnqueueSequence( 186 void SchedulerThreadPoolImpl::ReEnqueueSequence(
187 scoped_refptr<Sequence> sequence, 187 scoped_refptr<Sequence> sequence,
188 const SequenceSortKey& sequence_sort_key) { 188 const SequenceSortKey& sequence_sort_key) {
189 shared_priority_queue_.BeginTransaction()->Push( 189 shared_priority_queue_.BeginTransaction()->Push(
190 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), 190 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
191 sequence_sort_key))); 191 sequence_sort_key)));
192 192
193 // The thread calling this method just ran a Task from |sequence| and will 193 // The thread calling this method just ran a Task from |sequence| and will
194 // soon try to get another Sequence from which to run a Task. If the thread 194 // soon try to get another Sequence from which to run a Task. If the thread
195 // belongs to this pool, it will get that Sequence from 195 // belongs to this pool, it will get that Sequence from
196 // |shared_priority_queue_|. When that's the case, there is no need to wake up 196 // |shared_priority_queue_|. When that's the case, there is no need to wake up
197 // another thread after |sequence| is inserted in |shared_priority_queue_|. If 197 // another thread after |sequence| is inserted in |shared_priority_queue_|. If
198 // we did wake up another thread, we would waste resources by having more 198 // we did wake up another thread, we would waste resources by having more
199 // threads trying to get a Sequence from |shared_priority_queue_| than the 199 // threads trying to get a Sequence from |shared_priority_queue_| than the
200 // number of Sequences in it. 200 // number of Sequences in it.
201 if (tls_current_thread_pool.Get().Get() != this) 201 if (tls_current_thread_pool.Get().Get() != this)
202 WakeUpOneThread(); 202 WakeUpOneThread();
203 } 203 }
204 204
205 void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() { 205 bool SchedulerThreadPoolImpl::PostTaskWithSequence(
206 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
207 while (idle_worker_threads_stack_.Size() < worker_threads_.size())
208 idle_worker_threads_stack_cv_for_testing_->Wait();
209 }
210
211 void SchedulerThreadPool::JoinForTesting() {
212 for (const auto& worker_thread : worker_threads_)
213 worker_thread->JoinForTesting();
214
215 DCHECK(!join_for_testing_returned_.IsSignaled());
216 join_for_testing_returned_.Signal();
217 }
218
219 void SchedulerThreadPool::PostTaskWithSequence(
220 std::unique_ptr<Task> task, 206 std::unique_ptr<Task> task,
221 scoped_refptr<Sequence> sequence) { 207 scoped_refptr<Sequence> sequence) {
222 DCHECK(task); 208 DCHECK(task);
223 DCHECK(sequence); 209 DCHECK(sequence);
224 210
225 const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue( 211 if (!task_tracker_->WillPostTask(task.get()))
226 std::move(task), std::move(sequence), &shared_priority_queue_); 212 return false;
227 213
228 // No thread has already been woken up to run Tasks from |sequence| if it was 214 if (task->delayed_run_time.is_null()) {
229 // empty before |task| was inserted into it. 215 PostTaskWithSequenceNow(std::move(task), std::move(sequence));
230 if (sequence_was_empty) 216 } else {
231 WakeUpOneThread(); 217 delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence),
218 this);
219 }
220
221 return true;
232 } 222 }
233 223
234 SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl:: 224 void SchedulerThreadPoolImpl::PostTaskWithSequenceNow(
225 std::unique_ptr<Task> task,
226 scoped_refptr<Sequence> sequence) {
227 DCHECK(task);
228 DCHECK(sequence);
229
230 // Confirm that |task| is ready to run (its delayed run time is either null or
231 // in the past).
232 DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now());
233
234 const bool sequence_was_empty = sequence->PushTask(std::move(task));
235 if (sequence_was_empty) {
236 // Insert |sequence| in |shared_priority_queue_| if it was empty before
237 // |task| was inserted into it. Otherwise, one of these must be true:
238 // - |sequence| is already in a PriorityQueue (not necessarily
239 // |shared_priority_queue_|), or,
240 // - A worker thread is running a Task from |sequence|. It will insert
241 // |sequence| in a PriorityQueue once it's done running the Task.
242 const auto sequence_sort_key = sequence->GetSortKey();
243 shared_priority_queue_.BeginTransaction()->Push(
244 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
245 sequence_sort_key)));
246
247 // Wake up a worker thread to process |sequence|.
248 WakeUpOneThread();
249 }
250 }
251
252 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
235 SchedulerWorkerThreadDelegateImpl( 253 SchedulerWorkerThreadDelegateImpl(
236 SchedulerThreadPool* outer, 254 SchedulerThreadPoolImpl* outer,
237 const EnqueueSequenceCallback& enqueue_sequence_callback) 255 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback)
238 : outer_(outer), enqueue_sequence_callback_(enqueue_sequence_callback) {} 256 : outer_(outer),
257 re_enqueue_sequence_callback_(re_enqueue_sequence_callback) {}
239 258
240 SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl:: 259 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
241 ~SchedulerWorkerThreadDelegateImpl() = default; 260 ~SchedulerWorkerThreadDelegateImpl() = default;
242 261
243 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry() { 262 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry() {
244 DCHECK(!tls_current_thread_pool.Get().Get()); 263 DCHECK(!tls_current_thread_pool.Get().Get());
245 tls_current_thread_pool.Get().Set(outer_); 264 tls_current_thread_pool.Get().Set(outer_);
246 } 265 }
247 266
248 scoped_refptr<Sequence> 267 scoped_refptr<Sequence>
249 SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork( 268 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork(
250 SchedulerWorkerThread* worker_thread) { 269 SchedulerWorkerThread* worker_thread) {
251 std::unique_ptr<PriorityQueue::Transaction> transaction( 270 std::unique_ptr<PriorityQueue::Transaction> transaction(
252 outer_->shared_priority_queue_.BeginTransaction()); 271 outer_->shared_priority_queue_.BeginTransaction());
253 const auto& sequence_and_sort_key = transaction->Peek(); 272 const auto& sequence_and_sort_key = transaction->Peek();
254 273
255 if (sequence_and_sort_key.is_null()) { 274 if (sequence_and_sort_key.is_null()) {
256 // |transaction| is kept alive while |worker_thread| is added to 275 // |transaction| is kept alive while |worker_thread| is added to
257 // |idle_worker_threads_stack_| to avoid this race: 276 // |idle_worker_threads_stack_| to avoid this race:
258 // 1. This thread creates a Transaction, finds |shared_priority_queue_| 277 // 1. This thread creates a Transaction, finds |shared_priority_queue_|
259 // empty and ends the Transaction. 278 // empty and ends the Transaction.
260 // 2. Other thread creates a Transaction, inserts a Sequence into 279 // 2. Other thread creates a Transaction, inserts a Sequence into
261 // |shared_priority_queue_| and ends the Transaction. This can't happen 280 // |shared_priority_queue_| and ends the Transaction. This can't happen
262 // if the Transaction of step 1 is still active because because there can 281 // if the Transaction of step 1 is still active because because there can
263 // only be one active Transaction per PriorityQueue at a time. 282 // only be one active Transaction per PriorityQueue at a time.
264 // 3. Other thread calls WakeUpOneThread(). No thread is woken up because 283 // 3. Other thread calls WakeUpOneThread(). No thread is woken up because
265 // |idle_worker_threads_stack_| is empty. 284 // |idle_worker_threads_stack_| is empty.
266 // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to 285 // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to
267 // sleep. No thread runs the Sequence inserted in step 2. 286 // sleep. No thread runs the Sequence inserted in step 2.
268 outer_->AddToIdleWorkerThreadsStack(worker_thread); 287 outer_->AddToIdleWorkerThreadsStack(worker_thread);
269 return nullptr; 288 return nullptr;
270 } 289 }
271 290
272 scoped_refptr<Sequence> sequence = sequence_and_sort_key.sequence; 291 scoped_refptr<Sequence> sequence = sequence_and_sort_key.sequence;
273 transaction->Pop(); 292 transaction->Pop();
274 return sequence; 293 return sequence;
275 } 294 }
276 295
277 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( 296 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
278 scoped_refptr<Sequence> sequence) { 297 ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
279 enqueue_sequence_callback_.Run(std::move(sequence)); 298 re_enqueue_sequence_callback_.Run(std::move(sequence));
280 } 299 }
281 300
282 SchedulerThreadPool::SchedulerThreadPool( 301 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl(
283 const EnqueueSequenceCallback& enqueue_sequence_callback, 302 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
284 TaskTracker* task_tracker, 303 TaskTracker* task_tracker,
285 DelayedTaskManager* delayed_task_manager) 304 DelayedTaskManager* delayed_task_manager)
286 : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), 305 : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
287 idle_worker_threads_stack_cv_for_testing_( 306 idle_worker_threads_stack_cv_for_testing_(
288 idle_worker_threads_stack_lock_.CreateConditionVariable()), 307 idle_worker_threads_stack_lock_.CreateConditionVariable()),
289 join_for_testing_returned_(true, false), 308 join_for_testing_returned_(true, false),
290 worker_thread_delegate_( 309 worker_thread_delegate_(
291 new SchedulerWorkerThreadDelegateImpl(this, 310 new SchedulerWorkerThreadDelegateImpl(this,
292 enqueue_sequence_callback)), 311 re_enqueue_sequence_callback)),
293 task_tracker_(task_tracker), 312 task_tracker_(task_tracker),
294 delayed_task_manager_(delayed_task_manager) { 313 delayed_task_manager_(delayed_task_manager) {
295 DCHECK(task_tracker_); 314 DCHECK(task_tracker_);
296 DCHECK(delayed_task_manager_); 315 DCHECK(delayed_task_manager_);
297 } 316 }
298 317
299 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, 318 bool SchedulerThreadPoolImpl::Initialize(ThreadPriority thread_priority,
300 size_t max_threads) { 319 size_t max_threads) {
301 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 320 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
302 321
303 DCHECK(worker_threads_.empty()); 322 DCHECK(worker_threads_.empty());
304 323
305 for (size_t i = 0; i < max_threads; ++i) { 324 for (size_t i = 0; i < max_threads; ++i) {
306 std::unique_ptr<SchedulerWorkerThread> worker_thread = 325 std::unique_ptr<SchedulerWorkerThread> worker_thread =
307 SchedulerWorkerThread::CreateSchedulerWorkerThread( 326 SchedulerWorkerThread::CreateSchedulerWorkerThread(
308 thread_priority, worker_thread_delegate_.get(), task_tracker_); 327 thread_priority, worker_thread_delegate_.get(), task_tracker_);
309 if (!worker_thread) 328 if (!worker_thread)
310 break; 329 break;
311 idle_worker_threads_stack_.Push(worker_thread.get()); 330 idle_worker_threads_stack_.Push(worker_thread.get());
312 worker_threads_.push_back(std::move(worker_thread)); 331 worker_threads_.push_back(std::move(worker_thread));
313 } 332 }
314 333
315 return !worker_threads_.empty(); 334 return !worker_threads_.empty();
316 } 335 }
317 336
318 void SchedulerThreadPool::WakeUpOneThread() { 337 void SchedulerThreadPoolImpl::WakeUpOneThread() {
319 SchedulerWorkerThread* worker_thread; 338 SchedulerWorkerThread* worker_thread;
320 { 339 {
321 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 340 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
322 worker_thread = idle_worker_threads_stack_.Pop(); 341 worker_thread = idle_worker_threads_stack_.Pop();
323 } 342 }
324 if (worker_thread) 343 if (worker_thread)
325 worker_thread->WakeUp(); 344 worker_thread->WakeUp();
326 } 345 }
327 346
328 void SchedulerThreadPool::AddToIdleWorkerThreadsStack( 347 void SchedulerThreadPoolImpl::AddToIdleWorkerThreadsStack(
329 SchedulerWorkerThread* worker_thread) { 348 SchedulerWorkerThread* worker_thread) {
330 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 349 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
331 idle_worker_threads_stack_.Push(worker_thread); 350 idle_worker_threads_stack_.Push(worker_thread);
332 DCHECK_LE(idle_worker_threads_stack_.Size(), worker_threads_.size()); 351 DCHECK_LE(idle_worker_threads_stack_.Size(), worker_threads_.size());
333 352
334 if (idle_worker_threads_stack_.Size() == worker_threads_.size()) 353 if (idle_worker_threads_stack_.Size() == worker_threads_.size())
335 idle_worker_threads_stack_cv_for_testing_->Broadcast(); 354 idle_worker_threads_stack_cv_for_testing_->Broadcast();
336 } 355 }
337 356
338 } // namespace internal 357 } // namespace internal
339 } // namespace base 358 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698