Index: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
diff --git a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
index 71f9c0bfb9383e4e16b5f21f02c29b70cbfcd566..dcacf879dd43c0911aabbf775d01d1c6b8c24df7 100644 |
--- a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
+++ b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
@@ -64,15 +64,17 @@ class SchedulerWorkerDelegate : public SchedulerWorker::Delegate { |
scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { |
AutoSchedulerLock auto_lock(sequence_lock_); |
- return std::move(sequence_); |
+ bool has_work = has_work_; |
+ has_work_ = false; |
+ return has_work ? sequence_ : nullptr; |
} |
void DidRunTask() override {} |
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { |
AutoSchedulerLock auto_lock(sequence_lock_); |
- DCHECK(!sequence_); |
- sequence_ = std::move(sequence); |
+ DCHECK_EQ(sequence, sequence_); |
+ has_work_ = true; |
} |
TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } |
@@ -87,15 +89,22 @@ class SchedulerWorkerDelegate : public SchedulerWorker::Delegate { |
return thread_ref_checker_.IsCurrentThreadSameAsSetThread(); |
} |
+ void OnMainExit() override { |
+ // To reclaim skipped tasks on shutdown, we null out the sequence to allow |
+ // the tasks to destroy themselves. |
fdoray
2017/03/02 14:37:48
DCHECK(sequence->HasOneRef());
robliao
2017/03/03 04:24:17
In general for refcounted things, it's a dangerous
|
+ sequence_ = nullptr; |
+ } |
+ |
+ // SchedulerWorkerDelegate: |
+ Sequence* sequence() { return sequence_.get(); } |
+ |
private: |
const std::string thread_name_; |
- // Synchronizes access to |sequence_| and handles the fact that |
- // ReEnqueueSequence() is called on both the worker thread for reenqueuing |
- // the sequence and off of the worker thread to seed the sequence for |
- // GetWork(). |
+ // Synchronizes access to |sequence_| and |has_work_|. |
SchedulerLock sequence_lock_; |
- scoped_refptr<Sequence> sequence_; |
+ scoped_refptr<Sequence> sequence_ = new Sequence; |
+ bool has_work_ = false; |
AtomicThreadRefChecker thread_ref_checker_; |
@@ -121,7 +130,22 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
// SingleThreadTaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
const Closure& closure, |
- TimeDelta delay) override; |
+ TimeDelta delay) override { |
+ auto task = MakeUnique<Task>(from_here, closure, traits_, delay); |
+ task->single_thread_task_runner_ref = this; |
+ |
+ if (!outer_->task_tracker_->WillPostTask(task.get())) |
+ return false; |
+ |
+ if (task->delayed_run_time.is_null()) { |
+ PostTaskNow(std::move(task)); |
+ } else { |
+ outer_->delayed_task_manager_->AddDelayedTask( |
+ std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, |
+ Unretained(this))); |
+ } |
+ return true; |
+ } |
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
const Closure& closure, |
@@ -131,8 +155,7 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
} |
bool RunsTasksOnCurrentThread() const override { |
- auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
- return delegate->RunsTasksOnCurrentThread(); |
+ return GetDelegate()->RunsTasksOnCurrentThread(); |
} |
private: |
@@ -140,10 +163,18 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
outer_->UnregisterSchedulerWorker(worker_); |
} |
- void PostTaskNow(std::unique_ptr<Task> task); |
+ void PostTaskNow(std::unique_ptr<Task> task) { |
+ Sequence* sequence = GetDelegate()->sequence(); |
fdoray
2017/03/02 14:37:48
When shutdown completes, the thread exits and the
robliao
2017/03/03 04:24:17
Handled by making this a scoped_refptr. At this po
|
+ const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
+ if (sequence_was_empty) { |
+ GetDelegate()->ReEnqueueSequence(sequence); |
+ worker_->WakeUp(); |
+ } |
+ } |
- // Sequence for all Tasks posted through this TaskRunner. |
- const scoped_refptr<Sequence> sequence_ = new Sequence; |
+ SchedulerWorkerDelegate* GetDelegate() const { |
+ return static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
+ } |
SchedulerSingleThreadTaskRunnerManager* const outer_; |
const TaskTraits traits_; |
@@ -152,36 +183,6 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
}; |
-bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: |
- PostDelayedTask(const tracked_objects::Location& from_here, |
- const Closure& closure, |
- TimeDelta delay) { |
- auto task = MakeUnique<Task>(from_here, closure, traits_, delay); |
- task->single_thread_task_runner_ref = this; |
- |
- if (!outer_->task_tracker_->WillPostTask(task.get())) |
- return false; |
- |
- if (task->delayed_run_time.is_null()) { |
- PostTaskNow(std::move(task)); |
- } else { |
- outer_->delayed_task_manager_->AddDelayedTask( |
- std::move(task), |
- Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this))); |
- } |
- return true; |
-} |
- |
-void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: |
- PostTaskNow(std::unique_ptr<Task> task) { |
- const bool sequence_was_empty = sequence_->PushTask(std::move(task)); |
- if (sequence_was_empty) { |
- auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
- delegate->ReEnqueueSequence(sequence_); |
- worker_->WakeUp(); |
- } |
-} |
- |
SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( |
const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, |
const TaskScheduler::WorkerPoolIndexForTraitsCallback& |
@@ -201,8 +202,11 @@ SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( |
SchedulerSingleThreadTaskRunnerManager:: |
~SchedulerSingleThreadTaskRunnerManager() { |
- DCHECK(workers_.empty()) << "SchedulerSingleThreadTaskRunners must outlive " |
- "SchedulerSingleThreadTaskRunnerManager"; |
+ size_t workers_unregistered_during_join = |
+ subtle::NoBarrier_Load(&workers_unregistered_during_join_); |
+ DCHECK_EQ(workers_unregistered_during_join, workers_.size()) |
+ << "SchedulerSingleThreadTaskRunners must outlive " |
fdoray
2017/03/02 14:37:48
I think this would be clearer from a user point of
robliao
2017/03/03 04:24:17
While that's true, it doesn't make sense in the co
|
+ "SchedulerSingleThreadTaskRunnerManager"; |
} |
scoped_refptr<SingleThreadTaskRunner> |
@@ -254,9 +258,12 @@ void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( |
{ |
AutoSchedulerLock auto_lock(workers_lock_); |
- // We might be joining, so no-op this if |workers_| is empty. |
- if (workers_.empty()) |
+ // We might be joining, so record that a worker was unregistered for |
+ // verification at destruction. |
+ if (workers_.empty()) { |
+ subtle::NoBarrier_AtomicIncrement(&workers_unregistered_during_join_, 1); |
fdoray
2017/03/02 14:37:48
Use base::AtomicRefCountInc and base::AtomicRefCou
robliao
2017/03/03 04:24:17
Those functions are intended for reference countin
fdoray
2017/03/03 05:31:13
They want people to use base/memory/ref_counted.h
fdoray
2017/03/03 07:22:39
ping
robliao
2017/03/03 07:50:05
The intent of atomic_ref_count.h as written is to
|
return; |
+ } |
auto worker_iter = |
std::find_if(workers_.begin(), workers_.end(), |