Index: runtime/vm/thread_pool.cc |
=================================================================== |
--- runtime/vm/thread_pool.cc (revision 45833) |
+++ runtime/vm/thread_pool.cc (working copy) |
@@ -11,9 +11,11 @@ |
DEFINE_FLAG(int, worker_timeout_millis, 5000, |
"Free workers when they have been idle for this amount of time."); |
+DEFINE_FLAG(int, shutdown_timeout_millis, 1000, |
+ "Amount of time to wait for a worker to stop during shutdown."); |
-Monitor* ThreadPool::exit_monitor_ = NULL; |
-int* ThreadPool::exit_count_ = NULL; |
+Monitor ThreadPool::exit_monitor_; |
+ThreadPool::Worker* ThreadPool::shutting_down_workers_ = NULL; |
ThreadPool::ThreadPool() |
: shutting_down_(false), |
@@ -94,14 +96,52 @@ |
} |
// Release ThreadPool::mutex_ before calling Worker functions. |
- Worker* current = saved; |
- while (current != NULL) { |
- // We may access all_next_ without holding ThreadPool::mutex_ here |
- // because the worker is no longer owned by the ThreadPool. |
- Worker* next = current->all_next_; |
- current->all_next_ = NULL; |
- current->Shutdown(); |
- current = next; |
+ { |
+ MonitorLocker eml(&ThreadPool::exit_monitor_); |
+ |
+ // First tell all the workers to shut down. |
+ Worker* current = saved; |
+ while (current != NULL) { |
+ Worker* next = current->all_next_; |
+ bool started = current->Shutdown(); |
+ // After Shutdown, if started is false, we can no longer touch the worker |
+ // because a worker that hasn't started yet may run at any time and |
+ // delete itself. |
+ if (started) { |
+ current->all_next_ = NULL; |
+ // We only ensure the shutdown of threads that have started. |
+ // Threads that have not started will shutdown immediately as soon as |
+ // they run. |
+ AddWorkerToShutdownList(current); |
+ } |
+ current = next; |
+ } |
+ saved = NULL; |
+ |
+ // Give workers a chance to exit gracefully. |
+ const int64_t start_wait = OS::GetCurrentTimeMillis(); |
+ int timeout = FLAG_shutdown_timeout_millis; |
+ while (shutting_down_workers_ != NULL) { |
+ if (timeout > 0) { |
+ // Here, we are waiting for workers to exit. When a worker exits we will |
+ // be notified. |
+ eml.Wait(timeout); |
+ |
+ // We decrement the timeout for the next wait by the amount of time |
+ // we've already waited. If the new timeout drops below zero, we break |
+ // out of this loop, which triggers the termination code below. |
+ const int64_t after_wait = OS::GetCurrentTimeMillis(); |
+ timeout = FLAG_shutdown_timeout_millis - (after_wait - start_wait); |
+ } else { |
+ break; |
+ } |
+ } |
+ |
+ // It is an error if all workers have not exited within the timeout. We |
+ // assume that they have run off into the weeds, and it is a bug. |
+ if (shutting_down_workers_ != NULL) { |
+ FATAL("Thread pool worker threads failed to exit."); |
+ } |
} |
} |
@@ -206,6 +246,40 @@ |
} |
+// Only call while holding the exit_monitor_ |
+void ThreadPool::AddWorkerToShutdownList(Worker* worker) { |
+ worker->shutdown_next_ = shutting_down_workers_; |
+ shutting_down_workers_ = worker; |
+} |
+ |
+ |
+// Only call while holding the exit_monitor_ |
+bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { |
+ ASSERT(worker != NULL); |
+ if (shutting_down_workers_ == NULL) { |
+ return false; |
+ } |
+ |
+ // Special case head of list. |
+ if (shutting_down_workers_ == worker) { |
+ shutting_down_workers_ = worker->shutdown_next_; |
+ worker->shutdown_next_ = NULL; |
+ return true; |
+ } |
+ |
+ for (Worker* current = shutting_down_workers_; |
+ current->shutdown_next_ != NULL; |
+ current = current->shutdown_next_) { |
+ if (current->shutdown_next_ == worker) { |
+ current->shutdown_next_ = worker->shutdown_next_; |
+ worker->shutdown_next_ = NULL; |
+ return true; |
+ } |
+ } |
+ return false; |
+} |
+ |
+ |
ThreadPool::Task::Task() { |
} |
@@ -217,9 +291,12 @@ |
ThreadPool::Worker::Worker(ThreadPool* pool) |
: pool_(pool), |
task_(NULL), |
+ id_(OSThread::kInvalidThreadId), |
+ started_(false), |
owned_(false), |
all_next_(NULL), |
- idle_next_(NULL) { |
+ idle_next_(NULL), |
+ shutdown_next_(NULL) { |
} |
@@ -264,7 +341,7 @@ |
} |
-void ThreadPool::Worker::Loop() { |
+bool ThreadPool::Worker::Loop() { |
MonitorLocker ml(&monitor_); |
int64_t idle_start; |
while (true) { |
@@ -281,7 +358,7 @@ |
ASSERT(task_ == NULL); |
if (IsDone()) { |
- return; |
+ return false; |
} |
ASSERT(pool_ != NULL); |
pool_->SetIdle(this); |
@@ -294,22 +371,24 @@ |
break; |
} |
if (IsDone()) { |
- return; |
+ return false; |
} |
- if (result == Monitor::kTimedOut && |
- pool_->ReleaseIdleWorker(this)) { |
- return; |
+ if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { |
+ return true; |
} |
} |
} |
UNREACHABLE(); |
+ return false; |
} |
-void ThreadPool::Worker::Shutdown() { |
+bool ThreadPool::Worker::Shutdown() { |
MonitorLocker ml(&monitor_); |
pool_ = NULL; // Fail fast if someone tries to access pool_. |
ml.Notify(); |
+ // Return whether the worker thread has started. |
+ return started_; |
} |
@@ -317,20 +396,52 @@ |
void ThreadPool::Worker::Main(uword args) { |
Thread::EnsureInit(); |
Worker* worker = reinterpret_cast<Worker*>(args); |
- worker->Loop(); |
+ bool delete_self = false; |
+ { |
+ MonitorLocker ml(&(worker->monitor_)); |
+ if (worker->IsDone()) { |
+ // id_ hasn't been set, yet, but the ThreadPool is being shutdown. |
+ // Delete the task, and return. |
+ ASSERT(worker->task_); |
+ delete worker->task_; |
+ worker->task_ = NULL; |
+ delete_self = true; |
+ } else { |
+ worker->id_ = OSThread::GetCurrentThreadId(); |
+ worker->started_ = true; |
+ } |
+ } |
+ |
+ // We aren't able to delete the worker while holding the worker's monitor. |
+ // Now that we have released it, and we know that ThreadPool::Shutdown |
+ // won't touch it again, we can delete it and return. |
+ if (delete_self) { |
+ delete worker; |
+ return; |
+ } |
+ |
+ bool released = worker->Loop(); |
+ |
// It should be okay to access these unlocked here in this assert. |
- ASSERT(!worker->owned_ && |
- worker->all_next_ == NULL && |
- worker->idle_next_ == NULL); |
+ // worker->all_next_ is retained by the pool for shutdown monitoring. |
+ ASSERT(!worker->owned_ && (worker->idle_next_ == NULL)); |
- // The exit monitor is only used during testing. |
- if (ThreadPool::exit_monitor_) { |
- MonitorLocker ml(ThreadPool::exit_monitor_); |
- (*ThreadPool::exit_count_)++; |
- ml.Notify(); |
+ if (!released) { |
+ // This worker is exiting because the thread pool is being shut down. |
+ // Inform the thread pool that we are exiting. We remove this worker from |
+ // shutting_down_workers_ list because there will be no need for the |
+ // ThreadPool to take action for this worker. |
+ MonitorLocker eml(&ThreadPool::exit_monitor_); |
+ worker->id_ = OSThread::kInvalidThreadId; |
+ RemoveWorkerFromShutdownList(worker); |
+ delete worker; |
+ eml.Notify(); |
+ } else { |
+ // This worker is going down because it was idle for too long. This case |
+ // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker. |
+ delete worker; |
} |
- delete worker; |
#if defined(TARGET_OS_WINDOWS) |
Thread::CleanUp(); |
#endif |