Index: base/threading/sequenced_worker_pool.cc |
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
index d8d10f34e9f2998d79ba601a46fa6dfe0a4fdab4..e0bd2b7f3836af8e0c334a096857bec5254f1adb 100644 |
--- a/base/threading/sequenced_worker_pool.cc |
+++ b/base/threading/sequenced_worker_pool.cc |
@@ -25,6 +25,8 @@ |
#include "base/strings/stringprintf.h" |
#include "base/synchronization/condition_variable.h" |
#include "base/synchronization/lock.h" |
+#include "base/task_scheduler/post_task.h" |
+#include "base/task_scheduler/task_scheduler.h" |
#include "base/threading/platform_thread.h" |
#include "base/threading/simple_thread.h" |
#include "base/threading/thread_local.h" |
@@ -50,6 +52,18 @@ namespace base { |
namespace { |
+// An enum representing the state of all pools. Any given process should only |
+// ever transition from NONE_ACTIVE to the active states, transitions between |
+// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition |
+// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called |
+// and the WORKER_CREATED transition occurs when a Worker needs to be created |
+// because the first task was posted and the state is still NONE_ACTIVE. |
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:45
Done.
|
+enum class AllPoolsState { |
+ NONE_ACTIVE, |
+ WORKER_CREATED, |
+ REDIRECTED_TO_TASK_SCHEDULER, |
+} g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
+ |
struct SequencedTask : public TrackingInfo { |
SequencedTask() |
: sequence_token_id(0), |
@@ -502,10 +516,21 @@ class SequencedWorkerPool::Inner { |
TestingObserver* const testing_observer_; |
+ /* Members used for the experimental redirection to TaskScheduler. */ |
brettw
2016/08/12 17:44:39
Can this be a normal // comment?
gab
2016/08/12 23:27:46
Sure, a /* */ comment felt more like a split from
|
+ |
// The TaskPriority to be used for SequencedWorkerPool tasks redirected to the |
// TaskScheduler as an experiment (unused otherwise). |
const base::TaskPriority task_priority_; |
+ // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect |
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:46
Done.
|
+ // SequencedWorkerPool usage to the TaskScheduler. |
+ std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; |
+ |
+ // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as |
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:46
Done.
|
+ // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). |
+ // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread(). |
+ mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_; |
+ |
DISALLOW_COPY_AND_ASSIGN(Inner); |
}; |
@@ -519,6 +544,7 @@ SequencedWorkerPool::Worker::Worker( |
worker_pool_(std::move(worker_pool)), |
task_shutdown_behavior_(BLOCK_SHUTDOWN), |
is_processing_task_(false) { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
Start(); |
} |
@@ -526,6 +552,8 @@ SequencedWorkerPool::Worker::~Worker() { |
} |
void SequencedWorkerPool::Worker::Run() { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ |
#if defined(OS_WIN) |
win::ScopedCOMInitializer com_initializer; |
#endif |
@@ -669,36 +697,126 @@ bool SequencedWorkerPool::Inner::PostTask( |
if (optional_token_name) |
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
- pending_tasks_.insert(sequenced); |
- if (shutdown_behavior == BLOCK_SHUTDOWN) |
- blocking_shutdown_pending_task_count_++; |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ // Confirm that the TaskScheduler's shutdown behaviors use the same |
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:46
Doesn't |g_all_pools_state| cover this?
|
+ // underlying values as SequencedWorkerPool. |
+ static_assert( |
+ static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == |
+ static_cast<int>(CONTINUE_ON_SHUTDOWN), |
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
+ "CONTINUE_ON_SHUTDOWN."); |
+ static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == |
+ static_cast<int>(SKIP_ON_SHUTDOWN), |
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
+ "SKIP_ON_SHUTDOWN."); |
+ static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == |
+ static_cast<int>(BLOCK_SHUTDOWN), |
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
+ "BLOCK_SHUTDOWN."); |
+ |
+ const TaskShutdownBehavior task_shutdown_behavior = |
+ static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); |
+ const TaskTraits pool_traits = |
+ TaskTraits() |
+ .WithFileIO() |
+ .WithPriority(task_priority_) |
+ .WithShutdownBehavior(task_shutdown_behavior); |
+ |
+ // Find or create the TaskScheduler TaskRunner to redirect this task to if |
+ // it is posted to a specific sequence. |
+ scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr; |
+ if (sequenced.sequence_token_id) { |
+ sequenced_task_runner = |
+ &sequenced_task_runner_map_[sequenced.sequence_token_id]; |
+ if (!*sequenced_task_runner) { |
+ const ExecutionMode execution_mode = |
+ max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED |
+ : ExecutionMode::SEQUENCED; |
+ *sequenced_task_runner = |
+ CreateTaskRunnerWithTraits(pool_traits, execution_mode); |
+ } |
+ } |
+ |
+ if (sequenced_task_runner) { |
+ (*sequenced_task_runner) |
+ ->PostTask(sequenced.posted_from, sequenced.task); |
+ } else { |
+ // PostTaskWithTraits() posts a task with PARALLEL semantics. There are |
+ // however a few pools that use only one thread and therefore can |
+ // currently legitimatelly assuming thread affinity despite using |
+ // SequencedWorkerPool. Such pools typically only give access to their |
+ // TaskRunner which will be SINGLE_THREADED per nature of the pool |
+ // having only one thread but this DCHECK ensures no such pools use |
+ // SequencedWorkerPool::PostTask() directly. |
+ DCHECK_GT(max_threads_, 1U); |
+ base::PostTaskWithTraits(sequenced.posted_from, pool_traits, |
+ sequenced.task); |
+ } |
+ } else { // !REDIRECTED_TO_TASK_SCHEDULER |
+ pending_tasks_.insert(sequenced); |
+ |
+ if (shutdown_behavior == BLOCK_SHUTDOWN) |
+ blocking_shutdown_pending_task_count_++; |
- create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
+ create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
+ } |
} |
- // Actually start the additional thread or signal an existing one now that |
- // we're outside the lock. |
- if (create_thread_id) |
- FinishStartingAdditionalThread(create_thread_id); |
- else |
- SignalHasWork(); |
+ // Some variables are exposed in both modes for convenience but only really |
+ // intended for one of them at runtime, confirm exclusive usage here. |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ DCHECK(pending_tasks_.empty()); |
+ DCHECK(!create_thread_id); |
+ } else { |
+ DCHECK(sequenced_task_runner_map_.empty()); |
+ } |
+ |
+ if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ // Actually start the additional thread or signal an existing one now that |
+ // we're outside the lock. |
+ if (create_thread_id) |
+ FinishStartingAdditionalThread(create_thread_id); |
+ else |
+ SignalHasWork(); |
+ } |
return true; |
} |
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
- AutoLock lock(lock_); |
- return ContainsKey(threads_, PlatformThread::CurrentId()); |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (!runs_tasks_on_verifier_) { |
+ runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
+ TaskTraits().WithFileIO().WithPriority(task_priority_), |
+ ExecutionMode::PARALLEL); |
+ } |
+ return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
+ } else { |
+ AutoLock lock(lock_); |
+ return ContainsKey(threads_, PlatformThread::CurrentId()); |
+ } |
} |
bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
SequenceToken sequence_token) const { |
- AutoLock lock(lock_); |
- ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
- if (found == threads_.end()) |
- return false; |
- return found->second->is_processing_task() && |
- sequence_token.Equals(found->second->task_sequence_token()); |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ // TODO(gab): This currently only verifies that the current thread is a |
+ // thread on which a task bound to |sequence_token| *could* run, but it |
+ // doesn't verify that the current is *currently running* a task bound to |
+ // |sequence_token|. |
+ const auto sequenced_task_runner_it = |
+ sequenced_task_runner_map_.find(sequence_token.id_); |
+ return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
+ sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
+ } else { |
+ AutoLock lock(lock_); |
+ ThreadMap::const_iterator found = |
+ threads_.find(PlatformThread::CurrentId()); |
+ if (found == threads_.end()) |
+ return false; |
+ return found->second->is_processing_task() && |
+ sequence_token.Equals(found->second->task_sequence_token()); |
+ } |
} |
// See https://code.google.com/p/chromium/issues/detail?id=168415 |
@@ -732,6 +850,10 @@ void SequencedWorkerPool::Inner::Shutdown( |
if (shutdown_called_) |
return; |
shutdown_called_ = true; |
+ |
+ if (g_all_pools_state != AllPoolsState::WORKER_CREATED) |
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:46
Doesn't |g_all_pools_state| cover this?
|
+ return; |
+ |
max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
// Tickle the threads. This will wake up a waiting one so it will know that |
@@ -771,6 +893,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
} |
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
{ |
AutoLock lock(lock_); |
DCHECK(thread_being_created_); |
@@ -905,6 +1028,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
} |
void SequencedWorkerPool::Inner::HandleCleanup() { |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
if (cleanup_state_ == CLEANUP_DONE) |
return; |
@@ -970,6 +1095,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
SequencedTask* task, |
TimeDelta* wait_time, |
std::vector<Closure>* delete_these_outside_lock) { |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
// Find the next task with a sequence token that's not currently in use. |
@@ -1056,6 +1183,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
} |
int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
// Mark the task's sequence number as in use. |
@@ -1087,6 +1216,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
} |
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { |
@@ -1100,6 +1231,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
int sequence_token_id) const { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
return !sequence_token_id || |
current_sequences_.find(sequence_token_id) == |
@@ -1107,6 +1240,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
} |
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
// How thread creation works: |
// |
@@ -1157,15 +1292,24 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
int thread_number) { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ |
// Called outside of the lock. |
DCHECK_GT(thread_number, 0); |
+ if (g_all_pools_state != AllPoolsState::WORKER_CREATED) { |
brettw
2016/08/12 19:30:00
This should have a reference to a bug to remove it
gab
2016/08/12 23:27:46
Doesn't |g_all_pools_state| cover this?
|
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
+ g_all_pools_state = AllPoolsState::WORKER_CREATED; |
+ } |
+ |
// The worker is assigned to the list when the thread actually starts, which |
// will manage the memory of the pointer. |
new Worker(worker_pool_, thread_number, thread_name_prefix_); |
} |
void SequencedWorkerPool::Inner::SignalHasWork() { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ |
has_work_cv_.Signal(); |
if (testing_observer_) { |
testing_observer_->OnHasWork(); |
@@ -1173,6 +1317,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() { |
} |
bool SequencedWorkerPool::Inner::CanShutdown() const { |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
lock_.AssertAcquired(); |
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
return !thread_being_created_ && |
@@ -1209,6 +1354,18 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
return worker->worker_pool(); |
} |
+// static |
+void SequencedWorkerPool:: |
+ RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { |
+ DCHECK(TaskScheduler::GetInstance()); |
+ // Hitting this DCHECK indicates that a task was posted to a |
+ // SequencedWorkerPool before the TaskScheduler was initialized and |
+ // redirected, posting task to SequencedWorkerPools needs to at least be |
+ // delayed until after that point. |
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
+ g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
+} |
+ |
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
const std::string& thread_name_prefix, |
base::TaskPriority task_priority) |