Index: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
diff --git a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
index 71f9c0bfb9383e4e16b5f21f02c29b70cbfcd566..0d9d55dbfab7f40183ff5f95b154c0b395868c8c 100644 |
--- a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
+++ b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
@@ -64,15 +64,22 @@ class SchedulerWorkerDelegate : public SchedulerWorker::Delegate { |
scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { |
AutoSchedulerLock auto_lock(sequence_lock_); |
- return std::move(sequence_); |
+ bool has_work = has_work_; |
+ has_work_ = false; |
+ return has_work ? sequence_ : nullptr; |
} |
void DidRunTask() override {} |
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { |
AutoSchedulerLock auto_lock(sequence_lock_); |
- DCHECK(!sequence_); |
- sequence_ = std::move(sequence); |
+ // We've shut down, so no-op this work request. Any sequence cleanup will |
+ // occur in the caller's context. |
+ if (!sequence_) |
+ return; |
+ |
+ DCHECK_EQ(sequence, sequence_); |
+ has_work_ = true; |
} |
TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } |
@@ -87,15 +94,35 @@ class SchedulerWorkerDelegate : public SchedulerWorker::Delegate { |
return thread_ref_checker_.IsCurrentThreadSameAsSetThread(); |
} |
+ void OnMainExit() override { |
+ // Move |sequence_| to |local_sequence| so that if we have the last |
+ // reference to the sequence we don't destroy it (and its tasks) within |
+ // |sequence_lock_|. |
+ scoped_refptr<Sequence> local_sequence; |
+ { |
+ AutoSchedulerLock auto_lock(sequence_lock_); |
+ // To reclaim skipped tasks on shutdown, we null out the sequence to allow |
+ // the tasks to destroy themselves. |
+ local_sequence = std::move(sequence_); |
+ } |
+ } |
+ |
+ // SchedulerWorkerDelegate: |
+ |
+ // Consumers should release their sequence reference as soon as possible to |
+ // ensure timely cleanup for general shutdown. |
+ scoped_refptr<Sequence> sequence() { |
+ AutoSchedulerLock auto_lock(sequence_lock_); |
+ return sequence_; |
+ } |
+ |
private: |
const std::string thread_name_; |
- // Synchronizes access to |sequence_| and handles the fact that |
- // ReEnqueueSequence() is called on both the worker thread for reenqueuing |
- // the sequence and off of the worker thread to seed the sequence for |
- // GetWork(). |
+ // Synchronizes access to |sequence_| and |has_work_|. |
SchedulerLock sequence_lock_; |
- scoped_refptr<Sequence> sequence_; |
+ scoped_refptr<Sequence> sequence_ = new Sequence; |
+ bool has_work_ = false; |
AtomicThreadRefChecker thread_ref_checker_; |
@@ -121,7 +148,22 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
// SingleThreadTaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
const Closure& closure, |
- TimeDelta delay) override; |
+ TimeDelta delay) override { |
+ auto task = MakeUnique<Task>(from_here, closure, traits_, delay); |
+ task->single_thread_task_runner_ref = this; |
+ |
+ if (!outer_->task_tracker_->WillPostTask(task.get())) |
+ return false; |
+ |
+ if (task->delayed_run_time.is_null()) { |
+ PostTaskNow(std::move(task)); |
+ } else { |
+ outer_->delayed_task_manager_->AddDelayedTask( |
+ std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, |
+ Unretained(this))); |
+ } |
+ return true; |
+ } |
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
const Closure& closure, |
@@ -131,8 +173,7 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
} |
bool RunsTasksOnCurrentThread() const override { |
- auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
- return delegate->RunsTasksOnCurrentThread(); |
+ return GetDelegate()->RunsTasksOnCurrentThread(); |
} |
private: |
@@ -140,10 +181,23 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
outer_->UnregisterSchedulerWorker(worker_); |
} |
- void PostTaskNow(std::unique_ptr<Task> task); |
+ void PostTaskNow(std::unique_ptr<Task> task) { |
+ scoped_refptr<Sequence> sequence = GetDelegate()->sequence(); |
+ // If |sequence| is null, then the thread is effectively gone (either |
+ // shutdown or joined). |
+ if (!sequence) |
+ return; |
- // Sequence for all Tasks posted through this TaskRunner. |
- const scoped_refptr<Sequence> sequence_ = new Sequence; |
+ const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
+ if (sequence_was_empty) { |
+ GetDelegate()->ReEnqueueSequence(std::move(sequence)); |
+ worker_->WakeUp(); |
+ } |
+ } |
+ |
+ SchedulerWorkerDelegate* GetDelegate() const { |
+ return static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
+ } |
SchedulerSingleThreadTaskRunnerManager* const outer_; |
const TaskTraits traits_; |
@@ -152,36 +206,6 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
}; |
-bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: |
- PostDelayedTask(const tracked_objects::Location& from_here, |
- const Closure& closure, |
- TimeDelta delay) { |
- auto task = MakeUnique<Task>(from_here, closure, traits_, delay); |
- task->single_thread_task_runner_ref = this; |
- |
- if (!outer_->task_tracker_->WillPostTask(task.get())) |
- return false; |
- |
- if (task->delayed_run_time.is_null()) { |
- PostTaskNow(std::move(task)); |
- } else { |
- outer_->delayed_task_manager_->AddDelayedTask( |
- std::move(task), |
- Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this))); |
- } |
- return true; |
-} |
- |
-void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: |
- PostTaskNow(std::unique_ptr<Task> task) { |
- const bool sequence_was_empty = sequence_->PushTask(std::move(task)); |
- if (sequence_was_empty) { |
- auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
- delegate->ReEnqueueSequence(sequence_); |
- worker_->WakeUp(); |
- } |
-} |
- |
SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( |
const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, |
const TaskScheduler::WorkerPoolIndexForTraitsCallback& |
@@ -201,8 +225,13 @@ SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( |
SchedulerSingleThreadTaskRunnerManager:: |
~SchedulerSingleThreadTaskRunnerManager() { |
- DCHECK(workers_.empty()) << "SchedulerSingleThreadTaskRunners must outlive " |
- "SchedulerSingleThreadTaskRunnerManager"; |
+#if DCHECK_IS_ON() |
+ size_t workers_unregistered_during_join = |
+ subtle::NoBarrier_Load(&workers_unregistered_during_join_); |
+ DCHECK_EQ(workers_unregistered_during_join, workers_.size()) |
+ << "There cannot be outstanding SingleThreadTaskRunners upon destruction" |
gab
2017/03/15 20:00:13
nit: space before closing " to lead next word.
robliao
2017/03/15 20:40:16
Done.
|
+ "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler"; |
+#endif |
} |
scoped_refptr<SingleThreadTaskRunner> |
@@ -254,9 +283,14 @@ void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( |
{ |
AutoSchedulerLock auto_lock(workers_lock_); |
- // We might be joining, so no-op this if |workers_| is empty. |
- if (workers_.empty()) |
+ // We might be joining, so record that a worker was unregistered for |
+ // verification at destruction. |
+ if (workers_.empty()) { |
+#if DCHECK_IS_ON() |
+ subtle::NoBarrier_AtomicIncrement(&workers_unregistered_during_join_, 1); |
+#endif |
return; |
+ } |
auto worker_iter = |
std::find_if(workers_.begin(), workers_.end(), |