Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2209)

Unified Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2077093002: Rename SchedulerWorkerThread* to SchedulerWorker* (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@rename2
Patch Set: Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_worker_pool_impl.cc
diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc
index e2cc7aa718523cad5dd96279174215c317b6a28f..a991cb78cf308fc8d60845c51eeda19234009661 100644
--- a/base/task_scheduler/scheduler_worker_pool_impl.cc
+++ b/base/task_scheduler/scheduler_worker_pool_impl.cc
@@ -27,14 +27,14 @@ namespace internal {
namespace {
+// SchedulerWorker that owns the current thread, if any.
+LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky
+ tls_current_worker = LAZY_INSTANCE_INITIALIZER;
+
// SchedulerWorkerPool that owns the current thread, if any.
LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky
tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER;
-// SchedulerWorkerThread that owns the current thread, if any.
-LazyInstance<ThreadLocalPointer<const SchedulerWorkerThread>>::Leaky
- tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER;
-
// A task runner that runs tasks with the PARALLEL ExecutionMode.
class SchedulerParallelTaskRunner : public TaskRunner {
public:
@@ -117,15 +117,15 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
public:
// Constructs a SchedulerSingleThreadTaskRunner which can be used to post
- // tasks so long as |worker_pool| and |worker_thread| are alive.
+ // tasks so long as |worker_pool| and |worker| are alive.
// TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
- // and |worker_thread|.
+ // and |worker|.
SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool,
- SchedulerWorkerThread* worker_thread)
+ SchedulerWorker* worker)
: traits_(traits),
worker_pool_(worker_pool),
- worker_thread_(worker_thread) {}
+ worker_(worker) {}
// SingleThreadTaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -134,9 +134,9 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
task->single_thread_task_runner_ref = this;
- // Post the task to be executed by |worker_thread_| as part of |sequence_|.
+ // Post the task to be executed by |worker_| as part of |sequence_|.
return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
- worker_thread_);
+ worker_);
}
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
@@ -147,7 +147,7 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
}
bool RunsTasksOnCurrentThread() const override {
- return tls_current_worker_thread.Get().Get() == worker_thread_;
+ return tls_current_worker.Get().Get() == worker_;
}
private:
@@ -158,49 +158,47 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
const TaskTraits traits_;
SchedulerWorkerPool* const worker_pool_;
- SchedulerWorkerThread* const worker_thread_;
+ SchedulerWorker* const worker_;
DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
};
// Only used in DCHECKs.
-bool ContainsWorkerThread(
- const std::vector<std::unique_ptr<SchedulerWorkerThread>>& worker_threads,
- const SchedulerWorkerThread* worker_thread) {
- auto it = std::find_if(
- worker_threads.begin(), worker_threads.end(),
- [worker_thread](const std::unique_ptr<SchedulerWorkerThread>& i) {
- return i.get() == worker_thread;
+bool ContainsWorker(
+ const std::vector<std::unique_ptr<SchedulerWorker>>& workers,
+ const SchedulerWorker* worker) {
+ auto it = std::find_if(workers.begin(), workers.end(),
+ [worker](const std::unique_ptr<SchedulerWorker>& i) {
+ return i.get() == worker;
});
- return it != worker_threads.end();
+ return it != workers.end();
}
} // namespace
-class SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl
- : public SchedulerWorkerThread::Delegate {
+class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
+ : public SchedulerWorker::Delegate {
public:
- // |outer| owns the worker thread for which this delegate is constructed.
+ // |outer| owns the worker for which this delegate is constructed.
// |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is
// called with a non-single-threaded Sequence. |shared_priority_queue| is a
- // PriorityQueue whose transactions may overlap with the worker thread's
+ // PriorityQueue whose transactions may overlap with the worker's
// single-threaded PriorityQueue's transactions. |index| will be appended to
fdoray 2016/06/20 15:08:39 |index| will be appended to the pool name to label
robliao 2016/06/20 17:50:09 Took a different approach for this comment. Update
// this thread's name to uniquely identify it.
- SchedulerWorkerThreadDelegateImpl(
+ SchedulerWorkerDelegateImpl(
SchedulerWorkerPoolImpl* outer,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
const PriorityQueue* shared_priority_queue,
int index);
- ~SchedulerWorkerThreadDelegateImpl() override;
+ ~SchedulerWorkerDelegateImpl() override;
PriorityQueue* single_threaded_priority_queue() {
return &single_threaded_priority_queue_;
}
- // SchedulerWorkerThread::Delegate:
- void OnMainEntry(SchedulerWorkerThread* worker_thread) override;
- scoped_refptr<Sequence> GetWork(
- SchedulerWorkerThread* worker_thread) override;
+ // SchedulerWorker::Delegate:
+ void OnMainEntry(SchedulerWorker* worker) override;
+ scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override;
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
TimeDelta GetSleepTimeout() override;
@@ -208,7 +206,7 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl
SchedulerWorkerPoolImpl* outer_;
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
- // Single-threaded PriorityQueue for the worker thread.
+ // Single-threaded PriorityQueue for the worker.
PriorityQueue single_threaded_priority_queue_;
// True if the last Sequence returned by GetWork() was extracted from
@@ -217,13 +215,13 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl
const int index_;
- DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
+ DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
};
SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() {
// SchedulerWorkerPool should never be deleted in production unless its
// initialization failed.
- DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
+ DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty());
}
// static
@@ -245,15 +243,15 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
return nullptr;
}
-void SchedulerWorkerPoolImpl::WaitForAllWorkerWorkersIdleForTesting() {
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
- while (idle_worker_threads_stack_.Size() < worker_threads_.size())
- idle_worker_threads_stack_cv_for_testing_->Wait();
+void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
+ while (idle_workers_stack_.Size() < workers_.size())
+ idle_workers_stack_cv_for_testing_->Wait();
}
void SchedulerWorkerPoolImpl::JoinForTesting() {
- for (const auto& worker_thread : worker_threads_)
- worker_thread->JoinForTesting();
+ for (const auto& worker : workers_)
+ worker->JoinForTesting();
DCHECK(!join_for_testing_returned_.IsSignaled());
join_for_testing_returned_.Signal();
@@ -271,18 +269,17 @@ scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits(
case ExecutionMode::SINGLE_THREADED: {
// TODO(fdoray): Find a way to take load into account when assigning a
- // SchedulerWorkerThread to a SingleThreadTaskRunner. Also, this code
- // assumes that all SchedulerWorkerThreads are alive. Eventually, we might
+ // SchedulerWorker to a SingleThreadTaskRunner. Also, this code
+ // assumes that all SchedulerWorkers are alive. Eventually, we might
// decide to tear down threads that haven't run tasks for a long time.
- size_t worker_thread_index;
+ size_t worker_index;
{
- AutoSchedulerLock auto_lock(next_worker_thread_index_lock_);
- worker_thread_index = next_worker_thread_index_;
- next_worker_thread_index_ =
- (next_worker_thread_index_ + 1) % worker_threads_.size();
+ AutoSchedulerLock auto_lock(next_worker_index_lock_);
+ worker_index = next_worker_index_;
+ next_worker_index_ = (next_worker_index_ + 1) % workers_.size();
}
return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
- traits, this, worker_threads_[worker_thread_index].get()));
+ traits, this, workers_[worker_index].get()));
}
}
@@ -311,21 +308,19 @@ void SchedulerWorkerPoolImpl::ReEnqueueSequence(
bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
std::unique_ptr<Task> task,
scoped_refptr<Sequence> sequence,
- SchedulerWorkerThread* worker_thread) {
+ SchedulerWorker* worker) {
DCHECK(task);
DCHECK(sequence);
- DCHECK(!worker_thread ||
- ContainsWorkerThread(worker_threads_, worker_thread));
+ DCHECK(!worker || ContainsWorker(workers_, worker));
if (!task_tracker_->WillPostTask(task.get()))
return false;
if (task->delayed_run_time.is_null()) {
- PostTaskWithSequenceNow(std::move(task), std::move(sequence),
- worker_thread);
+ PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker);
} else {
delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence),
- worker_thread, this);
+ worker, this);
}
return true;
@@ -334,22 +329,20 @@ bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
std::unique_ptr<Task> task,
scoped_refptr<Sequence> sequence,
- SchedulerWorkerThread* worker_thread) {
+ SchedulerWorker* worker) {
DCHECK(task);
DCHECK(sequence);
- DCHECK(!worker_thread ||
- ContainsWorkerThread(worker_threads_, worker_thread));
+ DCHECK(!worker || ContainsWorker(workers_, worker));
// Confirm that |task| is ready to run (its delayed run time is either null or
// in the past).
DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now());
- // Because |worker_thread| belongs to this worker pool, we know that the type
- // of its delegate is SchedulerWorkerThreadDelegateImpl.
+ // Because |worker| belongs to this worker pool, we know that the type
+ // of its delegate is SchedulerWorkerDelegateImpl.
PriorityQueue* const priority_queue =
- worker_thread
- ? static_cast<SchedulerWorkerThreadDelegateImpl*>(
- worker_thread->delegate())
+ worker
+ ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate())
->single_threaded_priority_queue()
: &shared_priority_queue_;
DCHECK(priority_queue);
@@ -360,22 +353,22 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
// inserted into it. Otherwise, one of these must be true:
// - |sequence| is already in a PriorityQueue (not necessarily
// |shared_priority_queue_|), or,
- // - A worker thread is running a Task from |sequence|. It will insert
- // |sequence| in a PriorityQueue once it's done running the Task.
+ // - A worker is running a Task from |sequence|. It will insert |sequence|
+ // in a PriorityQueue once it's done running the Task.
const auto sequence_sort_key = sequence->GetSortKey();
priority_queue->BeginTransaction()->Push(std::move(sequence),
sequence_sort_key);
- // Wake up a worker thread to process |sequence|.
- if (worker_thread)
- worker_thread->WakeUp();
+ // Wake up a worker to process |sequence|.
+ if (worker)
+ worker->WakeUp();
else
WakeUpOneWorker();
}
}
-SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::
- SchedulerWorkerThreadDelegateImpl(
+SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
+ SchedulerWorkerDelegateImpl(
SchedulerWorkerPoolImpl* outer,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
const PriorityQueue* shared_priority_queue,
@@ -385,24 +378,24 @@ SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::
single_threaded_priority_queue_(shared_priority_queue),
index_(index) {}
-SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::
- ~SchedulerWorkerThreadDelegateImpl() = default;
+SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
+ ~SchedulerWorkerDelegateImpl() = default;
-void SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry(
- SchedulerWorkerThread* worker_thread) {
+void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
+ SchedulerWorker* worker) {
#if DCHECK_IS_ON()
// Wait for |outer_->threads_created_| to avoid traversing
fdoray 2016/06/20 15:08:39 workers_created_ (if you change the variable's nam
robliao 2016/06/20 17:50:09 Done.
- // |outer_->worker_threads_| while it is being filled by Initialize().
+ // |outer_->workers_| while it is being filled by Initialize().
outer_->threads_created_.Wait();
- DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread));
+ DCHECK(ContainsWorker(outer_->workers_, worker));
#endif
PlatformThread::SetName(
StringPrintf("%sWorker%d", outer_->name_.c_str(), index_));
- DCHECK(!tls_current_worker_thread.Get().Get());
+ DCHECK(!tls_current_worker.Get().Get());
DCHECK(!tls_current_worker_pool.Get().Get());
- tls_current_worker_thread.Get().Set(worker_thread);
+ tls_current_worker.Get().Set(worker);
tls_current_worker_pool.Get().Set(outer_);
ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ ==
@@ -410,9 +403,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry(
}
scoped_refptr<Sequence>
-SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork(
- SchedulerWorkerThread* worker_thread) {
- DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread));
+SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
+ SchedulerWorker* worker) {
+ DCHECK(ContainsWorker(outer_->workers_, worker));
scoped_refptr<Sequence> sequence;
{
@@ -425,8 +418,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork(
single_threaded_transaction->IsEmpty()) {
single_threaded_transaction.reset();
- // |shared_transaction| is kept alive while |worker_thread| is added to
- // |idle_worker_threads_stack_| to avoid this race:
+ // |shared_transaction| is kept alive while |worker| is added to
+ // |idle_workers_stack_| to avoid this race:
// 1. This thread creates a Transaction, finds |shared_priority_queue_|
// empty and ends the Transaction.
// 2. Other thread creates a Transaction, inserts a Sequence into
@@ -434,10 +427,10 @@ SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork(
// if the Transaction of step 1 is still active because because there
// can only be one active Transaction per PriorityQueue at a time.
// 3. Other thread calls WakeUpOneWorker(). No thread is woken up because
- // |idle_worker_threads_stack_| is empty.
- // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to
- // sleep. No thread runs the Sequence inserted in step 2.
- outer_->AddToIdleWorkerThreadsStack(worker_thread);
+ // |idle_workers_stack_| is empty.
+ // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep.
+ // No thread runs the Sequence inserted in step 2.
+ outer_->AddToIdleWorkersStack(worker);
return nullptr;
}
@@ -461,11 +454,11 @@ SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork(
}
DCHECK(sequence);
- outer_->RemoveFromIdleWorkerThreadsStack(worker_thread);
+ outer_->RemoveFromIdleWorkersStack(worker);
return sequence;
}
-void SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::
+void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
if (last_sequence_is_single_threaded_) {
// A single-threaded Sequence is always re-enqueued in the single-threaded
@@ -480,7 +473,7 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::
}
}
-TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::
+TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
GetSleepTimeout() {
return TimeDelta::Max();
}
@@ -492,9 +485,9 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
DelayedTaskManager* delayed_task_manager)
: name_(name.as_string()),
io_restriction_(io_restriction),
- idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
- idle_worker_threads_stack_cv_for_testing_(
- idle_worker_threads_stack_lock_.CreateConditionVariable()),
+ idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
+ idle_workers_stack_cv_for_testing_(
+ idle_workers_stack_lock_.CreateConditionVariable()),
join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
#if DCHECK_IS_ON()
@@ -511,54 +504,54 @@ bool SchedulerWorkerPoolImpl::Initialize(
ThreadPriority thread_priority,
size_t max_threads,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
- DCHECK(worker_threads_.empty());
+ DCHECK(workers_.empty());
for (size_t i = 0; i < max_threads; ++i) {
- std::unique_ptr<SchedulerWorkerThread> worker_thread =
- SchedulerWorkerThread::Create(
- thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl(
+ std::unique_ptr<SchedulerWorker> worker =
+ SchedulerWorker::Create(
+ thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl(
this, re_enqueue_sequence_callback,
&shared_priority_queue_, static_cast<int>(i))),
task_tracker_);
- if (!worker_thread)
+ if (!worker)
break;
- idle_worker_threads_stack_.Push(worker_thread.get());
- worker_threads_.push_back(std::move(worker_thread));
+ idle_workers_stack_.Push(worker.get());
+ workers_.push_back(std::move(worker));
}
#if DCHECK_IS_ON()
threads_created_.Signal();
#endif
- return !worker_threads_.empty();
+ return !workers_.empty();
}
void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
- SchedulerWorkerThread* worker_thread;
+ SchedulerWorker* worker;
{
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
- worker_thread = idle_worker_threads_stack_.Pop();
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
+ worker = idle_workers_stack_.Pop();
}
- if (worker_thread)
- worker_thread->WakeUp();
+ if (worker)
+ worker->WakeUp();
}
-void SchedulerWorkerPoolImpl::AddToIdleWorkerThreadsStack(
- SchedulerWorkerThread* worker_thread) {
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
- idle_worker_threads_stack_.Push(worker_thread);
- DCHECK_LE(idle_worker_threads_stack_.Size(), worker_threads_.size());
+void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
+ SchedulerWorker* worker) {
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
+ idle_workers_stack_.Push(worker);
+ DCHECK_LE(idle_workers_stack_.Size(), workers_.size());
- if (idle_worker_threads_stack_.Size() == worker_threads_.size())
- idle_worker_threads_stack_cv_for_testing_->Broadcast();
+ if (idle_workers_stack_.Size() == workers_.size())
+ idle_workers_stack_cv_for_testing_->Broadcast();
}
-void SchedulerWorkerPoolImpl::RemoveFromIdleWorkerThreadsStack(
- SchedulerWorkerThread* worker_thread) {
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
- idle_worker_threads_stack_.Remove(worker_thread);
+void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack(
+ SchedulerWorker* worker) {
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
+ idle_workers_stack_.Remove(worker);
}
} // namespace internal

Powered by Google App Engine
This is Rietveld 408576698