| OLD | NEW |
| (Empty) |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "cc/resources/task_graph_runner.h" | |
| 6 | |
| 7 #include <algorithm> | |
| 8 | |
| 9 #include "base/containers/hash_tables.h" | |
| 10 #include "base/debug/trace_event.h" | |
| 11 #include "base/strings/stringprintf.h" | |
| 12 #include "base/threading/thread_restrictions.h" | |
| 13 | |
| 14 namespace cc { | |
| 15 namespace internal { | |
| 16 | |
| 17 Task::Task() | |
| 18 : did_schedule_(false), | |
| 19 did_run_(false) { | |
| 20 } | |
| 21 | |
| 22 Task::~Task() { | |
| 23 DCHECK(!did_run_ || did_schedule_); | |
| 24 } | |
| 25 | |
| 26 void Task::DidSchedule() { | |
| 27 did_schedule_ = true; | |
| 28 } | |
| 29 | |
| 30 void Task::WillRun() { | |
| 31 DCHECK(did_schedule_); | |
| 32 DCHECK(!did_run_); | |
| 33 } | |
| 34 | |
| 35 void Task::DidRun() { | |
| 36 did_run_ = true; | |
| 37 } | |
| 38 | |
| 39 bool Task::HasFinishedRunning() const { | |
| 40 return did_run_; | |
| 41 } | |
| 42 | |
| 43 GraphNode::GraphNode(Task* task, unsigned priority) | |
| 44 : task_(task), | |
| 45 priority_(priority), | |
| 46 num_dependencies_(0) { | |
| 47 } | |
| 48 | |
| 49 GraphNode::~GraphNode() {} | |
| 50 | |
| 51 TaskGraphRunner::TaskNamespace::TaskNamespace() {} | |
| 52 | |
| 53 TaskGraphRunner::TaskNamespace::~TaskNamespace() {} | |
| 54 | |
| 55 TaskGraphRunner::TaskGraphRunner( | |
| 56 size_t num_threads, const std::string& thread_name_prefix) | |
| 57 : lock_(), | |
| 58 has_ready_to_run_tasks_cv_(&lock_), | |
| 59 has_namespaces_with_finished_running_tasks_cv_(&lock_), | |
| 60 next_namespace_id_(1), | |
| 61 next_thread_index_(0u), | |
| 62 shutdown_(false) { | |
| 63 base::AutoLock lock(lock_); | |
| 64 | |
| 65 while (workers_.size() < num_threads) { | |
| 66 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( | |
| 67 new base::DelegateSimpleThread( | |
| 68 this, | |
| 69 thread_name_prefix + | |
| 70 base::StringPrintf( | |
| 71 "Worker%u", | |
| 72 static_cast<unsigned>(workers_.size() + 1)).c_str())); | |
| 73 worker->Start(); | |
| 74 #if defined(OS_ANDROID) || defined(OS_LINUX) | |
| 75 worker->SetThreadPriority(base::kThreadPriority_Background); | |
| 76 #endif | |
| 77 workers_.push_back(worker.Pass()); | |
| 78 } | |
| 79 } | |
| 80 | |
| 81 TaskGraphRunner::~TaskGraphRunner() { | |
| 82 { | |
| 83 base::AutoLock lock(lock_); | |
| 84 | |
| 85 DCHECK_EQ(0u, ready_to_run_namespaces_.size()); | |
| 86 DCHECK_EQ(0u, namespaces_.size()); | |
| 87 | |
| 88 DCHECK(!shutdown_); | |
| 89 shutdown_ = true; | |
| 90 | |
| 91 // Wake up a worker so it knows it should exit. This will cause all workers | |
| 92 // to exit as each will wake up another worker before exiting. | |
| 93 has_ready_to_run_tasks_cv_.Signal(); | |
| 94 } | |
| 95 | |
| 96 while (workers_.size()) { | |
| 97 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); | |
| 98 // Join() is considered IO and will block this thread. | |
| 99 base::ThreadRestrictions::ScopedAllowIO allow_io; | |
| 100 worker->Join(); | |
| 101 } | |
| 102 } | |
| 103 | |
| 104 NamespaceToken TaskGraphRunner::GetNamespaceToken() { | |
| 105 base::AutoLock lock(lock_); | |
| 106 | |
| 107 NamespaceToken token(next_namespace_id_++); | |
| 108 DCHECK(namespaces_.find(token.id_) == namespaces_.end()); | |
| 109 return token; | |
| 110 } | |
| 111 | |
| 112 void TaskGraphRunner::WaitForTasksToFinishRunning(NamespaceToken token) { | |
| 113 base::AutoLock lock(lock_); | |
| 114 | |
| 115 DCHECK(token.IsValid()); | |
| 116 TaskNamespaceMap::iterator it = namespaces_.find(token.id_); | |
| 117 if (it == namespaces_.end()) | |
| 118 return; | |
| 119 | |
| 120 TaskNamespace* task_namespace = it->second; | |
| 121 while (!HasFinishedRunningTasksInNamespace(task_namespace)) | |
| 122 has_namespaces_with_finished_running_tasks_cv_.Wait(); | |
| 123 | |
| 124 // There may be other namespaces that have finished running | |
| 125 // tasks, so wake up another origin thread. | |
| 126 has_namespaces_with_finished_running_tasks_cv_.Signal(); | |
| 127 } | |
| 128 | |
| 129 void TaskGraphRunner::SetTaskGraph(NamespaceToken token, TaskGraph* graph) { | |
| 130 DCHECK(token.IsValid()); | |
| 131 | |
| 132 TaskGraph new_pending_tasks; | |
| 133 TaskGraph new_running_tasks; | |
| 134 | |
| 135 new_pending_tasks.swap(*graph); | |
| 136 | |
| 137 { | |
| 138 base::AutoLock lock(lock_); | |
| 139 | |
| 140 DCHECK(!shutdown_); | |
| 141 | |
| 142 scoped_ptr<TaskNamespace> task_namespace = namespaces_.take_and_erase( | |
| 143 token.id_); | |
| 144 | |
| 145 // Create task namespace if it doesn't exist. | |
| 146 if (!task_namespace) | |
| 147 task_namespace.reset(new TaskNamespace); | |
| 148 | |
| 149 // First remove all completed tasks from |new_pending_tasks| and | |
| 150 // adjust number of dependencies. | |
| 151 for (Task::Vector::iterator it = task_namespace->completed_tasks.begin(); | |
| 152 it != task_namespace->completed_tasks.end(); ++it) { | |
| 153 Task* task = it->get(); | |
| 154 | |
| 155 scoped_ptr<GraphNode> node = new_pending_tasks.take_and_erase(task); | |
| 156 if (node) { | |
| 157 for (GraphNode::Vector::const_iterator it = node->dependents().begin(); | |
| 158 it != node->dependents().end(); ++it) { | |
| 159 GraphNode* dependent_node = *it; | |
| 160 dependent_node->remove_dependency(); | |
| 161 } | |
| 162 } | |
| 163 } | |
| 164 | |
| 165 // Build new running task set. | |
| 166 for (TaskGraph::iterator it = task_namespace->running_tasks.begin(); | |
| 167 it != task_namespace->running_tasks.end(); ++it) { | |
| 168 Task* task = it->first; | |
| 169 // Transfer scheduled task value from |new_pending_tasks| to | |
| 170 // |new_running_tasks| if currently running. Value must be set to | |
| 171 // NULL if |new_pending_tasks| doesn't contain task. This does | |
| 172 // the right in both cases. | |
| 173 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); | |
| 174 } | |
| 175 | |
| 176 // Build new "ready to run" tasks queue. | |
| 177 task_namespace->ready_to_run_tasks.clear(); | |
| 178 for (TaskGraph::iterator it = new_pending_tasks.begin(); | |
| 179 it != new_pending_tasks.end(); ++it) { | |
| 180 Task* task = it->first; | |
| 181 DCHECK(task); | |
| 182 GraphNode* node = it->second; | |
| 183 | |
| 184 // Completed tasks should not exist in |new_pending_tasks|. | |
| 185 DCHECK(!task->HasFinishedRunning()); | |
| 186 | |
| 187 // Call DidSchedule() to indicate that this task has been scheduled. | |
| 188 // Note: This is only for debugging purposes. | |
| 189 task->DidSchedule(); | |
| 190 | |
| 191 if (!node->num_dependencies()) | |
| 192 task_namespace->ready_to_run_tasks.push_back(node); | |
| 193 | |
| 194 // Erase the task from old pending tasks. | |
| 195 task_namespace->pending_tasks.erase(task); | |
| 196 } | |
| 197 | |
| 198 // Rearrange the elements in |ready_to_run_tasks| in such a way that | |
| 199 // they form a heap. | |
| 200 std::make_heap(task_namespace->ready_to_run_tasks.begin(), | |
| 201 task_namespace->ready_to_run_tasks.end(), | |
| 202 CompareTaskPriority); | |
| 203 | |
| 204 task_namespace->completed_tasks.reserve( | |
| 205 task_namespace->completed_tasks.size() + | |
| 206 task_namespace->pending_tasks.size()); | |
| 207 | |
| 208 // The items left in |pending_tasks| need to be canceled. | |
| 209 for (TaskGraph::const_iterator it = task_namespace->pending_tasks.begin(); | |
| 210 it != task_namespace->pending_tasks.end(); ++it) { | |
| 211 task_namespace->completed_tasks.push_back(it->first); | |
| 212 } | |
| 213 | |
| 214 // Swap task sets. | |
| 215 // Note: old tasks are intentionally destroyed after releasing |lock_|. | |
| 216 task_namespace->pending_tasks.swap(new_pending_tasks); | |
| 217 task_namespace->running_tasks.swap(new_running_tasks); | |
| 218 | |
| 219 // If |ready_to_run_tasks| is empty, it means we either have | |
| 220 // running tasks, or we have no pending tasks. | |
| 221 DCHECK(!task_namespace->ready_to_run_tasks.empty() || | |
| 222 (task_namespace->pending_tasks.empty() || | |
| 223 !task_namespace->running_tasks.empty())); | |
| 224 | |
| 225 // Add task namespace if not empty. | |
| 226 if (!task_namespace->pending_tasks.empty() || | |
| 227 !task_namespace->running_tasks.empty() || | |
| 228 !task_namespace->completed_tasks.empty()) { | |
| 229 namespaces_.set(token.id_, task_namespace.Pass()); | |
| 230 } | |
| 231 | |
| 232 // Build new "ready to run" task namespaces queue. | |
| 233 ready_to_run_namespaces_.clear(); | |
| 234 for (TaskNamespaceMap::iterator it = namespaces_.begin(); | |
| 235 it != namespaces_.end(); ++it) { | |
| 236 if (!it->second->ready_to_run_tasks.empty()) | |
| 237 ready_to_run_namespaces_.push_back(it->second); | |
| 238 } | |
| 239 | |
| 240 // Rearrange the task namespaces in |ready_to_run_namespaces_| | |
| 241 // in such a way that they form a heap. | |
| 242 std::make_heap(ready_to_run_namespaces_.begin(), | |
| 243 ready_to_run_namespaces_.end(), | |
| 244 CompareTaskNamespacePriority); | |
| 245 | |
| 246 // If there is more work available, wake up worker thread. | |
| 247 if (!ready_to_run_namespaces_.empty()) | |
| 248 has_ready_to_run_tasks_cv_.Signal(); | |
| 249 } | |
| 250 } | |
| 251 | |
| 252 void TaskGraphRunner::CollectCompletedTasks( | |
| 253 NamespaceToken token, Task::Vector* completed_tasks) { | |
| 254 base::AutoLock lock(lock_); | |
| 255 | |
| 256 DCHECK(token.IsValid()); | |
| 257 TaskNamespaceMap::iterator it = namespaces_.find(token.id_); | |
| 258 if (it == namespaces_.end()) | |
| 259 return; | |
| 260 | |
| 261 TaskNamespace* task_namespace = it->second; | |
| 262 | |
| 263 DCHECK_EQ(0u, completed_tasks->size()); | |
| 264 completed_tasks->swap(task_namespace->completed_tasks); | |
| 265 if (!HasFinishedRunningTasksInNamespace(task_namespace)) | |
| 266 return; | |
| 267 | |
| 268 // Remove namespace if finished running tasks. | |
| 269 DCHECK_EQ(0u, task_namespace->pending_tasks.size()); | |
| 270 DCHECK_EQ(0u, task_namespace->running_tasks.size()); | |
| 271 DCHECK_EQ(0u, task_namespace->completed_tasks.size()); | |
| 272 DCHECK_EQ(0u, task_namespace->ready_to_run_tasks.size()); | |
| 273 namespaces_.erase(it); | |
| 274 } | |
| 275 | |
| 276 void TaskGraphRunner::Run() { | |
| 277 base::AutoLock lock(lock_); | |
| 278 | |
| 279 // Get a unique thread index. | |
| 280 int thread_index = next_thread_index_++; | |
| 281 | |
| 282 while (true) { | |
| 283 if (ready_to_run_namespaces_.empty()) { | |
| 284 // Exit when shutdown is set and no more tasks are pending. | |
| 285 if (shutdown_) | |
| 286 break; | |
| 287 | |
| 288 // Wait for more tasks. | |
| 289 has_ready_to_run_tasks_cv_.Wait(); | |
| 290 continue; | |
| 291 } | |
| 292 | |
| 293 // Take top priority TaskNamespace from |ready_to_run_namespaces_|. | |
| 294 std::pop_heap(ready_to_run_namespaces_.begin(), | |
| 295 ready_to_run_namespaces_.end(), | |
| 296 CompareTaskNamespacePriority); | |
| 297 TaskNamespace* task_namespace = ready_to_run_namespaces_.back(); | |
| 298 ready_to_run_namespaces_.pop_back(); | |
| 299 DCHECK(!task_namespace->ready_to_run_tasks.empty()); | |
| 300 | |
| 301 // Take top priority task from |ready_to_run_tasks|. | |
| 302 std::pop_heap(task_namespace->ready_to_run_tasks.begin(), | |
| 303 task_namespace->ready_to_run_tasks.end(), | |
| 304 CompareTaskPriority); | |
| 305 scoped_refptr<Task> task( | |
| 306 task_namespace->ready_to_run_tasks.back()->task()); | |
| 307 task_namespace->ready_to_run_tasks.pop_back(); | |
| 308 | |
| 309 // Add task namespace back to |ready_to_run_namespaces_| if not | |
| 310 // empty after taking top priority task. | |
| 311 if (!task_namespace->ready_to_run_tasks.empty()) { | |
| 312 ready_to_run_namespaces_.push_back(task_namespace); | |
| 313 std::push_heap(ready_to_run_namespaces_.begin(), | |
| 314 ready_to_run_namespaces_.end(), | |
| 315 CompareTaskNamespacePriority); | |
| 316 } | |
| 317 | |
| 318 // Move task from |pending_tasks| to |running_tasks|. | |
| 319 DCHECK(task_namespace->pending_tasks.contains(task.get())); | |
| 320 DCHECK(!task_namespace->running_tasks.contains(task.get())); | |
| 321 task_namespace->running_tasks.set( | |
| 322 task.get(), | |
| 323 task_namespace->pending_tasks.take_and_erase(task.get())); | |
| 324 | |
| 325 // There may be more work available, so wake up another worker thread. | |
| 326 has_ready_to_run_tasks_cv_.Signal(); | |
| 327 | |
| 328 // Call WillRun() before releasing |lock_| and running task. | |
| 329 task->WillRun(); | |
| 330 | |
| 331 { | |
| 332 base::AutoUnlock unlock(lock_); | |
| 333 | |
| 334 task->RunOnWorkerThread(thread_index); | |
| 335 } | |
| 336 | |
| 337 // This will mark task as finished running. | |
| 338 task->DidRun(); | |
| 339 | |
| 340 // Now iterate over all dependents to remove dependency and check | |
| 341 // if they are ready to run. | |
| 342 scoped_ptr<GraphNode> node = | |
| 343 task_namespace->running_tasks.take_and_erase(task.get()); | |
| 344 if (node) { | |
| 345 bool ready_to_run_namespaces_has_heap_properties = true; | |
| 346 | |
| 347 for (GraphNode::Vector::const_iterator it = node->dependents().begin(); | |
| 348 it != node->dependents().end(); ++it) { | |
| 349 GraphNode* dependent_node = *it; | |
| 350 | |
| 351 dependent_node->remove_dependency(); | |
| 352 // Task is ready if it has no dependencies. Add it to | |
| 353 // |ready_to_run_tasks_|. | |
| 354 if (!dependent_node->num_dependencies()) { | |
| 355 bool was_empty = task_namespace->ready_to_run_tasks.empty(); | |
| 356 task_namespace->ready_to_run_tasks.push_back(dependent_node); | |
| 357 std::push_heap(task_namespace->ready_to_run_tasks.begin(), | |
| 358 task_namespace->ready_to_run_tasks.end(), | |
| 359 CompareTaskPriority); | |
| 360 // Task namespace is ready if it has at least one ready | |
| 361 // to run task. Add it to |ready_to_run_namespaces_| if | |
| 362 // it just become ready. | |
| 363 if (was_empty) { | |
| 364 DCHECK(std::find(ready_to_run_namespaces_.begin(), | |
| 365 ready_to_run_namespaces_.end(), | |
| 366 task_namespace) == | |
| 367 ready_to_run_namespaces_.end()); | |
| 368 ready_to_run_namespaces_.push_back(task_namespace); | |
| 369 } | |
| 370 ready_to_run_namespaces_has_heap_properties = false; | |
| 371 } | |
| 372 } | |
| 373 | |
| 374 // Rearrange the task namespaces in |ready_to_run_namespaces_| | |
| 375 // in such a way that they yet again form a heap. | |
| 376 if (!ready_to_run_namespaces_has_heap_properties) { | |
| 377 std::make_heap(ready_to_run_namespaces_.begin(), | |
| 378 ready_to_run_namespaces_.end(), | |
| 379 CompareTaskNamespacePriority); | |
| 380 } | |
| 381 } | |
| 382 | |
| 383 // Finally add task to |completed_tasks_|. | |
| 384 task_namespace->completed_tasks.push_back(task); | |
| 385 | |
| 386 // If namespace has finished running all tasks, wake up origin thread. | |
| 387 if (HasFinishedRunningTasksInNamespace(task_namespace)) | |
| 388 has_namespaces_with_finished_running_tasks_cv_.Signal(); | |
| 389 } | |
| 390 | |
| 391 // We noticed we should exit. Wake up the next worker so it knows it should | |
| 392 // exit as well (because the Shutdown() code only signals once). | |
| 393 has_ready_to_run_tasks_cv_.Signal(); | |
| 394 } | |
| 395 | |
| 396 } // namespace internal | |
| 397 } // namespace cc | |
| OLD | NEW |