Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Side by Side Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2807063007: Remove SchedulerWorkerPoolImpl::ReEnqueueSequenceCallback. (Closed)
Patch Set: Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <utility> 10 #include <utility>
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
136 return i.get() == worker; 136 return i.get() == worker;
137 }); 137 });
138 return it != workers.end(); 138 return it != workers.end();
139 } 139 }
140 140
141 } // namespace 141 } // namespace
142 142
143 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl 143 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
144 : public SchedulerWorker::Delegate { 144 : public SchedulerWorker::Delegate {
145 public: 145 public:
146 // |outer| owns the worker for which this delegate is constructed. 146 // |outer| owns the worker for which this delegate is constructed. |index|
147 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is 147 // will be appended to the pool name to label the underlying worker threads.
148 // called. |index| will be appended to the pool name to label the underlying
149 // worker threads.
150 SchedulerWorkerDelegateImpl( 148 SchedulerWorkerDelegateImpl(
151 SchedulerWorkerPoolImpl* outer, 149 SchedulerWorkerPoolImpl* outer,
152 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
153 int index); 150 int index);
154 ~SchedulerWorkerDelegateImpl() override; 151 ~SchedulerWorkerDelegateImpl() override;
155 152
156 // SchedulerWorker::Delegate: 153 // SchedulerWorker::Delegate:
157 void OnMainEntry(SchedulerWorker* worker) override; 154 void OnMainEntry(SchedulerWorker* worker) override;
158 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; 155 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override;
159 void DidRunTask() override; 156 void DidRunTask() override;
160 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; 157 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
161 TimeDelta GetSleepTimeout() override; 158 TimeDelta GetSleepTimeout() override;
162 bool CanDetach(SchedulerWorker* worker) override; 159 bool CanDetach(SchedulerWorker* worker) override;
163 void OnDetach() override; 160 void OnDetach() override;
164 161
165 private: 162 private:
166 SchedulerWorkerPoolImpl* outer_; 163 SchedulerWorkerPoolImpl* outer_;
167 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
168 164
169 // Time of the last detach. 165 // Time of the last detach.
170 TimeTicks last_detach_time_; 166 TimeTicks last_detach_time_;
171 167
172 // Time when GetWork() first returned nullptr. 168 // Time when GetWork() first returned nullptr.
173 TimeTicks idle_start_time_; 169 TimeTicks idle_start_time_;
174 170
175 // Indicates whether the last call to GetWork() returned nullptr. 171 // Indicates whether the last call to GetWork() returned nullptr.
176 bool last_get_work_returned_nullptr_ = false; 172 bool last_get_work_returned_nullptr_ = false;
177 173
(...skipping 10 matching lines...) Expand all
188 size_t num_tasks_since_last_detach_ = 0; 184 size_t num_tasks_since_last_detach_ = 0;
189 185
190 const int index_; 186 const int index_;
191 187
192 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); 188 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
193 }; 189 };
194 190
195 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( 191 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
196 const std::string& name, 192 const std::string& name,
197 ThreadPriority priority_hint, 193 ThreadPriority priority_hint,
198 ReEnqueueSequenceCallback re_enqueue_sequence_callback,
199 TaskTracker* task_tracker, 194 TaskTracker* task_tracker,
200 DelayedTaskManager* delayed_task_manager) 195 DelayedTaskManager* delayed_task_manager)
201 : name_(name), 196 : name_(name),
202 priority_hint_(priority_hint), 197 priority_hint_(priority_hint),
203 re_enqueue_sequence_callback_(std::move(re_enqueue_sequence_callback)),
204 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), 198 idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
205 idle_workers_stack_cv_for_testing_( 199 idle_workers_stack_cv_for_testing_(
206 idle_workers_stack_lock_.CreateConditionVariable()), 200 idle_workers_stack_lock_.CreateConditionVariable()),
207 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, 201 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
208 WaitableEvent::InitialState::NOT_SIGNALED), 202 WaitableEvent::InitialState::NOT_SIGNALED),
209 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. 203 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro.
210 detach_duration_histogram_(Histogram::FactoryTimeGet( 204 detach_duration_histogram_(Histogram::FactoryTimeGet(
211 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, 205 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix,
212 TimeDelta::FromMilliseconds(1), 206 TimeDelta::FromMilliseconds(1),
213 TimeDelta::FromHours(1), 207 TimeDelta::FromHours(1),
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
247 #if DCHECK_IS_ON() 241 #if DCHECK_IS_ON()
248 DCHECK(!workers_created_.IsSet()); 242 DCHECK(!workers_created_.IsSet());
249 #endif 243 #endif
250 244
251 DCHECK(workers_.empty()); 245 DCHECK(workers_.empty());
252 workers_.resize(params.max_threads()); 246 workers_.resize(params.max_threads());
253 247
254 // Create workers in reverse order of index so that the worker with the 248 // Create workers in reverse order of index so that the worker with the
255 // highest index is at the bottom of the idle stack. 249 // highest index is at the bottom of the idle stack.
256 for (int index = params.max_threads() - 1; index >= 0; --index) { 250 for (int index = params.max_threads() - 1; index >= 0; --index) {
257 workers_[index] = make_scoped_refptr( 251 workers_[index] = make_scoped_refptr(new SchedulerWorker(
258 new SchedulerWorker(priority_hint_, 252 priority_hint_, MakeUnique<SchedulerWorkerDelegateImpl>(this, index),
259 MakeUnique<SchedulerWorkerDelegateImpl>( 253 task_tracker_, params.backward_compatibility()));
260 this, re_enqueue_sequence_callback_, index),
261 task_tracker_, params.backward_compatibility()));
262
263 if (index >= num_wake_ups_before_start_) 254 if (index >= num_wake_ups_before_start_)
264 idle_workers_stack_.Push(workers_[index].get()); 255 idle_workers_stack_.Push(workers_[index].get());
265 } 256 }
266 257
267 #if DCHECK_IS_ON() 258 #if DCHECK_IS_ON()
268 workers_created_.Set(); 259 workers_created_.Set();
269 #endif 260 #endif
270 } 261 }
271 262
272 // The number of workers created alive is |num_wake_ups_before_start_|, plus 263 // The number of workers created alive is |num_wake_ups_before_start_|, plus
(...skipping 26 matching lines...) Expand all
299 const TaskTraits& traits) { 290 const TaskTraits& traits) {
300 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); 291 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this));
301 } 292 }
302 293
303 scoped_refptr<SequencedTaskRunner> 294 scoped_refptr<SequencedTaskRunner>
304 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( 295 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits(
305 const TaskTraits& traits) { 296 const TaskTraits& traits) {
306 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); 297 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
307 } 298 }
308 299
309 void SchedulerWorkerPoolImpl::ReEnqueueSequence(
310 scoped_refptr<Sequence> sequence,
311 const SequenceSortKey& sequence_sort_key) {
312 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
313 sequence_sort_key);
314
315 // The thread calling this method just ran a Task from |sequence| and will
316 // soon try to get another Sequence from which to run a Task. If the thread
317 // belongs to this pool, it will get that Sequence from
318 // |shared_priority_queue_|. When that's the case, there is no need to wake up
319 // another worker after |sequence| is inserted in |shared_priority_queue_|. If
320 // we did wake up another worker, we would waste resources by having more
321 // workers trying to get a Sequence from |shared_priority_queue_| than the
322 // number of Sequences in it.
323 if (tls_current_worker_pool.Get().Get() != this)
324 WakeUpOneWorker();
325 }
326
327 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( 300 bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
328 std::unique_ptr<Task> task, 301 std::unique_ptr<Task> task,
329 scoped_refptr<Sequence> sequence) { 302 scoped_refptr<Sequence> sequence) {
330 DCHECK(task); 303 DCHECK(task);
331 DCHECK(sequence); 304 DCHECK(sequence);
332 305
333 if (!task_tracker_->WillPostTask(task.get())) 306 if (!task_tracker_->WillPostTask(task.get()))
334 return false; 307 return false;
335 308
336 if (task->delayed_run_time.is_null()) { 309 if (task->delayed_run_time.is_null()) {
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
421 for (const auto& worker : workers_) { 394 for (const auto& worker : workers_) {
422 if (worker->ThreadAliveForTesting()) 395 if (worker->ThreadAliveForTesting())
423 ++num_alive_workers; 396 ++num_alive_workers;
424 } 397 }
425 return num_alive_workers; 398 return num_alive_workers;
426 } 399 }
427 400
428 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 401 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
429 SchedulerWorkerDelegateImpl( 402 SchedulerWorkerDelegateImpl(
430 SchedulerWorkerPoolImpl* outer, 403 SchedulerWorkerPoolImpl* outer,
431 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
432 int index) 404 int index)
433 : outer_(outer), 405 : outer_(outer),
434 re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
435 index_(index) {} 406 index_(index) {}
436 407
437 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 408 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
438 ~SchedulerWorkerDelegateImpl() = default; 409 ~SchedulerWorkerDelegateImpl() = default;
439 410
440 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( 411 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
441 SchedulerWorker* worker) { 412 SchedulerWorker* worker) {
442 #if DCHECK_IS_ON() 413 #if DCHECK_IS_ON()
443 DCHECK(outer_->workers_created_.IsSet()); 414 DCHECK(outer_->workers_created_.IsSet());
444 DCHECK(ContainsWorker(outer_->workers_, worker)); 415 DCHECK(ContainsWorker(outer_->workers_, worker));
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
520 return sequence; 491 return sequence;
521 } 492 }
522 493
523 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { 494 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() {
524 ++num_tasks_since_last_wait_; 495 ++num_tasks_since_last_wait_;
525 ++num_tasks_since_last_detach_; 496 ++num_tasks_since_last_detach_;
526 } 497 }
527 498
528 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 499 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
529 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { 500 ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
530 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue 501 const SequenceSortKey sort_key = sequence->GetSortKey();
531 // |sequence| must be enqueued. 502 outer_->shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
532 re_enqueue_sequence_callback_.Run(std::move(sequence)); 503 sort_key);
504 // No need to wake up a worker. The current worker will soon call GetWork().
533 } 505 }
534 506
535 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 507 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
536 GetSleepTimeout() { 508 GetSleepTimeout() {
537 return outer_->suggested_reclaim_time_; 509 return outer_->suggested_reclaim_time_;
538 } 510 }
539 511
540 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( 512 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
541 SchedulerWorker* worker) { 513 SchedulerWorker* worker) {
542 const bool can_detach = 514 const bool can_detach =
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
602 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 574 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
603 idle_workers_stack_.Remove(worker); 575 idle_workers_stack_.Remove(worker);
604 } 576 }
605 577
606 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { 578 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() {
607 return !worker_detachment_disallowed_.IsSet(); 579 return !worker_detachment_disallowed_.IsSet();
608 } 580 }
609 581
610 } // namespace internal 582 } // namespace internal
611 } // namespace base 583 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/scheduler_worker_pool_impl.h ('k') | base/task_scheduler/scheduler_worker_pool_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698