Chromium Code Reviews| Index: base/task_scheduler/scheduler_worker_pool_impl.cc |
| diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| index 9a6a20b741097adf42e8ff7a5ebb6921ceaaeecd..56e1f7b54eb5e55522e94f047dddf0a7333a62ee 100644 |
| --- a/base/task_scheduler/scheduler_worker_pool_impl.cc |
| +++ b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| @@ -17,7 +17,6 @@ |
| #include "base/metrics/histogram.h" |
| #include "base/sequence_token.h" |
| #include "base/sequenced_task_runner.h" |
| -#include "base/single_thread_task_runner.h" |
| #include "base/strings/stringprintf.h" |
| #include "base/task_runner.h" |
| #include "base/task_scheduler/delayed_task_manager.h" |
| @@ -64,7 +63,7 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
| // Post the task as part of a one-off single-task Sequence. |
| return worker_pool_->PostTaskWithSequence( |
| MakeUnique<Task>(from_here, closure, traits_, delay), |
| - make_scoped_refptr(new Sequence), nullptr); |
| + make_scoped_refptr(new Sequence)); |
| } |
| bool RunsTasksOnCurrentThread() const override { |
| @@ -100,8 +99,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| task->sequenced_task_runner_ref = this; |
| // Post the task as part of |sequence_|. |
| - return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
| - nullptr); |
| + return worker_pool_->PostTaskWithSequence(std::move(task), sequence_); |
| } |
| bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| @@ -141,78 +139,19 @@ bool ContainsWorker(const std::vector<scoped_refptr<SchedulerWorker>>& workers, |
| } // namespace |
| -// TODO(http://crbug.com/694823): Remove this and supporting framework. |
| -// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
| -class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
| - public SingleThreadTaskRunner { |
| - public: |
| - // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
| - // tasks so long as |worker_pool| and |worker| are alive. |
| - // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
| - // and |worker|. |
| - SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| - SchedulerWorkerPool* worker_pool, |
| - SchedulerWorker* worker); |
| - |
| - // SingleThreadTaskRunner: |
| - bool PostDelayedTask(const tracked_objects::Location& from_here, |
| - const Closure& closure, |
| - TimeDelta delay) override { |
| - std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
| - task->single_thread_task_runner_ref = this; |
| - |
| - // Post the task to be executed by |worker_| as part of |sequence_|. |
| - return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
| - worker_); |
| - } |
| - |
| - bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| - const Closure& closure, |
| - base::TimeDelta delay) override { |
| - // Tasks are never nested within the task scheduler. |
| - return PostDelayedTask(from_here, closure, delay); |
| - } |
| - |
| - bool RunsTasksOnCurrentThread() const override { |
| - // Even though this is a SingleThreadTaskRunner, test the actual sequence |
| - // instead of the assigned worker so that another task randomly assigned |
| - // to the same worker doesn't return true by happenstance. |
| - return sequence_->token() == SequenceToken::GetForCurrentThread(); |
| - } |
| - |
| - private: |
| - ~SchedulerSingleThreadTaskRunner() override; |
| - |
| - // Sequence for all Tasks posted through this TaskRunner. |
| - const scoped_refptr<Sequence> sequence_ = new Sequence; |
| - |
| - const TaskTraits traits_; |
| - SchedulerWorkerPool* const worker_pool_; |
| - SchedulerWorker* const worker_; |
| - |
| - DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| -}; |
| - |
| class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| : public SchedulerWorker::Delegate { |
| public: |
| // |outer| owns the worker for which this delegate is constructed. |
| // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
| - // called with a non-single-threaded Sequence. |shared_priority_queue| is a |
| - // PriorityQueue whose transactions may overlap with the worker's |
| - // single-threaded PriorityQueue's transactions. |index| will be appended to |
| - // the pool name to label the underlying worker threads. |
| + // called. |index| will be appended to the pool name to label the underlying |
| + // worker threads. |
| SchedulerWorkerDelegateImpl( |
| SchedulerWorkerPoolImpl* outer, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| - const PriorityQueue* shared_priority_queue, |
| int index); |
| ~SchedulerWorkerDelegateImpl() override; |
| - PriorityQueue* single_threaded_priority_queue() { |
| - return &single_threaded_priority_queue_; |
| - } |
| - |
| // SchedulerWorker::Delegate: |
| void OnMainEntry(SchedulerWorker* worker) override; |
| scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
| @@ -222,28 +161,10 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| bool CanDetach(SchedulerWorker* worker) override; |
| void OnDetach() override; |
| - void RegisterSingleThreadTaskRunner() { |
| - // No barrier as barriers only affect sequential consistency which is |
| - // irrelevant in a single variable use case (they don't force an immediate |
| - // flush anymore than atomics do by default). |
| - subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
| - } |
| - |
| - void UnregisterSingleThreadTaskRunner() { |
| - subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
| - } |
| - |
| private: |
| SchedulerWorkerPoolImpl* outer_; |
| const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
| - // Single-threaded PriorityQueue for the worker. |
| - PriorityQueue single_threaded_priority_queue_; |
| - |
| - // True if the last Sequence returned by GetWork() was extracted from |
| - // |single_threaded_priority_queue_|. |
| - bool last_sequence_is_single_threaded_ = false; |
| - |
| // Time of the last detach. |
| TimeTicks last_detach_time_; |
| @@ -265,8 +186,6 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| // TaskScheduler.NumTasksBeforeDetach histogram was recorded. |
| size_t num_tasks_since_last_detach_ = 0; |
| - subtle::Atomic32 num_single_threaded_runners_ = 0; |
| - |
| const int index_; |
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
| @@ -302,21 +221,6 @@ SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( |
| return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
| } |
| -scoped_refptr<SingleThreadTaskRunner> |
| -SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits( |
| - const TaskTraits& traits) { |
| - // TODO(fdoray): Find a way to take load into account when assigning a |
| - // SchedulerWorker to a SingleThreadTaskRunner. |
| - size_t worker_index; |
| - { |
| - AutoSchedulerLock auto_lock(next_worker_index_lock_); |
| - worker_index = next_worker_index_; |
| - next_worker_index_ = (next_worker_index_ + 1) % workers_.size(); |
| - } |
| - return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( |
| - traits, this, workers_[worker_index].get())); |
| -} |
| - |
| void SchedulerWorkerPoolImpl::ReEnqueueSequence( |
| scoped_refptr<Sequence> sequence, |
| const SequenceSortKey& sequence_sort_key) { |
| @@ -337,27 +241,25 @@ void SchedulerWorkerPoolImpl::ReEnqueueSequence( |
| bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
| std::unique_ptr<Task> task, |
| - scoped_refptr<Sequence> sequence, |
| - SchedulerWorker* worker) { |
| + scoped_refptr<Sequence> sequence) { |
| DCHECK(task); |
| DCHECK(sequence); |
| - DCHECK(!worker || ContainsWorker(workers_, worker)); |
| if (!task_tracker_->WillPostTask(task.get())) |
| return false; |
| if (task->delayed_run_time.is_null()) { |
| - PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker); |
| + PostTaskWithSequenceNow(std::move(task), std::move(sequence)); |
| } else { |
| delayed_task_manager_->AddDelayedTask( |
| std::move(task), |
| Bind( |
| - [](scoped_refptr<Sequence> sequence, SchedulerWorker* worker, |
| + [](scoped_refptr<Sequence> sequence, |
| SchedulerWorkerPool* worker_pool, std::unique_ptr<Task> task) { |
| worker_pool->PostTaskWithSequenceNow(std::move(task), |
| - std::move(sequence), worker); |
| + std::move(sequence)); |
| }, |
| - std::move(sequence), Unretained(worker), Unretained(this))); |
| + std::move(sequence), Unretained(this))); |
| } |
| return true; |
| @@ -365,42 +267,27 @@ bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
| void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
| std::unique_ptr<Task> task, |
| - scoped_refptr<Sequence> sequence, |
| - SchedulerWorker* worker) { |
| + scoped_refptr<Sequence> sequence) { |
| DCHECK(task); |
| DCHECK(sequence); |
| - DCHECK(!worker || ContainsWorker(workers_, worker)); |
| // Confirm that |task| is ready to run (its delayed run time is either null or |
| // in the past). |
| DCHECK_LE(task->delayed_run_time, TimeTicks::Now()); |
| - // Because |worker| belongs to this worker pool, we know that the type |
| - // of its delegate is SchedulerWorkerDelegateImpl. |
| - PriorityQueue* const priority_queue = |
| - worker |
| - ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()) |
| - ->single_threaded_priority_queue() |
| - : &shared_priority_queue_; |
| - DCHECK(priority_queue); |
| - |
| const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
| if (sequence_was_empty) { |
| - // Insert |sequence| in |priority_queue| if it was empty before |task| was |
| - // inserted into it. Otherwise, one of these must be true: |
| - // - |sequence| is already in a PriorityQueue (not necessarily |
| - // |shared_priority_queue_|), or, |
| + // Insert |sequence| in |shared_priority_queue_| if it was empty before |
| + // |task| was inserted into it. Otherwise, one of these must be true: |
| + // - |sequence| is already in a PriorityQueue, or, |
| // - A worker is running a Task from |sequence|. It will insert |sequence| |
| // in a PriorityQueue once it's done running the Task. |
| const auto sequence_sort_key = sequence->GetSortKey(); |
| - priority_queue->BeginTransaction()->Push(std::move(sequence), |
| - sequence_sort_key); |
| + shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
| + sequence_sort_key); |
| // Wake up a worker to process |sequence|. |
|
gab
2017/03/15 20:20:58
rm now redundant comment
robliao
2017/03/15 20:46:44
This comment is consistent with the current style
gab
2017/03/16 15:34:46
Hmm ok, we at least shouldn't add more (and I'm ha
robliao
2017/03/16 22:49:03
While I agree with your opinion, the style guide i
|
| - if (worker) |
| - WakeUpWorker(worker); |
| - else |
| - WakeUpOneWorker(); |
| + WakeUpOneWorker(); |
| } |
| } |
| @@ -443,34 +330,13 @@ size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { |
| return num_alive_workers; |
| } |
| -SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| - SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| - SchedulerWorkerPool* worker_pool, |
| - SchedulerWorker* worker) |
| - : traits_(traits), |
| - worker_pool_(worker_pool), |
| - worker_(worker) { |
| - DCHECK(worker_pool_); |
| - DCHECK(worker_); |
| - static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| - RegisterSingleThreadTaskRunner(); |
| -} |
| - |
| -SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| - ~SchedulerSingleThreadTaskRunner() { |
| - static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| - UnregisterSingleThreadTaskRunner(); |
| -} |
| - |
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| SchedulerWorkerDelegateImpl( |
| SchedulerWorkerPoolImpl* outer, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| - const PriorityQueue* shared_priority_queue, |
| int index) |
| : outer_(outer), |
| re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
| - single_threaded_priority_queue_(shared_priority_queue), |
| index_(index) {} |
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| @@ -528,13 +394,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
| { |
| std::unique_ptr<PriorityQueue::Transaction> shared_transaction( |
| outer_->shared_priority_queue_.BeginTransaction()); |
|
gab
2017/03/15 20:20:58
This was the only use case of transaction I believ
robliao
2017/03/15 20:46:44
Correct. This is focused on just SchedulerWorkerPo
gab
2017/03/16 15:34:45
Unless you have a CL doing this now I still prefer
robliao
2017/03/16 22:49:03
In progress at the moment!
gab
2017/03/20 16:40:58
Ok but for future reference a comment in code refe
|
| - std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( |
| - single_threaded_priority_queue_.BeginTransaction()); |
| - |
| - if (shared_transaction->IsEmpty() && |
| - single_threaded_transaction->IsEmpty()) { |
| - single_threaded_transaction.reset(); |
| + if (shared_transaction->IsEmpty()) { |
| // |shared_transaction| is kept alive while |worker| is added to |
| // |idle_workers_stack_| to avoid this race: |
| // 1. This thread creates a Transaction, finds |shared_priority_queue_| |
| @@ -555,23 +416,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
| return nullptr; |
| } |
| - // True if both PriorityQueues have Sequences and the Sequence at the top of |
| - // the shared PriorityQueue is more important. |
| - const bool shared_sequence_is_more_important = |
| - !shared_transaction->IsEmpty() && |
| - !single_threaded_transaction->IsEmpty() && |
| - shared_transaction->PeekSortKey() > |
| - single_threaded_transaction->PeekSortKey(); |
| - |
| - if (single_threaded_transaction->IsEmpty() || |
| - shared_sequence_is_more_important) { |
| - sequence = shared_transaction->PopSequence(); |
| - last_sequence_is_single_threaded_ = false; |
| - } else { |
| - DCHECK(!single_threaded_transaction->IsEmpty()); |
| - sequence = single_threaded_transaction->PopSequence(); |
| - last_sequence_is_single_threaded_ = true; |
| - } |
| + sequence = shared_transaction->PopSequence(); |
| } |
| DCHECK(sequence); |
| @@ -590,17 +435,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { |
| void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
| - if (last_sequence_is_single_threaded_) { |
| - // A single-threaded Sequence is always re-enqueued in the single-threaded |
| - // PriorityQueue from which it was extracted. |
| - const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); |
| - single_threaded_priority_queue_.BeginTransaction()->Push( |
| - std::move(sequence), sequence_sort_key); |
| - } else { |
| - // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
| - // |sequence| must be enqueued. |
| - re_enqueue_sequence_callback_.Run(std::move(sequence)); |
| - } |
| + // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
| + // |sequence| must be enqueued. |
| + re_enqueue_sequence_callback_.Run(std::move(sequence)); |
| } |
| TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| @@ -610,15 +447,10 @@ TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
| SchedulerWorker* worker) { |
| - // It's not an issue if |num_single_threaded_runners_| is incremented after |
| - // this because the newly created SingleThreadTaskRunner (from which no task |
| - // has run yet) will simply run all its tasks on the next physical thread |
| - // created by the worker. |
| const bool can_detach = |
| !idle_start_time_.is_null() && |
| (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && |
| worker != outer_->PeekAtIdleWorkersStack() && |
| - !subtle::NoBarrier_Load(&num_single_threaded_runners_) && |
| outer_->CanWorkerDetachForTesting(); |
| return can_detach; |
| } |
| @@ -700,7 +532,7 @@ bool SchedulerWorkerPoolImpl::Initialize( |
| scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( |
| params.priority_hint(), |
| MakeUnique<SchedulerWorkerDelegateImpl>( |
| - this, re_enqueue_sequence_callback, &shared_priority_queue_, index), |
| + this, re_enqueue_sequence_callback, index), |
| task_tracker_, initial_state, params.backward_compatibility()); |
| if (!worker) |
| break; |
| @@ -715,14 +547,6 @@ bool SchedulerWorkerPoolImpl::Initialize( |
| return !workers_.empty(); |
| } |
| -void SchedulerWorkerPoolImpl::WakeUpWorker(SchedulerWorker* worker) { |
| - DCHECK(worker); |
| - RemoveFromIdleWorkersStack(worker); |
| - worker->WakeUp(); |
| - // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding |
| - // hysteresis to the CanDetach check. See https://crbug.com/666041. |
| -} |
| - |
| void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| SchedulerWorker* worker; |
| { |
| @@ -731,6 +555,8 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| } |
| if (worker) |
| worker->WakeUp(); |
| + // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding |
|
gab
2017/03/15 20:20:58
Shouldn't this TODO be in CanDetach()?
robliao
2017/03/15 20:46:44
This was the best place to put it that respected t
|
| + // hysteresis to the CanDetach check. See https://crbug.com/666041. |
| } |
| void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |