Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(61)

Side by Side Diff: runtime/vm/thread_pool.cc

Issue 1275353005: VM thread shutdown. (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "vm/thread_pool.h" 5 #include "vm/thread_pool.h"
6 6
7 #include "vm/flags.h" 7 #include "vm/flags.h"
8 #include "vm/lockers.h" 8 #include "vm/lockers.h"
9 9
10 namespace dart { 10 namespace dart {
11 11
12 DEFINE_FLAG(int, worker_timeout_millis, 5000, 12 DEFINE_FLAG(int, worker_timeout_millis, 5000,
13 "Free workers when they have been idle for this amount of time."); 13 "Free workers when they have been idle for this amount of time.");
14 14
15 Monitor* ThreadPool::exit_monitor_ = NULL;
16 int* ThreadPool::exit_count_ = NULL;
17
18 ThreadPool::ThreadPool() 15 ThreadPool::ThreadPool()
19 : shutting_down_(false), 16 : shutting_down_(false),
20 all_workers_(NULL), 17 all_workers_(NULL),
21 idle_workers_(NULL), 18 idle_workers_(NULL),
22 count_started_(0), 19 count_started_(0),
23 count_stopped_(0), 20 count_stopped_(0),
24 count_running_(0), 21 count_running_(0),
25 count_idle_(0) { 22 count_idle_(0),
23 idle_join_list_(NULL),
24 shutting_down_workers_(NULL),
25 join_list_(NULL) {
26 } 26 }
27 27
28 28
29 ThreadPool::~ThreadPool() { 29 ThreadPool::~ThreadPool() {
30 Shutdown(); 30 Shutdown();
31 } 31 }
32 32
33 33
34 void ThreadPool::Run(Task* task) { 34 void ThreadPool::Run(Task* task) {
35 Worker* worker = NULL; 35 Worker* worker = NULL;
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
87 current = next; 87 current = next;
88 count_stopped_++; 88 count_stopped_++;
89 } 89 }
90 90
91 count_idle_ = 0; 91 count_idle_ = 0;
92 count_running_ = 0; 92 count_running_ = 0;
93 ASSERT(count_started_ == count_stopped_); 93 ASSERT(count_started_ == count_stopped_);
94 } 94 }
95 // Release ThreadPool::mutex_ before calling Worker functions. 95 // Release ThreadPool::mutex_ before calling Worker functions.
96 96
97 Worker* current = saved; 97 {
98 while (current != NULL) { 98 MonitorLocker eml(&exit_monitor_);
99 // We may access all_next_ without holding ThreadPool::mutex_ here 99
100 // because the worker is no longer owned by the ThreadPool. 100 // First tell all the workers to shut down.
101 Worker* next = current->all_next_; 101 Worker* current = saved;
102 current->all_next_ = NULL; 102 while (current != NULL) {
103 current->Shutdown(); 103 Worker* next = current->all_next_;
104 current = next; 104 if (current->id_ != OSThread::GetCurrentThreadId()) {
105 AddWorkerToShutdownList(current);
106 }
107 current->Shutdown();
108 current = next;
109 }
110 saved = NULL;
111
112 // Wait until all workers will exit.
113 while (shutting_down_workers_ != NULL) {
114 // Here, we are waiting for workers to exit. When a worker exits we will
115 // be notified.
116 eml.Wait();
117 }
118
119 // Join non-idle threads.
120 JoinList::Join(&join_list_);
105 } 121 }
122
123 // Join any remaining idle threads.
124 JoinList::Join(&idle_join_list_);
106 } 125 }
107 126
108 127
109 bool ThreadPool::IsIdle(Worker* worker) { 128 bool ThreadPool::IsIdle(Worker* worker) {
110 ASSERT(worker != NULL && worker->owned_); 129 ASSERT(worker != NULL && worker->owned_);
111 for (Worker* current = idle_workers_; 130 for (Worker* current = idle_workers_;
112 current != NULL; 131 current != NULL;
113 current = current->idle_next_) { 132 current = current->idle_next_) {
114 if (current == worker) { 133 if (current == worker) {
115 return true; 134 return true;
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
149 ASSERT(worker != NULL && worker->owned_); 168 ASSERT(worker != NULL && worker->owned_);
150 if (all_workers_ == NULL) { 169 if (all_workers_ == NULL) {
151 return false; 170 return false;
152 } 171 }
153 172
154 // Special case head of list. 173 // Special case head of list.
155 if (all_workers_ == worker) { 174 if (all_workers_ == worker) {
156 all_workers_ = worker->all_next_; 175 all_workers_ = worker->all_next_;
157 worker->all_next_ = NULL; 176 worker->all_next_ = NULL;
158 worker->owned_ = false; 177 worker->owned_ = false;
159 worker->pool_ = NULL; 178 worker->done_ = true;
160 return true; 179 return true;
161 } 180 }
162 181
163 for (Worker* current = all_workers_; 182 for (Worker* current = all_workers_;
164 current->all_next_ != NULL; 183 current->all_next_ != NULL;
165 current = current->all_next_) { 184 current = current->all_next_) {
166 if (current->all_next_ == worker) { 185 if (current->all_next_ == worker) {
167 current->all_next_ = worker->all_next_; 186 current->all_next_ = worker->all_next_;
168 worker->all_next_ = NULL; 187 worker->all_next_ = NULL;
169 worker->owned_ = false; 188 worker->owned_ = false;
(...skipping 10 matching lines...) Expand all
180 return; 199 return;
181 } 200 }
182 ASSERT(worker->owned_ && !IsIdle(worker)); 201 ASSERT(worker->owned_ && !IsIdle(worker));
183 worker->idle_next_ = idle_workers_; 202 worker->idle_next_ = idle_workers_;
184 idle_workers_ = worker; 203 idle_workers_ = worker;
185 count_idle_++; 204 count_idle_++;
186 count_running_--; 205 count_running_--;
187 } 206 }
188 207
189 208
209 void ThreadPool::ReapExitedIdleThreads() {
210 JoinList* list = NULL;
211 {
212 MutexLocker ml(&mutex_);
213 if (shutting_down_) {
214 // If we're shutting down, the threads will be reaped in Shutdown.
215 return;
216 }
217 list = idle_join_list_;
218 idle_join_list_ = NULL;
219 }
220 JoinList::Join(&list);
221 }
222
223
224 class ReaperTask : public ThreadPool::Task {
225 public:
226 explicit ReaperTask(ThreadPool* pool) : pool_(pool) {}
227 virtual void Run() {
228 pool_->ReapExitedIdleThreads();
229 }
230
231 private:
232 ThreadPool* pool_;
233 };
234
235
190 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { 236 bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
191 MutexLocker ml(&mutex_); 237 Worker* idle_worker = NULL;
192 if (shutting_down_) { 238 {
193 return false; 239 MutexLocker ml(&mutex_);
240 if (shutting_down_) {
241 return false;
242 }
243 // Remove from idle list.
244 if (!RemoveWorkerFromIdleList(worker)) {
245 return false;
246 }
247 // Remove from all list.
248 bool found = RemoveWorkerFromAllList(worker);
249 ASSERT(found);
250
251 // The thread for worker will exit. Add its ThreadId to the idle_join_list_
252 // so that we can join on it at the next opportunity.
253 JoinList::Add(worker->id_, &idle_join_list_);
254
255 // If there's an idle worker hanging around, set it up to reap the exiting
256 // idle thread.
257 if (idle_workers_ != NULL) {
258 Worker* idle_worker = idle_workers_;
259 idle_workers_ = idle_workers_->idle_next_;
260 idle_worker->idle_next_ = NULL;
261 count_idle_--;
262 }
263
264 count_stopped_++;
265 count_idle_--;
194 } 266 }
195 // Remove from idle list. 267 if (idle_worker != NULL) {
196 if (!RemoveWorkerFromIdleList(worker)) { 268 idle_worker->SetTask(new ReaperTask(this));
197 return false;
198 } 269 }
199 // Remove from all list. 270 return true;
200 bool found = RemoveWorkerFromAllList(worker); 271 }
201 ASSERT(found);
202 272
203 count_stopped_++; 273
204 count_idle_--; 274 // Only call while holding the exit_monitor_
205 return true; 275 void ThreadPool::AddWorkerToShutdownList(Worker* worker) {
276 worker->shutdown_next_ = shutting_down_workers_;
277 shutting_down_workers_ = worker;
278 }
279
280
281 // Only call while holding the exit_monitor_
282 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) {
283 ASSERT(worker != NULL);
284 ASSERT(shutting_down_workers_ != NULL);
285
286 // Special case head of list.
287 if (shutting_down_workers_ == worker) {
288 shutting_down_workers_ = worker->shutdown_next_;
289 worker->shutdown_next_ = NULL;
290 return true;
291 }
292
293 for (Worker* current = shutting_down_workers_;
294 current->shutdown_next_ != NULL;
295 current = current->shutdown_next_) {
296 if (current->shutdown_next_ == worker) {
297 current->shutdown_next_ = worker->shutdown_next_;
298 worker->shutdown_next_ = NULL;
299 return true;
300 }
301 }
302 return false;
303 }
304
305
306 void ThreadPool::JoinList::Add(ThreadId id, JoinList** list) {
307 *list = new JoinList(id, *list);
308 }
309
310
311 void ThreadPool::JoinList::Join(JoinList** list) {
312 while (*list) {
313 JoinList* current = *list;
314 *list = current->next();
315 OSThread::Join(current->id());
316 delete current;
317 }
206 } 318 }
207 319
208 320
209 ThreadPool::Task::Task() { 321 ThreadPool::Task::Task() {
210 } 322 }
211 323
212 324
213 ThreadPool::Task::~Task() { 325 ThreadPool::Task::~Task() {
214 } 326 }
215 327
216 328
217 ThreadPool::Worker::Worker(ThreadPool* pool) 329 ThreadPool::Worker::Worker(ThreadPool* pool)
218 : pool_(pool), 330 : pool_(pool),
331 done_(false),
219 task_(NULL), 332 task_(NULL),
333 id_(OSThread::kInvalidThreadId),
334 started_(false),
220 owned_(false), 335 owned_(false),
221 all_next_(NULL), 336 all_next_(NULL),
222 idle_next_(NULL) { 337 idle_next_(NULL),
338 shutdown_next_(NULL) {
223 } 339 }
224 340
225 341
226 void ThreadPool::Worker::StartThread() { 342 void ThreadPool::Worker::StartThread() {
227 #if defined(DEBUG) 343 #if defined(DEBUG)
228 // Must call SetTask before StartThread. 344 // Must call SetTask before StartThread.
229 { // NOLINT 345 { // NOLINT
230 MonitorLocker ml(&monitor_); 346 MonitorLocker ml(&monitor_);
231 ASSERT(task_ != NULL); 347 ASSERT(task_ != NULL);
232 } 348 }
(...skipping 24 matching lines...) Expand all
257 // out. Give the worker one last desperate chance to live. We 373 // out. Give the worker one last desperate chance to live. We
258 // are merciful. 374 // are merciful.
259 return 1; 375 return 1;
260 } else { 376 } else {
261 return FLAG_worker_timeout_millis - waited; 377 return FLAG_worker_timeout_millis - waited;
262 } 378 }
263 } 379 }
264 } 380 }
265 381
266 382
267 void ThreadPool::Worker::Loop() { 383 bool ThreadPool::Worker::Loop() {
268 MonitorLocker ml(&monitor_); 384 MonitorLocker ml(&monitor_);
269 int64_t idle_start; 385 int64_t idle_start;
270 while (true) { 386 while (true) {
271 ASSERT(task_ != NULL); 387 ASSERT(task_ != NULL);
272 Task* task = task_; 388 Task* task = task_;
273 task_ = NULL; 389 task_ = NULL;
274 390
275 // Release monitor while handling the task. 391 // Release monitor while handling the task.
276 monitor_.Exit(); 392 monitor_.Exit();
277 task->Run(); 393 task->Run();
278 ASSERT(Isolate::Current() == NULL); 394 ASSERT(Isolate::Current() == NULL);
279 delete task; 395 delete task;
280 monitor_.Enter(); 396 monitor_.Enter();
281 397
282 ASSERT(task_ == NULL); 398 ASSERT(task_ == NULL);
283 if (IsDone()) { 399 if (IsDone()) {
284 return; 400 return false;
285 } 401 }
286 ASSERT(pool_ != NULL); 402 ASSERT(!done_);
403 pool_->ReapExitedIdleThreads();
Ivan Posva 2015/08/17 13:35:52 There is no need to hold the monitor_ when calling
zra 2015/08/18 06:23:14 Done.
287 pool_->SetIdle(this); 404 pool_->SetIdle(this);
288 idle_start = OS::GetCurrentTimeMillis(); 405 idle_start = OS::GetCurrentTimeMillis();
289 while (true) { 406 while (true) {
290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); 407 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start));
291 if (task_ != NULL) { 408 if (task_ != NULL) {
292 // We've found a task. Process it, regardless of whether the 409 // We've found a task. Process it, regardless of whether the
293 // worker is done_. 410 // worker is done_.
294 break; 411 break;
295 } 412 }
296 if (IsDone()) { 413 if (IsDone()) {
297 return; 414 return false;
298 } 415 }
299 if (result == Monitor::kTimedOut && 416 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) {
300 pool_->ReleaseIdleWorker(this)) { 417 return true;
301 return;
302 } 418 }
303 } 419 }
304 } 420 }
305 UNREACHABLE(); 421 UNREACHABLE();
422 return false;
306 } 423 }
307 424
308 425
309 void ThreadPool::Worker::Shutdown() { 426 void ThreadPool::Worker::Shutdown() {
310 MonitorLocker ml(&monitor_); 427 MonitorLocker ml(&monitor_);
311 pool_ = NULL; // Fail fast if someone tries to access pool_. 428 done_ = true;
312 ml.Notify(); 429 ml.Notify();
313 } 430 }
314 431
315 432
316 // static 433 // static
317 void ThreadPool::Worker::Main(uword args) { 434 void ThreadPool::Worker::Main(uword args) {
318 Thread::EnsureInit(); 435 Thread::EnsureInit();
319 Worker* worker = reinterpret_cast<Worker*>(args); 436 Worker* worker = reinterpret_cast<Worker*>(args);
320 worker->Loop(); 437 bool delete_self = false;
438
439 {
440 MonitorLocker ml(&(worker->monitor_));
441 if (worker->IsDone()) {
442 // id_ hasn't been set yet, but the ThreadPool is being shutdown.
443 // Delete the task, and return.
444 ASSERT(worker->task_);
445 delete worker->task_;
446 worker->task_ = NULL;
447 delete_self = true;
448 } else {
449 worker->id_ = OSThread::GetCurrentThreadId();
450 worker->started_ = true;
451 }
452 }
453
454 // We aren't able to delete the worker while holding the worker's monitor.
455 // Now that we have released it, and we know that ThreadPool::Shutdown
456 // won't touch it again, we can delete it and return.
457 if (delete_self) {
458 MonitorLocker eml(&worker->pool_->exit_monitor_);
459 ThreadPool::JoinList::Add(
460 OSThread::GetCurrentThreadId(), &worker->pool_->join_list_);
461 worker->pool_->RemoveWorkerFromShutdownList(worker);
462 delete worker;
463 eml.Notify();
464 return;
465 }
466
467 bool released = worker->Loop();
321 468
322 // It should be okay to access these unlocked here in this assert. 469 // It should be okay to access these unlocked here in this assert.
323 ASSERT(!worker->owned_ && 470 // worker->all_next_ is retained by the pool for shutdown monitoring.
324 worker->all_next_ == NULL && 471 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL));
325 worker->idle_next_ == NULL);
326 472
327 // The exit monitor is only used during testing. 473 if (!released) {
328 if (ThreadPool::exit_monitor_) { 474 // This worker is exiting because the thread pool is being shut down.
329 MonitorLocker ml(ThreadPool::exit_monitor_); 475 // Inform the thread pool that we are exiting. We remove this worker from
330 (*ThreadPool::exit_count_)++; 476 // shutting_down_workers_ list because there will be no need for the
331 ml.Notify(); 477 // ThreadPool to take action for this worker.
478 MonitorLocker eml(&worker->pool_->exit_monitor_);
479 JoinList::Add(worker->id_, &worker->pool_->join_list_);
480 worker->id_ = OSThread::kInvalidThreadId;
481 worker->pool_->RemoveWorkerFromShutdownList(worker);
482 delete worker;
483 eml.Notify();
484 } else {
485 // This worker is going down because it was idle for too long. This case
486 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker.
487 delete worker;
332 } 488 }
333 delete worker;
334 #if defined(TARGET_OS_WINDOWS) 489 #if defined(TARGET_OS_WINDOWS)
335 Thread::CleanUp(); 490 Thread::CleanUp();
336 #endif 491 #endif
337 } 492 }
338 493
339 } // namespace dart 494 } // namespace dart
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698