| Index: runtime/vm/thread_pool.cc
|
| ===================================================================
|
| --- runtime/vm/thread_pool.cc (revision 45833)
|
| +++ runtime/vm/thread_pool.cc (working copy)
|
| @@ -11,9 +11,11 @@
|
|
|
| DEFINE_FLAG(int, worker_timeout_millis, 5000,
|
| "Free workers when they have been idle for this amount of time.");
|
| +DEFINE_FLAG(int, shutdown_timeout_millis, 1000,
|
| + "Amount of time to wait for a worker to stop during shutdown.");
|
|
|
| -Monitor* ThreadPool::exit_monitor_ = NULL;
|
| -int* ThreadPool::exit_count_ = NULL;
|
| +Monitor ThreadPool::exit_monitor_;
|
| +ThreadPool::Worker* ThreadPool::shutting_down_workers_ = NULL;
|
|
|
| ThreadPool::ThreadPool()
|
| : shutting_down_(false),
|
| @@ -94,14 +96,52 @@
|
| }
|
| // Release ThreadPool::mutex_ before calling Worker functions.
|
|
|
| - Worker* current = saved;
|
| - while (current != NULL) {
|
| - // We may access all_next_ without holding ThreadPool::mutex_ here
|
| - // because the worker is no longer owned by the ThreadPool.
|
| - Worker* next = current->all_next_;
|
| - current->all_next_ = NULL;
|
| - current->Shutdown();
|
| - current = next;
|
| + {
|
| + MonitorLocker eml(&ThreadPool::exit_monitor_);
|
| +
|
| + // First tell all the workers to shut down.
|
| + Worker* current = saved;
|
| + while (current != NULL) {
|
| + Worker* next = current->all_next_;
|
| + bool started = current->Shutdown();
|
| + // After Shutdown, if started is false, we can no longer touch the worker
|
| + // because a worker that hasn't started yet may run at any time and
|
| + // delete itself.
|
| + if (started) {
|
| + current->all_next_ = NULL;
|
| + // We only ensure the shutdown of threads that have started.
|
| + // Threads that have not started will shutdown immediately as soon as
|
| + // they run.
|
| + AddWorkerToShutdownList(current);
|
| + }
|
| + current = next;
|
| + }
|
| + saved = NULL;
|
| +
|
| + // Give workers a chance to exit gracefully.
|
| + const int64_t start_wait = OS::GetCurrentTimeMillis();
|
| + int timeout = FLAG_shutdown_timeout_millis;
|
| + while (shutting_down_workers_ != NULL) {
|
| + if (timeout > 0) {
|
| + // Here, we are waiting for workers to exit. When a worker exits we will
|
| + // be notified.
|
| + eml.Wait(timeout);
|
| +
|
| + // We decrement the timeout for the next wait by the amount of time
|
| + // we've already waited. If the new timeout drops below zero, we break
|
| + // out of this loop, which triggers the termination code below.
|
| + const int64_t after_wait = OS::GetCurrentTimeMillis();
|
| + timeout = FLAG_shutdown_timeout_millis - (after_wait - start_wait);
|
| + } else {
|
| + break;
|
| + }
|
| + }
|
| +
|
| + // It is an error if all workers have not exited within the timeout. We
|
| + // assume that they have run off into the weeds, and it is a bug.
|
| + if (shutting_down_workers_ != NULL) {
|
| + FATAL("Thread pool worker threads failed to exit.");
|
| + }
|
| }
|
| }
|
|
|
| @@ -206,6 +246,40 @@
|
| }
|
|
|
|
|
| +// Only call while holding the exit_monitor_
|
| +void ThreadPool::AddWorkerToShutdownList(Worker* worker) {
|
| + worker->shutdown_next_ = shutting_down_workers_;
|
| + shutting_down_workers_ = worker;
|
| +}
|
| +
|
| +
|
| +// Only call while holding the exit_monitor_
|
| +bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) {
|
| + ASSERT(worker != NULL);
|
| + if (shutting_down_workers_ == NULL) {
|
| + return false;
|
| + }
|
| +
|
| + // Special case head of list.
|
| + if (shutting_down_workers_ == worker) {
|
| + shutting_down_workers_ = worker->shutdown_next_;
|
| + worker->shutdown_next_ = NULL;
|
| + return true;
|
| + }
|
| +
|
| + for (Worker* current = shutting_down_workers_;
|
| + current->shutdown_next_ != NULL;
|
| + current = current->shutdown_next_) {
|
| + if (current->shutdown_next_ == worker) {
|
| + current->shutdown_next_ = worker->shutdown_next_;
|
| + worker->shutdown_next_ = NULL;
|
| + return true;
|
| + }
|
| + }
|
| + return false;
|
| +}
|
| +
|
| +
|
| ThreadPool::Task::Task() {
|
| }
|
|
|
| @@ -217,9 +291,12 @@
|
| ThreadPool::Worker::Worker(ThreadPool* pool)
|
| : pool_(pool),
|
| task_(NULL),
|
| + id_(OSThread::kInvalidThreadId),
|
| + started_(false),
|
| owned_(false),
|
| all_next_(NULL),
|
| - idle_next_(NULL) {
|
| + idle_next_(NULL),
|
| + shutdown_next_(NULL) {
|
| }
|
|
|
|
|
| @@ -264,7 +341,7 @@
|
| }
|
|
|
|
|
| -void ThreadPool::Worker::Loop() {
|
| +bool ThreadPool::Worker::Loop() {
|
| MonitorLocker ml(&monitor_);
|
| int64_t idle_start;
|
| while (true) {
|
| @@ -281,7 +358,7 @@
|
|
|
| ASSERT(task_ == NULL);
|
| if (IsDone()) {
|
| - return;
|
| + return false;
|
| }
|
| ASSERT(pool_ != NULL);
|
| pool_->SetIdle(this);
|
| @@ -294,22 +371,24 @@
|
| break;
|
| }
|
| if (IsDone()) {
|
| - return;
|
| + return false;
|
| }
|
| - if (result == Monitor::kTimedOut &&
|
| - pool_->ReleaseIdleWorker(this)) {
|
| - return;
|
| + if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) {
|
| + return true;
|
| }
|
| }
|
| }
|
| UNREACHABLE();
|
| + return false;
|
| }
|
|
|
|
|
| -void ThreadPool::Worker::Shutdown() {
|
| +bool ThreadPool::Worker::Shutdown() {
|
| MonitorLocker ml(&monitor_);
|
| pool_ = NULL; // Fail fast if someone tries to access pool_.
|
| ml.Notify();
|
| + // Return whether the worker thread has started.
|
| + return started_;
|
| }
|
|
|
|
|
| @@ -317,20 +396,52 @@
|
| void ThreadPool::Worker::Main(uword args) {
|
| Thread::EnsureInit();
|
| Worker* worker = reinterpret_cast<Worker*>(args);
|
| - worker->Loop();
|
| + bool delete_self = false;
|
|
|
| + {
|
| + MonitorLocker ml(&(worker->monitor_));
|
| + if (worker->IsDone()) {
|
| + // id_ hasn't been set, yet, but the ThreadPool is being shutdown.
|
| + // Delete the task, and return.
|
| + ASSERT(worker->task_);
|
| + delete worker->task_;
|
| + worker->task_ = NULL;
|
| + delete_self = true;
|
| + } else {
|
| + worker->id_ = OSThread::GetCurrentThreadId();
|
| + worker->started_ = true;
|
| + }
|
| + }
|
| +
|
| + // We aren't able to delete the worker while holding the worker's monitor.
|
| + // Now that we have released it, and we know that ThreadPool::Shutdown
|
| + // won't touch it again, we can delete it and return.
|
| + if (delete_self) {
|
| + delete worker;
|
| + return;
|
| + }
|
| +
|
| + bool released = worker->Loop();
|
| +
|
| // It should be okay to access these unlocked here in this assert.
|
| - ASSERT(!worker->owned_ &&
|
| - worker->all_next_ == NULL &&
|
| - worker->idle_next_ == NULL);
|
| + // worker->all_next_ is retained by the pool for shutdown monitoring.
|
| + ASSERT(!worker->owned_ && (worker->idle_next_ == NULL));
|
|
|
| - // The exit monitor is only used during testing.
|
| - if (ThreadPool::exit_monitor_) {
|
| - MonitorLocker ml(ThreadPool::exit_monitor_);
|
| - (*ThreadPool::exit_count_)++;
|
| - ml.Notify();
|
| + if (!released) {
|
| + // This worker is exiting because the thread pool is being shut down.
|
| + // Inform the thread pool that we are exiting. We remove this worker from
|
| + // shutting_down_workers_ list because there will be no need for the
|
| + // ThreadPool to take action for this worker.
|
| + MonitorLocker eml(&ThreadPool::exit_monitor_);
|
| + worker->id_ = OSThread::kInvalidThreadId;
|
| + RemoveWorkerFromShutdownList(worker);
|
| + delete worker;
|
| + eml.Notify();
|
| + } else {
|
| + // This worker is going down because it was idle for too long. This case
|
| + // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker.
|
| + delete worker;
|
| }
|
| - delete worker;
|
| #if defined(TARGET_OS_WINDOWS)
|
| Thread::CleanUp();
|
| #endif
|
|
|