Index: base/task_scheduler/scheduler_worker_pool_impl.cc |
diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc |
index e2cc7aa718523cad5dd96279174215c317b6a28f..a991cb78cf308fc8d60845c51eeda19234009661 100644 |
--- a/base/task_scheduler/scheduler_worker_pool_impl.cc |
+++ b/base/task_scheduler/scheduler_worker_pool_impl.cc |
@@ -27,14 +27,14 @@ namespace internal { |
namespace { |
+// SchedulerWorker that owns the current thread, if any. |
+LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky |
+ tls_current_worker = LAZY_INSTANCE_INITIALIZER; |
+ |
// SchedulerWorkerPool that owns the current thread, if any. |
LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky |
tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; |
-// SchedulerWorkerThread that owns the current thread, if any. |
-LazyInstance<ThreadLocalPointer<const SchedulerWorkerThread>>::Leaky |
- tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER; |
- |
// A task runner that runs tasks with the PARALLEL ExecutionMode. |
class SchedulerParallelTaskRunner : public TaskRunner { |
public: |
@@ -117,15 +117,15 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
public: |
// Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
- // tasks so long as |worker_pool| and |worker_thread| are alive. |
+ // tasks so long as |worker_pool| and |worker| are alive. |
// TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
- // and |worker_thread|. |
+ // and |worker|. |
SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
SchedulerWorkerPool* worker_pool, |
- SchedulerWorkerThread* worker_thread) |
+ SchedulerWorker* worker) |
: traits_(traits), |
worker_pool_(worker_pool), |
- worker_thread_(worker_thread) {} |
+ worker_(worker) {} |
// SingleThreadTaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
@@ -134,9 +134,9 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
task->single_thread_task_runner_ref = this; |
- // Post the task to be executed by |worker_thread_| as part of |sequence_|. |
+ // Post the task to be executed by |worker_| as part of |sequence_|. |
return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
- worker_thread_); |
+ worker_); |
} |
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
@@ -147,7 +147,7 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
} |
bool RunsTasksOnCurrentThread() const override { |
- return tls_current_worker_thread.Get().Get() == worker_thread_; |
+ return tls_current_worker.Get().Get() == worker_; |
} |
private: |
@@ -158,49 +158,47 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
const TaskTraits traits_; |
SchedulerWorkerPool* const worker_pool_; |
- SchedulerWorkerThread* const worker_thread_; |
+ SchedulerWorker* const worker_; |
DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
}; |
// Only used in DCHECKs. |
-bool ContainsWorkerThread( |
- const std::vector<std::unique_ptr<SchedulerWorkerThread>>& worker_threads, |
- const SchedulerWorkerThread* worker_thread) { |
- auto it = std::find_if( |
- worker_threads.begin(), worker_threads.end(), |
- [worker_thread](const std::unique_ptr<SchedulerWorkerThread>& i) { |
- return i.get() == worker_thread; |
+bool ContainsWorker( |
+ const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
+ const SchedulerWorker* worker) { |
+ auto it = std::find_if(workers.begin(), workers.end(), |
+ [worker](const std::unique_ptr<SchedulerWorker>& i) { |
+ return i.get() == worker; |
}); |
- return it != worker_threads.end(); |
+ return it != workers.end(); |
} |
} // namespace |
-class SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl |
- : public SchedulerWorkerThread::Delegate { |
+class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
+ : public SchedulerWorker::Delegate { |
public: |
- // |outer| owns the worker thread for which this delegate is constructed. |
+ // |outer| owns the worker for which this delegate is constructed. |
// |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
// called with a non-single-threaded Sequence. |shared_priority_queue| is a |
- // PriorityQueue whose transactions may overlap with the worker thread's |
+ // PriorityQueue whose transactions may overlap with the worker's |
// single-threaded PriorityQueue's transactions. |index| will be appended to |
fdoray
2016/06/20 15:08:39
|index| will be appended to the pool name to label
robliao
2016/06/20 17:50:09
Took a different approach for this comment. Update
|
// this thread's name to uniquely identify it. |
- SchedulerWorkerThreadDelegateImpl( |
+ SchedulerWorkerDelegateImpl( |
SchedulerWorkerPoolImpl* outer, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
const PriorityQueue* shared_priority_queue, |
int index); |
- ~SchedulerWorkerThreadDelegateImpl() override; |
+ ~SchedulerWorkerDelegateImpl() override; |
PriorityQueue* single_threaded_priority_queue() { |
return &single_threaded_priority_queue_; |
} |
- // SchedulerWorkerThread::Delegate: |
- void OnMainEntry(SchedulerWorkerThread* worker_thread) override; |
- scoped_refptr<Sequence> GetWork( |
- SchedulerWorkerThread* worker_thread) override; |
+ // SchedulerWorker::Delegate: |
+ void OnMainEntry(SchedulerWorker* worker) override; |
+ scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
TimeDelta GetSleepTimeout() override; |
@@ -208,7 +206,7 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl |
SchedulerWorkerPoolImpl* outer_; |
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
- // Single-threaded PriorityQueue for the worker thread. |
+ // Single-threaded PriorityQueue for the worker. |
PriorityQueue single_threaded_priority_queue_; |
// True if the last Sequence returned by GetWork() was extracted from |
@@ -217,13 +215,13 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl |
const int index_; |
- DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); |
+ DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
}; |
SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
// SchedulerWorkerPool should never be deleted in production unless its |
// initialization failed. |
- DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); |
+ DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
} |
// static |
@@ -245,15 +243,15 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
return nullptr; |
} |
-void SchedulerWorkerPoolImpl::WaitForAllWorkerWorkersIdleForTesting() { |
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
- while (idle_worker_threads_stack_.Size() < worker_threads_.size()) |
- idle_worker_threads_stack_cv_for_testing_->Wait(); |
+void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
+ while (idle_workers_stack_.Size() < workers_.size()) |
+ idle_workers_stack_cv_for_testing_->Wait(); |
} |
void SchedulerWorkerPoolImpl::JoinForTesting() { |
- for (const auto& worker_thread : worker_threads_) |
- worker_thread->JoinForTesting(); |
+ for (const auto& worker : workers_) |
+ worker->JoinForTesting(); |
DCHECK(!join_for_testing_returned_.IsSignaled()); |
join_for_testing_returned_.Signal(); |
@@ -271,18 +269,17 @@ scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
case ExecutionMode::SINGLE_THREADED: { |
// TODO(fdoray): Find a way to take load into account when assigning a |
- // SchedulerWorkerThread to a SingleThreadTaskRunner. Also, this code |
- // assumes that all SchedulerWorkerThreads are alive. Eventually, we might |
+ // SchedulerWorker to a SingleThreadTaskRunner. Also, this code |
+ // assumes that all SchedulerWorkers are alive. Eventually, we might |
// decide to tear down threads that haven't run tasks for a long time. |
- size_t worker_thread_index; |
+ size_t worker_index; |
{ |
- AutoSchedulerLock auto_lock(next_worker_thread_index_lock_); |
- worker_thread_index = next_worker_thread_index_; |
- next_worker_thread_index_ = |
- (next_worker_thread_index_ + 1) % worker_threads_.size(); |
+ AutoSchedulerLock auto_lock(next_worker_index_lock_); |
+ worker_index = next_worker_index_; |
+ next_worker_index_ = (next_worker_index_ + 1) % workers_.size(); |
} |
return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( |
- traits, this, worker_threads_[worker_thread_index].get())); |
+ traits, this, workers_[worker_index].get())); |
} |
} |
@@ -311,21 +308,19 @@ void SchedulerWorkerPoolImpl::ReEnqueueSequence( |
bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
std::unique_ptr<Task> task, |
scoped_refptr<Sequence> sequence, |
- SchedulerWorkerThread* worker_thread) { |
+ SchedulerWorker* worker) { |
DCHECK(task); |
DCHECK(sequence); |
- DCHECK(!worker_thread || |
- ContainsWorkerThread(worker_threads_, worker_thread)); |
+ DCHECK(!worker || ContainsWorker(workers_, worker)); |
if (!task_tracker_->WillPostTask(task.get())) |
return false; |
if (task->delayed_run_time.is_null()) { |
- PostTaskWithSequenceNow(std::move(task), std::move(sequence), |
- worker_thread); |
+ PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker); |
} else { |
delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence), |
- worker_thread, this); |
+ worker, this); |
} |
return true; |
@@ -334,22 +329,20 @@ bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
std::unique_ptr<Task> task, |
scoped_refptr<Sequence> sequence, |
- SchedulerWorkerThread* worker_thread) { |
+ SchedulerWorker* worker) { |
DCHECK(task); |
DCHECK(sequence); |
- DCHECK(!worker_thread || |
- ContainsWorkerThread(worker_threads_, worker_thread)); |
+ DCHECK(!worker || ContainsWorker(workers_, worker)); |
// Confirm that |task| is ready to run (its delayed run time is either null or |
// in the past). |
DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now()); |
- // Because |worker_thread| belongs to this worker pool, we know that the type |
- // of its delegate is SchedulerWorkerThreadDelegateImpl. |
+ // Because |worker| belongs to this worker pool, we know that the type |
+ // of its delegate is SchedulerWorkerDelegateImpl. |
PriorityQueue* const priority_queue = |
- worker_thread |
- ? static_cast<SchedulerWorkerThreadDelegateImpl*>( |
- worker_thread->delegate()) |
+ worker |
+ ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()) |
->single_threaded_priority_queue() |
: &shared_priority_queue_; |
DCHECK(priority_queue); |
@@ -360,22 +353,22 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
// inserted into it. Otherwise, one of these must be true: |
// - |sequence| is already in a PriorityQueue (not necessarily |
// |shared_priority_queue_|), or, |
- // - A worker thread is running a Task from |sequence|. It will insert |
- // |sequence| in a PriorityQueue once it's done running the Task. |
+ // - A worker is running a Task from |sequence|. It will insert |sequence| |
+ // in a PriorityQueue once it's done running the Task. |
const auto sequence_sort_key = sequence->GetSortKey(); |
priority_queue->BeginTransaction()->Push(std::move(sequence), |
sequence_sort_key); |
- // Wake up a worker thread to process |sequence|. |
- if (worker_thread) |
- worker_thread->WakeUp(); |
+ // Wake up a worker to process |sequence|. |
+ if (worker) |
+ worker->WakeUp(); |
else |
WakeUpOneWorker(); |
} |
} |
-SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
- SchedulerWorkerThreadDelegateImpl( |
+SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
+ SchedulerWorkerDelegateImpl( |
SchedulerWorkerPoolImpl* outer, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
const PriorityQueue* shared_priority_queue, |
@@ -385,24 +378,24 @@ SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
single_threaded_priority_queue_(shared_priority_queue), |
index_(index) {} |
-SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
- ~SchedulerWorkerThreadDelegateImpl() = default; |
+SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
+ ~SchedulerWorkerDelegateImpl() = default; |
-void SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( |
- SchedulerWorkerThread* worker_thread) { |
+void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
+ SchedulerWorker* worker) { |
#if DCHECK_IS_ON() |
// Wait for |outer_->threads_created_| to avoid traversing |
fdoray
2016/06/20 15:08:39
workers_created_ (if you change the variable's nam
robliao
2016/06/20 17:50:09
Done.
|
- // |outer_->worker_threads_| while it is being filled by Initialize(). |
+ // |outer_->workers_| while it is being filled by Initialize(). |
outer_->threads_created_.Wait(); |
- DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); |
+ DCHECK(ContainsWorker(outer_->workers_, worker)); |
#endif |
PlatformThread::SetName( |
StringPrintf("%sWorker%d", outer_->name_.c_str(), index_)); |
- DCHECK(!tls_current_worker_thread.Get().Get()); |
+ DCHECK(!tls_current_worker.Get().Get()); |
DCHECK(!tls_current_worker_pool.Get().Get()); |
- tls_current_worker_thread.Get().Set(worker_thread); |
+ tls_current_worker.Get().Set(worker); |
tls_current_worker_pool.Get().Set(outer_); |
ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == |
@@ -410,9 +403,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry( |
} |
scoped_refptr<Sequence> |
-SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork( |
- SchedulerWorkerThread* worker_thread) { |
- DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); |
+SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
+ SchedulerWorker* worker) { |
+ DCHECK(ContainsWorker(outer_->workers_, worker)); |
scoped_refptr<Sequence> sequence; |
{ |
@@ -425,8 +418,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork( |
single_threaded_transaction->IsEmpty()) { |
single_threaded_transaction.reset(); |
- // |shared_transaction| is kept alive while |worker_thread| is added to |
- // |idle_worker_threads_stack_| to avoid this race: |
+ // |shared_transaction| is kept alive while |worker| is added to |
+ // |idle_workers_stack_| to avoid this race: |
// 1. This thread creates a Transaction, finds |shared_priority_queue_| |
// empty and ends the Transaction. |
// 2. Other thread creates a Transaction, inserts a Sequence into |
@@ -434,10 +427,10 @@ SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork( |
// if the Transaction of step 1 is still active because because there |
// can only be one active Transaction per PriorityQueue at a time. |
// 3. Other thread calls WakeUpOneWorker(). No thread is woken up because |
- // |idle_worker_threads_stack_| is empty. |
- // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to |
- // sleep. No thread runs the Sequence inserted in step 2. |
- outer_->AddToIdleWorkerThreadsStack(worker_thread); |
+ // |idle_workers_stack_| is empty. |
+ // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. |
+ // No thread runs the Sequence inserted in step 2. |
+ outer_->AddToIdleWorkersStack(worker); |
return nullptr; |
} |
@@ -461,11 +454,11 @@ SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork( |
} |
DCHECK(sequence); |
- outer_->RemoveFromIdleWorkerThreadsStack(worker_thread); |
+ outer_->RemoveFromIdleWorkersStack(worker); |
return sequence; |
} |
-void SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
+void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
if (last_sequence_is_single_threaded_) { |
// A single-threaded Sequence is always re-enqueued in the single-threaded |
@@ -480,7 +473,7 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
} |
} |
-TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
+TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
GetSleepTimeout() { |
return TimeDelta::Max(); |
} |
@@ -492,9 +485,9 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
DelayedTaskManager* delayed_task_manager) |
: name_(name.as_string()), |
io_restriction_(io_restriction), |
- idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), |
- idle_worker_threads_stack_cv_for_testing_( |
- idle_worker_threads_stack_lock_.CreateConditionVariable()), |
+ idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
+ idle_workers_stack_cv_for_testing_( |
+ idle_workers_stack_lock_.CreateConditionVariable()), |
join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
WaitableEvent::InitialState::NOT_SIGNALED), |
#if DCHECK_IS_ON() |
@@ -511,54 +504,54 @@ bool SchedulerWorkerPoolImpl::Initialize( |
ThreadPriority thread_priority, |
size_t max_threads, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
- DCHECK(worker_threads_.empty()); |
+ DCHECK(workers_.empty()); |
for (size_t i = 0; i < max_threads; ++i) { |
- std::unique_ptr<SchedulerWorkerThread> worker_thread = |
- SchedulerWorkerThread::Create( |
- thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl( |
+ std::unique_ptr<SchedulerWorker> worker = |
+ SchedulerWorker::Create( |
+ thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( |
this, re_enqueue_sequence_callback, |
&shared_priority_queue_, static_cast<int>(i))), |
task_tracker_); |
- if (!worker_thread) |
+ if (!worker) |
break; |
- idle_worker_threads_stack_.Push(worker_thread.get()); |
- worker_threads_.push_back(std::move(worker_thread)); |
+ idle_workers_stack_.Push(worker.get()); |
+ workers_.push_back(std::move(worker)); |
} |
#if DCHECK_IS_ON() |
threads_created_.Signal(); |
#endif |
- return !worker_threads_.empty(); |
+ return !workers_.empty(); |
} |
void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
- SchedulerWorkerThread* worker_thread; |
+ SchedulerWorker* worker; |
{ |
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
- worker_thread = idle_worker_threads_stack_.Pop(); |
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
+ worker = idle_workers_stack_.Pop(); |
} |
- if (worker_thread) |
- worker_thread->WakeUp(); |
+ if (worker) |
+ worker->WakeUp(); |
} |
-void SchedulerWorkerPoolImpl::AddToIdleWorkerThreadsStack( |
- SchedulerWorkerThread* worker_thread) { |
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
- idle_worker_threads_stack_.Push(worker_thread); |
- DCHECK_LE(idle_worker_threads_stack_.Size(), worker_threads_.size()); |
+void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
+ SchedulerWorker* worker) { |
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
+ idle_workers_stack_.Push(worker); |
+ DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
- if (idle_worker_threads_stack_.Size() == worker_threads_.size()) |
- idle_worker_threads_stack_cv_for_testing_->Broadcast(); |
+ if (idle_workers_stack_.Size() == workers_.size()) |
+ idle_workers_stack_cv_for_testing_->Broadcast(); |
} |
-void SchedulerWorkerPoolImpl::RemoveFromIdleWorkerThreadsStack( |
- SchedulerWorkerThread* worker_thread) { |
- AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
- idle_worker_threads_stack_.Remove(worker_thread); |
+void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
+ SchedulerWorker* worker) { |
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
+ idle_workers_stack_.Remove(worker); |
} |
} // namespace internal |