Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 #include "vm/thread_pool.h" | 5 #include "vm/thread_pool.h" |
| 6 | 6 |
| 7 #include "vm/flags.h" | 7 #include "vm/flags.h" |
| 8 #include "vm/lockers.h" | 8 #include "vm/lockers.h" |
| 9 | 9 |
| 10 namespace dart { | 10 namespace dart { |
| 11 | 11 |
| 12 DEFINE_FLAG(int, worker_timeout_millis, 5000, | 12 DEFINE_FLAG(int, worker_timeout_millis, 5000, |
| 13 "Free workers when they have been idle for this amount of time."); | 13 "Free workers when they have been idle for this amount of time."); |
| 14 | 14 |
| 15 Monitor* ThreadPool::exit_monitor_ = NULL; | |
| 16 int* ThreadPool::exit_count_ = NULL; | |
| 17 | |
| 18 ThreadPool::ThreadPool() | 15 ThreadPool::ThreadPool() |
| 19 : shutting_down_(false), | 16 : shutting_down_(false), |
| 20 all_workers_(NULL), | 17 all_workers_(NULL), |
| 21 idle_workers_(NULL), | 18 idle_workers_(NULL), |
| 22 count_started_(0), | 19 count_started_(0), |
| 23 count_stopped_(0), | 20 count_stopped_(0), |
| 24 count_running_(0), | 21 count_running_(0), |
| 25 count_idle_(0) { | 22 count_idle_(0), |
| 23 shutting_down_workers_(NULL), | |
| 24 join_list_(NULL) { | |
| 26 } | 25 } |
| 27 | 26 |
| 28 | 27 |
| 29 ThreadPool::~ThreadPool() { | 28 ThreadPool::~ThreadPool() { |
| 30 Shutdown(); | 29 Shutdown(); |
| 31 } | 30 } |
| 32 | 31 |
| 33 | 32 |
| 34 void ThreadPool::Run(Task* task) { | 33 bool ThreadPool::Run(Task* task) { |
| 35 Worker* worker = NULL; | 34 Worker* worker = NULL; |
| 36 bool new_worker = false; | 35 bool new_worker = false; |
| 37 { | 36 { |
| 38 // We need ThreadPool::mutex_ to access worker lists and other | 37 // We need ThreadPool::mutex_ to access worker lists and other |
| 39 // ThreadPool state. | 38 // ThreadPool state. |
| 40 MutexLocker ml(&mutex_); | 39 MutexLocker ml(&mutex_); |
| 41 if (shutting_down_) { | 40 if (shutting_down_) { |
| 42 return; | 41 return false; |
| 43 } | 42 } |
| 44 if (idle_workers_ == NULL) { | 43 if (idle_workers_ == NULL) { |
| 45 worker = new Worker(this); | 44 worker = new Worker(this); |
| 46 ASSERT(worker != NULL); | 45 ASSERT(worker != NULL); |
| 47 new_worker = true; | 46 new_worker = true; |
| 48 count_started_++; | 47 count_started_++; |
| 49 | 48 |
| 50 // Add worker to the all_workers_ list. | 49 // Add worker to the all_workers_ list. |
| 51 worker->all_next_ = all_workers_; | 50 worker->all_next_ = all_workers_; |
| 52 all_workers_ = worker; | 51 all_workers_ = worker; |
| 53 worker->owned_ = true; | 52 worker->owned_ = true; |
| 53 count_running_++; | |
| 54 } else { | 54 } else { |
| 55 // Get the first worker from the idle worker list. | 55 // Get the first worker from the idle worker list. |
| 56 worker = idle_workers_; | 56 worker = idle_workers_; |
| 57 idle_workers_ = worker->idle_next_; | 57 idle_workers_ = worker->idle_next_; |
| 58 worker->idle_next_ = NULL; | 58 worker->idle_next_ = NULL; |
| 59 count_idle_--; | 59 count_idle_--; |
| 60 count_running_++; | |
| 60 } | 61 } |
| 61 count_running_++; | |
| 62 } | 62 } |
| 63 | |
| 63 // Release ThreadPool::mutex_ before calling Worker functions. | 64 // Release ThreadPool::mutex_ before calling Worker functions. |
| 64 ASSERT(worker != NULL); | 65 ASSERT(worker != NULL); |
| 65 worker->SetTask(task); | 66 worker->SetTask(task); |
| 66 if (new_worker) { | 67 if (new_worker) { |
| 67 // Call StartThread after we've assigned the first task. | 68 // Call StartThread after we've assigned the first task. |
| 68 worker->StartThread(); | 69 worker->StartThread(); |
| 69 } | 70 } |
| 71 return true; | |
| 70 } | 72 } |
| 71 | 73 |
| 72 | 74 |
| 73 void ThreadPool::Shutdown() { | 75 void ThreadPool::Shutdown() { |
| 74 Worker* saved = NULL; | 76 Worker* saved = NULL; |
| 75 { | 77 { |
| 76 MutexLocker ml(&mutex_); | 78 MutexLocker ml(&mutex_); |
| 77 shutting_down_ = true; | 79 shutting_down_ = true; |
| 78 saved = all_workers_; | 80 saved = all_workers_; |
| 79 all_workers_ = NULL; | 81 all_workers_ = NULL; |
| 80 idle_workers_ = NULL; | 82 idle_workers_ = NULL; |
| 81 | 83 |
| 82 Worker* current = saved; | 84 Worker* current = saved; |
| 83 while (current != NULL) { | 85 while (current != NULL) { |
| 84 Worker* next = current->all_next_; | 86 Worker* next = current->all_next_; |
| 85 current->idle_next_ = NULL; | 87 current->idle_next_ = NULL; |
| 86 current->owned_ = false; | 88 current->owned_ = false; |
| 87 current = next; | 89 current = next; |
| 88 count_stopped_++; | 90 count_stopped_++; |
| 89 } | 91 } |
| 90 | 92 |
| 91 count_idle_ = 0; | 93 count_idle_ = 0; |
| 92 count_running_ = 0; | 94 count_running_ = 0; |
| 93 ASSERT(count_started_ == count_stopped_); | 95 ASSERT(count_started_ == count_stopped_); |
| 94 } | 96 } |
| 95 // Release ThreadPool::mutex_ before calling Worker functions. | 97 // Release ThreadPool::mutex_ before calling Worker functions. |
| 96 | 98 |
| 97 Worker* current = saved; | 99 { |
| 98 while (current != NULL) { | 100 MonitorLocker eml(&exit_monitor_); |
| 99 // We may access all_next_ without holding ThreadPool::mutex_ here | 101 |
| 100 // because the worker is no longer owned by the ThreadPool. | 102 // First tell all the workers to shut down. |
| 101 Worker* next = current->all_next_; | 103 Worker* current = saved; |
| 102 current->all_next_ = NULL; | 104 ThreadId id = OSThread::GetCurrentThreadId(); |
| 103 current->Shutdown(); | 105 while (current != NULL) { |
| 104 current = next; | 106 Worker* next = current->all_next_; |
| 107 if (current->id_ != id) { | |
|
turnidge
2015/09/15 17:19:59
May need worker lock while reading current->id.
| |
| 108 AddWorkerToShutdownList(current); | |
| 109 } | |
| 110 current->Shutdown(); | |
| 111 current = next; | |
| 112 } | |
| 113 saved = NULL; | |
| 114 | |
| 115 // Wait until all workers will exit. | |
| 116 while (shutting_down_workers_ != NULL) { | |
| 117 // Here, we are waiting for workers to exit. When a worker exits we will | |
| 118 // be notified. | |
| 119 eml.Wait(); | |
| 120 } | |
| 105 } | 121 } |
| 122 | |
| 123 // Extract the join list, and join on the threads. | |
| 124 JoinList* list = NULL; | |
| 125 { | |
| 126 MutexLocker ml(&mutex_); | |
| 127 list = join_list_; | |
| 128 join_list_ = NULL; | |
| 129 } | |
| 130 | |
| 131 // Join non-idle threads. | |
| 132 JoinList::Join(&list); | |
| 133 | |
| 134 #if defined(DEBUG) | |
| 135 { | |
| 136 MutexLocker ml(&mutex_); | |
| 137 ASSERT(join_list_ == NULL); | |
| 138 } | |
| 139 #endif | |
| 106 } | 140 } |
| 107 | 141 |
| 108 | 142 |
| 109 bool ThreadPool::IsIdle(Worker* worker) { | 143 bool ThreadPool::IsIdle(Worker* worker) { |
| 110 ASSERT(worker != NULL && worker->owned_); | 144 ASSERT(worker != NULL && worker->owned_); |
| 111 for (Worker* current = idle_workers_; | 145 for (Worker* current = idle_workers_; |
| 112 current != NULL; | 146 current != NULL; |
| 113 current = current->idle_next_) { | 147 current = current->idle_next_) { |
| 114 if (current == worker) { | 148 if (current == worker) { |
| 115 return true; | 149 return true; |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 149 ASSERT(worker != NULL && worker->owned_); | 183 ASSERT(worker != NULL && worker->owned_); |
| 150 if (all_workers_ == NULL) { | 184 if (all_workers_ == NULL) { |
| 151 return false; | 185 return false; |
| 152 } | 186 } |
| 153 | 187 |
| 154 // Special case head of list. | 188 // Special case head of list. |
| 155 if (all_workers_ == worker) { | 189 if (all_workers_ == worker) { |
| 156 all_workers_ = worker->all_next_; | 190 all_workers_ = worker->all_next_; |
| 157 worker->all_next_ = NULL; | 191 worker->all_next_ = NULL; |
| 158 worker->owned_ = false; | 192 worker->owned_ = false; |
| 159 worker->pool_ = NULL; | 193 worker->done_ = true; |
| 160 return true; | 194 return true; |
| 161 } | 195 } |
| 162 | 196 |
| 163 for (Worker* current = all_workers_; | 197 for (Worker* current = all_workers_; |
| 164 current->all_next_ != NULL; | 198 current->all_next_ != NULL; |
| 165 current = current->all_next_) { | 199 current = current->all_next_) { |
| 166 if (current->all_next_ == worker) { | 200 if (current->all_next_ == worker) { |
| 167 current->all_next_ = worker->all_next_; | 201 current->all_next_ = worker->all_next_; |
| 168 worker->all_next_ = NULL; | 202 worker->all_next_ = NULL; |
| 169 worker->owned_ = false; | 203 worker->owned_ = false; |
| 170 return true; | 204 return true; |
| 171 } | 205 } |
| 172 } | 206 } |
| 173 return false; | 207 return false; |
| 174 } | 208 } |
| 175 | 209 |
| 176 | 210 |
| 177 void ThreadPool::SetIdle(Worker* worker) { | 211 void ThreadPool::SetIdleAndReapExited(Worker* worker) { |
| 178 MutexLocker ml(&mutex_); | 212 JoinList* list = NULL; |
| 179 if (shutting_down_) { | 213 { |
| 180 return; | 214 MutexLocker ml(&mutex_); |
| 215 if (shutting_down_) { | |
| 216 return; | |
| 217 } | |
| 218 ASSERT(worker->owned_ && !IsIdle(worker)); | |
| 219 worker->idle_next_ = idle_workers_; | |
| 220 idle_workers_ = worker; | |
| 221 count_idle_++; | |
| 222 count_running_--; | |
|
turnidge
2015/09/15 17:19:59
Add blank line and a comment...
| |
| 223 list = join_list_; | |
| 224 join_list_ = NULL; | |
| 181 } | 225 } |
| 182 ASSERT(worker->owned_ && !IsIdle(worker)); | 226 JoinList::Join(&list); |
| 183 worker->idle_next_ = idle_workers_; | |
| 184 idle_workers_ = worker; | |
| 185 count_idle_++; | |
| 186 count_running_--; | |
| 187 } | 227 } |
| 188 | 228 |
| 189 | 229 |
| 190 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { | 230 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { |
| 191 MutexLocker ml(&mutex_); | 231 MutexLocker ml(&mutex_); |
| 192 if (shutting_down_) { | 232 if (shutting_down_) { |
| 193 return false; | 233 return false; |
| 194 } | 234 } |
| 195 // Remove from idle list. | 235 // Remove from idle list. |
| 196 if (!RemoveWorkerFromIdleList(worker)) { | 236 if (!RemoveWorkerFromIdleList(worker)) { |
| 197 return false; | 237 return false; |
| 198 } | 238 } |
| 199 // Remove from all list. | 239 // Remove from all list. |
| 200 bool found = RemoveWorkerFromAllList(worker); | 240 bool found = RemoveWorkerFromAllList(worker); |
| 201 ASSERT(found); | 241 ASSERT(found); |
| 202 | 242 |
| 243 // The thread for worker will exit. Add its ThreadId to the join_list_ | |
| 244 // so that we can join on it at the next opportunity. | |
| 245 JoinList::AddLocked(worker->join_id_, &join_list_); | |
| 203 count_stopped_++; | 246 count_stopped_++; |
| 204 count_idle_--; | 247 count_idle_--; |
| 205 return true; | 248 return true; |
| 206 } | 249 } |
| 207 | 250 |
| 208 | 251 |
| 252 // Only call while holding the exit_monitor_ | |
| 253 void ThreadPool::AddWorkerToShutdownList(Worker* worker) { | |
| 254 worker->shutdown_next_ = shutting_down_workers_; | |
| 255 shutting_down_workers_ = worker; | |
| 256 } | |
| 257 | |
| 258 | |
| 259 // Only call while holding the exit_monitor_ | |
| 260 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { | |
| 261 ASSERT(worker != NULL); | |
| 262 ASSERT(shutting_down_workers_ != NULL); | |
| 263 | |
| 264 // Special case head of list. | |
| 265 if (shutting_down_workers_ == worker) { | |
| 266 shutting_down_workers_ = worker->shutdown_next_; | |
| 267 worker->shutdown_next_ = NULL; | |
| 268 return true; | |
| 269 } | |
| 270 | |
| 271 for (Worker* current = shutting_down_workers_; | |
| 272 current->shutdown_next_ != NULL; | |
| 273 current = current->shutdown_next_) { | |
| 274 if (current->shutdown_next_ == worker) { | |
| 275 current->shutdown_next_ = worker->shutdown_next_; | |
| 276 worker->shutdown_next_ = NULL; | |
| 277 return true; | |
| 278 } | |
| 279 } | |
| 280 return false; | |
| 281 } | |
| 282 | |
| 283 | |
| 284 void ThreadPool::JoinList::AddLocked(ThreadJoinId id, JoinList** list) { | |
| 285 *list = new JoinList(id, *list); | |
| 286 } | |
| 287 | |
| 288 | |
| 289 void ThreadPool::JoinList::Join(JoinList** list) { | |
| 290 while (*list != NULL) { | |
| 291 JoinList* current = *list; | |
| 292 *list = current->next(); | |
| 293 OSThread::Join(current->id()); | |
| 294 delete current; | |
| 295 } | |
| 296 } | |
| 297 | |
| 298 | |
| 209 ThreadPool::Task::Task() { | 299 ThreadPool::Task::Task() { |
| 210 } | 300 } |
| 211 | 301 |
| 212 | 302 |
| 213 ThreadPool::Task::~Task() { | 303 ThreadPool::Task::~Task() { |
| 214 } | 304 } |
| 215 | 305 |
| 216 | 306 |
| 217 ThreadPool::Worker::Worker(ThreadPool* pool) | 307 ThreadPool::Worker::Worker(ThreadPool* pool) |
| 218 : pool_(pool), | 308 : pool_(pool), |
| 219 task_(NULL), | 309 task_(NULL), |
| 310 id_(OSThread::kInvalidThreadId), | |
| 311 join_id_(OSThread::kInvalidThreadJoinId), | |
| 312 done_(false), | |
| 220 owned_(false), | 313 owned_(false), |
| 221 all_next_(NULL), | 314 all_next_(NULL), |
| 222 idle_next_(NULL) { | 315 idle_next_(NULL), |
| 316 shutdown_next_(NULL) { | |
| 223 } | 317 } |
| 224 | 318 |
| 225 | 319 |
| 226 void ThreadPool::Worker::StartThread() { | 320 void ThreadPool::Worker::StartThread() { |
| 227 #if defined(DEBUG) | 321 #if defined(DEBUG) |
| 228 // Must call SetTask before StartThread. | 322 // Must call SetTask before StartThread. |
| 229 { // NOLINT | 323 { // NOLINT |
| 230 MonitorLocker ml(&monitor_); | 324 MonitorLocker ml(&monitor_); |
| 231 ASSERT(task_ != NULL); | 325 ASSERT(task_ != NULL); |
| 232 } | 326 } |
| (...skipping 24 matching lines...) Expand all Loading... | |
| 257 // out. Give the worker one last desperate chance to live. We | 351 // out. Give the worker one last desperate chance to live. We |
| 258 // are merciful. | 352 // are merciful. |
| 259 return 1; | 353 return 1; |
| 260 } else { | 354 } else { |
| 261 return FLAG_worker_timeout_millis - waited; | 355 return FLAG_worker_timeout_millis - waited; |
| 262 } | 356 } |
| 263 } | 357 } |
| 264 } | 358 } |
| 265 | 359 |
| 266 | 360 |
| 267 void ThreadPool::Worker::Loop() { | 361 bool ThreadPool::Worker::Loop() { |
| 268 MonitorLocker ml(&monitor_); | 362 MonitorLocker ml(&monitor_); |
| 269 int64_t idle_start; | 363 int64_t idle_start; |
| 270 while (true) { | 364 while (true) { |
| 271 ASSERT(task_ != NULL); | 365 ASSERT(task_ != NULL); |
| 272 Task* task = task_; | 366 Task* task = task_; |
| 273 task_ = NULL; | 367 task_ = NULL; |
| 274 | 368 |
| 275 // Release monitor while handling the task. | 369 // Release monitor while handling the task. |
| 276 monitor_.Exit(); | 370 monitor_.Exit(); |
| 277 task->Run(); | 371 task->Run(); |
| 278 ASSERT(Isolate::Current() == NULL); | 372 ASSERT(Isolate::Current() == NULL); |
| 279 delete task; | 373 delete task; |
| 280 monitor_.Enter(); | 374 monitor_.Enter(); |
| 281 | 375 |
| 282 ASSERT(task_ == NULL); | 376 ASSERT(task_ == NULL); |
| 283 if (IsDone()) { | 377 if (IsDone()) { |
| 284 return; | 378 return false; |
| 285 } | 379 } |
| 286 ASSERT(pool_ != NULL); | 380 ASSERT(!done_); |
| 287 pool_->SetIdle(this); | 381 pool_->SetIdleAndReapExited(this); |
| 288 idle_start = OS::GetCurrentTimeMillis(); | 382 idle_start = OS::GetCurrentTimeMillis(); |
| 289 while (true) { | 383 while (true) { |
| 290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); | 384 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
| 291 if (task_ != NULL) { | 385 if (task_ != NULL) { |
| 292 // We've found a task. Process it, regardless of whether the | 386 // We've found a task. Process it, regardless of whether the |
| 293 // worker is done_. | 387 // worker is done_. |
| 294 break; | 388 break; |
| 295 } | 389 } |
| 296 if (IsDone()) { | 390 if (IsDone()) { |
| 297 return; | 391 return false; |
| 298 } | 392 } |
| 299 if (result == Monitor::kTimedOut && | 393 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { |
| 300 pool_->ReleaseIdleWorker(this)) { | 394 return true; |
| 301 return; | |
| 302 } | 395 } |
| 303 } | 396 } |
| 304 } | 397 } |
| 305 UNREACHABLE(); | 398 UNREACHABLE(); |
| 399 return false; | |
| 306 } | 400 } |
| 307 | 401 |
| 308 | 402 |
| 309 void ThreadPool::Worker::Shutdown() { | 403 void ThreadPool::Worker::Shutdown() { |
| 310 MonitorLocker ml(&monitor_); | 404 MonitorLocker ml(&monitor_); |
| 311 pool_ = NULL; // Fail fast if someone tries to access pool_. | 405 done_ = true; |
| 312 ml.Notify(); | 406 ml.Notify(); |
| 313 } | 407 } |
| 314 | 408 |
| 315 | 409 |
| 316 // static | 410 // static |
| 317 void ThreadPool::Worker::Main(uword args) { | 411 void ThreadPool::Worker::Main(uword args) { |
| 318 Thread::EnsureInit(); | 412 Thread::EnsureInit(); |
| 319 Worker* worker = reinterpret_cast<Worker*>(args); | 413 Worker* worker = reinterpret_cast<Worker*>(args); |
| 320 worker->Loop(); | 414 |
| 415 { | |
| 416 MonitorLocker ml(&(worker->monitor_)); | |
| 417 ASSERT(worker->task_); | |
| 418 worker->id_ = OSThread::GetCurrentThreadId(); | |
| 419 worker->join_id_ = OSThread::GetCurrentThreadJoinId(); | |
|
turnidge
2015/09/15 17:19:59
cache these in locals so we don't need to lock to
| |
| 420 } | |
| 421 | |
| 422 bool released = worker->Loop(); | |
| 321 | 423 |
| 322 // It should be okay to access these unlocked here in this assert. | 424 // It should be okay to access these unlocked here in this assert. |
| 323 ASSERT(!worker->owned_ && | 425 // worker->all_next_ is retained by the pool for shutdown monitoring. |
| 324 worker->all_next_ == NULL && | 426 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL)); |
| 325 worker->idle_next_ == NULL); | |
| 326 | 427 |
| 327 // The exit monitor is only used during testing. | 428 if (!released) { |
| 328 if (ThreadPool::exit_monitor_) { | 429 // This worker is exiting because the thread pool is being shut down. |
| 329 MonitorLocker ml(ThreadPool::exit_monitor_); | 430 // Inform the thread pool that we are exiting. We remove this worker from |
| 330 (*ThreadPool::exit_count_)++; | 431 // shutting_down_workers_ list because there will be no need for the |
| 331 ml.Notify(); | 432 // ThreadPool to take action for this worker. |
| 433 { | |
| 434 MutexLocker ml(&worker->pool_->mutex_); | |
| 435 JoinList::AddLocked(worker->join_id_, &worker->pool_->join_list_); | |
| 436 } | |
| 437 MonitorLocker eml(&worker->pool_->exit_monitor_); | |
| 438 worker->id_ = OSThread::kInvalidThreadId; | |
| 439 worker->join_id_ = OSThread::kInvalidThreadJoinId; | |
|
turnidge
2015/09/15 17:19:59
Setting these not under lock should work, but is i
| |
| 440 worker->pool_->RemoveWorkerFromShutdownList(worker); | |
| 441 delete worker; | |
| 442 eml.Notify(); | |
| 443 } else { | |
| 444 // This worker is going down because it was idle for too long. This case | |
| 445 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker. | |
| 446 // The worker's id is added to the thread pool's join list by | |
| 447 // ReleaseIdleWorker, so in the case that the thread pool begins shutting | |
| 448 // down immediately after returning from worker->Loop() above, we still | |
| 449 // wait for the thread to exit by joining on it in Shutdown(). | |
| 450 delete worker; | |
| 332 } | 451 } |
| 333 delete worker; | |
| 334 #if defined(TARGET_OS_WINDOWS) | 452 #if defined(TARGET_OS_WINDOWS) |
| 335 Thread::CleanUp(); | 453 Thread::CleanUp(); |
| 336 #endif | 454 #endif |
| 337 } | 455 } |
| 338 | 456 |
| 339 } // namespace dart | 457 } // namespace dart |
| OLD | NEW |