Index: base/threading/sequenced_worker_pool.cc |
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
index 13323d776339183a68aa313fd67edbf28f67c189..5d6efe167d43bfbe647bb60c74edb13ffb625eeb 100644 |
--- a/base/threading/sequenced_worker_pool.cc |
+++ b/base/threading/sequenced_worker_pool.cc |
@@ -10,6 +10,7 @@ |
#include <map> |
#include <memory> |
#include <set> |
+#include <unordered_map> |
#include <utility> |
#include <vector> |
@@ -52,12 +53,15 @@ namespace base { |
namespace { |
-// An enum representing the state of all pools. Any given process should only |
-// ever transition from NONE_ACTIVE to the active states, transitions between |
-// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition |
-// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called |
-// and the WORKER_CREATED transition occurs when a Worker needs to be created |
-// because the first task was posted and the state is still NONE_ACTIVE. |
+// An enum representing the state of all pools. Any given non-test process |
+// should only ever transition from NONE_ACTIVE to the active states, |
+// transitions between actives states are unexpected. The |
+// REDIRECTED_TO_TASK_SCHEDULER transition occurs when |
+// RedirectToTaskSchedulerForProcess() is called and the WORKER_CREATED |
+// transition occurs when a Worker needs to be created because the first task |
+// was posted and the state is still NONE_ACTIVE. In a test process, |
+// ResetRedirectToTaskSchedulerForProcessForTesting() causes a transition to |
+// the NONE_ACTIVE state. |
// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
// will be phased out completely otherwise). |
enum class AllPoolsState { |
@@ -370,9 +374,17 @@ class SequencedWorkerPool::Inner { |
CLEANUP_DONE, |
}; |
+ struct TaskRunnerAndShutdownBehavior { |
+ scoped_refptr<TaskRunner> task_runner; |
+ TaskShutdownBehavior shutdown_behavior; |
+ }; |
+ |
+ bool RunsTasksOnCurrentThreadAssertSynchronized() const; |
+ |
// Helper used by PostTask() to complete the work when redirection is on. |
// Coalesce upon resolution of http://crbug.com/622400. |
- void PostTaskToTaskScheduler(const SequencedTask& sequenced); |
+ bool PostTaskToTaskScheduler(const SequencedTask& sequenced, |
+ const TimeDelta& delay); |
// Called from within the lock, this converts the given token name into a |
// token ID, creating a new one if necessary. |
@@ -531,8 +543,14 @@ class SequencedWorkerPool::Inner { |
const base::TaskPriority task_priority_; |
// A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect |
- // SequencedWorkerPool usage to the TaskScheduler. |
- std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; |
+ // sequenced tasks to the TaskScheduler. |
+ std::unordered_map<int, TaskRunnerAndShutdownBehavior> |
+ sequenced_task_runner_map_; |
+ |
+ // TaskScheduler TaskRunners to redirect unsequenced tasks to the |
+ // TaskScheduler. |
+ std::unordered_map<TaskShutdownBehavior, scoped_refptr<TaskRunner>> |
+ unsequenced_task_runners_; |
// A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as |
// used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). |
@@ -667,11 +685,14 @@ bool SequencedWorkerPool::Inner::PostTask( |
base::MakeCriticalClosure(task) : task; |
sequenced.time_to_run = TimeTicks::Now() + delay; |
+ bool task_successfully_posted = true; |
+ |
int create_thread_id = 0; |
{ |
AutoLock lock(lock_); |
- if (shutdown_called_) { |
+ if (shutdown_called_ && |
+ g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
// Don't allow a new task to be posted if it doesn't block shutdown. |
if (shutdown_behavior != BLOCK_SHUTDOWN) |
return false; |
@@ -707,7 +728,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
- PostTaskToTaskScheduler(sequenced); |
+ task_successfully_posted = PostTaskToTaskScheduler(sequenced, delay); |
} else { |
pending_tasks_.insert(sequenced); |
@@ -741,11 +762,28 @@ bool SequencedWorkerPool::Inner::PostTask( |
} |
#endif // DCHECK_IS_ON() |
- return true; |
+ return task_successfully_posted; |
+} |
+ |
+bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThreadAssertSynchronized() |
+ const { |
+ lock_.AssertAcquired(); |
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ // TODO(fdoray): Add a special case for single-threaded pools. |
+ if (!runs_tasks_on_verifier_) { |
+ runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
+ TaskTraits().WithFileIO().WithPriority(task_priority_), |
+ ExecutionMode::PARALLEL); |
+ } |
+ return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
+ } else { |
robliao
2016/08/29 23:28:50
Nit: Remove else
fdoray
2016/08/30 17:20:49
Done.
|
+ return ContainsKey(threads_, PlatformThread::CurrentId()); |
+ } |
} |
-void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
- const SequencedTask& sequenced) { |
+bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
+ const SequencedTask& sequenced, |
+ const TimeDelta& delay) { |
DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
lock_.AssertAcquired(); |
@@ -774,56 +812,64 @@ void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
.WithPriority(task_priority_) |
.WithShutdownBehavior(task_shutdown_behavior); |
- // Find or create the TaskScheduler TaskRunner to redirect this task to if |
- // it is posted to a specific sequence. |
- scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr; |
+ // Find or create the TaskScheduler TaskRunner to redirect this task to. |
+ scoped_refptr<TaskRunner> task_runner; |
if (sequenced.sequence_token_id) { |
- sequenced_task_runner = |
- &sequenced_task_runner_map_[sequenced.sequence_token_id]; |
- if (!*sequenced_task_runner) { |
+ TaskRunnerAndShutdownBehavior& task_runner_and_shutdown_behavior = |
+ sequenced_task_runner_map_[sequenced.sequence_token_id]; |
+ if (!task_runner_and_shutdown_behavior.task_runner) { |
const ExecutionMode execution_mode = |
max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED |
: ExecutionMode::SEQUENCED; |
- *sequenced_task_runner = |
+ task_runner_and_shutdown_behavior.task_runner = |
CreateTaskRunnerWithTraits(pool_traits, execution_mode); |
+ task_runner_and_shutdown_behavior.shutdown_behavior = |
+ task_shutdown_behavior; |
} |
- } |
- if (sequenced_task_runner) { |
- (*sequenced_task_runner) |
- ->PostTask(sequenced.posted_from, sequenced.task); |
+ // Posting tasks to the same sequence with different shutdown behaviors |
+ // isn't supported by the TaskScheduler. |
+ DCHECK_EQ(task_shutdown_behavior, |
+ task_runner_and_shutdown_behavior.shutdown_behavior); |
+ |
+ task_runner = task_runner_and_shutdown_behavior.task_runner; |
} else { |
- // PostTaskWithTraits() posts a task with PARALLEL semantics. There are |
- // however a few pools that use only one thread and therefore can currently |
- // legitimatelly assume thread affinity despite using SequencedWorkerPool. |
- // Such pools typically only give access to their TaskRunner which will be |
- // SINGLE_THREADED per nature of the pool having only one thread but this |
- // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() |
- // directly. |
- DCHECK_GT(max_threads_, 1U); |
- base::PostTaskWithTraits(sequenced.posted_from, pool_traits, |
- sequenced.task); |
+ scoped_refptr<TaskRunner>& task_runner_for_shutdown_behavior_ref = |
+ unsequenced_task_runners_[task_shutdown_behavior]; |
+ if (!task_runner_for_shutdown_behavior_ref) { |
+ const ExecutionMode execution_mode = max_threads_ == 1U |
+ ? ExecutionMode::SINGLE_THREADED |
+ : ExecutionMode::PARALLEL; |
+ task_runner_for_shutdown_behavior_ref = |
+ CreateTaskRunnerWithTraits(pool_traits, execution_mode); |
+ } |
+ |
+ task_runner = task_runner_for_shutdown_behavior_ref; |
} |
+ |
+ // Single-threaded pools can legitimatelly assume thread affinity. Disallow |
robliao
2016/08/29 23:28:50
Nit: s/legitimatelly/legitimately/
fdoray
2016/08/30 17:20:49
Done.
|
+ // posting tasks with different sequence tokens or shutdown behaviors in such |
+ // pools since the TaskScheduler can't force them to run on the same thread. |
+ DCHECK(max_threads_ > 1 || |
+ (unsequenced_task_runners_.size() + |
robliao
2016/08/29 23:28:50
A caller could plausibly call GetTaskRunnerWithShu
fdoray
2016/08/30 17:20:49
No, we shouldn't disallow that. Done.
|
+ sequenced_task_runner_map_.size()) == 1); |
+ |
+ return task_runner->PostDelayedTask(sequenced.posted_from, sequenced.task, |
+ delay); |
} |
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
AutoLock lock(lock_); |
- if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
- if (!runs_tasks_on_verifier_) { |
- runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
- TaskTraits().WithFileIO().WithPriority(task_priority_), |
- ExecutionMode::PARALLEL); |
- } |
- return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
- } else { |
- return ContainsKey(threads_, PlatformThread::CurrentId()); |
- } |
+ return RunsTasksOnCurrentThreadAssertSynchronized(); |
} |
bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
SequenceToken sequence_token) const { |
AutoLock lock(lock_); |
if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
+ if (!sequence_token.IsValid()) |
+ return RunsTasksOnCurrentThreadAssertSynchronized(); |
+ |
// TODO(gab): This currently only verifies that the current thread is a |
// thread on which a task bound to |sequence_token| *could* run, but it |
// doesn't verify that the current is *currently running* a task bound to |
@@ -831,7 +877,8 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
const auto sequenced_task_runner_it = |
sequenced_task_runner_map_.find(sequence_token.id_); |
return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
- sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
+ sequenced_task_runner_it->second.task_runner |
+ ->RunsTasksOnCurrentThread(); |
} else { |
ThreadMap::const_iterator found = |
threads_.find(PlatformThread::CurrentId()); |
@@ -1378,8 +1425,7 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
} |
// static |
-void SequencedWorkerPool:: |
- RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { |
+void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() { |
DCHECK(TaskScheduler::GetInstance()); |
// Hitting this DCHECK indicates that a task was posted to a |
// SequencedWorkerPool before the TaskScheduler was initialized and |
@@ -1389,6 +1435,11 @@ void SequencedWorkerPool:: |
g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
} |
+// static |
+void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() { |
+ g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
+} |
+ |
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
const std::string& thread_name_prefix, |
base::TaskPriority task_priority) |
@@ -1528,6 +1579,7 @@ bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
} |
void SequencedWorkerPool::FlushForTesting() { |
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
inner_->CleanupForTesting(); |
} |