| Index: base/task_scheduler/scheduler_worker_pool_impl.cc
|
| diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc
|
| index 9a6a20b741097adf42e8ff7a5ebb6921ceaaeecd..56e1f7b54eb5e55522e94f047dddf0a7333a62ee 100644
|
| --- a/base/task_scheduler/scheduler_worker_pool_impl.cc
|
| +++ b/base/task_scheduler/scheduler_worker_pool_impl.cc
|
| @@ -17,7 +17,6 @@
|
| #include "base/metrics/histogram.h"
|
| #include "base/sequence_token.h"
|
| #include "base/sequenced_task_runner.h"
|
| -#include "base/single_thread_task_runner.h"
|
| #include "base/strings/stringprintf.h"
|
| #include "base/task_runner.h"
|
| #include "base/task_scheduler/delayed_task_manager.h"
|
| @@ -64,7 +63,7 @@ class SchedulerParallelTaskRunner : public TaskRunner {
|
| // Post the task as part of a one-off single-task Sequence.
|
| return worker_pool_->PostTaskWithSequence(
|
| MakeUnique<Task>(from_here, closure, traits_, delay),
|
| - make_scoped_refptr(new Sequence), nullptr);
|
| + make_scoped_refptr(new Sequence));
|
| }
|
|
|
| bool RunsTasksOnCurrentThread() const override {
|
| @@ -100,8 +99,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
|
| task->sequenced_task_runner_ref = this;
|
|
|
| // Post the task as part of |sequence_|.
|
| - return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
|
| - nullptr);
|
| + return worker_pool_->PostTaskWithSequence(std::move(task), sequence_);
|
| }
|
|
|
| bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
|
| @@ -141,78 +139,19 @@ bool ContainsWorker(const std::vector<scoped_refptr<SchedulerWorker>>& workers,
|
|
|
| } // namespace
|
|
|
| -// TODO(http://crbug.com/694823): Remove this and supporting framework.
|
| -// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
|
| -class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner :
|
| - public SingleThreadTaskRunner {
|
| - public:
|
| - // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
|
| - // tasks so long as |worker_pool| and |worker| are alive.
|
| - // TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
|
| - // and |worker|.
|
| - SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
|
| - SchedulerWorkerPool* worker_pool,
|
| - SchedulerWorker* worker);
|
| -
|
| - // SingleThreadTaskRunner:
|
| - bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| - const Closure& closure,
|
| - TimeDelta delay) override {
|
| - std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
|
| - task->single_thread_task_runner_ref = this;
|
| -
|
| - // Post the task to be executed by |worker_| as part of |sequence_|.
|
| - return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
|
| - worker_);
|
| - }
|
| -
|
| - bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
|
| - const Closure& closure,
|
| - base::TimeDelta delay) override {
|
| - // Tasks are never nested within the task scheduler.
|
| - return PostDelayedTask(from_here, closure, delay);
|
| - }
|
| -
|
| - bool RunsTasksOnCurrentThread() const override {
|
| - // Even though this is a SingleThreadTaskRunner, test the actual sequence
|
| - // instead of the assigned worker so that another task randomly assigned
|
| - // to the same worker doesn't return true by happenstance.
|
| - return sequence_->token() == SequenceToken::GetForCurrentThread();
|
| - }
|
| -
|
| - private:
|
| - ~SchedulerSingleThreadTaskRunner() override;
|
| -
|
| - // Sequence for all Tasks posted through this TaskRunner.
|
| - const scoped_refptr<Sequence> sequence_ = new Sequence;
|
| -
|
| - const TaskTraits traits_;
|
| - SchedulerWorkerPool* const worker_pool_;
|
| - SchedulerWorker* const worker_;
|
| -
|
| - DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
|
| -};
|
| -
|
| class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
|
| : public SchedulerWorker::Delegate {
|
| public:
|
| // |outer| owns the worker for which this delegate is constructed.
|
| // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is
|
| - // called with a non-single-threaded Sequence. |shared_priority_queue| is a
|
| - // PriorityQueue whose transactions may overlap with the worker's
|
| - // single-threaded PriorityQueue's transactions. |index| will be appended to
|
| - // the pool name to label the underlying worker threads.
|
| + // called. |index| will be appended to the pool name to label the underlying
|
| + // worker threads.
|
| SchedulerWorkerDelegateImpl(
|
| SchedulerWorkerPoolImpl* outer,
|
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
|
| - const PriorityQueue* shared_priority_queue,
|
| int index);
|
| ~SchedulerWorkerDelegateImpl() override;
|
|
|
| - PriorityQueue* single_threaded_priority_queue() {
|
| - return &single_threaded_priority_queue_;
|
| - }
|
| -
|
| // SchedulerWorker::Delegate:
|
| void OnMainEntry(SchedulerWorker* worker) override;
|
| scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override;
|
| @@ -222,28 +161,10 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
|
| bool CanDetach(SchedulerWorker* worker) override;
|
| void OnDetach() override;
|
|
|
| - void RegisterSingleThreadTaskRunner() {
|
| - // No barrier as barriers only affect sequential consistency which is
|
| - // irrelevant in a single variable use case (they don't force an immediate
|
| - // flush anymore than atomics do by default).
|
| - subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1);
|
| - }
|
| -
|
| - void UnregisterSingleThreadTaskRunner() {
|
| - subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1);
|
| - }
|
| -
|
| private:
|
| SchedulerWorkerPoolImpl* outer_;
|
| const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
|
|
|
| - // Single-threaded PriorityQueue for the worker.
|
| - PriorityQueue single_threaded_priority_queue_;
|
| -
|
| - // True if the last Sequence returned by GetWork() was extracted from
|
| - // |single_threaded_priority_queue_|.
|
| - bool last_sequence_is_single_threaded_ = false;
|
| -
|
| // Time of the last detach.
|
| TimeTicks last_detach_time_;
|
|
|
| @@ -265,8 +186,6 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
|
| // TaskScheduler.NumTasksBeforeDetach histogram was recorded.
|
| size_t num_tasks_since_last_detach_ = 0;
|
|
|
| - subtle::Atomic32 num_single_threaded_runners_ = 0;
|
| -
|
| const int index_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
|
| @@ -302,21 +221,6 @@ SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits(
|
| return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
|
| }
|
|
|
| -scoped_refptr<SingleThreadTaskRunner>
|
| -SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits(
|
| - const TaskTraits& traits) {
|
| - // TODO(fdoray): Find a way to take load into account when assigning a
|
| - // SchedulerWorker to a SingleThreadTaskRunner.
|
| - size_t worker_index;
|
| - {
|
| - AutoSchedulerLock auto_lock(next_worker_index_lock_);
|
| - worker_index = next_worker_index_;
|
| - next_worker_index_ = (next_worker_index_ + 1) % workers_.size();
|
| - }
|
| - return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
|
| - traits, this, workers_[worker_index].get()));
|
| -}
|
| -
|
| void SchedulerWorkerPoolImpl::ReEnqueueSequence(
|
| scoped_refptr<Sequence> sequence,
|
| const SequenceSortKey& sequence_sort_key) {
|
| @@ -337,27 +241,25 @@ void SchedulerWorkerPoolImpl::ReEnqueueSequence(
|
|
|
| bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
|
| std::unique_ptr<Task> task,
|
| - scoped_refptr<Sequence> sequence,
|
| - SchedulerWorker* worker) {
|
| + scoped_refptr<Sequence> sequence) {
|
| DCHECK(task);
|
| DCHECK(sequence);
|
| - DCHECK(!worker || ContainsWorker(workers_, worker));
|
|
|
| if (!task_tracker_->WillPostTask(task.get()))
|
| return false;
|
|
|
| if (task->delayed_run_time.is_null()) {
|
| - PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker);
|
| + PostTaskWithSequenceNow(std::move(task), std::move(sequence));
|
| } else {
|
| delayed_task_manager_->AddDelayedTask(
|
| std::move(task),
|
| Bind(
|
| - [](scoped_refptr<Sequence> sequence, SchedulerWorker* worker,
|
| + [](scoped_refptr<Sequence> sequence,
|
| SchedulerWorkerPool* worker_pool, std::unique_ptr<Task> task) {
|
| worker_pool->PostTaskWithSequenceNow(std::move(task),
|
| - std::move(sequence), worker);
|
| + std::move(sequence));
|
| },
|
| - std::move(sequence), Unretained(worker), Unretained(this)));
|
| + std::move(sequence), Unretained(this)));
|
| }
|
|
|
| return true;
|
| @@ -365,42 +267,27 @@ bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
|
|
|
| void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
|
| std::unique_ptr<Task> task,
|
| - scoped_refptr<Sequence> sequence,
|
| - SchedulerWorker* worker) {
|
| + scoped_refptr<Sequence> sequence) {
|
| DCHECK(task);
|
| DCHECK(sequence);
|
| - DCHECK(!worker || ContainsWorker(workers_, worker));
|
|
|
| // Confirm that |task| is ready to run (its delayed run time is either null or
|
| // in the past).
|
| DCHECK_LE(task->delayed_run_time, TimeTicks::Now());
|
|
|
| - // Because |worker| belongs to this worker pool, we know that the type
|
| - // of its delegate is SchedulerWorkerDelegateImpl.
|
| - PriorityQueue* const priority_queue =
|
| - worker
|
| - ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate())
|
| - ->single_threaded_priority_queue()
|
| - : &shared_priority_queue_;
|
| - DCHECK(priority_queue);
|
| -
|
| const bool sequence_was_empty = sequence->PushTask(std::move(task));
|
| if (sequence_was_empty) {
|
| - // Insert |sequence| in |priority_queue| if it was empty before |task| was
|
| - // inserted into it. Otherwise, one of these must be true:
|
| - // - |sequence| is already in a PriorityQueue (not necessarily
|
| - // |shared_priority_queue_|), or,
|
| + // Insert |sequence| in |shared_priority_queue_| if it was empty before
|
| + // |task| was inserted into it. Otherwise, one of these must be true:
|
| + // - |sequence| is already in a PriorityQueue, or,
|
| // - A worker is running a Task from |sequence|. It will insert |sequence|
|
| // in a PriorityQueue once it's done running the Task.
|
| const auto sequence_sort_key = sequence->GetSortKey();
|
| - priority_queue->BeginTransaction()->Push(std::move(sequence),
|
| - sequence_sort_key);
|
| + shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
|
| + sequence_sort_key);
|
|
|
| // Wake up a worker to process |sequence|.
|
| - if (worker)
|
| - WakeUpWorker(worker);
|
| - else
|
| - WakeUpOneWorker();
|
| + WakeUpOneWorker();
|
| }
|
| }
|
|
|
| @@ -443,34 +330,13 @@ size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() {
|
| return num_alive_workers;
|
| }
|
|
|
| -SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
|
| - SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
|
| - SchedulerWorkerPool* worker_pool,
|
| - SchedulerWorker* worker)
|
| - : traits_(traits),
|
| - worker_pool_(worker_pool),
|
| - worker_(worker) {
|
| - DCHECK(worker_pool_);
|
| - DCHECK(worker_);
|
| - static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
|
| - RegisterSingleThreadTaskRunner();
|
| -}
|
| -
|
| -SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
|
| - ~SchedulerSingleThreadTaskRunner() {
|
| - static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
|
| - UnregisterSingleThreadTaskRunner();
|
| -}
|
| -
|
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
|
| SchedulerWorkerDelegateImpl(
|
| SchedulerWorkerPoolImpl* outer,
|
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
|
| - const PriorityQueue* shared_priority_queue,
|
| int index)
|
| : outer_(outer),
|
| re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
|
| - single_threaded_priority_queue_(shared_priority_queue),
|
| index_(index) {}
|
|
|
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
|
| @@ -528,13 +394,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
|
| {
|
| std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
|
| outer_->shared_priority_queue_.BeginTransaction());
|
| - std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction(
|
| - single_threaded_priority_queue_.BeginTransaction());
|
| -
|
| - if (shared_transaction->IsEmpty() &&
|
| - single_threaded_transaction->IsEmpty()) {
|
| - single_threaded_transaction.reset();
|
|
|
| + if (shared_transaction->IsEmpty()) {
|
| // |shared_transaction| is kept alive while |worker| is added to
|
| // |idle_workers_stack_| to avoid this race:
|
| // 1. This thread creates a Transaction, finds |shared_priority_queue_|
|
| @@ -555,23 +416,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
|
| return nullptr;
|
| }
|
|
|
| - // True if both PriorityQueues have Sequences and the Sequence at the top of
|
| - // the shared PriorityQueue is more important.
|
| - const bool shared_sequence_is_more_important =
|
| - !shared_transaction->IsEmpty() &&
|
| - !single_threaded_transaction->IsEmpty() &&
|
| - shared_transaction->PeekSortKey() >
|
| - single_threaded_transaction->PeekSortKey();
|
| -
|
| - if (single_threaded_transaction->IsEmpty() ||
|
| - shared_sequence_is_more_important) {
|
| - sequence = shared_transaction->PopSequence();
|
| - last_sequence_is_single_threaded_ = false;
|
| - } else {
|
| - DCHECK(!single_threaded_transaction->IsEmpty());
|
| - sequence = single_threaded_transaction->PopSequence();
|
| - last_sequence_is_single_threaded_ = true;
|
| - }
|
| + sequence = shared_transaction->PopSequence();
|
| }
|
| DCHECK(sequence);
|
|
|
| @@ -590,17 +435,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() {
|
|
|
| void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
|
| ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
|
| - if (last_sequence_is_single_threaded_) {
|
| - // A single-threaded Sequence is always re-enqueued in the single-threaded
|
| - // PriorityQueue from which it was extracted.
|
| - const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
|
| - single_threaded_priority_queue_.BeginTransaction()->Push(
|
| - std::move(sequence), sequence_sort_key);
|
| - } else {
|
| - // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
|
| - // |sequence| must be enqueued.
|
| - re_enqueue_sequence_callback_.Run(std::move(sequence));
|
| - }
|
| + // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
|
| + // |sequence| must be enqueued.
|
| + re_enqueue_sequence_callback_.Run(std::move(sequence));
|
| }
|
|
|
| TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
|
| @@ -610,15 +447,10 @@ TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
|
|
|
| bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
|
| SchedulerWorker* worker) {
|
| - // It's not an issue if |num_single_threaded_runners_| is incremented after
|
| - // this because the newly created SingleThreadTaskRunner (from which no task
|
| - // has run yet) will simply run all its tasks on the next physical thread
|
| - // created by the worker.
|
| const bool can_detach =
|
| !idle_start_time_.is_null() &&
|
| (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ &&
|
| worker != outer_->PeekAtIdleWorkersStack() &&
|
| - !subtle::NoBarrier_Load(&num_single_threaded_runners_) &&
|
| outer_->CanWorkerDetachForTesting();
|
| return can_detach;
|
| }
|
| @@ -700,7 +532,7 @@ bool SchedulerWorkerPoolImpl::Initialize(
|
| scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create(
|
| params.priority_hint(),
|
| MakeUnique<SchedulerWorkerDelegateImpl>(
|
| - this, re_enqueue_sequence_callback, &shared_priority_queue_, index),
|
| + this, re_enqueue_sequence_callback, index),
|
| task_tracker_, initial_state, params.backward_compatibility());
|
| if (!worker)
|
| break;
|
| @@ -715,14 +547,6 @@ bool SchedulerWorkerPoolImpl::Initialize(
|
| return !workers_.empty();
|
| }
|
|
|
| -void SchedulerWorkerPoolImpl::WakeUpWorker(SchedulerWorker* worker) {
|
| - DCHECK(worker);
|
| - RemoveFromIdleWorkersStack(worker);
|
| - worker->WakeUp();
|
| - // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding
|
| - // hysteresis to the CanDetach check. See https://crbug.com/666041.
|
| -}
|
| -
|
| void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
|
| SchedulerWorker* worker;
|
| {
|
| @@ -731,6 +555,8 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
|
| }
|
| if (worker)
|
| worker->WakeUp();
|
| + // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding
|
| + // hysteresis to the CanDetach check. See https://crbug.com/666041.
|
| }
|
|
|
| void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
|
|
|