Index: base/threading/sequenced_worker_pool.cc |
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
index 13323d776339183a68aa313fd67edbf28f67c189..247e90783dfd179764827c617a855a95080d3470 100644 |
--- a/base/threading/sequenced_worker_pool.cc |
+++ b/base/threading/sequenced_worker_pool.cc |
@@ -10,6 +10,7 @@ |
#include <map> |
#include <memory> |
#include <set> |
+#include <unordered_map> |
#include <utility> |
#include <vector> |
@@ -52,12 +53,15 @@ namespace base { |
namespace { |
-// An enum representing the state of all pools. Any given process should only |
-// ever transition from NONE_ACTIVE to the active states, transitions between |
-// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition |
-// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called |
-// and the WORKER_CREATED transition occurs when a Worker needs to be created |
-// because the first task was posted and the state is still NONE_ACTIVE. |
+// An enum representing the state of all pools. Any given non-test process |
+// should only ever transition from NONE_ACTIVE to the active states, |
+// transitions between actives states are unexpected. The |
+// REDIRECTED_TO_TASK_SCHEDULER transition occurs when |
+// RedirectToTaskSchedulerForProcess() is called and the WORKER_CREATED |
+// transition occurs when a Worker needs to be created because the first task |
+// was posted and the state is still NONE_ACTIVE. In a test process, |
+// ResetRedirectToTaskSchedulerForProcessForTesting() causes a transition to |
+// the NONE_ACTIVE state. |
// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
// will be phased out completely otherwise). |
enum class AllPoolsState { |
@@ -370,9 +374,17 @@ class SequencedWorkerPool::Inner { |
CLEANUP_DONE, |
}; |
+ struct TaskRunnerAndShutdownBehavior { |
+ scoped_refptr<TaskRunner> task_runner; |
+ TaskShutdownBehavior shutdown_behavior; |
+ }; |
+ |
+ bool RunsTasksOnCurrentThreadAssertSynchronized() const; |
+ |
// Helper used by PostTask() to complete the work when redirection is on. |
// Coalesce upon resolution of http://crbug.com/622400. |
- void PostTaskToTaskScheduler(const SequencedTask& sequenced); |
+ bool PostTaskToTaskScheduler(const SequencedTask& sequenced, |
+ const TimeDelta& delay); |
// Called from within the lock, this converts the given token name into a |
// token ID, creating a new one if necessary. |
@@ -530,9 +542,19 @@ class SequencedWorkerPool::Inner { |
// TaskScheduler as an experiment (unused otherwise). |
const base::TaskPriority task_priority_; |
+ // The single-threaded TaskRunner used to redirect tasks to the TaskScheduler. |
+ // Always null when the pool isn't single-threaded. |
+ TaskRunnerAndShutdownBehavior single_threaded_task_runner_; |
+ |
// A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect |
- // SequencedWorkerPool usage to the TaskScheduler. |
- std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; |
+ // sequenced tasks to the TaskScheduler. |
+ std::unordered_map<int, TaskRunnerAndShutdownBehavior> |
+ sequenced_task_runner_map_; |
+ |
+ // TaskScheduler TaskRunners to redirect unsequenced tasks to the |
+ // TaskScheduler. |
+ std::unordered_map<TaskShutdownBehavior, scoped_refptr<TaskRunner>> |
+ unsequenced_task_runners_; |
// A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as |
// used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). |
@@ -667,11 +689,14 @@ bool SequencedWorkerPool::Inner::PostTask( |
base::MakeCriticalClosure(task) : task; |
sequenced.time_to_run = TimeTicks::Now() + delay; |
+ bool task_successfully_posted = true; |
+ |
int create_thread_id = 0; |
{ |
AutoLock lock(lock_); |
- if (shutdown_called_) { |
+ if (shutdown_called_ && |
+ g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
gab
2016/09/07 14:59:09
Good catch, trunk uses atomics for this now though
|
// Don't allow a new task to be posted if it doesn't block shutdown. |
if (shutdown_behavior != BLOCK_SHUTDOWN) |
return false; |
@@ -707,7 +732,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
- PostTaskToTaskScheduler(sequenced); |
+ task_successfully_posted = PostTaskToTaskScheduler(sequenced, delay); |
} else { |
pending_tasks_.insert(sequenced); |
@@ -741,11 +766,27 @@ bool SequencedWorkerPool::Inner::PostTask( |
} |
#endif // DCHECK_IS_ON() |
- return true; |
+ return task_successfully_posted; |
} |
-void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
- const SequencedTask& sequenced) { |
+bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThreadAssertSynchronized() |
+ const { |
+ lock_.AssertAcquired(); |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
gab
2016/09/07 14:59:09
atomic
|
+ // TODO(fdoray): Add a special case for single-threaded pools. |
+ if (!runs_tasks_on_verifier_) { |
+ runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
+ TaskTraits().WithFileIO().WithPriority(task_priority_), |
+ ExecutionMode::PARALLEL); |
+ } |
+ return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
+ } |
+ return ContainsKey(threads_, PlatformThread::CurrentId()); |
+} |
+ |
+bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
+ const SequencedTask& sequenced, |
+ const TimeDelta& delay) { |
DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
lock_.AssertAcquired(); |
@@ -774,56 +815,65 @@ void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
.WithPriority(task_priority_) |
.WithShutdownBehavior(task_shutdown_behavior); |
- // Find or create the TaskScheduler TaskRunner to redirect this task to if |
- // it is posted to a specific sequence. |
- scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr; |
- if (sequenced.sequence_token_id) { |
- sequenced_task_runner = |
- &sequenced_task_runner_map_[sequenced.sequence_token_id]; |
- if (!*sequenced_task_runner) { |
- const ExecutionMode execution_mode = |
- max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED |
- : ExecutionMode::SEQUENCED; |
- *sequenced_task_runner = |
- CreateTaskRunnerWithTraits(pool_traits, execution_mode); |
+ // Find or create the TaskScheduler TaskRunner to redirect this task to. |
+ scoped_refptr<TaskRunner> task_runner; |
+ if (max_threads_ == 1) { |
gab
2016/09/07 14:59:08
I'm thinking we might just want to disallow pools
|
+ if (!single_threaded_task_runner_.task_runner) { |
+ single_threaded_task_runner_.task_runner = CreateTaskRunnerWithTraits( |
+ pool_traits, ExecutionMode::SINGLE_THREADED); |
+ single_threaded_task_runner_.shutdown_behavior = task_shutdown_behavior; |
} |
- } |
- if (sequenced_task_runner) { |
- (*sequenced_task_runner) |
- ->PostTask(sequenced.posted_from, sequenced.task); |
+ // Single-threaded pools can legitimately assume thread affinity. Disallow |
+ // posting tasks with different shutdown behaviors in such pools since the |
+ // TaskScheduler can't force them to run on the same thread. |
+ DCHECK_EQ(task_shutdown_behavior, |
+ single_threaded_task_runner_.shutdown_behavior); |
gab
2016/09/07 14:59:08
Different sequences on single-threaded pool could
|
+ |
+ task_runner = single_threaded_task_runner_.task_runner; |
+ } else if (sequenced.sequence_token_id) { |
+ TaskRunnerAndShutdownBehavior& task_runner_and_shutdown_behavior = |
+ sequenced_task_runner_map_[sequenced.sequence_token_id]; |
+ if (!task_runner_and_shutdown_behavior.task_runner) { |
+ task_runner_and_shutdown_behavior.task_runner = |
+ CreateTaskRunnerWithTraits(pool_traits, ExecutionMode::SEQUENCED); |
+ task_runner_and_shutdown_behavior.shutdown_behavior = |
+ task_shutdown_behavior; |
+ } |
+ |
+ // Posting tasks to the same sequence with different shutdown behaviors |
+ // isn't supported by the TaskScheduler. |
+ DCHECK_EQ(task_shutdown_behavior, |
+ task_runner_and_shutdown_behavior.shutdown_behavior); |
+ |
+ task_runner = task_runner_and_shutdown_behavior.task_runner; |
} else { |
- // PostTaskWithTraits() posts a task with PARALLEL semantics. There are |
- // however a few pools that use only one thread and therefore can currently |
- // legitimatelly assume thread affinity despite using SequencedWorkerPool. |
- // Such pools typically only give access to their TaskRunner which will be |
- // SINGLE_THREADED per nature of the pool having only one thread but this |
- // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() |
- // directly. |
- DCHECK_GT(max_threads_, 1U); |
- base::PostTaskWithTraits(sequenced.posted_from, pool_traits, |
- sequenced.task); |
+ scoped_refptr<TaskRunner>& task_runner_for_shutdown_behavior_ref = |
+ unsequenced_task_runners_[task_shutdown_behavior]; |
+ if (!task_runner_for_shutdown_behavior_ref) { |
+ task_runner_for_shutdown_behavior_ref = |
+ CreateTaskRunnerWithTraits(pool_traits, ExecutionMode::PARALLEL); |
+ } |
+ |
+ task_runner = task_runner_for_shutdown_behavior_ref; |
} |
+ |
+ return task_runner->PostDelayedTask(sequenced.posted_from, sequenced.task, |
+ delay); |
} |
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
AutoLock lock(lock_); |
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
- if (!runs_tasks_on_verifier_) { |
- runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
- TaskTraits().WithFileIO().WithPriority(task_priority_), |
- ExecutionMode::PARALLEL); |
- } |
- return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
- } else { |
- return ContainsKey(threads_, PlatformThread::CurrentId()); |
- } |
+ return RunsTasksOnCurrentThreadAssertSynchronized(); |
} |
bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
SequenceToken sequence_token) const { |
AutoLock lock(lock_); |
if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (!sequence_token.IsValid()) |
+ return RunsTasksOnCurrentThreadAssertSynchronized(); |
danakj
2016/09/01 23:16:34
The non-task runner code would return false if the
|
+ |
// TODO(gab): This currently only verifies that the current thread is a |
danakj
2016/09/01 23:16:34
This TODO applies to the above also right
|
// thread on which a task bound to |sequence_token| *could* run, but it |
// doesn't verify that the current is *currently running* a task bound to |
@@ -831,7 +881,8 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
const auto sequenced_task_runner_it = |
sequenced_task_runner_map_.find(sequence_token.id_); |
return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
- sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
+ sequenced_task_runner_it->second.task_runner |
+ ->RunsTasksOnCurrentThread(); |
} else { |
ThreadMap::const_iterator found = |
threads_.find(PlatformThread::CurrentId()); |
@@ -1378,8 +1429,7 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
} |
// static |
-void SequencedWorkerPool:: |
- RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { |
+void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() { |
DCHECK(TaskScheduler::GetInstance()); |
// Hitting this DCHECK indicates that a task was posted to a |
// SequencedWorkerPool before the TaskScheduler was initialized and |
@@ -1389,6 +1439,11 @@ void SequencedWorkerPool:: |
g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
} |
+// static |
+void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() { |
+ g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
danakj
2016/09/01 23:16:34
DCHECK that it's REDIRECTED_TO_TASK_SCHEDULER curr
gab
2016/09/07 14:59:08
And use atomics now.
|
+} |
+ |
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
const std::string& thread_name_prefix, |
base::TaskPriority task_priority) |
@@ -1528,6 +1583,7 @@ bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
} |
void SequencedWorkerPool::FlushForTesting() { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
gab
2016/09/07 14:59:08
Atomics
|
inner_->CleanupForTesting(); |
} |