Index: cc/resources/task_graph_runner.cc |
diff --git a/cc/resources/task_graph_runner.cc b/cc/resources/task_graph_runner.cc |
index 1a67faad2c349784395a5de8f00b4f9601ceefd4..c9ab6fea50834e0c75ae52c5aeaccd998eeb544a 100644 |
--- a/cc/resources/task_graph_runner.cc |
+++ b/cc/resources/task_graph_runner.cc |
@@ -134,7 +134,27 @@ void TaskGraph::Reset() { |
edges.clear(); |
} |
-TaskGraphRunner::TaskNamespace::TaskNamespace() {} |
+TaskGraphRunner::TaskSubNamespace::TaskSubNamespace() |
+ : task_namespace(nullptr), |
+ is_in_ready_sub_namespaces(false), |
+ running_task_count(0) { |
+} |
+ |
+TaskGraphRunner::TaskSubNamespace::~TaskSubNamespace() { |
+} |
+ |
+bool TaskGraphRunner::TaskSubNamespace::IsReadyToRun() const { |
+ // A sub namespace is ready to run if it has ready to run tasks, |
+ // and the number of running tasks in the sub namespace is not |
+ // greater than the next prioritized task's |max_concurrent_tasks|. |
+ return !ready_to_run_tasks.empty() && |
+ running_task_count < ready_to_run_tasks.front().max_concurrent_tasks; |
+} |
+ |
+TaskGraphRunner::TaskNamespace::TaskNamespace() { |
+ for (TaskSubNamespace& sub_namespace : sub_namespaces) |
+ sub_namespace.task_namespace = this; |
+} |
TaskGraphRunner::TaskNamespace::~TaskNamespace() {} |
@@ -149,7 +169,7 @@ TaskGraphRunner::~TaskGraphRunner() { |
{ |
base::AutoLock lock(lock_); |
- DCHECK_EQ(0u, ready_to_run_namespaces_.size()); |
+ DCHECK_EQ(0u, ready_to_run_sub_namespaces_.size()); |
DCHECK_EQ(0u, namespaces_.size()); |
} |
} |
@@ -195,7 +215,9 @@ void TaskGraphRunner::ScheduleTasks(NamespaceToken token, TaskGraph* graph) { |
} |
// Build new "ready to run" queue and remove nodes from old graph. |
- task_namespace.ready_to_run_tasks.clear(); |
+ for (TaskSubNamespace& sub_namespace : task_namespace.sub_namespaces) |
+ sub_namespace.ready_to_run_tasks.clear(); |
+ |
for (TaskGraph::Node::Vector::iterator it = graph->nodes.begin(); |
it != graph->nodes.end(); |
++it) { |
@@ -227,15 +249,21 @@ void TaskGraphRunner::ScheduleTasks(NamespaceToken token, TaskGraph* graph) { |
node.task) != task_namespace.running_tasks.end()) |
continue; |
- task_namespace.ready_to_run_tasks.push_back( |
- PrioritizedTask(node.task, node.priority)); |
+ task_namespace.sub_namespaces[node.sub_namespace] |
+ .ready_to_run_tasks.push_back(PrioritizedTask( |
+ node.task, node.priority, node.max_concurrent_tasks)); |
} |
- // Rearrange the elements in |ready_to_run_tasks| in such a way that they |
- // form a heap. |
- std::make_heap(task_namespace.ready_to_run_tasks.begin(), |
- task_namespace.ready_to_run_tasks.end(), |
- CompareTaskPriority); |
+ // Prioritize tasks in each sub namespace. |
+ for (TaskSubNamespace& sub_namespace : task_namespace.sub_namespaces) { |
+ if (!sub_namespace.ready_to_run_tasks.empty()) { |
+ // Rearrange the elements in |ready_to_run_tasks| in |
+ // such a way that they form a heap. |
+ std::make_heap(sub_namespace.ready_to_run_tasks.begin(), |
+ sub_namespace.ready_to_run_tasks.end(), |
+ CompareTaskPriority); |
+ } |
+ } |
// Swap task graph. |
task_namespace.graph.Swap(graph); |
@@ -263,22 +291,28 @@ void TaskGraphRunner::ScheduleTasks(NamespaceToken token, TaskGraph* graph) { |
} |
// Build new "ready to run" task namespaces queue. |
- ready_to_run_namespaces_.clear(); |
+ ready_to_run_sub_namespaces_.clear(); |
for (TaskNamespaceMap::iterator it = namespaces_.begin(); |
it != namespaces_.end(); |
++it) { |
- if (!it->second.ready_to_run_tasks.empty()) |
- ready_to_run_namespaces_.push_back(&it->second); |
+ for (TaskSubNamespace& sub_namespace : it->second.sub_namespaces) { |
+ if (sub_namespace.IsReadyToRun()) { |
+ ready_to_run_sub_namespaces_.push_back(&sub_namespace); |
+ sub_namespace.is_in_ready_sub_namespaces = true; |
+ } else { |
+ sub_namespace.is_in_ready_sub_namespaces = false; |
+ } |
+ } |
} |
// Rearrange the task namespaces in |ready_to_run_namespaces_| in such a way |
// that they form a heap. |
- std::make_heap(ready_to_run_namespaces_.begin(), |
- ready_to_run_namespaces_.end(), |
- CompareTaskNamespacePriority); |
+ std::make_heap(ready_to_run_sub_namespaces_.begin(), |
+ ready_to_run_sub_namespaces_.end(), |
+ CompareTaskSubNamespacePriority); |
// If there is more work available, wake up worker thread. |
- if (!ready_to_run_namespaces_.empty()) |
+ if (!ready_to_run_sub_namespaces_.empty()) |
has_ready_to_run_tasks_cv_.Signal(); |
} |
} |
@@ -326,10 +360,7 @@ void TaskGraphRunner::CollectCompletedTasks(NamespaceToken token, |
if (!HasFinishedRunningTasksInNamespace(&task_namespace)) |
return; |
- // Remove namespace if finished running tasks. |
- DCHECK_EQ(0u, task_namespace.completed_tasks.size()); |
- DCHECK_EQ(0u, task_namespace.ready_to_run_tasks.size()); |
- DCHECK_EQ(0u, task_namespace.running_tasks.size()); |
+ // Remove empty namespace. |
namespaces_.erase(it); |
} |
} |
@@ -337,7 +368,7 @@ void TaskGraphRunner::CollectCompletedTasks(NamespaceToken token, |
void TaskGraphRunner::Shutdown() { |
base::AutoLock lock(lock_); |
- DCHECK_EQ(0u, ready_to_run_namespaces_.size()); |
+ DCHECK_EQ(0u, ready_to_run_sub_namespaces_.size()); |
DCHECK_EQ(0u, namespaces_.size()); |
DCHECK(!shutdown_); |
@@ -352,7 +383,7 @@ void TaskGraphRunner::Run() { |
base::AutoLock lock(lock_); |
while (true) { |
- if (ready_to_run_namespaces_.empty()) { |
+ if (ready_to_run_sub_namespaces_.empty()) { |
// Exit when shutdown is set and no more tasks are pending. |
if (shutdown_) |
break; |
@@ -373,7 +404,7 @@ void TaskGraphRunner::Run() { |
void TaskGraphRunner::RunUntilIdle() { |
base::AutoLock lock(lock_); |
- while (!ready_to_run_namespaces_.empty()) |
+ while (!ready_to_run_sub_namespaces_.empty()) |
RunTaskWithLockAcquired(); |
} |
@@ -381,30 +412,40 @@ void TaskGraphRunner::RunTaskWithLockAcquired() { |
TRACE_EVENT0("toplevel", "TaskGraphRunner::RunTask"); |
lock_.AssertAcquired(); |
- DCHECK(!ready_to_run_namespaces_.empty()); |
+ DCHECK(!ready_to_run_sub_namespaces_.empty()); |
+ |
+ // Take top priority TaskSubNamespace from |ready_to_run_sub_namespaces_|. |
+ std::pop_heap(ready_to_run_sub_namespaces_.begin(), |
+ ready_to_run_sub_namespaces_.end(), |
+ CompareTaskSubNamespacePriority); |
+ TaskSubNamespace* sub_namespace = ready_to_run_sub_namespaces_.back(); |
+ ready_to_run_sub_namespaces_.pop_back(); |
+ sub_namespace->is_in_ready_sub_namespaces = false; |
- // Take top priority TaskNamespace from |ready_to_run_namespaces_|. |
- std::pop_heap(ready_to_run_namespaces_.begin(), |
- ready_to_run_namespaces_.end(), |
- CompareTaskNamespacePriority); |
- TaskNamespace* task_namespace = ready_to_run_namespaces_.back(); |
- ready_to_run_namespaces_.pop_back(); |
- DCHECK(!task_namespace->ready_to_run_tasks.empty()); |
+ TaskNamespace* task_namespace = sub_namespace->task_namespace; |
// Take top priority task from |ready_to_run_tasks|. |
- std::pop_heap(task_namespace->ready_to_run_tasks.begin(), |
- task_namespace->ready_to_run_tasks.end(), |
- CompareTaskPriority); |
- scoped_refptr<Task> task(task_namespace->ready_to_run_tasks.back().task); |
- task_namespace->ready_to_run_tasks.pop_back(); |
- |
- // Add task namespace back to |ready_to_run_namespaces_| if not empty after |
- // taking top priority task. |
- if (!task_namespace->ready_to_run_tasks.empty()) { |
- ready_to_run_namespaces_.push_back(task_namespace); |
- std::push_heap(ready_to_run_namespaces_.begin(), |
- ready_to_run_namespaces_.end(), |
- CompareTaskNamespacePriority); |
+ DCHECK(sub_namespace->IsReadyToRun()); |
+ std::pop_heap(sub_namespace->ready_to_run_tasks.begin(), |
+ sub_namespace->ready_to_run_tasks.end(), CompareTaskPriority); |
+ |
+ PrioritizedTask& prioritized_task = sub_namespace->ready_to_run_tasks.back(); |
+ scoped_refptr<Task> task(prioritized_task.task); |
+ sub_namespace->ready_to_run_tasks.pop_back(); |
+ |
+ // Update number of concurrently running tasks in sub namespace. |
+ sub_namespace->running_task_count++; |
+ DCHECK_LE(sub_namespace->running_task_count, |
+ prioritized_task.max_concurrent_tasks); |
+ |
+ // Add |sub_namespace| back to |ready_to_run_sub_namespaces_| if it's |
+ // still ready to run. |
+ if (sub_namespace->IsReadyToRun()) { |
+ ready_to_run_sub_namespaces_.push_back(sub_namespace); |
+ std::push_heap(ready_to_run_sub_namespaces_.begin(), |
+ ready_to_run_sub_namespaces_.end(), |
+ CompareTaskSubNamespacePriority); |
+ sub_namespace->is_in_ready_sub_namespaces = true; |
} |
// Add task to |running_tasks|. |
@@ -425,6 +466,20 @@ void TaskGraphRunner::RunTaskWithLockAcquired() { |
// This will mark task as finished running. |
task->DidRun(); |
+ // Update number of concurrently running tasks in sub namespace. |
+ sub_namespace->running_task_count--; |
+ |
+ // If sub namespace became ready to run, add it to |
+ // |ready_to_run_sub_namespaces_|. |
+ if (!sub_namespace->is_in_ready_sub_namespaces && |
+ sub_namespace->IsReadyToRun()) { |
+ ready_to_run_sub_namespaces_.push_back(sub_namespace); |
+ std::push_heap(ready_to_run_sub_namespaces_.begin(), |
+ ready_to_run_sub_namespaces_.end(), |
+ CompareTaskSubNamespacePriority); |
+ sub_namespace->is_in_ready_sub_namespaces = true; |
+ } |
+ |
// Remove task from |running_tasks|. |
TaskVector::iterator it = std::find(task_namespace->running_tasks.begin(), |
task_namespace->running_tasks.end(), |
@@ -435,38 +490,43 @@ void TaskGraphRunner::RunTaskWithLockAcquired() { |
// Now iterate over all dependents to decrement dependencies and check if they |
// are ready to run. |
- bool ready_to_run_namespaces_has_heap_properties = true; |
+ bool ready_to_run_sub_namespaces_has_heap_properties = true; |
for (DependentIterator it(&task_namespace->graph, task.get()); it; ++it) { |
TaskGraph::Node& dependent_node = *it; |
DCHECK_LT(0u, dependent_node.dependencies); |
dependent_node.dependencies--; |
- // Task is ready if it has no dependencies. Add it to |ready_to_run_tasks_|. |
+ // Task is ready if it has no dependencies. Add it to |ready_to_run_tasks|. |
if (!dependent_node.dependencies) { |
- bool was_empty = task_namespace->ready_to_run_tasks.empty(); |
- task_namespace->ready_to_run_tasks.push_back( |
- PrioritizedTask(dependent_node.task, dependent_node.priority)); |
- std::push_heap(task_namespace->ready_to_run_tasks.begin(), |
- task_namespace->ready_to_run_tasks.end(), |
+ TaskSubNamespace& dependent_sub_namespace = |
+ task_namespace->sub_namespaces[dependent_node.sub_namespace]; |
+ |
+ dependent_sub_namespace.ready_to_run_tasks.push_back( |
+ PrioritizedTask(dependent_node.task, dependent_node.priority, |
+ dependent_node.max_concurrent_tasks)); |
+ std::push_heap(dependent_sub_namespace.ready_to_run_tasks.begin(), |
+ dependent_sub_namespace.ready_to_run_tasks.end(), |
CompareTaskPriority); |
- // Task namespace is ready if it has at least one ready to run task. Add |
- // it to |ready_to_run_namespaces_| if it just become ready. |
- if (was_empty) { |
- DCHECK(std::find(ready_to_run_namespaces_.begin(), |
- ready_to_run_namespaces_.end(), |
- task_namespace) == ready_to_run_namespaces_.end()); |
- ready_to_run_namespaces_.push_back(task_namespace); |
+ |
+ // If sub namespace became ready to run, add it to |
+ // |ready_to_run_sub_namespaces_|. |
+ if (!dependent_sub_namespace.is_in_ready_sub_namespaces && |
+ dependent_sub_namespace.IsReadyToRun()) { |
+ ready_to_run_sub_namespaces_.push_back(&dependent_sub_namespace); |
+ dependent_sub_namespace.is_in_ready_sub_namespaces = true; |
} |
- ready_to_run_namespaces_has_heap_properties = false; |
+ |
+ ready_to_run_sub_namespaces_has_heap_properties = false; |
} |
} |
- // Rearrange the task namespaces in |ready_to_run_namespaces_| in such a way |
+ // Rearrange the sub namespaces in |ready_to_run_sub_namespaces_| in such a |
+ // way |
danakj
2015/02/23 18:45:01
wrapping
|
// that they yet again form a heap. |
- if (!ready_to_run_namespaces_has_heap_properties) { |
- std::make_heap(ready_to_run_namespaces_.begin(), |
- ready_to_run_namespaces_.end(), |
- CompareTaskNamespacePriority); |
+ if (!ready_to_run_sub_namespaces_has_heap_properties) { |
+ std::make_heap(ready_to_run_sub_namespaces_.begin(), |
+ ready_to_run_sub_namespaces_.end(), |
+ CompareTaskSubNamespacePriority); |
} |
// Finally add task to |completed_tasks_|. |