Chromium Code Reviews| Index: runtime/vm/thread_pool.cc |
| diff --git a/runtime/vm/thread_pool.cc b/runtime/vm/thread_pool.cc |
| index 5b3a713d2d09b2c32be7605f622e7671669e36c6..489acef4a8acd7c81a71d3d2d871a29c48b0dbd6 100644 |
| --- a/runtime/vm/thread_pool.cc |
| +++ b/runtime/vm/thread_pool.cc |
| @@ -12,9 +12,6 @@ namespace dart { |
| DEFINE_FLAG(int, worker_timeout_millis, 5000, |
| "Free workers when they have been idle for this amount of time."); |
| -Monitor* ThreadPool::exit_monitor_ = NULL; |
| -int* ThreadPool::exit_count_ = NULL; |
| - |
| ThreadPool::ThreadPool() |
| : shutting_down_(false), |
| all_workers_(NULL), |
| @@ -22,7 +19,9 @@ ThreadPool::ThreadPool() |
| count_started_(0), |
| count_stopped_(0), |
| count_running_(0), |
| - count_idle_(0) { |
| + count_idle_(0), |
| + shutting_down_workers_(NULL), |
| + join_list_(NULL) { |
| } |
| @@ -31,7 +30,7 @@ ThreadPool::~ThreadPool() { |
| } |
| -void ThreadPool::Run(Task* task) { |
| +bool ThreadPool::Run(Task* task) { |
| Worker* worker = NULL; |
| bool new_worker = false; |
| { |
| @@ -39,7 +38,7 @@ void ThreadPool::Run(Task* task) { |
| // ThreadPool state. |
| MutexLocker ml(&mutex_); |
| if (shutting_down_) { |
| - return; |
| + return false; |
| } |
| if (idle_workers_ == NULL) { |
| worker = new Worker(this); |
| @@ -51,15 +50,17 @@ void ThreadPool::Run(Task* task) { |
| worker->all_next_ = all_workers_; |
| all_workers_ = worker; |
| worker->owned_ = true; |
| + count_running_++; |
| } else { |
| // Get the first worker from the idle worker list. |
| worker = idle_workers_; |
| idle_workers_ = worker->idle_next_; |
| worker->idle_next_ = NULL; |
| count_idle_--; |
| + count_running_++; |
| } |
| - count_running_++; |
| } |
| + |
| // Release ThreadPool::mutex_ before calling Worker functions. |
| ASSERT(worker != NULL); |
| worker->SetTask(task); |
| @@ -67,6 +68,7 @@ void ThreadPool::Run(Task* task) { |
| // Call StartThread after we've assigned the first task. |
| worker->StartThread(); |
| } |
| + return true; |
| } |
| @@ -94,14 +96,30 @@ void ThreadPool::Shutdown() { |
| } |
| // Release ThreadPool::mutex_ before calling Worker functions. |
| - Worker* current = saved; |
| - while (current != NULL) { |
| - // We may access all_next_ without holding ThreadPool::mutex_ here |
| - // because the worker is no longer owned by the ThreadPool. |
| - Worker* next = current->all_next_; |
| - current->all_next_ = NULL; |
| - current->Shutdown(); |
| - current = next; |
| + { |
| + MonitorLocker eml(&exit_monitor_); |
| + |
| + // First tell all the workers to shut down. |
| + Worker* current = saved; |
| + while (current != NULL) { |
| + Worker* next = current->all_next_; |
| + if (current->id_ != OSThread::GetCurrentThreadId()) { |
|
Ivan Posva
2015/09/14 21:52:14
Cache the current thread it outside the loop?
zra
2015/09/14 22:59:01
Done.
|
| + AddWorkerToShutdownList(current); |
| + } |
| + current->Shutdown(); |
| + current = next; |
| + } |
| + saved = NULL; |
| + |
| + // Wait until all workers will exit. |
| + while (shutting_down_workers_ != NULL) { |
| + // Here, we are waiting for workers to exit. When a worker exits we will |
| + // be notified. |
| + eml.Wait(); |
| + } |
| + |
| + // Join non-idle threads. |
|
Ivan Posva
2015/09/14 21:52:13
Move outside the exit_monitor_ block.
Verify that
zra
2015/09/14 22:59:01
Done.
|
| + JoinList::Join(&join_list_); |
| } |
| } |
| @@ -156,7 +174,7 @@ bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) { |
| all_workers_ = worker->all_next_; |
| worker->all_next_ = NULL; |
| worker->owned_ = false; |
| - worker->pool_ = NULL; |
| + worker->done_ = true; |
| return true; |
| } |
| @@ -174,16 +192,22 @@ bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) { |
| } |
| -void ThreadPool::SetIdle(Worker* worker) { |
| - MutexLocker ml(&mutex_); |
| - if (shutting_down_) { |
| - return; |
| +void ThreadPool::SetIdleAndReapExited(Worker* worker) { |
| + JoinList* list = NULL; |
| + { |
| + MutexLocker ml(&mutex_); |
| + if (shutting_down_) { |
| + return; |
| + } |
| + ASSERT(worker->owned_ && !IsIdle(worker)); |
| + worker->idle_next_ = idle_workers_; |
| + idle_workers_ = worker; |
| + count_idle_++; |
| + count_running_--; |
| + list = join_list_; |
| + join_list_ = NULL; |
| } |
| - ASSERT(worker->owned_ && !IsIdle(worker)); |
| - worker->idle_next_ = idle_workers_; |
| - idle_workers_ = worker; |
| - count_idle_++; |
| - count_running_--; |
| + JoinList::Join(&list); |
| } |
| @@ -200,12 +224,62 @@ bool ThreadPool::ReleaseIdleWorker(Worker* worker) { |
| bool found = RemoveWorkerFromAllList(worker); |
| ASSERT(found); |
| + // The thread for worker will exit. Add its ThreadId to the join_list_ |
| + // so that we can join on it at the next opportunity. |
| + JoinList::Add(worker->join_id_, &join_list_); |
| count_stopped_++; |
| count_idle_--; |
| return true; |
| } |
| +// Only call while holding the exit_monitor_ |
| +void ThreadPool::AddWorkerToShutdownList(Worker* worker) { |
| + worker->shutdown_next_ = shutting_down_workers_; |
| + shutting_down_workers_ = worker; |
| +} |
| + |
| + |
| +// Only call while holding the exit_monitor_ |
| +bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { |
| + ASSERT(worker != NULL); |
| + ASSERT(shutting_down_workers_ != NULL); |
| + |
| + // Special case head of list. |
| + if (shutting_down_workers_ == worker) { |
| + shutting_down_workers_ = worker->shutdown_next_; |
| + worker->shutdown_next_ = NULL; |
| + return true; |
| + } |
| + |
| + for (Worker* current = shutting_down_workers_; |
| + current->shutdown_next_ != NULL; |
| + current = current->shutdown_next_) { |
| + if (current->shutdown_next_ == worker) { |
| + current->shutdown_next_ = worker->shutdown_next_; |
| + worker->shutdown_next_ = NULL; |
| + return true; |
| + } |
| + } |
| + return false; |
| +} |
| + |
| + |
| +void ThreadPool::JoinList::Add(ThreadJoinId id, JoinList** list) { |
|
Ivan Posva
2015/09/14 21:52:13
AddLocked(). Ideally we want to assert that we hol
zra
2015/09/14 22:59:01
Done.
|
| + *list = new JoinList(id, *list); |
| +} |
| + |
| + |
| +void ThreadPool::JoinList::Join(JoinList** list) { |
| + while (*list) { |
|
Ivan Posva
2015/09/14 21:52:14
*list is not a bool
zra
2015/09/14 22:59:01
Done.
|
| + JoinList* current = *list; |
| + *list = current->next(); |
| + OSThread::Join(current->id()); |
| + delete current; |
| + } |
| +} |
| + |
| + |
| ThreadPool::Task::Task() { |
| } |
| @@ -217,9 +291,13 @@ ThreadPool::Task::~Task() { |
| ThreadPool::Worker::Worker(ThreadPool* pool) |
| : pool_(pool), |
| task_(NULL), |
| + id_(OSThread::kInvalidThreadId), |
| + join_id_(OSThread::kInvalidThreadJoinId), |
| + done_(false), |
| owned_(false), |
| all_next_(NULL), |
| - idle_next_(NULL) { |
| + idle_next_(NULL), |
| + shutdown_next_(NULL) { |
| } |
| @@ -264,7 +342,7 @@ static int64_t ComputeTimeout(int64_t idle_start) { |
| } |
| -void ThreadPool::Worker::Loop() { |
| +bool ThreadPool::Worker::Loop() { |
| MonitorLocker ml(&monitor_); |
| int64_t idle_start; |
| while (true) { |
| @@ -281,10 +359,10 @@ void ThreadPool::Worker::Loop() { |
| ASSERT(task_ == NULL); |
| if (IsDone()) { |
| - return; |
| + return false; |
| } |
| - ASSERT(pool_ != NULL); |
| - pool_->SetIdle(this); |
| + ASSERT(!done_); |
| + pool_->SetIdleAndReapExited(this); |
| idle_start = OS::GetCurrentTimeMillis(); |
| while (true) { |
| Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
| @@ -294,21 +372,21 @@ void ThreadPool::Worker::Loop() { |
| break; |
| } |
| if (IsDone()) { |
| - return; |
| + return false; |
| } |
| - if (result == Monitor::kTimedOut && |
| - pool_->ReleaseIdleWorker(this)) { |
| - return; |
| + if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { |
| + return true; |
| } |
| } |
| } |
| UNREACHABLE(); |
| + return false; |
| } |
| void ThreadPool::Worker::Shutdown() { |
| MonitorLocker ml(&monitor_); |
| - pool_ = NULL; // Fail fast if someone tries to access pool_. |
| + done_ = true; |
| ml.Notify(); |
| } |
| @@ -317,20 +395,37 @@ void ThreadPool::Worker::Shutdown() { |
| void ThreadPool::Worker::Main(uword args) { |
| Thread::EnsureInit(); |
| Worker* worker = reinterpret_cast<Worker*>(args); |
| - worker->Loop(); |
| + |
| + { |
| + MonitorLocker ml(&(worker->monitor_)); |
| + ASSERT(worker->task_); |
| + worker->id_ = OSThread::GetCurrentThreadId(); |
| + worker->join_id_ = OSThread::GetCurrentThreadJoinId(); |
| + } |
| + |
| + bool released = worker->Loop(); |
| // It should be okay to access these unlocked here in this assert. |
| - ASSERT(!worker->owned_ && |
| - worker->all_next_ == NULL && |
| - worker->idle_next_ == NULL); |
| - |
| - // The exit monitor is only used during testing. |
| - if (ThreadPool::exit_monitor_) { |
| - MonitorLocker ml(ThreadPool::exit_monitor_); |
| - (*ThreadPool::exit_count_)++; |
| - ml.Notify(); |
| + // worker->all_next_ is retained by the pool for shutdown monitoring. |
| + ASSERT(!worker->owned_ && (worker->idle_next_ == NULL)); |
| + |
| + if (!released) { |
| + // This worker is exiting because the thread pool is being shut down. |
| + // Inform the thread pool that we are exiting. We remove this worker from |
| + // shutting_down_workers_ list because there will be no need for the |
| + // ThreadPool to take action for this worker. |
| + MonitorLocker eml(&worker->pool_->exit_monitor_); |
| + JoinList::Add(worker->join_id_, &worker->pool_->join_list_); |
|
Ivan Posva
2015/09/14 21:52:14
JoinList should be consistent on the lock which pr
zra
2015/09/14 22:59:01
Done.
|
| + worker->id_ = OSThread::kInvalidThreadId; |
| + worker->join_id_ = OSThread::kInvalidThreadJoinId; |
| + worker->pool_->RemoveWorkerFromShutdownList(worker); |
| + delete worker; |
| + eml.Notify(); |
| + } else { |
| + // This worker is going down because it was idle for too long. This case |
| + // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker. |
|
Ivan Posva
2015/09/14 21:52:13
Please add comment: If there is a shutdown race, t
zra
2015/09/14 22:59:01
Done.
|
| + delete worker; |
| } |
| - delete worker; |
| #if defined(TARGET_OS_WINDOWS) |
| Thread::CleanUp(); |
| #endif |