Chromium Code Reviews| Index: runtime/vm/thread_pool.cc |
| =================================================================== |
| --- runtime/vm/thread_pool.cc (revision 0) |
| +++ runtime/vm/thread_pool.cc (revision 0) |
| @@ -0,0 +1,317 @@ |
| +// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +#include "vm/thread_pool.h" |
| + |
| +namespace dart { |
| + |
| +DEFINE_FLAG(int, worker_timeout_millis, 5000, |
| + "Free workers when they have been idle for this amount of time."); |
| + |
| +ThreadPool::ThreadPool() |
| + : shutting_down_(false), |
| + all_workers_(NULL), |
| + idle_workers_(NULL), |
| + count_started_(0), |
| + count_stopped_(0), |
| + count_running_(0), |
| + count_idle_(0) { |
| +} |
| + |
| + |
| +ThreadPool::~ThreadPool() { |
| + Shutdown(); |
| +} |
| + |
| + |
| +void ThreadPool::Run(Task* task) { |
| + Worker* worker = NULL; |
| + bool new_worker = false; |
| + { |
|
Ivan Posva
2012/03/14 18:51:26
// Grab ThreadPool::mutex_ before touching queues
turnidge
2012/03/14 21:00:27
Done.
|
| + MutexLocker ml(&mutex_); |
| + if (shutting_down_) { |
| + return; |
| + } |
| + if (idle_workers_ == NULL) { |
| + worker = new Worker(this); |
| + ASSERT(worker != NULL); |
| + new_worker = true; |
| + count_started_++; |
| + |
| + // Add worker to the all_workers_ list. |
| + worker->all_next_ = all_workers_; |
| + all_workers_ = worker; |
| + worker->owned_ = true; |
| + } else { |
| + // Get the first worker from the idle worker list. |
| + worker = idle_workers_; |
| + idle_workers_ = worker->idle_next_; |
| + worker->idle_next_ = NULL; |
| + count_idle_--; |
| + } |
| + count_running_++; |
| + } |
| + // Release ThreadPool::mutex_ before calling Worker functions. |
| + ASSERT(worker != NULL); |
| + worker->Run(task); |
| + if (new_worker) { |
| + // Call StartThread after we've assigned the first t |
|
siva
2012/03/14 17:36:09
first task.
Ivan Posva
2012/03/14 18:51:26
Comment cutoff.
turnidge
2012/03/14 21:00:27
Done.
|
| + worker->StartThread(); |
| + } |
| +} |
| + |
| + |
| +void ThreadPool::Shutdown() { |
| + Worker* saved = NULL; |
| + { |
| + MutexLocker ml(&mutex_); |
| + shutting_down_ = true; |
| + saved = all_workers_; |
| + all_workers_ = NULL; |
| + idle_workers_ = NULL; |
| + |
| + Worker* current = saved; |
| + while (current != NULL) { |
| + Worker* next = current->all_next_; |
| + current->idle_next_ = NULL; |
| + current->owned_ = false; |
| + current = next; |
| + } |
| + |
| + count_idle_ = 0; |
| + count_running_ = 0; |
| + } |
| + // Release ThreadPool::mutex_ before calling Worker functions. |
| + |
| + Worker* current = saved; |
| + while (current != NULL) { |
| + Worker* next = current->all_next_; |
|
Ivan Posva
2012/03/14 18:51:26
There is unprotected access to fields marked as pr
turnidge
2012/03/14 21:00:27
Done.
|
| + current->all_next_ = NULL; |
| + current->Shutdown(); |
| + current = next; |
| + } |
| +} |
| + |
| + |
| +bool ThreadPool::IsIdle(Worker* worker) { |
| + ASSERT(worker != NULL && worker->owned_); |
| + for (Worker* current = idle_workers_; |
| + current != NULL; |
| + current = current->idle_next_) { |
| + if (current == worker) { |
| + return true; |
| + } |
| + } |
| + return false; |
| +} |
| + |
| + |
| +bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) { |
| + ASSERT(worker != NULL && worker->owned_); |
| + if (idle_workers_ == NULL) { |
| + return false; |
| + } |
| + |
| + // Special case head of list. |
| + if (idle_workers_ == worker) { |
| + idle_workers_ = worker->idle_next_; |
| + worker->idle_next_ = NULL; |
| + return true; |
| + } |
| + |
| + for (Worker* current = idle_workers_; |
| + current->idle_next_ != NULL; |
| + current = current->idle_next_) { |
| + if (current->idle_next_ == worker) { |
| + current->idle_next_ = worker->idle_next_; |
| + worker->idle_next_ = NULL; |
| + return true; |
| + } |
| + } |
| + return false; |
| +} |
| + |
| + |
| +bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) { |
| + ASSERT(worker != NULL && worker->owned_); |
| + if (all_workers_ == NULL) { |
| + return false; |
| + } |
| + |
| + // Special case head of list. |
| + if (all_workers_ == worker) { |
| + all_workers_ = worker->all_next_; |
| + worker->all_next_ = NULL; |
| + worker->owned_ = false; |
|
Ivan Posva
2012/03/14 18:51:26
worker->pool_ = NULL; ?
turnidge
2012/03/14 21:00:27
Done.
|
| + return true; |
| + } |
| + |
| + for (Worker* current = all_workers_; |
| + current->all_next_ != NULL; |
| + current = current->all_next_) { |
| + if (current->all_next_ == worker) { |
| + current->all_next_ = worker->all_next_; |
| + worker->all_next_ = NULL; |
| + worker->owned_ = false; |
| + return true; |
| + } |
| + } |
| + return false; |
| +} |
| + |
| + |
| +void ThreadPool::SetIdle(Worker* worker) { |
| + MutexLocker ml(&mutex_); |
| + if (shutting_down_) { |
| + return; |
| + } |
| + ASSERT(worker->owned_ && !IsIdle(worker)); |
| + worker->idle_next_ = idle_workers_; |
| + idle_workers_ = worker; |
| + count_idle_++; |
| + count_running_--; |
| +} |
| + |
| + |
| +bool ThreadPool::ReleaseIdleWorker(Worker* worker) { |
| + MutexLocker ml(&mutex_); |
| + if (shutting_down_) { |
| + return false; |
| + } |
| + // Remove from idle list. |
| + if (!RemoveWorkerFromIdleList(worker)) { |
| + return false; |
| + } |
| + // Remove from all list. |
| + bool found = RemoveWorkerFromAllList(worker); |
| + ASSERT(found); |
| + |
| + count_stopped_++; |
| + count_idle_--; |
|
siva
2012/03/14 17:36:09
Will these counts get updated when we are doing a
turnidge
2012/03/14 21:00:27
count_started_ == The number of workers started.
|
| + return true; |
| +} |
| + |
| + |
| +ThreadPool::Task::Task() { |
| +} |
| + |
| + |
| +ThreadPool::Task::~Task() { |
| +} |
| + |
| + |
| +ThreadPool::Worker::Worker(ThreadPool* pool) |
| + : pool_(pool), |
| + task_(NULL), |
| + done_(false), |
| + owned_(false), |
| + all_next_(NULL), |
| + idle_next_(NULL) { |
| +} |
| + |
| + |
| +void ThreadPool::Worker::StartThread() { |
| +#if defined(DEBUG) |
| + // Must call Run before StartThread. |
| + { // NOLINT |
|
Ivan Posva
2012/03/14 18:51:26
NOLINT?
turnidge
2012/03/14 21:00:27
Yeah. The linter seems to be confused that the {
|
| + MonitorLocker ml(&monitor_); |
| + ASSERT(task_ != NULL); |
| + } |
| +#endif |
| + Thread::Start(&Worker::Main, reinterpret_cast<uword>(this)); |
| +} |
| + |
| + |
| +void ThreadPool::Worker::Run(Task* task) { |
| + MonitorLocker ml(&monitor_); |
| + ASSERT(task_ == NULL); |
| + task_ = task; |
| + ml.Notify(); |
| +} |
| + |
| + |
| +static int64_t ComputeTimeout(int64_t idle_start) { |
| + if (FLAG_worker_timeout_millis <= 0) { |
| + // No timeout. |
| + return 0; |
| + } else { |
| + int64_t waited = OS::GetCurrentTimeMillis() - idle_start; |
| + if (waited >= FLAG_worker_timeout_millis) { |
| + // We must have gotten a spurious wakeup just before we timed |
| + // out. Give the worker one last desperate chance to live. We |
| + // are merciful. |
| + return 1; |
| + } else { |
| + return FLAG_worker_timeout_millis - waited; |
| + } |
| + } |
| +} |
| + |
| + |
| +void ThreadPool::Worker::Loop() { |
| + MonitorLocker ml(&monitor_); |
| + if (done_) { |
| + return; |
| + } |
|
Ivan Posva
2012/03/14 18:51:26
remove
turnidge
2012/03/14 21:00:27
Done.
|
| + |
| + int64_t idle_start; |
| + while (true) { |
| + ASSERT(task_ != NULL); |
| + Task* task = task_; |
| + task_ = NULL; |
| + |
| + // Release monitor while handling the task. |
| + monitor_.Exit(); |
| + task->Run(); |
| + delete task; |
| + monitor_.Enter(); |
| + |
| + if (done_) { |
| + return; |
| + } |
| + ASSERT(task_ == NULL); |
|
Ivan Posva
2012/03/14 18:51:26
Move up after the monitor enter.
turnidge
2012/03/14 21:00:27
Done.
|
| + ASSERT(pool_ != NULL); |
| + pool_->SetIdle(this); |
| + idle_start = OS::GetCurrentTimeMillis(); |
| + while (true) { |
| + Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
| + if (task_ != NULL) { |
| + // We've found a task. Process it, regardless of whether the |
| + // worker is done_. |
| + break; |
| + } |
| + if (done_) { |
| + return; |
| + } |
| + if (result == Monitor::kTimedOut && |
| + pool_->ReleaseIdleWorker(this)) { |
| + return; |
| + } |
| + } |
| + } |
| + UNREACHABLE(); |
| +} |
| + |
| + |
| +void ThreadPool::Worker::Shutdown() { |
| + MonitorLocker ml(&monitor_); |
| + done_ = true; |
| + pool_ = NULL; // Fail fast if someone tries to access pool_. |
| + ml.Notify(); |
| +} |
| + |
| + |
| +// static |
| +void ThreadPool::Worker::Main(uword args) { |
| + Worker* worker = reinterpret_cast<Worker*>(args); |
| + worker->Loop(); |
| + |
| + // It should be okay to access these unlocked here in this assert. |
| + ASSERT(!worker->owned_ && |
| + worker->all_next_ == NULL && |
| + worker->idle_next_ == NULL); |
| + delete worker; |
| +} |
| + |
| +} // namespace dart |