Index: base/task_scheduler/scheduler_thread_pool.cc |
diff --git a/base/task_scheduler/scheduler_thread_pool.cc b/base/task_scheduler/scheduler_thread_pool.cc |
index e1e4f4e49456faf41499663f2dac537b7698e015..a1a22180df6428ae3e8dcebd4e50b677337e26cc 100644 |
--- a/base/task_scheduler/scheduler_thread_pool.cc |
+++ b/base/task_scheduler/scheduler_thread_pool.cc |
@@ -12,7 +12,8 @@ |
#include "base/logging.h" |
#include "base/memory/ptr_util.h" |
#include "base/sequenced_task_runner.h" |
-#include "base/task_scheduler/utils.h" |
+#include "base/task_scheduler/delayed_task_manager.h" |
+#include "base/task_scheduler/task_tracker.h" |
#include "base/threading/thread_local.h" |
namespace base { |
@@ -21,48 +22,40 @@ namespace internal { |
namespace { |
// SchedulerThreadPool that owns the current thread, if any. |
-LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky |
+LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky |
tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER; |
// A task runner that runs tasks with the PARALLEL ExecutionMode. |
class SchedulerParallelTaskRunner : public TaskRunner { |
public: |
// Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
- // long as |executor| is alive. |
- // TODO(robliao): Find a concrete way to manage |executor|'s memory. |
+ // long as |thread_pool| is alive. |
+ // TODO(robliao): Find a concrete way to manage |thread_pool|'s memory. |
SchedulerParallelTaskRunner(const TaskTraits& traits, |
- SchedulerTaskExecutor* executor, |
- TaskTracker* task_tracker, |
- DelayedTaskManager* delayed_task_manager) |
- : traits_(traits), |
- executor_(executor), |
- task_tracker_(task_tracker), |
- delayed_task_manager_(delayed_task_manager) {} |
+ SchedulerThreadPool* thread_pool) |
+ : traits_(traits), thread_pool_(thread_pool) {} |
// TaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
const Closure& closure, |
TimeDelta delay) override { |
// Post the task as part of a one-off single-task Sequence. |
- return PostTaskToExecutor( |
+ return thread_pool_->PostTaskWithSequence( |
WrapUnique( |
new Task(from_here, closure, traits_, |
delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), |
- make_scoped_refptr(new Sequence), executor_, task_tracker_, |
- delayed_task_manager_); |
+ make_scoped_refptr(new Sequence)); |
} |
bool RunsTasksOnCurrentThread() const override { |
- return tls_current_thread_pool.Get().Get() == executor_; |
+ return tls_current_thread_pool.Get().Get() == thread_pool_; |
} |
private: |
~SchedulerParallelTaskRunner() override = default; |
const TaskTraits traits_; |
- SchedulerTaskExecutor* const executor_; |
- TaskTracker* const task_tracker_; |
- DelayedTaskManager* const delayed_task_manager_; |
+ SchedulerThreadPool* const thread_pool_; |
DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); |
}; |
@@ -70,28 +63,23 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
// A task runner that runs tasks with the SEQUENCED ExecutionMode. |
class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
public: |
- // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
- // long as |executor| is alive. |
- // TODO(robliao): Find a concrete way to manage |executor|'s memory. |
+ // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks |
+ // so long as |thread_pool| is alive. |
+ // TODO(robliao): Find a concrete way to manage |thread_pool|'s memory. |
SchedulerSequencedTaskRunner(const TaskTraits& traits, |
- SchedulerTaskExecutor* executor, |
- TaskTracker* task_tracker, |
- DelayedTaskManager* delayed_task_manager) |
- : traits_(traits), |
- executor_(executor), |
- task_tracker_(task_tracker), |
- delayed_task_manager_(delayed_task_manager) {} |
+ SchedulerThreadPool* thread_pool) |
+ : traits_(traits), thread_pool_(thread_pool) {} |
// SequencedTaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
const Closure& closure, |
TimeDelta delay) override { |
// Post the task as part of |sequence|. |
- return PostTaskToExecutor( |
+ return thread_pool_->PostTaskWithSequence( |
WrapUnique( |
new Task(from_here, closure, traits_, |
delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), |
- sequence_, executor_, task_tracker_, delayed_task_manager_); |
+ sequence_); |
} |
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
@@ -102,7 +90,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
} |
bool RunsTasksOnCurrentThread() const override { |
- return tls_current_thread_pool.Get().Get() == executor_; |
+ return tls_current_thread_pool.Get().Get() == thread_pool_; |
} |
private: |
@@ -112,9 +100,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
const scoped_refptr<Sequence> sequence_ = new Sequence; |
const TaskTraits traits_; |
- SchedulerTaskExecutor* const executor_; |
- TaskTracker* const task_tracker_; |
- DelayedTaskManager* const delayed_task_manager_; |
+ SchedulerThreadPool* const thread_pool_; |
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
}; |
@@ -166,12 +152,10 @@ scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( |
ExecutionMode execution_mode) { |
switch (execution_mode) { |
case ExecutionMode::PARALLEL: |
- return make_scoped_refptr(new SchedulerParallelTaskRunner( |
- traits, this, task_tracker_, delayed_task_manager_)); |
+ return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
case ExecutionMode::SEQUENCED: |
- return make_scoped_refptr(new SchedulerSequencedTaskRunner( |
- traits, this, task_tracker_, delayed_task_manager_)); |
+ return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
case ExecutionMode::SINGLE_THREADED: |
// TODO(fdoray): Support SINGLE_THREADED TaskRunners. |
@@ -216,19 +200,51 @@ void SchedulerThreadPool::JoinForTesting() { |
join_for_testing_returned_.Signal(); |
} |
-void SchedulerThreadPool::PostTaskWithSequence( |
+bool SchedulerThreadPool::PostTaskWithSequence( |
std::unique_ptr<Task> task, |
scoped_refptr<Sequence> sequence) { |
DCHECK(task); |
DCHECK(sequence); |
- const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue( |
- std::move(task), std::move(sequence), &shared_priority_queue_); |
+ if (!task_tracker_->WillPostTask(task.get())) |
+ return false; |
+ |
+ if (task->delayed_run_time.is_null()) { |
+ PostTaskWithSequenceNow(std::move(task), std::move(sequence)); |
+ } else { |
+ delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence), |
+ this); |
+ } |
+ |
+ return true; |
+} |
- // No thread has already been woken up to run Tasks from |sequence| if it was |
- // empty before |task| was inserted into it. |
- if (sequence_was_empty) |
+void SchedulerThreadPool::PostTaskWithSequenceNow( |
+ std::unique_ptr<Task> task, |
+ scoped_refptr<Sequence> sequence) { |
+ DCHECK(task); |
+ DCHECK(sequence); |
+ |
+ // Confirm that |task| is ready to run (its delayed run time is either null or |
+ // in the past). |
+ DCHECK_LE(task->delayed_run_time, TimeTicks::Now()); |
+ |
+ const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
+ if (sequence_was_empty) { |
+ // Insert |sequence| in |shared_priority_queue_| if it was empty before |
+ // |task| was inserted into it. Otherwise, one of these must be true: |
+ // - |sequence| is already in a PriorityQueue (not necessarily |
+ // |shared_priority_queue_|), or, |
+ // - A worker thread is running a Task from |sequence|. It will insert |
+ // |sequence| in a PriorityQueue once it's done running the Task. |
+ const auto sequence_sort_key = sequence->GetSortKey(); |
+ shared_priority_queue_.BeginTransaction()->Push( |
+ WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), |
+ sequence_sort_key))); |
+ |
+ // Wake up a worker thread to process |sequence|. |
WakeUpOneThread(); |
+ } |
} |
SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl:: |