| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 #include "vm/thread_pool.h" | 5 #include "vm/thread_pool.h" |
| 6 | 6 |
| 7 #include "vm/flags.h" | 7 #include "vm/flags.h" |
| 8 #include "vm/lockers.h" | 8 #include "vm/lockers.h" |
| 9 | 9 |
| 10 namespace dart { | 10 namespace dart { |
| 11 | 11 |
| 12 DEFINE_FLAG(int, worker_timeout_millis, 5000, | 12 DEFINE_FLAG(int, worker_timeout_millis, 5000, |
| 13 "Free workers when they have been idle for this amount of time."); | 13 "Free workers when they have been idle for this amount of time."); |
| 14 | 14 |
| 15 Monitor* ThreadPool::exit_monitor_ = NULL; |
| 16 int* ThreadPool::exit_count_ = NULL; |
| 17 |
| 15 ThreadPool::ThreadPool() | 18 ThreadPool::ThreadPool() |
| 16 : shutting_down_(false), | 19 : shutting_down_(false), |
| 17 all_workers_(NULL), | 20 all_workers_(NULL), |
| 18 idle_workers_(NULL), | 21 idle_workers_(NULL), |
| 19 count_started_(0), | 22 count_started_(0), |
| 20 count_stopped_(0), | 23 count_stopped_(0), |
| 21 count_running_(0), | 24 count_running_(0), |
| 22 count_idle_(0), | 25 count_idle_(0) { |
| 23 shutting_down_workers_(NULL) { | |
| 24 } | 26 } |
| 25 | 27 |
| 26 | 28 |
| 27 ThreadPool::~ThreadPool() { | 29 ThreadPool::~ThreadPool() { |
| 28 Shutdown(); | 30 Shutdown(); |
| 29 } | 31 } |
| 30 | 32 |
| 31 | 33 |
| 32 void ThreadPool::Run(Task* task) { | 34 void ThreadPool::Run(Task* task) { |
| 33 Worker* worker = NULL; | 35 Worker* worker = NULL; |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 85 current = next; | 87 current = next; |
| 86 count_stopped_++; | 88 count_stopped_++; |
| 87 } | 89 } |
| 88 | 90 |
| 89 count_idle_ = 0; | 91 count_idle_ = 0; |
| 90 count_running_ = 0; | 92 count_running_ = 0; |
| 91 ASSERT(count_started_ == count_stopped_); | 93 ASSERT(count_started_ == count_stopped_); |
| 92 } | 94 } |
| 93 // Release ThreadPool::mutex_ before calling Worker functions. | 95 // Release ThreadPool::mutex_ before calling Worker functions. |
| 94 | 96 |
| 95 { | 97 Worker* current = saved; |
| 96 MonitorLocker eml(&exit_monitor_); | 98 while (current != NULL) { |
| 97 | 99 // We may access all_next_ without holding ThreadPool::mutex_ here |
| 98 // First tell all the workers to shut down. | 100 // because the worker is no longer owned by the ThreadPool. |
| 99 Worker* current = saved; | 101 Worker* next = current->all_next_; |
| 100 while (current != NULL) { | 102 current->all_next_ = NULL; |
| 101 Worker* next = current->all_next_; | 103 current->Shutdown(); |
| 102 if (current->id_ != OSThread::GetCurrentThreadId()) { | 104 current = next; |
| 103 AddWorkerToShutdownList(current); | |
| 104 } | |
| 105 current->Shutdown(); | |
| 106 current = next; | |
| 107 } | |
| 108 saved = NULL; | |
| 109 | |
| 110 // Wait until all workers have exited. | |
| 111 while (shutting_down_workers_ != NULL) { | |
| 112 // Here, we are waiting for workers to exit. When a worker exits we will | |
| 113 // be notified. | |
| 114 eml.Wait(); | |
| 115 } | |
| 116 } | 105 } |
| 117 } | 106 } |
| 118 | 107 |
| 119 | 108 |
| 120 bool ThreadPool::IsIdle(Worker* worker) { | 109 bool ThreadPool::IsIdle(Worker* worker) { |
| 121 ASSERT(worker != NULL && worker->owned_); | 110 ASSERT(worker != NULL && worker->owned_); |
| 122 for (Worker* current = idle_workers_; | 111 for (Worker* current = idle_workers_; |
| 123 current != NULL; | 112 current != NULL; |
| 124 current = current->idle_next_) { | 113 current = current->idle_next_) { |
| 125 if (current == worker) { | 114 if (current == worker) { |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 161 if (all_workers_ == NULL) { | 150 if (all_workers_ == NULL) { |
| 162 return false; | 151 return false; |
| 163 } | 152 } |
| 164 | 153 |
| 165 // Special case head of list. | 154 // Special case head of list. |
| 166 if (all_workers_ == worker) { | 155 if (all_workers_ == worker) { |
| 167 all_workers_ = worker->all_next_; | 156 all_workers_ = worker->all_next_; |
| 168 worker->all_next_ = NULL; | 157 worker->all_next_ = NULL; |
| 169 worker->owned_ = false; | 158 worker->owned_ = false; |
| 170 worker->pool_ = NULL; | 159 worker->pool_ = NULL; |
| 171 worker->done_ = true; | |
| 172 return true; | 160 return true; |
| 173 } | 161 } |
| 174 | 162 |
| 175 for (Worker* current = all_workers_; | 163 for (Worker* current = all_workers_; |
| 176 current->all_next_ != NULL; | 164 current->all_next_ != NULL; |
| 177 current = current->all_next_) { | 165 current = current->all_next_) { |
| 178 if (current->all_next_ == worker) { | 166 if (current->all_next_ == worker) { |
| 179 current->all_next_ = worker->all_next_; | 167 current->all_next_ = worker->all_next_; |
| 180 worker->all_next_ = NULL; | 168 worker->all_next_ = NULL; |
| 181 worker->owned_ = false; | 169 worker->owned_ = false; |
| (...skipping 29 matching lines...) Expand all Loading... |
| 211 // Remove from all list. | 199 // Remove from all list. |
| 212 bool found = RemoveWorkerFromAllList(worker); | 200 bool found = RemoveWorkerFromAllList(worker); |
| 213 ASSERT(found); | 201 ASSERT(found); |
| 214 | 202 |
| 215 count_stopped_++; | 203 count_stopped_++; |
| 216 count_idle_--; | 204 count_idle_--; |
| 217 return true; | 205 return true; |
| 218 } | 206 } |
| 219 | 207 |
| 220 | 208 |
| 221 // Only call while holding the exit_monitor_ | |
| 222 void ThreadPool::AddWorkerToShutdownList(Worker* worker) { | |
| 223 worker->shutdown_next_ = shutting_down_workers_; | |
| 224 shutting_down_workers_ = worker; | |
| 225 } | |
| 226 | |
| 227 | |
| 228 // Only call while holding the exit_monitor_ | |
| 229 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { | |
| 230 ASSERT(worker != NULL); | |
| 231 ASSERT(shutting_down_workers_ != NULL); | |
| 232 | |
| 233 // Special case head of list. | |
| 234 if (shutting_down_workers_ == worker) { | |
| 235 shutting_down_workers_ = worker->shutdown_next_; | |
| 236 worker->shutdown_next_ = NULL; | |
| 237 return true; | |
| 238 } | |
| 239 | |
| 240 for (Worker* current = shutting_down_workers_; | |
| 241 current->shutdown_next_ != NULL; | |
| 242 current = current->shutdown_next_) { | |
| 243 if (current->shutdown_next_ == worker) { | |
| 244 current->shutdown_next_ = worker->shutdown_next_; | |
| 245 worker->shutdown_next_ = NULL; | |
| 246 return true; | |
| 247 } | |
| 248 } | |
| 249 return false; | |
| 250 } | |
| 251 | |
| 252 | |
| 253 ThreadPool::Task::Task() { | 209 ThreadPool::Task::Task() { |
| 254 } | 210 } |
| 255 | 211 |
| 256 | 212 |
| 257 ThreadPool::Task::~Task() { | 213 ThreadPool::Task::~Task() { |
| 258 } | 214 } |
| 259 | 215 |
| 260 | 216 |
| 261 ThreadPool::Worker::Worker(ThreadPool* pool) | 217 ThreadPool::Worker::Worker(ThreadPool* pool) |
| 262 : pool_(pool), | 218 : pool_(pool), |
| 263 done_(false), | |
| 264 task_(NULL), | 219 task_(NULL), |
| 265 id_(OSThread::kInvalidThreadId), | |
| 266 started_(false), | |
| 267 owned_(false), | 220 owned_(false), |
| 268 all_next_(NULL), | 221 all_next_(NULL), |
| 269 idle_next_(NULL), | 222 idle_next_(NULL) { |
| 270 shutdown_next_(NULL) { | |
| 271 } | 223 } |
| 272 | 224 |
| 273 | 225 |
| 274 void ThreadPool::Worker::StartThread() { | 226 void ThreadPool::Worker::StartThread() { |
| 275 #if defined(DEBUG) | 227 #if defined(DEBUG) |
| 276 // Must call SetTask before StartThread. | 228 // Must call SetTask before StartThread. |
| 277 { // NOLINT | 229 { // NOLINT |
| 278 MonitorLocker ml(&monitor_); | 230 MonitorLocker ml(&monitor_); |
| 279 ASSERT(task_ != NULL); | 231 ASSERT(task_ != NULL); |
| 280 } | 232 } |
| (...skipping 24 matching lines...) Expand all Loading... |
| 305 // out. Give the worker one last desperate chance to live. We | 257 // out. Give the worker one last desperate chance to live. We |
| 306 // are merciful. | 258 // are merciful. |
| 307 return 1; | 259 return 1; |
| 308 } else { | 260 } else { |
| 309 return FLAG_worker_timeout_millis - waited; | 261 return FLAG_worker_timeout_millis - waited; |
| 310 } | 262 } |
| 311 } | 263 } |
| 312 } | 264 } |
| 313 | 265 |
| 314 | 266 |
| 315 bool ThreadPool::Worker::Loop() { | 267 void ThreadPool::Worker::Loop() { |
| 316 MonitorLocker ml(&monitor_); | 268 MonitorLocker ml(&monitor_); |
| 317 int64_t idle_start; | 269 int64_t idle_start; |
| 318 while (true) { | 270 while (true) { |
| 319 ASSERT(task_ != NULL); | 271 ASSERT(task_ != NULL); |
| 320 Task* task = task_; | 272 Task* task = task_; |
| 321 task_ = NULL; | 273 task_ = NULL; |
| 322 | 274 |
| 323 // Release monitor while handling the task. | 275 // Release monitor while handling the task. |
| 324 monitor_.Exit(); | 276 monitor_.Exit(); |
| 325 task->Run(); | 277 task->Run(); |
| 326 ASSERT(Isolate::Current() == NULL); | 278 ASSERT(Isolate::Current() == NULL); |
| 327 delete task; | 279 delete task; |
| 328 monitor_.Enter(); | 280 monitor_.Enter(); |
| 329 | 281 |
| 330 ASSERT(task_ == NULL); | 282 ASSERT(task_ == NULL); |
| 331 if (IsDone()) { | 283 if (IsDone()) { |
| 332 return false; | 284 return; |
| 333 } | 285 } |
| 334 ASSERT(!done_); | 286 ASSERT(pool_ != NULL); |
| 335 pool_->SetIdle(this); | 287 pool_->SetIdle(this); |
| 336 idle_start = OS::GetCurrentTimeMillis(); | 288 idle_start = OS::GetCurrentTimeMillis(); |
| 337 while (true) { | 289 while (true) { |
| 338 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); | 290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
| 339 if (task_ != NULL) { | 291 if (task_ != NULL) { |
| 340 // We've found a task. Process it, regardless of whether the | 292 // We've found a task. Process it, regardless of whether the |
| 341 // worker is done_. | 293 // worker is done_. |
| 342 break; | 294 break; |
| 343 } | 295 } |
| 344 if (IsDone()) { | 296 if (IsDone()) { |
| 345 return false; | 297 return; |
| 346 } | 298 } |
| 347 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { | 299 if (result == Monitor::kTimedOut && |
| 348 return true; | 300 pool_->ReleaseIdleWorker(this)) { |
| 301 return; |
| 349 } | 302 } |
| 350 } | 303 } |
| 351 } | 304 } |
| 352 UNREACHABLE(); | 305 UNREACHABLE(); |
| 353 return false; | |
| 354 } | 306 } |
| 355 | 307 |
| 356 | 308 |
| 357 void ThreadPool::Worker::Shutdown() { | 309 void ThreadPool::Worker::Shutdown() { |
| 358 MonitorLocker ml(&monitor_); | 310 MonitorLocker ml(&monitor_); |
| 359 done_ = true; | 311 pool_ = NULL; // Fail fast if someone tries to access pool_. |
| 360 ml.Notify(); | 312 ml.Notify(); |
| 361 } | 313 } |
| 362 | 314 |
| 363 | 315 |
| 364 // static | 316 // static |
| 365 void ThreadPool::Worker::Main(uword args) { | 317 void ThreadPool::Worker::Main(uword args) { |
| 366 Thread::EnsureInit(); | 318 Thread::EnsureInit(); |
| 367 Worker* worker = reinterpret_cast<Worker*>(args); | 319 Worker* worker = reinterpret_cast<Worker*>(args); |
| 368 bool delete_self = false; | 320 worker->Loop(); |
| 369 | |
| 370 { | |
| 371 MonitorLocker ml(&(worker->monitor_)); | |
| 372 if (worker->IsDone()) { | |
| 373 // id_ hasn't been set yet, but the ThreadPool is being shutdown. | |
| 374 // Delete the task, and return. | |
| 375 ASSERT(worker->task_); | |
| 376 delete worker->task_; | |
| 377 worker->task_ = NULL; | |
| 378 delete_self = true; | |
| 379 } else { | |
| 380 worker->id_ = OSThread::GetCurrentThreadId(); | |
| 381 worker->started_ = true; | |
| 382 } | |
| 383 } | |
| 384 | |
| 385 // We aren't able to delete the worker while holding the worker's monitor. | |
| 386 // Now that we have released it, and we know that ThreadPool::Shutdown | |
| 387 // won't touch it again, we can delete it and return. | |
| 388 if (delete_self) { | |
| 389 MonitorLocker eml(&worker->pool_->exit_monitor_); | |
| 390 worker->pool_->RemoveWorkerFromShutdownList(worker); | |
| 391 delete worker; | |
| 392 eml.Notify(); | |
| 393 return; | |
| 394 } | |
| 395 | |
| 396 bool released = worker->Loop(); | |
| 397 | 321 |
| 398 // It should be okay to access these unlocked here in this assert. | 322 // It should be okay to access these unlocked here in this assert. |
| 399 // worker->all_next_ is retained by the pool for shutdown monitoring. | 323 ASSERT(!worker->owned_ && |
| 400 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL)); | 324 worker->all_next_ == NULL && |
| 325 worker->idle_next_ == NULL); |
| 401 | 326 |
| 402 if (!released) { | 327 // The exit monitor is only used during testing. |
| 403 // This worker is exiting because the thread pool is being shut down. | 328 if (ThreadPool::exit_monitor_) { |
| 404 // Inform the thread pool that we are exiting. We remove this worker from | 329 MonitorLocker ml(ThreadPool::exit_monitor_); |
| 405 // shutting_down_workers_ list because there will be no need for the | 330 (*ThreadPool::exit_count_)++; |
| 406 // ThreadPool to take action for this worker. | 331 ml.Notify(); |
| 407 MonitorLocker eml(&worker->pool_->exit_monitor_); | |
| 408 worker->id_ = OSThread::kInvalidThreadId; | |
| 409 worker->pool_->RemoveWorkerFromShutdownList(worker); | |
| 410 delete worker; | |
| 411 eml.Notify(); | |
| 412 } else { | |
| 413 // This worker is going down because it was idle for too long. This case | |
| 414 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker. | |
| 415 delete worker; | |
| 416 } | 332 } |
| 333 delete worker; |
| 417 #if defined(TARGET_OS_WINDOWS) | 334 #if defined(TARGET_OS_WINDOWS) |
| 418 Thread::CleanUp(); | 335 Thread::CleanUp(); |
| 419 #endif | 336 #endif |
| 420 } | 337 } |
| 421 | 338 |
| 422 } // namespace dart | 339 } // namespace dart |
| OLD | NEW |