Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1008)

Unified Diff: runtime/vm/thread_pool.cc

Issue 850183003: During thread-pool shutdown, wait for worker threads to shutdown. (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/vm/thread_pool.h ('k') | runtime/vm/thread_pool_test.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/vm/thread_pool.cc
===================================================================
--- runtime/vm/thread_pool.cc (revision 45833)
+++ runtime/vm/thread_pool.cc (working copy)
@@ -11,9 +11,11 @@
DEFINE_FLAG(int, worker_timeout_millis, 5000,
"Free workers when they have been idle for this amount of time.");
+DEFINE_FLAG(int, shutdown_timeout_millis, 1000,
+ "Amount of time to wait for a worker to stop during shutdown.");
-Monitor* ThreadPool::exit_monitor_ = NULL;
-int* ThreadPool::exit_count_ = NULL;
+Monitor ThreadPool::exit_monitor_;
+ThreadPool::Worker* ThreadPool::shutting_down_workers_ = NULL;
ThreadPool::ThreadPool()
: shutting_down_(false),
@@ -94,14 +96,52 @@
}
// Release ThreadPool::mutex_ before calling Worker functions.
- Worker* current = saved;
- while (current != NULL) {
- // We may access all_next_ without holding ThreadPool::mutex_ here
- // because the worker is no longer owned by the ThreadPool.
- Worker* next = current->all_next_;
- current->all_next_ = NULL;
- current->Shutdown();
- current = next;
+ {
+ MonitorLocker eml(&ThreadPool::exit_monitor_);
+
+ // First tell all the workers to shut down.
+ Worker* current = saved;
+ while (current != NULL) {
+ Worker* next = current->all_next_;
+ bool started = current->Shutdown();
+ // After Shutdown, if started is false, we can no longer touch the worker
+ // because a worker that hasn't started yet may run at any time and
+ // delete itself.
+ if (started) {
+ current->all_next_ = NULL;
+ // We only ensure the shutdown of threads that have started.
+ // Threads that have not started will shutdown immediately as soon as
+ // they run.
+ AddWorkerToShutdownList(current);
+ }
+ current = next;
+ }
+ saved = NULL;
+
+ // Give workers a chance to exit gracefully.
+ const int64_t start_wait = OS::GetCurrentTimeMillis();
+ int timeout = FLAG_shutdown_timeout_millis;
+ while (shutting_down_workers_ != NULL) {
+ if (timeout > 0) {
+ // Here, we are waiting for workers to exit. When a worker exits we will
+ // be notified.
+ eml.Wait(timeout);
+
+ // We decrement the timeout for the next wait by the amount of time
+ // we've already waited. If the new timeout drops below zero, we break
+ // out of this loop, which triggers the termination code below.
+ const int64_t after_wait = OS::GetCurrentTimeMillis();
+ timeout = FLAG_shutdown_timeout_millis - (after_wait - start_wait);
+ } else {
+ break;
+ }
+ }
+
+ // It is an error if all workers have not exited within the timeout. We
+ // assume that they have run off into the weeds, and it is a bug.
+ if (shutting_down_workers_ != NULL) {
+ FATAL("Thread pool worker threads failed to exit.");
+ }
}
}
@@ -206,6 +246,40 @@
}
+// Only call while holding the exit_monitor_
+void ThreadPool::AddWorkerToShutdownList(Worker* worker) {
+ worker->shutdown_next_ = shutting_down_workers_;
+ shutting_down_workers_ = worker;
+}
+
+
+// Only call while holding the exit_monitor_
+bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) {
+ ASSERT(worker != NULL);
+ if (shutting_down_workers_ == NULL) {
+ return false;
+ }
+
+ // Special case head of list.
+ if (shutting_down_workers_ == worker) {
+ shutting_down_workers_ = worker->shutdown_next_;
+ worker->shutdown_next_ = NULL;
+ return true;
+ }
+
+ for (Worker* current = shutting_down_workers_;
+ current->shutdown_next_ != NULL;
+ current = current->shutdown_next_) {
+ if (current->shutdown_next_ == worker) {
+ current->shutdown_next_ = worker->shutdown_next_;
+ worker->shutdown_next_ = NULL;
+ return true;
+ }
+ }
+ return false;
+}
+
+
ThreadPool::Task::Task() {
}
@@ -217,9 +291,12 @@
ThreadPool::Worker::Worker(ThreadPool* pool)
: pool_(pool),
task_(NULL),
+ id_(OSThread::kInvalidThreadId),
+ started_(false),
owned_(false),
all_next_(NULL),
- idle_next_(NULL) {
+ idle_next_(NULL),
+ shutdown_next_(NULL) {
}
@@ -264,7 +341,7 @@
}
-void ThreadPool::Worker::Loop() {
+bool ThreadPool::Worker::Loop() {
MonitorLocker ml(&monitor_);
int64_t idle_start;
while (true) {
@@ -281,7 +358,7 @@
ASSERT(task_ == NULL);
if (IsDone()) {
- return;
+ return false;
}
ASSERT(pool_ != NULL);
pool_->SetIdle(this);
@@ -294,22 +371,24 @@
break;
}
if (IsDone()) {
- return;
+ return false;
}
- if (result == Monitor::kTimedOut &&
- pool_->ReleaseIdleWorker(this)) {
- return;
+ if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) {
+ return true;
}
}
}
UNREACHABLE();
+ return false;
}
-void ThreadPool::Worker::Shutdown() {
+bool ThreadPool::Worker::Shutdown() {
MonitorLocker ml(&monitor_);
pool_ = NULL; // Fail fast if someone tries to access pool_.
ml.Notify();
+ // Return whether the worker thread has started.
+ return started_;
}
@@ -317,20 +396,52 @@
void ThreadPool::Worker::Main(uword args) {
Thread::EnsureInit();
Worker* worker = reinterpret_cast<Worker*>(args);
- worker->Loop();
+ bool delete_self = false;
+ {
+ MonitorLocker ml(&(worker->monitor_));
+ if (worker->IsDone()) {
+ // id_ hasn't been set, yet, but the ThreadPool is being shutdown.
+ // Delete the task, and return.
+ ASSERT(worker->task_);
+ delete worker->task_;
+ worker->task_ = NULL;
+ delete_self = true;
+ } else {
+ worker->id_ = OSThread::GetCurrentThreadId();
+ worker->started_ = true;
+ }
+ }
+
+ // We aren't able to delete the worker while holding the worker's monitor.
+ // Now that we have released it, and we know that ThreadPool::Shutdown
+ // won't touch it again, we can delete it and return.
+ if (delete_self) {
+ delete worker;
+ return;
+ }
+
+ bool released = worker->Loop();
+
// It should be okay to access these unlocked here in this assert.
- ASSERT(!worker->owned_ &&
- worker->all_next_ == NULL &&
- worker->idle_next_ == NULL);
+ // worker->all_next_ is retained by the pool for shutdown monitoring.
+ ASSERT(!worker->owned_ && (worker->idle_next_ == NULL));
- // The exit monitor is only used during testing.
- if (ThreadPool::exit_monitor_) {
- MonitorLocker ml(ThreadPool::exit_monitor_);
- (*ThreadPool::exit_count_)++;
- ml.Notify();
+ if (!released) {
+ // This worker is exiting because the thread pool is being shut down.
+ // Inform the thread pool that we are exiting. We remove this worker from
+ // shutting_down_workers_ list because there will be no need for the
+ // ThreadPool to take action for this worker.
+ MonitorLocker eml(&ThreadPool::exit_monitor_);
+ worker->id_ = OSThread::kInvalidThreadId;
+ RemoveWorkerFromShutdownList(worker);
+ delete worker;
+ eml.Notify();
+ } else {
+ // This worker is going down because it was idle for too long. This case
+ // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker.
+ delete worker;
}
- delete worker;
#if defined(TARGET_OS_WINDOWS)
Thread::CleanUp();
#endif
« no previous file with comments | « runtime/vm/thread_pool.h ('k') | runtime/vm/thread_pool_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698