Chromium Code Reviews| Index: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
| diff --git a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
| index 71f9c0bfb9383e4e16b5f21f02c29b70cbfcd566..dbdbc5d738f51379c0169969a08c3b36e1e245f9 100644 |
| --- a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
| +++ b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
| @@ -64,15 +64,22 @@ class SchedulerWorkerDelegate : public SchedulerWorker::Delegate { |
| scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { |
| AutoSchedulerLock auto_lock(sequence_lock_); |
| - return std::move(sequence_); |
| + bool has_work = has_work_; |
| + has_work_ = false; |
| + return has_work ? sequence_ : nullptr; |
| } |
| void DidRunTask() override {} |
| void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { |
| AutoSchedulerLock auto_lock(sequence_lock_); |
| - DCHECK(!sequence_); |
| - sequence_ = std::move(sequence); |
| + // We've shut down, so no-op this work request. Any sequence cleanup will |
| + // occur in the caller's context. |
| + if (!sequence_) |
| + return; |
| + |
| + DCHECK_EQ(sequence, sequence_); |
| + has_work_ = true; |
| } |
| TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } |
| @@ -87,15 +94,28 @@ class SchedulerWorkerDelegate : public SchedulerWorker::Delegate { |
| return thread_ref_checker_.IsCurrentThreadSameAsSetThread(); |
| } |
| + void OnMainExit() override { |
| + // To reclaim skipped tasks on shutdown, we null out the sequence to allow |
| + // the tasks to destroy themselves. |
|
fdoray
2017/03/03 05:31:13
Need to grab the lock to clear the pointer. To avo
robliao
2017/03/03 07:07:14
Nice. Yes indeed!
|
| + sequence_ = nullptr; |
| + } |
| + |
| + // SchedulerWorkerDelegate: |
| + |
| + // Consumers should release their sequence reference as soon as possible to |
| + // ensure timely cleanup for general shutdown. |
| + scoped_refptr<Sequence> sequence() { |
| + AutoSchedulerLock auto_lock(sequence_lock_); |
| + return sequence_.get(); |
|
fdoray
2017/03/03 05:31:13
no .get();
robliao
2017/03/03 07:07:14
Done.
|
| + } |
| + |
| private: |
| const std::string thread_name_; |
| - // Synchronizes access to |sequence_| and handles the fact that |
| - // ReEnqueueSequence() is called on both the worker thread for reenqueuing |
| - // the sequence and off of the worker thread to seed the sequence for |
| - // GetWork(). |
| + // Synchronizes access to |sequence_| and |has_work_|. |
| SchedulerLock sequence_lock_; |
| - scoped_refptr<Sequence> sequence_; |
| + scoped_refptr<Sequence> sequence_ = new Sequence; |
| + bool has_work_ = false; |
| AtomicThreadRefChecker thread_ref_checker_; |
| @@ -121,7 +141,22 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
| // SingleThreadTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| const Closure& closure, |
| - TimeDelta delay) override; |
| + TimeDelta delay) override { |
| + auto task = MakeUnique<Task>(from_here, closure, traits_, delay); |
| + task->single_thread_task_runner_ref = this; |
| + |
| + if (!outer_->task_tracker_->WillPostTask(task.get())) |
| + return false; |
| + |
| + if (task->delayed_run_time.is_null()) { |
| + PostTaskNow(std::move(task)); |
| + } else { |
| + outer_->delayed_task_manager_->AddDelayedTask( |
| + std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, |
| + Unretained(this))); |
| + } |
| + return true; |
| + } |
| bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| const Closure& closure, |
| @@ -131,8 +166,7 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
| } |
| bool RunsTasksOnCurrentThread() const override { |
| - auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
| - return delegate->RunsTasksOnCurrentThread(); |
| + return GetDelegate()->RunsTasksOnCurrentThread(); |
| } |
| private: |
| @@ -140,10 +174,23 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
| outer_->UnregisterSchedulerWorker(worker_); |
| } |
| - void PostTaskNow(std::unique_ptr<Task> task); |
| + void PostTaskNow(std::unique_ptr<Task> task) { |
| + scoped_refptr<Sequence> sequence = GetDelegate()->sequence(); |
| + // If |sequence| is null, then the thread is effectively gone (either |
| + // shutdonw or joined). We will destroy the task in this context instead. |
|
fdoray
2017/03/03 05:31:13
I don't find "We will destroy the task in this con
fdoray
2017/03/03 05:31:13
s/shutdonw/shutdown/
robliao
2017/03/03 07:07:13
Done.
robliao
2017/03/03 07:07:14
Sounds good. Removed.
|
| + if (!sequence) |
| + return; |
| - // Sequence for all Tasks posted through this TaskRunner. |
| - const scoped_refptr<Sequence> sequence_ = new Sequence; |
| + const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
| + if (sequence_was_empty) { |
| + GetDelegate()->ReEnqueueSequence(std::move(sequence)); |
| + worker_->WakeUp(); |
| + } |
|
fdoray
2017/03/03 05:31:13
The sequence and tasks it contained before the Pus
robliao
2017/03/03 07:07:14
Offline discussion:
There is still a potential for
|
| + } |
| + |
| + SchedulerWorkerDelegate* GetDelegate() const { |
| + return static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
| + } |
| SchedulerSingleThreadTaskRunnerManager* const outer_; |
| const TaskTraits traits_; |
| @@ -152,36 +199,6 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| }; |
| -bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: |
| - PostDelayedTask(const tracked_objects::Location& from_here, |
| - const Closure& closure, |
| - TimeDelta delay) { |
| - auto task = MakeUnique<Task>(from_here, closure, traits_, delay); |
| - task->single_thread_task_runner_ref = this; |
| - |
| - if (!outer_->task_tracker_->WillPostTask(task.get())) |
| - return false; |
| - |
| - if (task->delayed_run_time.is_null()) { |
| - PostTaskNow(std::move(task)); |
| - } else { |
| - outer_->delayed_task_manager_->AddDelayedTask( |
| - std::move(task), |
| - Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this))); |
| - } |
| - return true; |
| -} |
| - |
| -void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: |
| - PostTaskNow(std::unique_ptr<Task> task) { |
| - const bool sequence_was_empty = sequence_->PushTask(std::move(task)); |
| - if (sequence_was_empty) { |
| - auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
| - delegate->ReEnqueueSequence(sequence_); |
| - worker_->WakeUp(); |
| - } |
| -} |
| - |
| SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( |
| const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, |
| const TaskScheduler::WorkerPoolIndexForTraitsCallback& |
| @@ -201,8 +218,13 @@ SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( |
| SchedulerSingleThreadTaskRunnerManager:: |
| ~SchedulerSingleThreadTaskRunnerManager() { |
| - DCHECK(workers_.empty()) << "SchedulerSingleThreadTaskRunners must outlive " |
| - "SchedulerSingleThreadTaskRunnerManager"; |
| +#if DCHECK_IS_ON() |
| + size_t workers_unregistered_during_join = |
| + subtle::NoBarrier_Load(&workers_unregistered_during_join_); |
| + DCHECK_EQ(workers_unregistered_during_join, workers_.size()) |
| + << "There cannot be outstanding SingleThreadTaskRunners upon destruction" |
| + "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler"; |
| +#endif |
| } |
| scoped_refptr<SingleThreadTaskRunner> |
| @@ -254,9 +276,14 @@ void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( |
| { |
| AutoSchedulerLock auto_lock(workers_lock_); |
| - // We might be joining, so no-op this if |workers_| is empty. |
| - if (workers_.empty()) |
| + // We might be joining, so record that a worker was unregistered for |
| + // verification at destruction. |
| + if (workers_.empty()) { |
| +#if DCHECK_IS_ON() |
| + subtle::NoBarrier_AtomicIncrement(&workers_unregistered_during_join_, 1); |
| +#endif |
| return; |
| + } |
| auto worker_iter = |
| std::find_if(workers_.begin(), workers_.end(), |