Chromium Code Reviews| Index: base/task_scheduler/worker_thread.cc |
| diff --git a/base/task_scheduler/worker_thread.cc b/base/task_scheduler/worker_thread.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..e87119643fa146c216cc61e5fedd7ee5c4bdc51d |
| --- /dev/null |
| +++ b/base/task_scheduler/worker_thread.cc |
| @@ -0,0 +1,269 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "base/task_scheduler/worker_thread.h" |
| + |
| +#include <ostream> |
| +#include <utility> |
| + |
| +#include "base/bind.h" |
| +#include "base/logging.h" |
| +#include "base/task_scheduler/task_tracker.h" |
| +#include "base/task_scheduler/utils.h" |
| +#include "base/time/time.h" |
| + |
| +namespace base { |
| +namespace internal { |
| + |
| +namespace { |
| + |
| +class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| + public: |
| + SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| + PriorityQueue* single_thread_priority_queue, |
| + TaskTracker* task_tracker); |
| + |
| + // SingleThreadTaskRunner: |
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| + const Closure& task, |
| + TimeDelta delay) override; |
| + bool PostDelayedTask(const tracked_objects::Location& from_here, |
| + const Closure& closure, |
| + TimeDelta delay) override; |
| + bool RunsTasksOnCurrentThread() const override; |
| + |
| + private: |
| + ~SchedulerSingleThreadTaskRunner() override; |
| + |
| + const TaskTraits traits_; |
| + const scoped_refptr<Sequence> sequence_; |
| + PriorityQueue* const priority_queue_; |
| + TaskTracker* const task_tracker_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| +}; |
| + |
| +SchedulerSingleThreadTaskRunner::SchedulerSingleThreadTaskRunner( |
| + const TaskTraits& traits, |
| + PriorityQueue* single_thread_priority_queue, |
| + TaskTracker* task_tracker) |
| + : traits_(traits), |
| + sequence_(new Sequence), |
| + priority_queue_(single_thread_priority_queue), |
| + task_tracker_(task_tracker) {} |
| + |
| +bool SchedulerSingleThreadTaskRunner::PostDelayedTask( |
| + const tracked_objects::Location& from_here, |
| + const Closure& closure, |
| + TimeDelta delay) { |
| + // TODO(fdoray): Support delayed tasks. |
| + DCHECK(delay.is_zero()); |
| + PostTaskHelper(make_scoped_ptr(new Task(from_here, closure, traits_)), |
| + sequence_, priority_queue_, task_tracker_); |
| + return true; |
| +} |
| + |
| +bool SchedulerSingleThreadTaskRunner::RunsTasksOnCurrentThread() const { |
| + // TODO(fdoray): Return true only if tasks posted may actually run on the |
| + // current thread. It is valid, but not ideal, to always return true. |
| + return true; |
| +} |
| + |
| +bool SchedulerSingleThreadTaskRunner::PostNonNestableDelayedTask( |
| + const tracked_objects::Location& from_here, |
| + const Closure& task, |
| + TimeDelta delay) { |
| + // Tasks are never nested on WorkerThread. |
| + return PostDelayedTask(from_here, task, delay); |
| +} |
| + |
| +SchedulerSingleThreadTaskRunner::~SchedulerSingleThreadTaskRunner() = default; |
| + |
| +// Extracts the Sequence with the highest priority from |shared_transaction| or |
| +// |single_thread_transaction|. |is_single_threaded| is set to true if the |
| +// returned Sequence comes from |single_thread_transaction|. |
| +scoped_refptr<Sequence> GetWork( |
| + PriorityQueue::Transaction* shared_transaction, |
| + PriorityQueue::Transaction* single_thread_transaction, |
| + bool* is_single_threaded) { |
| + DCHECK(shared_transaction); |
| + DCHECK(single_thread_transaction); |
| + DCHECK(is_single_threaded); |
|
robliao
2016/03/30 00:42:42
Done as in DCHECK necessary or DCHECK unnecessary?
fdoray
2016/03/30 18:44:49
I removed it in patch set 11... and I re-added it
|
| + |
| + *is_single_threaded = false; |
| + |
| + const PriorityQueue::SequenceAndSortKey shared_sequence = |
| + shared_transaction->Peek(); |
| + const PriorityQueue::SequenceAndSortKey single_thread_sequence = |
| + single_thread_transaction->Peek(); |
| + |
| + if (single_thread_sequence.is_null() && shared_sequence.is_null()) |
| + return scoped_refptr<Sequence>(); |
| + |
| + if (single_thread_sequence.is_null() || |
| + (!shared_sequence.is_null() && |
| + single_thread_sequence.sort_key < shared_sequence.sort_key)) { |
| + shared_transaction->Pop(); |
| + return shared_sequence.sequence; |
| + } |
| + |
| + DCHECK(!single_thread_sequence.is_null()); |
| + single_thread_transaction->Pop(); |
| + *is_single_threaded = true; |
| + return single_thread_sequence.sequence; |
| +} |
| + |
| +} // namespace |
| + |
| +scoped_ptr<WorkerThread> WorkerThread::CreateWorkerThread( |
| + ThreadPriority thread_priority, |
| + PriorityQueue* shared_priority_queue, |
| + const SharedSequenceStillHasTasksCallback& |
| + shared_sequence_still_has_tasks_callback, |
| + const StateChangedCallback& state_changed_callback, |
| + TaskTracker* task_tracker) { |
| + scoped_ptr<WorkerThread> worker_thread( |
| + new WorkerThread(thread_priority, shared_priority_queue, |
| + shared_sequence_still_has_tasks_callback, |
| + state_changed_callback, task_tracker)); |
| + |
| + if (worker_thread->thread_handle_.is_null()) |
| + return scoped_ptr<WorkerThread>(); |
| + return worker_thread; |
| +} |
| + |
| +WorkerThread::~WorkerThread() { |
| + DCHECK(should_exit_for_testing()); |
| +} |
| + |
| +void WorkerThread::WakeUp() { |
| + wake_up_event_.Signal(); |
| +} |
| + |
| +scoped_refptr<SingleThreadTaskRunner> WorkerThread::CreateTaskRunnerWithTraits( |
| + const TaskTraits& traits) { |
| + // A WorkerThread is never destroyed, except in tests in which we don't use |
| + // task runners after their WorkerThread has been destroyed. Because of that, |
| + // it is correct to keep pointers to WorkerThread members in the constructed |
| + // task runner. |
| + return scoped_refptr<SingleThreadTaskRunner>( |
| + new SchedulerSingleThreadTaskRunner( |
| + traits, &single_thread_priority_queue_, task_tracker_)); |
| +} |
| + |
| +void WorkerThread::JoinForTesting() { |
| + should_exit_for_testing_ = true; |
| + base::subtle::MemoryBarrier(); |
| + WakeUp(); |
| + PlatformThread::Join(thread_handle_); |
| +} |
| + |
| +WorkerThread::WorkerThread(ThreadPriority thread_priority, |
| + PriorityQueue* shared_priority_queue, |
| + const SharedSequenceStillHasTasksCallback& |
| + shared_sequence_still_has_tasks_callback, |
| + const StateChangedCallback& state_changed_callback, |
| + TaskTracker* task_tracker) |
| + : wake_up_event_(false, false), |
| + single_thread_priority_queue_( |
| + Bind(&WorkerThread::WakeUp, Unretained(this)), |
| + shared_priority_queue), |
| + shared_priority_queue_(shared_priority_queue), |
| + shared_sequence_still_has_tasks_callback_( |
| + shared_sequence_still_has_tasks_callback), |
| + state_changed_callback_(state_changed_callback), |
| + task_tracker_(task_tracker) { |
| + DCHECK(shared_priority_queue_); |
| + DCHECK(!shared_sequence_still_has_tasks_callback_.is_null()); |
| + DCHECK(!state_changed_callback_.is_null()); |
| + DCHECK(task_tracker_); |
| + |
| + static const size_t kDefaultStackSize = 0; |
| + PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, |
| + thread_priority); |
| +} |
| + |
| +void WorkerThread::SetState(State state) { |
| + DCHECK_NE(state_, state); |
| + shared_priority_queue_->container_lock()->AssertAcquired(); |
| + state_ = state; |
| + state_changed_callback_.Run(this, state); |
|
robliao
2016/03/30 00:42:42
I'm not sure I like this callback as we're holding
robliao
2016/03/30 00:47:28
This also means that the threadpool will have to m
fdoray
2016/03/30 18:44:49
I like the idea of delegating GetWork() to the Thr
|
| +} |
| + |
| +void WorkerThread::ThreadMain() { |
| + while (!task_tracker_->shutdown_completed() && !should_exit_for_testing()) { |
| + // Get the sequence containing the next task to execute. |
| + bool sequence_is_single_threaded = false; |
| + scoped_refptr<Sequence> sequence; |
| + { |
| + scoped_ptr<PriorityQueue::Transaction> shared_transaction( |
| + shared_priority_queue_->BeginTransaction()); |
| + scoped_ptr<PriorityQueue::Transaction> single_thread_transaction( |
| + single_thread_priority_queue_.BeginTransaction()); |
| + sequence = |
| + GetWork(shared_transaction.get(), single_thread_transaction.get(), |
| + &sequence_is_single_threaded); |
| + single_thread_transaction.reset(); |
| + |
| + if (!sequence) { |
| + // Mark this WorkerThread as IDLE. This must be done within the scope of |
| + // a |shared_priority_queue_| Transaction and |shared_priority_queue_| |
| + // must be empty. |
| + SetState(State::IDLE); |
| + shared_transaction.reset(); |
| + |
| + // Wait for a wake-up. |
| + wake_up_event_.Wait(); |
|
robliao
2016/03/30 00:42:42
Between shared_transaction.reset and wake_up_event
fdoray
2016/03/30 18:44:49
Why is it problematic? What shouldn't happen betwe
robliao
2016/03/30 19:38:26
Previously, we had a CV do the work here so we cou
fdoray
2016/03/30 19:56:37
ok, I agree.
|
| + |
| + // Mark this WorkerThread as BUSY. This must be done within the scope of |
| + // a |shared_priority_queue_| Transaction. |
| + shared_transaction = shared_priority_queue_->BeginTransaction(); |
| + SetState(State::BUSY); |
| + |
| + // Try to get work again. |
| + continue; |
| + } |
| + } |
| + |
| + DCHECK_EQ(state_, State::BUSY); |
| + |
| + // Peek the next task in |sequence| and run it. |
| + task_tracker_->RunTask(sequence->PeekTask()); |
| + |
| + // Pop a task from |sequence|. Reinsert it in the appropriate PriorityQueue |
| + // if it's not empty. |
| + if (!sequence->PopTask()) { |
| + if (sequence_is_single_threaded) { |
| + const SequenceSortKey sort_key = sequence->GetSortKey(); |
| + single_thread_priority_queue_.BeginTransaction()->Push( |
| + make_scoped_ptr(new PriorityQueue::SequenceAndSortKey( |
| + std::move(sequence), sort_key))); |
| + } else { |
| + shared_sequence_still_has_tasks_callback_.Run(this, |
| + std::move(sequence)); |
| + } |
| + } |
| + |
| + // Reset |wake_up_event_| to avoid an extra loop iteration before going to |
| + // sleep if the PriorityQueues are empty. This WorkerThread will check its |
| + // PriorityQueues and exit conditions at least once before going to sleep |
| + // despite the fact that the event is reset here. |
| + wake_up_event_.Reset(); |
| + } |
| +} |
| + |
| +std::ostream& operator<<(std::ostream& os, WorkerThread::State state) { |
| + switch (state) { |
| + case WorkerThread::State::BUSY: |
| + os << "BUSY"; |
| + break; |
| + case WorkerThread::State::IDLE: |
| + os << "IDLE"; |
| + break; |
| + } |
| + return os; |
| +} |
| + |
| +} // namespace internal |
| +} // namespace base |