Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1150)

Unified Diff: base/threading/sequenced_worker_pool.cc

Issue 2285633003: Test SequencedWorkerPool with redirection to the TaskScheduler. (Closed)
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/threading/sequenced_worker_pool.cc
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index 13323d776339183a68aa313fd67edbf28f67c189..bc30222b5d8ca1f492602f1b00507f28e362f1cb 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -52,12 +52,15 @@ namespace base {
namespace {
-// An enum representing the state of all pools. Any given process should only
-// ever transition from NONE_ACTIVE to the active states, transitions between
-// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition
-// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called
-// and the WORKER_CREATED transition occurs when a Worker needs to be created
-// because the first task was posted and the state is still NONE_ACTIVE.
+// An enum representing the state of all pools. Any given non-test process
+// should only ever transition from NONE_ACTIVE to the active states,
+// transitions between actives states are unexpected. The
+// REDIRECTED_TO_TASK_SCHEDULER transition occurs when
+// RedirectToTaskSchedulerForProcess() is called and the WORKER_CREATED
+// transition occurs when a Worker needs to be created because the first task
+// was posted and the state is still NONE_ACTIVE. In a test process,
+// ResetRedirectToTaskSchedulerForProcessForTesting() causes a transition to
+// the NONE_ACTIVE state.
// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
// will be phased out completely otherwise).
enum class AllPoolsState {
@@ -370,9 +373,17 @@ class SequencedWorkerPool::Inner {
CLEANUP_DONE,
};
+ struct TaskRunnerAndShutdownBehavior {
+ scoped_refptr<TaskRunner> task_runner;
+ TaskShutdownBehavior shutdown_behavior;
+ };
+
+ bool RunsTasksOnCurrentThreadAssertSynchronized() const;
+
// Helper used by PostTask() to complete the work when redirection is on.
// Coalesce upon resolution of http://crbug.com/622400.
- void PostTaskToTaskScheduler(const SequencedTask& sequenced);
+ void PostTaskToTaskScheduler(const SequencedTask& sequenced,
+ const TimeDelta& delay);
// Called from within the lock, this converts the given token name into a
// token ID, creating a new one if necessary.
@@ -531,8 +542,13 @@ class SequencedWorkerPool::Inner {
const base::TaskPriority task_priority_;
// A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect
- // SequencedWorkerPool usage to the TaskScheduler.
- std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_;
+ // sequenced tasks to the TaskScheduler.
+ std::map<int, TaskRunnerAndShutdownBehavior> sequenced_task_runner_map_;
robliao 2016/08/27 00:28:59 Unrelated Optional: Should these be unordered_maps
fdoray 2016/08/29 15:07:11 Done.
+
+ // TaskScheduler TaskRunners to redirect unsequenced tasks to the
+ // TaskScheduler.
+ std::map<TaskShutdownBehavior, scoped_refptr<TaskRunner>>
+ unsequenced_task_runners_;
// A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as
// used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread().
@@ -707,7 +723,7 @@ bool SequencedWorkerPool::Inner::PostTask(
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
- PostTaskToTaskScheduler(sequenced);
+ PostTaskToTaskScheduler(sequenced, delay);
} else {
pending_tasks_.insert(sequenced);
@@ -744,8 +760,25 @@ bool SequencedWorkerPool::Inner::PostTask(
return true;
}
+bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThreadAssertSynchronized()
+ const {
+ lock_.AssertAcquired();
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ // TODO(fdoray): Add a special case for single-threaded pools.
+ if (!runs_tasks_on_verifier_) {
+ runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
+ TaskTraits().WithFileIO().WithPriority(task_priority_),
+ ExecutionMode::PARALLEL);
+ }
+ return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
+ } else {
+ return ContainsKey(threads_, PlatformThread::CurrentId());
+ }
+}
+
void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
- const SequencedTask& sequenced) {
+ const SequencedTask& sequenced,
+ const TimeDelta& delay) {
DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
lock_.AssertAcquired();
@@ -774,56 +807,63 @@ void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
.WithPriority(task_priority_)
.WithShutdownBehavior(task_shutdown_behavior);
- // Find or create the TaskScheduler TaskRunner to redirect this task to if
- // it is posted to a specific sequence.
- scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr;
+ // Find or create the TaskScheduler TaskRunner to redirect this task to.
+ scoped_refptr<TaskRunner> task_runner;
if (sequenced.sequence_token_id) {
- sequenced_task_runner =
- &sequenced_task_runner_map_[sequenced.sequence_token_id];
- if (!*sequenced_task_runner) {
+ TaskRunnerAndShutdownBehavior& task_runner_and_shutdown_behavior =
+ sequenced_task_runner_map_[sequenced.sequence_token_id];
+ if (!task_runner_and_shutdown_behavior.task_runner) {
const ExecutionMode execution_mode =
max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED
: ExecutionMode::SEQUENCED;
- *sequenced_task_runner =
+ task_runner_and_shutdown_behavior.task_runner =
CreateTaskRunnerWithTraits(pool_traits, execution_mode);
+ task_runner_and_shutdown_behavior.shutdown_behavior =
+ task_shutdown_behavior;
}
- }
- if (sequenced_task_runner) {
- (*sequenced_task_runner)
- ->PostTask(sequenced.posted_from, sequenced.task);
+ // Posting tasks to the same sequence with different shutdown behaviors
+ // isn't supported by the TaskScheduler.
+ DCHECK_EQ(task_shutdown_behavior,
+ task_runner_and_shutdown_behavior.shutdown_behavior);
+
+ task_runner = task_runner_and_shutdown_behavior.task_runner;
} else {
- // PostTaskWithTraits() posts a task with PARALLEL semantics. There are
- // however a few pools that use only one thread and therefore can currently
- // legitimatelly assume thread affinity despite using SequencedWorkerPool.
- // Such pools typically only give access to their TaskRunner which will be
- // SINGLE_THREADED per nature of the pool having only one thread but this
- // DCHECK ensures no such pools use SequencedWorkerPool::PostTask()
- // directly.
- DCHECK_GT(max_threads_, 1U);
- base::PostTaskWithTraits(sequenced.posted_from, pool_traits,
- sequenced.task);
+ scoped_refptr<TaskRunner>& task_runner_for_shutdown_behavior_ref =
+ unsequenced_task_runners_[task_shutdown_behavior];
+ if (!task_runner_for_shutdown_behavior_ref) {
+ const ExecutionMode execution_mode = max_threads_ == 1U
+ ? ExecutionMode::SINGLE_THREADED
+ : ExecutionMode::PARALLEL;
+ task_runner_for_shutdown_behavior_ref =
+ CreateTaskRunnerWithTraits(pool_traits, execution_mode);
+ }
+
+ task_runner = task_runner_for_shutdown_behavior_ref;
}
+
+ // Single-threaded pools can legitimatelly assume thread affinity. Disallow
+ // posting tasks with different sequence tokens or shutdown behaviors in such
+ // pools since the TaskScheduler can't force them to run on the same thread.
+ DCHECK(max_threads_ > 1 ||
+ (unsequenced_task_runners_.size() +
+ sequenced_task_runner_map_.size()) == 1);
+
+ task_runner->PostDelayedTask(sequenced.posted_from, sequenced.task, delay);
}
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
AutoLock lock(lock_);
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
- if (!runs_tasks_on_verifier_) {
- runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
- TaskTraits().WithFileIO().WithPriority(task_priority_),
- ExecutionMode::PARALLEL);
- }
- return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
- } else {
- return ContainsKey(threads_, PlatformThread::CurrentId());
- }
+ return RunsTasksOnCurrentThreadAssertSynchronized();
}
bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
SequenceToken sequence_token) const {
AutoLock lock(lock_);
if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (!sequence_token.IsValid())
+ return RunsTasksOnCurrentThreadAssertSynchronized();
+
// TODO(gab): This currently only verifies that the current thread is a
// thread on which a task bound to |sequence_token| *could* run, but it
// doesn't verify that the current is *currently running* a task bound to
@@ -831,7 +871,8 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
const auto sequenced_task_runner_it =
sequenced_task_runner_map_.find(sequence_token.id_);
return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
- sequenced_task_runner_it->second->RunsTasksOnCurrentThread();
+ sequenced_task_runner_it->second.task_runner
+ ->RunsTasksOnCurrentThread();
} else {
ThreadMap::const_iterator found =
threads_.find(PlatformThread::CurrentId());
@@ -1378,8 +1419,7 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
}
// static
-void SequencedWorkerPool::
- RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() {
+void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() {
DCHECK(TaskScheduler::GetInstance());
// Hitting this DCHECK indicates that a task was posted to a
// SequencedWorkerPool before the TaskScheduler was initialized and
@@ -1389,6 +1429,11 @@ void SequencedWorkerPool::
g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
}
+// static
+void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() {
+ g_all_pools_state = AllPoolsState::NONE_ACTIVE;
+}
+
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix,
base::TaskPriority task_priority)
@@ -1528,6 +1573,7 @@ bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
}
void SequencedWorkerPool::FlushForTesting() {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
inner_->CleanupForTesting();
}

Powered by Google App Engine
This is Rietveld 408576698