Index: base/task_scheduler/scheduler_worker_pool_impl.cc |
diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc |
index 9a6a20b741097adf42e8ff7a5ebb6921ceaaeecd..56e1f7b54eb5e55522e94f047dddf0a7333a62ee 100644 |
--- a/base/task_scheduler/scheduler_worker_pool_impl.cc |
+++ b/base/task_scheduler/scheduler_worker_pool_impl.cc |
@@ -17,7 +17,6 @@ |
#include "base/metrics/histogram.h" |
#include "base/sequence_token.h" |
#include "base/sequenced_task_runner.h" |
-#include "base/single_thread_task_runner.h" |
#include "base/strings/stringprintf.h" |
#include "base/task_runner.h" |
#include "base/task_scheduler/delayed_task_manager.h" |
@@ -64,7 +63,7 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
// Post the task as part of a one-off single-task Sequence. |
return worker_pool_->PostTaskWithSequence( |
MakeUnique<Task>(from_here, closure, traits_, delay), |
- make_scoped_refptr(new Sequence), nullptr); |
+ make_scoped_refptr(new Sequence)); |
} |
bool RunsTasksOnCurrentThread() const override { |
@@ -100,8 +99,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
task->sequenced_task_runner_ref = this; |
// Post the task as part of |sequence_|. |
- return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
- nullptr); |
+ return worker_pool_->PostTaskWithSequence(std::move(task), sequence_); |
} |
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
@@ -141,78 +139,19 @@ bool ContainsWorker(const std::vector<scoped_refptr<SchedulerWorker>>& workers, |
} // namespace |
-// TODO(http://crbug.com/694823): Remove this and supporting framework. |
-// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
-class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
- public SingleThreadTaskRunner { |
- public: |
- // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
- // tasks so long as |worker_pool| and |worker| are alive. |
- // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
- // and |worker|. |
- SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
- SchedulerWorkerPool* worker_pool, |
- SchedulerWorker* worker); |
- |
- // SingleThreadTaskRunner: |
- bool PostDelayedTask(const tracked_objects::Location& from_here, |
- const Closure& closure, |
- TimeDelta delay) override { |
- std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
- task->single_thread_task_runner_ref = this; |
- |
- // Post the task to be executed by |worker_| as part of |sequence_|. |
- return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
- worker_); |
- } |
- |
- bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
- const Closure& closure, |
- base::TimeDelta delay) override { |
- // Tasks are never nested within the task scheduler. |
- return PostDelayedTask(from_here, closure, delay); |
- } |
- |
- bool RunsTasksOnCurrentThread() const override { |
- // Even though this is a SingleThreadTaskRunner, test the actual sequence |
- // instead of the assigned worker so that another task randomly assigned |
- // to the same worker doesn't return true by happenstance. |
- return sequence_->token() == SequenceToken::GetForCurrentThread(); |
- } |
- |
- private: |
- ~SchedulerSingleThreadTaskRunner() override; |
- |
- // Sequence for all Tasks posted through this TaskRunner. |
- const scoped_refptr<Sequence> sequence_ = new Sequence; |
- |
- const TaskTraits traits_; |
- SchedulerWorkerPool* const worker_pool_; |
- SchedulerWorker* const worker_; |
- |
- DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
-}; |
- |
class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
: public SchedulerWorker::Delegate { |
public: |
// |outer| owns the worker for which this delegate is constructed. |
// |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
- // called with a non-single-threaded Sequence. |shared_priority_queue| is a |
- // PriorityQueue whose transactions may overlap with the worker's |
- // single-threaded PriorityQueue's transactions. |index| will be appended to |
- // the pool name to label the underlying worker threads. |
+ // called. |index| will be appended to the pool name to label the underlying |
+ // worker threads. |
SchedulerWorkerDelegateImpl( |
SchedulerWorkerPoolImpl* outer, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
- const PriorityQueue* shared_priority_queue, |
int index); |
~SchedulerWorkerDelegateImpl() override; |
- PriorityQueue* single_threaded_priority_queue() { |
- return &single_threaded_priority_queue_; |
- } |
- |
// SchedulerWorker::Delegate: |
void OnMainEntry(SchedulerWorker* worker) override; |
scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
@@ -222,28 +161,10 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
bool CanDetach(SchedulerWorker* worker) override; |
void OnDetach() override; |
- void RegisterSingleThreadTaskRunner() { |
- // No barrier as barriers only affect sequential consistency which is |
- // irrelevant in a single variable use case (they don't force an immediate |
- // flush anymore than atomics do by default). |
- subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
- } |
- |
- void UnregisterSingleThreadTaskRunner() { |
- subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
- } |
- |
private: |
SchedulerWorkerPoolImpl* outer_; |
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
- // Single-threaded PriorityQueue for the worker. |
- PriorityQueue single_threaded_priority_queue_; |
- |
- // True if the last Sequence returned by GetWork() was extracted from |
- // |single_threaded_priority_queue_|. |
- bool last_sequence_is_single_threaded_ = false; |
- |
// Time of the last detach. |
TimeTicks last_detach_time_; |
@@ -265,8 +186,6 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
// TaskScheduler.NumTasksBeforeDetach histogram was recorded. |
size_t num_tasks_since_last_detach_ = 0; |
- subtle::Atomic32 num_single_threaded_runners_ = 0; |
- |
const int index_; |
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
@@ -302,21 +221,6 @@ SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( |
return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
} |
-scoped_refptr<SingleThreadTaskRunner> |
-SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits( |
- const TaskTraits& traits) { |
- // TODO(fdoray): Find a way to take load into account when assigning a |
- // SchedulerWorker to a SingleThreadTaskRunner. |
- size_t worker_index; |
- { |
- AutoSchedulerLock auto_lock(next_worker_index_lock_); |
- worker_index = next_worker_index_; |
- next_worker_index_ = (next_worker_index_ + 1) % workers_.size(); |
- } |
- return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( |
- traits, this, workers_[worker_index].get())); |
-} |
- |
void SchedulerWorkerPoolImpl::ReEnqueueSequence( |
scoped_refptr<Sequence> sequence, |
const SequenceSortKey& sequence_sort_key) { |
@@ -337,27 +241,25 @@ void SchedulerWorkerPoolImpl::ReEnqueueSequence( |
bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
std::unique_ptr<Task> task, |
- scoped_refptr<Sequence> sequence, |
- SchedulerWorker* worker) { |
+ scoped_refptr<Sequence> sequence) { |
DCHECK(task); |
DCHECK(sequence); |
- DCHECK(!worker || ContainsWorker(workers_, worker)); |
if (!task_tracker_->WillPostTask(task.get())) |
return false; |
if (task->delayed_run_time.is_null()) { |
- PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker); |
+ PostTaskWithSequenceNow(std::move(task), std::move(sequence)); |
} else { |
delayed_task_manager_->AddDelayedTask( |
std::move(task), |
Bind( |
- [](scoped_refptr<Sequence> sequence, SchedulerWorker* worker, |
+ [](scoped_refptr<Sequence> sequence, |
SchedulerWorkerPool* worker_pool, std::unique_ptr<Task> task) { |
worker_pool->PostTaskWithSequenceNow(std::move(task), |
- std::move(sequence), worker); |
+ std::move(sequence)); |
}, |
- std::move(sequence), Unretained(worker), Unretained(this))); |
+ std::move(sequence), Unretained(this))); |
} |
return true; |
@@ -365,42 +267,27 @@ bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
std::unique_ptr<Task> task, |
- scoped_refptr<Sequence> sequence, |
- SchedulerWorker* worker) { |
+ scoped_refptr<Sequence> sequence) { |
DCHECK(task); |
DCHECK(sequence); |
- DCHECK(!worker || ContainsWorker(workers_, worker)); |
// Confirm that |task| is ready to run (its delayed run time is either null or |
// in the past). |
DCHECK_LE(task->delayed_run_time, TimeTicks::Now()); |
- // Because |worker| belongs to this worker pool, we know that the type |
- // of its delegate is SchedulerWorkerDelegateImpl. |
- PriorityQueue* const priority_queue = |
- worker |
- ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()) |
- ->single_threaded_priority_queue() |
- : &shared_priority_queue_; |
- DCHECK(priority_queue); |
- |
const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
if (sequence_was_empty) { |
- // Insert |sequence| in |priority_queue| if it was empty before |task| was |
- // inserted into it. Otherwise, one of these must be true: |
- // - |sequence| is already in a PriorityQueue (not necessarily |
- // |shared_priority_queue_|), or, |
+ // Insert |sequence| in |shared_priority_queue_| if it was empty before |
+ // |task| was inserted into it. Otherwise, one of these must be true: |
+ // - |sequence| is already in a PriorityQueue, or, |
// - A worker is running a Task from |sequence|. It will insert |sequence| |
// in a PriorityQueue once it's done running the Task. |
const auto sequence_sort_key = sequence->GetSortKey(); |
- priority_queue->BeginTransaction()->Push(std::move(sequence), |
- sequence_sort_key); |
+ shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
+ sequence_sort_key); |
// Wake up a worker to process |sequence|. |
gab
2017/03/15 20:20:58
rm now redundant comment
robliao
2017/03/15 20:46:44
This comment is consistent with the current style
gab
2017/03/16 15:34:46
Hmm ok, we at least shouldn't add more (and I'm ha
robliao
2017/03/16 22:49:03
While I agree with your opinion, the style guide i
|
- if (worker) |
- WakeUpWorker(worker); |
- else |
- WakeUpOneWorker(); |
+ WakeUpOneWorker(); |
} |
} |
@@ -443,34 +330,13 @@ size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { |
return num_alive_workers; |
} |
-SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
- SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
- SchedulerWorkerPool* worker_pool, |
- SchedulerWorker* worker) |
- : traits_(traits), |
- worker_pool_(worker_pool), |
- worker_(worker) { |
- DCHECK(worker_pool_); |
- DCHECK(worker_); |
- static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
- RegisterSingleThreadTaskRunner(); |
-} |
- |
-SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
- ~SchedulerSingleThreadTaskRunner() { |
- static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
- UnregisterSingleThreadTaskRunner(); |
-} |
- |
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
SchedulerWorkerDelegateImpl( |
SchedulerWorkerPoolImpl* outer, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
- const PriorityQueue* shared_priority_queue, |
int index) |
: outer_(outer), |
re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
- single_threaded_priority_queue_(shared_priority_queue), |
index_(index) {} |
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
@@ -528,13 +394,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
{ |
std::unique_ptr<PriorityQueue::Transaction> shared_transaction( |
outer_->shared_priority_queue_.BeginTransaction()); |
gab
2017/03/15 20:20:58
This was the only use case of transaction I believ
robliao
2017/03/15 20:46:44
Correct. This is focused on just SchedulerWorkerPo
gab
2017/03/16 15:34:45
Unless you have a CL doing this now I still prefer
robliao
2017/03/16 22:49:03
In progress at the moment!
gab
2017/03/20 16:40:58
Ok but for future reference a comment in code refe
|
- std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( |
- single_threaded_priority_queue_.BeginTransaction()); |
- |
- if (shared_transaction->IsEmpty() && |
- single_threaded_transaction->IsEmpty()) { |
- single_threaded_transaction.reset(); |
+ if (shared_transaction->IsEmpty()) { |
// |shared_transaction| is kept alive while |worker| is added to |
// |idle_workers_stack_| to avoid this race: |
// 1. This thread creates a Transaction, finds |shared_priority_queue_| |
@@ -555,23 +416,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
return nullptr; |
} |
- // True if both PriorityQueues have Sequences and the Sequence at the top of |
- // the shared PriorityQueue is more important. |
- const bool shared_sequence_is_more_important = |
- !shared_transaction->IsEmpty() && |
- !single_threaded_transaction->IsEmpty() && |
- shared_transaction->PeekSortKey() > |
- single_threaded_transaction->PeekSortKey(); |
- |
- if (single_threaded_transaction->IsEmpty() || |
- shared_sequence_is_more_important) { |
- sequence = shared_transaction->PopSequence(); |
- last_sequence_is_single_threaded_ = false; |
- } else { |
- DCHECK(!single_threaded_transaction->IsEmpty()); |
- sequence = single_threaded_transaction->PopSequence(); |
- last_sequence_is_single_threaded_ = true; |
- } |
+ sequence = shared_transaction->PopSequence(); |
} |
DCHECK(sequence); |
@@ -590,17 +435,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { |
void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
- if (last_sequence_is_single_threaded_) { |
- // A single-threaded Sequence is always re-enqueued in the single-threaded |
- // PriorityQueue from which it was extracted. |
- const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); |
- single_threaded_priority_queue_.BeginTransaction()->Push( |
- std::move(sequence), sequence_sort_key); |
- } else { |
- // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
- // |sequence| must be enqueued. |
- re_enqueue_sequence_callback_.Run(std::move(sequence)); |
- } |
+ // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
+ // |sequence| must be enqueued. |
+ re_enqueue_sequence_callback_.Run(std::move(sequence)); |
} |
TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
@@ -610,15 +447,10 @@ TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
SchedulerWorker* worker) { |
- // It's not an issue if |num_single_threaded_runners_| is incremented after |
- // this because the newly created SingleThreadTaskRunner (from which no task |
- // has run yet) will simply run all its tasks on the next physical thread |
- // created by the worker. |
const bool can_detach = |
!idle_start_time_.is_null() && |
(TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && |
worker != outer_->PeekAtIdleWorkersStack() && |
- !subtle::NoBarrier_Load(&num_single_threaded_runners_) && |
outer_->CanWorkerDetachForTesting(); |
return can_detach; |
} |
@@ -700,7 +532,7 @@ bool SchedulerWorkerPoolImpl::Initialize( |
scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( |
params.priority_hint(), |
MakeUnique<SchedulerWorkerDelegateImpl>( |
- this, re_enqueue_sequence_callback, &shared_priority_queue_, index), |
+ this, re_enqueue_sequence_callback, index), |
task_tracker_, initial_state, params.backward_compatibility()); |
if (!worker) |
break; |
@@ -715,14 +547,6 @@ bool SchedulerWorkerPoolImpl::Initialize( |
return !workers_.empty(); |
} |
-void SchedulerWorkerPoolImpl::WakeUpWorker(SchedulerWorker* worker) { |
- DCHECK(worker); |
- RemoveFromIdleWorkersStack(worker); |
- worker->WakeUp(); |
- // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding |
- // hysteresis to the CanDetach check. See https://crbug.com/666041. |
-} |
- |
void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
SchedulerWorker* worker; |
{ |
@@ -731,6 +555,8 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
} |
if (worker) |
worker->WakeUp(); |
+ // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding |
gab
2017/03/15 20:20:58
Shouldn't this TODO be in CanDetach()?
robliao
2017/03/15 20:46:44
This was the best place to put it that respected t
|
+ // hysteresis to the CanDetach check. See https://crbug.com/666041. |
} |
void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |