Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(162)

Side by Side Diff: runtime/vm/thread_pool.cc

Issue 850183003: During thread-pool shutdown, wait for worker threads to shutdown. (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/vm/thread_pool.h ('k') | runtime/vm/thread_pool_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "vm/thread_pool.h" 5 #include "vm/thread_pool.h"
6 6
7 #include "vm/flags.h" 7 #include "vm/flags.h"
8 #include "vm/lockers.h" 8 #include "vm/lockers.h"
9 9
10 namespace dart { 10 namespace dart {
11 11
12 DEFINE_FLAG(int, worker_timeout_millis, 5000, 12 DEFINE_FLAG(int, worker_timeout_millis, 5000,
13 "Free workers when they have been idle for this amount of time."); 13 "Free workers when they have been idle for this amount of time.");
14 DEFINE_FLAG(int, shutdown_timeout_millis, 1000,
15 "Amount of time to wait for a worker to stop during shutdown.");
14 16
15 Monitor* ThreadPool::exit_monitor_ = NULL; 17 Monitor ThreadPool::exit_monitor_;
16 int* ThreadPool::exit_count_ = NULL; 18 ThreadPool::Worker* ThreadPool::shutting_down_workers_ = NULL;
17 19
18 ThreadPool::ThreadPool() 20 ThreadPool::ThreadPool()
19 : shutting_down_(false), 21 : shutting_down_(false),
20 all_workers_(NULL), 22 all_workers_(NULL),
21 idle_workers_(NULL), 23 idle_workers_(NULL),
22 count_started_(0), 24 count_started_(0),
23 count_stopped_(0), 25 count_stopped_(0),
24 count_running_(0), 26 count_running_(0),
25 count_idle_(0) { 27 count_idle_(0) {
26 } 28 }
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
87 current = next; 89 current = next;
88 count_stopped_++; 90 count_stopped_++;
89 } 91 }
90 92
91 count_idle_ = 0; 93 count_idle_ = 0;
92 count_running_ = 0; 94 count_running_ = 0;
93 ASSERT(count_started_ == count_stopped_); 95 ASSERT(count_started_ == count_stopped_);
94 } 96 }
95 // Release ThreadPool::mutex_ before calling Worker functions. 97 // Release ThreadPool::mutex_ before calling Worker functions.
96 98
97 Worker* current = saved; 99 {
98 while (current != NULL) { 100 MonitorLocker eml(&ThreadPool::exit_monitor_);
99 // We may access all_next_ without holding ThreadPool::mutex_ here 101
100 // because the worker is no longer owned by the ThreadPool. 102 // First tell all the workers to shut down.
101 Worker* next = current->all_next_; 103 Worker* current = saved;
102 current->all_next_ = NULL; 104 while (current != NULL) {
103 current->Shutdown(); 105 Worker* next = current->all_next_;
104 current = next; 106 bool started = current->Shutdown();
107 // After Shutdown, if started is false, we can no longer touch the worker
108 // because a worker that hasn't started yet may run at any time and
109 // delete itself.
110 if (started) {
111 current->all_next_ = NULL;
112 // We only ensure the shutdown of threads that have started.
113 // Threads that have not started will shutdown immediately as soon as
114 // they run.
115 AddWorkerToShutdownList(current);
116 }
117 current = next;
118 }
119 saved = NULL;
120
121 // Give workers a chance to exit gracefully.
122 const int64_t start_wait = OS::GetCurrentTimeMillis();
123 int timeout = FLAG_shutdown_timeout_millis;
124 while (shutting_down_workers_ != NULL) {
125 if (timeout > 0) {
126 // Here, we are waiting for workers to exit. When a worker exits we will
127 // be notified.
128 eml.Wait(timeout);
129
130 // We decrement the timeout for the next wait by the amount of time
131 // we've already waited. If the new timeout drops below zero, we break
132 // out of this loop, which triggers the termination code below.
133 const int64_t after_wait = OS::GetCurrentTimeMillis();
134 timeout = FLAG_shutdown_timeout_millis - (after_wait - start_wait);
135 } else {
136 break;
137 }
138 }
139
140 // It is an error if all workers have not exited within the timeout. We
141 // assume that they have run off into the weeds, and it is a bug.
142 if (shutting_down_workers_ != NULL) {
143 FATAL("Thread pool worker threads failed to exit.");
144 }
105 } 145 }
106 } 146 }
107 147
108 148
109 bool ThreadPool::IsIdle(Worker* worker) { 149 bool ThreadPool::IsIdle(Worker* worker) {
110 ASSERT(worker != NULL && worker->owned_); 150 ASSERT(worker != NULL && worker->owned_);
111 for (Worker* current = idle_workers_; 151 for (Worker* current = idle_workers_;
112 current != NULL; 152 current != NULL;
113 current = current->idle_next_) { 153 current = current->idle_next_) {
114 if (current == worker) { 154 if (current == worker) {
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
199 // Remove from all list. 239 // Remove from all list.
200 bool found = RemoveWorkerFromAllList(worker); 240 bool found = RemoveWorkerFromAllList(worker);
201 ASSERT(found); 241 ASSERT(found);
202 242
203 count_stopped_++; 243 count_stopped_++;
204 count_idle_--; 244 count_idle_--;
205 return true; 245 return true;
206 } 246 }
207 247
208 248
249 // Only call while holding the exit_monitor_
250 void ThreadPool::AddWorkerToShutdownList(Worker* worker) {
251 worker->shutdown_next_ = shutting_down_workers_;
252 shutting_down_workers_ = worker;
253 }
254
255
256 // Only call while holding the exit_monitor_
257 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) {
258 ASSERT(worker != NULL);
259 if (shutting_down_workers_ == NULL) {
260 return false;
261 }
262
263 // Special case head of list.
264 if (shutting_down_workers_ == worker) {
265 shutting_down_workers_ = worker->shutdown_next_;
266 worker->shutdown_next_ = NULL;
267 return true;
268 }
269
270 for (Worker* current = shutting_down_workers_;
271 current->shutdown_next_ != NULL;
272 current = current->shutdown_next_) {
273 if (current->shutdown_next_ == worker) {
274 current->shutdown_next_ = worker->shutdown_next_;
275 worker->shutdown_next_ = NULL;
276 return true;
277 }
278 }
279 return false;
280 }
281
282
209 ThreadPool::Task::Task() { 283 ThreadPool::Task::Task() {
210 } 284 }
211 285
212 286
213 ThreadPool::Task::~Task() { 287 ThreadPool::Task::~Task() {
214 } 288 }
215 289
216 290
217 ThreadPool::Worker::Worker(ThreadPool* pool) 291 ThreadPool::Worker::Worker(ThreadPool* pool)
218 : pool_(pool), 292 : pool_(pool),
219 task_(NULL), 293 task_(NULL),
294 id_(OSThread::kInvalidThreadId),
295 started_(false),
220 owned_(false), 296 owned_(false),
221 all_next_(NULL), 297 all_next_(NULL),
222 idle_next_(NULL) { 298 idle_next_(NULL),
299 shutdown_next_(NULL) {
223 } 300 }
224 301
225 302
226 void ThreadPool::Worker::StartThread() { 303 void ThreadPool::Worker::StartThread() {
227 #if defined(DEBUG) 304 #if defined(DEBUG)
228 // Must call SetTask before StartThread. 305 // Must call SetTask before StartThread.
229 { // NOLINT 306 { // NOLINT
230 MonitorLocker ml(&monitor_); 307 MonitorLocker ml(&monitor_);
231 ASSERT(task_ != NULL); 308 ASSERT(task_ != NULL);
232 } 309 }
(...skipping 24 matching lines...) Expand all
257 // out. Give the worker one last desperate chance to live. We 334 // out. Give the worker one last desperate chance to live. We
258 // are merciful. 335 // are merciful.
259 return 1; 336 return 1;
260 } else { 337 } else {
261 return FLAG_worker_timeout_millis - waited; 338 return FLAG_worker_timeout_millis - waited;
262 } 339 }
263 } 340 }
264 } 341 }
265 342
266 343
267 void ThreadPool::Worker::Loop() { 344 bool ThreadPool::Worker::Loop() {
268 MonitorLocker ml(&monitor_); 345 MonitorLocker ml(&monitor_);
269 int64_t idle_start; 346 int64_t idle_start;
270 while (true) { 347 while (true) {
271 ASSERT(task_ != NULL); 348 ASSERT(task_ != NULL);
272 Task* task = task_; 349 Task* task = task_;
273 task_ = NULL; 350 task_ = NULL;
274 351
275 // Release monitor while handling the task. 352 // Release monitor while handling the task.
276 monitor_.Exit(); 353 monitor_.Exit();
277 task->Run(); 354 task->Run();
278 ASSERT(Isolate::Current() == NULL); 355 ASSERT(Isolate::Current() == NULL);
279 delete task; 356 delete task;
280 monitor_.Enter(); 357 monitor_.Enter();
281 358
282 ASSERT(task_ == NULL); 359 ASSERT(task_ == NULL);
283 if (IsDone()) { 360 if (IsDone()) {
284 return; 361 return false;
285 } 362 }
286 ASSERT(pool_ != NULL); 363 ASSERT(pool_ != NULL);
287 pool_->SetIdle(this); 364 pool_->SetIdle(this);
288 idle_start = OS::GetCurrentTimeMillis(); 365 idle_start = OS::GetCurrentTimeMillis();
289 while (true) { 366 while (true) {
290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); 367 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start));
291 if (task_ != NULL) { 368 if (task_ != NULL) {
292 // We've found a task. Process it, regardless of whether the 369 // We've found a task. Process it, regardless of whether the
293 // worker is done_. 370 // worker is done_.
294 break; 371 break;
295 } 372 }
296 if (IsDone()) { 373 if (IsDone()) {
297 return; 374 return false;
298 } 375 }
299 if (result == Monitor::kTimedOut && 376 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) {
300 pool_->ReleaseIdleWorker(this)) { 377 return true;
301 return;
302 } 378 }
303 } 379 }
304 } 380 }
305 UNREACHABLE(); 381 UNREACHABLE();
382 return false;
306 } 383 }
307 384
308 385
309 void ThreadPool::Worker::Shutdown() { 386 bool ThreadPool::Worker::Shutdown() {
310 MonitorLocker ml(&monitor_); 387 MonitorLocker ml(&monitor_);
311 pool_ = NULL; // Fail fast if someone tries to access pool_. 388 pool_ = NULL; // Fail fast if someone tries to access pool_.
312 ml.Notify(); 389 ml.Notify();
390 // Return whether the worker thread has started.
391 return started_;
313 } 392 }
314 393
315 394
316 // static 395 // static
317 void ThreadPool::Worker::Main(uword args) { 396 void ThreadPool::Worker::Main(uword args) {
318 Thread::EnsureInit(); 397 Thread::EnsureInit();
319 Worker* worker = reinterpret_cast<Worker*>(args); 398 Worker* worker = reinterpret_cast<Worker*>(args);
320 worker->Loop(); 399 bool delete_self = false;
400
401 {
402 MonitorLocker ml(&(worker->monitor_));
403 if (worker->IsDone()) {
404 // id_ hasn't been set, yet, but the ThreadPool is being shutdown.
405 // Delete the task, and return.
406 ASSERT(worker->task_);
407 delete worker->task_;
408 worker->task_ = NULL;
409 delete_self = true;
410 } else {
411 worker->id_ = OSThread::GetCurrentThreadId();
412 worker->started_ = true;
413 }
414 }
415
416 // We aren't able to delete the worker while holding the worker's monitor.
417 // Now that we have released it, and we know that ThreadPool::Shutdown
418 // won't touch it again, we can delete it and return.
419 if (delete_self) {
420 delete worker;
421 return;
422 }
423
424 bool released = worker->Loop();
321 425
322 // It should be okay to access these unlocked here in this assert. 426 // It should be okay to access these unlocked here in this assert.
323 ASSERT(!worker->owned_ && 427 // worker->all_next_ is retained by the pool for shutdown monitoring.
324 worker->all_next_ == NULL && 428 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL));
325 worker->idle_next_ == NULL);
326 429
327 // The exit monitor is only used during testing. 430 if (!released) {
328 if (ThreadPool::exit_monitor_) { 431 // This worker is exiting because the thread pool is being shut down.
329 MonitorLocker ml(ThreadPool::exit_monitor_); 432 // Inform the thread pool that we are exiting. We remove this worker from
330 (*ThreadPool::exit_count_)++; 433 // shutting_down_workers_ list because there will be no need for the
331 ml.Notify(); 434 // ThreadPool to take action for this worker.
435 MonitorLocker eml(&ThreadPool::exit_monitor_);
436 worker->id_ = OSThread::kInvalidThreadId;
437 RemoveWorkerFromShutdownList(worker);
438 delete worker;
439 eml.Notify();
440 } else {
441 // This worker is going down because it was idle for too long. This case
442 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker.
443 delete worker;
332 } 444 }
333 delete worker;
334 #if defined(TARGET_OS_WINDOWS) 445 #if defined(TARGET_OS_WINDOWS)
335 Thread::CleanUp(); 446 Thread::CleanUp();
336 #endif 447 #endif
337 } 448 }
338 449
339 } // namespace dart 450 } // namespace dart
OLDNEW
« no previous file with comments | « runtime/vm/thread_pool.h ('k') | runtime/vm/thread_pool_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698