Chromium Code Reviews| Index: base/task_scheduler/scheduler_worker_thread.cc |
| diff --git a/base/task_scheduler/scheduler_worker_thread.cc b/base/task_scheduler/scheduler_worker_thread.cc |
| index 60d05e1ca7b97ec213e8c24f86534e54395f2282..3565bd5e6b9f816cb1cf1d33074aea606e2255e0 100644 |
| --- a/base/task_scheduler/scheduler_worker_thread.cc |
| +++ b/base/task_scheduler/scheduler_worker_thread.cc |
| @@ -8,19 +8,82 @@ |
| #include <utility> |
| +#include "base/lazy_instance.h" |
| #include "base/logging.h" |
| +#include "base/memory/ptr_util.h" |
| #include "base/task_scheduler/task_tracker.h" |
| +#include "base/task_scheduler/utils.h" |
| +#include "base/threading/thread_local.h" |
| +#include "base/time/time.h" |
| namespace base { |
| namespace internal { |
| +namespace { |
| + |
| +// SchedulerWorkerThread that owns the current thread, if any. |
| +LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky |
| + tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER; |
| + |
| +// A task runner that runs tasks on a single SchedulerWorkerThread. |
| +class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| + public: |
| + SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| + SchedulerTaskExecutor* executor, |
|
gab
2016/04/18 18:04:02
TODO on memory management for this one as well for
fdoray
2016/04/18 19:04:50
Done.
|
| + TaskTracker* task_tracker, |
| + DelayedTaskManager* delayed_task_manager) |
| + : traits_(traits), |
| + executor_(executor), |
| + task_tracker_(task_tracker), |
| + delayed_task_manager_(delayed_task_manager) {} |
| + |
| + // SingleThreadTaskRunner: |
| + bool PostDelayedTask(const tracked_objects::Location& from_here, |
| + const Closure& closure, |
| + TimeDelta delay) override { |
| + // Post the task as part of |sequence|. |
| + return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_, |
| + executor_, task_tracker_, delayed_task_manager_); |
| + } |
| + |
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| + const Closure& closure, |
| + base::TimeDelta delay) override { |
| + // Tasks are never nested within the task scheduler. |
| + return PostDelayedTask(from_here, closure, delay); |
| + } |
| + |
| + bool RunsTasksOnCurrentThread() const override { |
| + return tls_current_worker_thread.Get().Get() == executor_; |
| + } |
| + |
| + private: |
| + ~SchedulerSingleThreadTaskRunner() override = default; |
| + |
| + // Sequence for all Tasks posted through this TaskRunner. |
| + const scoped_refptr<Sequence> sequence_ = new Sequence; |
| + |
| + const TaskTraits traits_; |
| + SchedulerTaskExecutor* const executor_; |
| + TaskTracker* const task_tracker_; |
| + DelayedTaskManager* const delayed_task_manager_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| +}; |
| + |
| +} // namespace |
| + |
| std::unique_ptr<SchedulerWorkerThread> |
| SchedulerWorkerThread::CreateSchedulerWorkerThread( |
| ThreadPriority thread_priority, |
| Delegate* delegate, |
| - TaskTracker* task_tracker) { |
| + TaskTracker* task_tracker, |
| + DelayedTaskManager* delayed_task_manager, |
| + const PriorityQueue* predecessor_priority_queue) { |
| std::unique_ptr<SchedulerWorkerThread> worker_thread( |
| - new SchedulerWorkerThread(thread_priority, delegate, task_tracker)); |
| + new SchedulerWorkerThread(thread_priority, delegate, task_tracker, |
| + delayed_task_manager, |
| + predecessor_priority_queue)); |
| if (worker_thread->thread_handle_.is_null()) |
| return nullptr; |
| @@ -31,6 +94,12 @@ SchedulerWorkerThread::~SchedulerWorkerThread() { |
| DCHECK(ShouldExitForTesting()); |
| } |
| +scoped_refptr<SingleThreadTaskRunner> |
| +SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) { |
| + return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( |
| + traits, this, task_tracker_, delayed_task_manager_)); |
| +} |
| + |
| void SchedulerWorkerThread::WakeUp() { |
| wake_up_event_.Signal(); |
| } |
| @@ -44,14 +113,36 @@ void SchedulerWorkerThread::JoinForTesting() { |
| PlatformThread::Join(thread_handle_); |
| } |
| -SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, |
| - Delegate* delegate, |
| - TaskTracker* task_tracker) |
| +void SchedulerWorkerThread::PostTaskWithSequence( |
| + std::unique_ptr<Task> task, |
| + scoped_refptr<Sequence> sequence) { |
| + DCHECK(task); |
| + DCHECK(sequence); |
| + |
| + const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue( |
| + std::move(task), std::move(sequence), &single_threaded_priority_queue_); |
| + |
| + // If |sequence| wasn't empty before |task| was inserted into it, the worker |
| + // thread has already been woken up to run it. |
|
gab
2016/04/18 18:04:02
How about s/run/process/ (at first I thought "it"
fdoray
2016/04/18 19:04:50
Done.
|
| + if (sequence_was_empty) |
| + WakeUp(); |
|
gab
2016/04/18 18:04:02
Add a TODO to check for removal from idle stack (a
fdoray
2016/04/18 19:04:50
Done.
|
| +} |
| + |
| +SchedulerWorkerThread::SchedulerWorkerThread( |
| + ThreadPriority thread_priority, |
| + Delegate* delegate, |
| + TaskTracker* task_tracker, |
| + DelayedTaskManager* delayed_task_manager, |
| + const PriorityQueue* predecessor_priority_queue) |
| : wake_up_event_(false, false), |
| + single_threaded_priority_queue_(predecessor_priority_queue), |
| delegate_(delegate), |
| - task_tracker_(task_tracker) { |
| + task_tracker_(task_tracker), |
| + delayed_task_manager_(delayed_task_manager) { |
| DCHECK(delegate_); |
| DCHECK(task_tracker_); |
| + DCHECK(delayed_task_manager_); |
| + DCHECK(predecessor_priority_queue); |
| const size_t kDefaultStackSize = 0; |
| PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, |
| @@ -60,13 +151,16 @@ SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, |
| void SchedulerWorkerThread::ThreadMain() { |
| delegate_->OnMainEntry(); |
| + tls_current_worker_thread.Get().Set(this); |
| // A SchedulerWorkerThread starts out sleeping. |
| wake_up_event_.Wait(); |
| while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { |
| // Get the sequence containing the next task to execute. |
| - scoped_refptr<Sequence> sequence = delegate_->GetWork(this); |
| + bool is_single_threaded_sequence = false; |
| + scoped_refptr<Sequence> sequence = delegate_->GetWork( |
| + this, &single_threaded_priority_queue_, &is_single_threaded_sequence); |
| if (!sequence) { |
| wake_up_event_.Wait(); |
| @@ -82,8 +176,16 @@ void SchedulerWorkerThread::ThreadMain() { |
| // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and |
| // there are live references to it, it will be enqueued when a Task is added |
| // to it. Otherwise, it will be destroyed at the end of this scope. |
| - if (!sequence_became_empty) |
| - delegate_->EnqueueSequence(std::move(sequence)); |
| + if (!sequence_became_empty) { |
| + if (is_single_threaded_sequence) { |
| + const auto sort_key = sequence->GetSortKey(); |
| + single_threaded_priority_queue_.BeginTransaction()->Push( |
| + WrapUnique(new PriorityQueue::SequenceAndSortKey( |
| + std::move(sequence), sort_key))); |
| + } else { |
| + delegate_->EnqueueSequence(std::move(sequence)); |
| + } |
| + } |
| // Calling WakeUp() guarantees that this SchedulerWorkerThread will run |
| // Tasks from Sequences returned by the GetWork() method of |delegate_| |