Index: base/threading/sequenced_worker_pool.cc |
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
index ef6047ef478411350d64469fa62d2ca7e9295dc2..13323d776339183a68aa313fd67edbf28f67c189 100644 |
--- a/base/threading/sequenced_worker_pool.cc |
+++ b/base/threading/sequenced_worker_pool.cc |
@@ -25,6 +25,8 @@ |
#include "base/strings/stringprintf.h" |
#include "base/synchronization/condition_variable.h" |
#include "base/synchronization/lock.h" |
+#include "base/task_scheduler/post_task.h" |
+#include "base/task_scheduler/task_scheduler.h" |
#include "base/threading/platform_thread.h" |
#include "base/threading/simple_thread.h" |
#include "base/threading/thread_local.h" |
@@ -50,6 +52,20 @@ namespace base { |
namespace { |
+// An enum representing the state of all pools. Any given process should only |
+// ever transition from NONE_ACTIVE to the active states, transitions between |
+// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition |
+// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called |
+// and the WORKER_CREATED transition occurs when a Worker needs to be created |
+// because the first task was posted and the state is still NONE_ACTIVE. |
+// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
+// will be phased out completely otherwise). |
+enum class AllPoolsState { |
+ NONE_ACTIVE, |
+ WORKER_CREATED, |
+ REDIRECTED_TO_TASK_SCHEDULER, |
+} g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
+ |
struct SequencedTask : public TrackingInfo { |
SequencedTask() |
: sequence_token_id(0), |
@@ -354,6 +370,10 @@ class SequencedWorkerPool::Inner { |
CLEANUP_DONE, |
}; |
+ // Helper used by PostTask() to complete the work when redirection is on. |
+ // Coalesce upon resolution of http://crbug.com/622400. |
+ void PostTaskToTaskScheduler(const SequencedTask& sequenced); |
+ |
// Called from within the lock, this converts the given token name into a |
// token ID, creating a new one if necessary. |
int LockedGetNamedTokenID(const std::string& name); |
@@ -402,7 +422,7 @@ class SequencedWorkerPool::Inner { |
// 0 or more. The caller should then call FinishStartingAdditionalThread to |
// complete initialization once the lock is released. |
// |
- // If another thread is not necessary, returne 0; |
+ // If another thread is not necessary, return 0; |
// |
// See the implementedion for more. |
int PrepareToStartAdditionalThreadIfHelpful(); |
@@ -502,10 +522,23 @@ class SequencedWorkerPool::Inner { |
TestingObserver* const testing_observer_; |
+ // Members below are used for the experimental redirection to TaskScheduler. |
+ // TODO(gab): Remove these if http://crbug.com/622400 fails |
+ // (SequencedWorkerPool will be phased out completely otherwise). |
+ |
// The TaskPriority to be used for SequencedWorkerPool tasks redirected to the |
// TaskScheduler as an experiment (unused otherwise). |
const base::TaskPriority task_priority_; |
+ // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect |
+ // SequencedWorkerPool usage to the TaskScheduler. |
+ std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; |
+ |
+ // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as |
+ // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). |
+ // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread(). |
+ mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_; |
+ |
DISALLOW_COPY_AND_ASSIGN(Inner); |
}; |
@@ -519,6 +552,7 @@ SequencedWorkerPool::Worker::Worker( |
worker_pool_(std::move(worker_pool)), |
task_shutdown_behavior_(BLOCK_SHUTDOWN), |
is_processing_task_(false) { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
Start(); |
} |
@@ -526,6 +560,8 @@ SequencedWorkerPool::Worker::~Worker() { |
} |
void SequencedWorkerPool::Worker::Run() { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ |
#if defined(OS_WIN) |
win::ScopedCOMInitializer com_initializer; |
#endif |
@@ -634,6 +670,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
int create_thread_id = 0; |
{ |
AutoLock lock(lock_); |
+ |
danakj
2016/08/15 22:05:49
(you added whitespace here as a result)
gab
2016/09/07 15:00:10
True, but I kind of think it's cleaner that way as
|
if (shutdown_called_) { |
// Don't allow a new task to be posted if it doesn't block shutdown. |
if (shutdown_behavior != BLOCK_SHUTDOWN) |
@@ -669,36 +706,140 @@ bool SequencedWorkerPool::Inner::PostTask( |
if (optional_token_name) |
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
- pending_tasks_.insert(sequenced); |
- if (shutdown_behavior == BLOCK_SHUTDOWN) |
- blocking_shutdown_pending_task_count_++; |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ PostTaskToTaskScheduler(sequenced); |
+ } else { |
+ pending_tasks_.insert(sequenced); |
+ |
+ if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) |
+ blocking_shutdown_pending_task_count_++; |
+ |
+ create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
+ } |
+ } |
- create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
+ if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ // Actually start the additional thread or signal an existing one outside |
+ // the lock. |
+ if (create_thread_id) |
+ FinishStartingAdditionalThread(create_thread_id); |
+ else |
+ SignalHasWork(); |
} |
- // Actually start the additional thread or signal an existing one now that |
- // we're outside the lock. |
- if (create_thread_id) |
- FinishStartingAdditionalThread(create_thread_id); |
- else |
- SignalHasWork(); |
+#if DCHECK_IS_ON() |
+ { |
+ AutoLock lock_for_dcheck(lock_); |
+ // Some variables are exposed in both modes for convenience but only really |
+ // intended for one of them at runtime, confirm exclusive usage here. |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ DCHECK(pending_tasks_.empty()); |
+ DCHECK_EQ(0, create_thread_id); |
+ } else { |
+ DCHECK(sequenced_task_runner_map_.empty()); |
+ } |
+ } |
+#endif // DCHECK_IS_ON() |
return true; |
} |
+void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
+ const SequencedTask& sequenced) { |
fdoray
2016/08/25 15:14:01
This method doesn't honor delays :(
gab
2016/09/07 15:00:10
Oops, looks like you're addressing that in https:/
|
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ |
+ lock_.AssertAcquired(); |
+ |
+ // Confirm that the TaskScheduler's shutdown behaviors use the same |
+ // underlying values as SequencedWorkerPool. |
+ static_assert( |
+ static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == |
+ static_cast<int>(CONTINUE_ON_SHUTDOWN), |
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
+ "CONTINUE_ON_SHUTDOWN."); |
+ static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == |
+ static_cast<int>(SKIP_ON_SHUTDOWN), |
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
+ "SKIP_ON_SHUTDOWN."); |
+ static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == |
+ static_cast<int>(BLOCK_SHUTDOWN), |
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
+ "BLOCK_SHUTDOWN."); |
+ |
+ const TaskShutdownBehavior task_shutdown_behavior = |
+ static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); |
+ const TaskTraits pool_traits = |
+ TaskTraits() |
+ .WithFileIO() |
+ .WithPriority(task_priority_) |
+ .WithShutdownBehavior(task_shutdown_behavior); |
fdoray
2016/08/25 15:14:01
SequencedWorkerPool allows tasks with different sh
gab
2016/09/07 15:00:10
Yes, let's do that (seems like that's what you're
|
+ |
+ // Find or create the TaskScheduler TaskRunner to redirect this task to if |
+ // it is posted to a specific sequence. |
+ scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr; |
+ if (sequenced.sequence_token_id) { |
+ sequenced_task_runner = |
+ &sequenced_task_runner_map_[sequenced.sequence_token_id]; |
+ if (!*sequenced_task_runner) { |
+ const ExecutionMode execution_mode = |
+ max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED |
+ : ExecutionMode::SEQUENCED; |
fdoray
2016/08/25 15:14:01
Should we DCHECK_LE(sequenced_task_runner_map_.siz
gab
2016/09/07 15:00:10
Good point, I think this is the case in practice s
|
+ *sequenced_task_runner = |
+ CreateTaskRunnerWithTraits(pool_traits, execution_mode); |
+ } |
+ } |
+ |
+ if (sequenced_task_runner) { |
+ (*sequenced_task_runner) |
+ ->PostTask(sequenced.posted_from, sequenced.task); |
+ } else { |
+ // PostTaskWithTraits() posts a task with PARALLEL semantics. There are |
+ // however a few pools that use only one thread and therefore can currently |
+ // legitimatelly assume thread affinity despite using SequencedWorkerPool. |
+ // Such pools typically only give access to their TaskRunner which will be |
+ // SINGLE_THREADED per nature of the pool having only one thread but this |
+ // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() |
+ // directly. |
+ DCHECK_GT(max_threads_, 1U); |
+ base::PostTaskWithTraits(sequenced.posted_from, pool_traits, |
+ sequenced.task); |
+ } |
+} |
+ |
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
AutoLock lock(lock_); |
- return ContainsKey(threads_, PlatformThread::CurrentId()); |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (!runs_tasks_on_verifier_) { |
+ runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
+ TaskTraits().WithFileIO().WithPriority(task_priority_), |
+ ExecutionMode::PARALLEL); |
fdoray
2016/08/25 15:14:01
Should we have a special case for single-threaded
gab
2016/09/07 15:00:10
Yes, looks like you got this too :-)
|
+ } |
+ return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
+ } else { |
+ return ContainsKey(threads_, PlatformThread::CurrentId()); |
+ } |
} |
bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
SequenceToken sequence_token) const { |
AutoLock lock(lock_); |
- ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
- if (found == threads_.end()) |
- return false; |
- return found->second->is_processing_task() && |
- sequence_token.Equals(found->second->task_sequence_token()); |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ // TODO(gab): This currently only verifies that the current thread is a |
+ // thread on which a task bound to |sequence_token| *could* run, but it |
+ // doesn't verify that the current is *currently running* a task bound to |
+ // |sequence_token|. |
+ const auto sequenced_task_runner_it = |
+ sequenced_task_runner_map_.find(sequence_token.id_); |
+ return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
+ sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
+ } else { |
+ ThreadMap::const_iterator found = |
+ threads_.find(PlatformThread::CurrentId()); |
+ if (found == threads_.end()) |
+ return false; |
+ return found->second->is_processing_task() && |
+ sequence_token.Equals(found->second->task_sequence_token()); |
+ } |
} |
// See https://code.google.com/p/chromium/issues/detail?id=168415 |
@@ -732,6 +873,10 @@ void SequencedWorkerPool::Inner::Shutdown( |
if (shutdown_called_) |
return; |
shutdown_called_ = true; |
+ |
+ if (g_all_pools_state != AllPoolsState::WORKER_CREATED) |
+ return; |
+ |
max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
// Tickle the threads. This will wake up a waiting one so it will know that |
@@ -771,6 +916,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
} |
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
{ |
AutoLock lock(lock_); |
DCHECK(thread_being_created_); |
@@ -905,6 +1051,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
} |
void SequencedWorkerPool::Inner::HandleCleanup() { |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
if (cleanup_state_ == CLEANUP_DONE) |
return; |
@@ -970,6 +1118,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
SequencedTask* task, |
TimeDelta* wait_time, |
std::vector<Closure>* delete_these_outside_lock) { |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
// Find the next task with a sequence token that's not currently in use. |
@@ -1056,6 +1206,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
} |
int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
// Mark the task's sequence number as in use. |
@@ -1087,6 +1239,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
} |
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { |
@@ -1100,6 +1254,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
int sequence_token_id) const { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
return !sequence_token_id || |
current_sequences_.find(sequence_token_id) == |
@@ -1107,6 +1263,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
} |
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ |
lock_.AssertAcquired(); |
// How thread creation works: |
// |
@@ -1157,15 +1315,24 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
int thread_number) { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ |
// Called outside of the lock. |
DCHECK_GT(thread_number, 0); |
+ if (g_all_pools_state != AllPoolsState::WORKER_CREATED) { |
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
+ g_all_pools_state = AllPoolsState::WORKER_CREATED; |
+ } |
+ |
// The worker is assigned to the list when the thread actually starts, which |
// will manage the memory of the pointer. |
new Worker(worker_pool_, thread_number, thread_name_prefix_); |
} |
void SequencedWorkerPool::Inner::SignalHasWork() { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
+ |
has_work_cv_.Signal(); |
if (testing_observer_) { |
testing_observer_->OnHasWork(); |
@@ -1173,6 +1340,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() { |
} |
bool SequencedWorkerPool::Inner::CanShutdown() const { |
+ DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); |
lock_.AssertAcquired(); |
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
return !thread_being_created_ && |
@@ -1209,6 +1377,18 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
return worker->worker_pool(); |
} |
+// static |
+void SequencedWorkerPool:: |
+ RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { |
+ DCHECK(TaskScheduler::GetInstance()); |
+ // Hitting this DCHECK indicates that a task was posted to a |
+ // SequencedWorkerPool before the TaskScheduler was initialized and |
+ // redirected, posting task to SequencedWorkerPools needs to at least be |
+ // delayed until after that point. |
+ DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
+ g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
+} |
+ |
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
const std::string& thread_name_prefix, |
base::TaskPriority task_priority) |