OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/thread_pool.h" | 5 #include "vm/thread_pool.h" |
6 | 6 |
7 #include "vm/flags.h" | 7 #include "vm/flags.h" |
8 #include "vm/lockers.h" | 8 #include "vm/lockers.h" |
9 | 9 |
10 namespace dart { | 10 namespace dart { |
11 | 11 |
12 DEFINE_FLAG(int, worker_timeout_millis, 5000, | 12 DEFINE_FLAG(int, worker_timeout_millis, 5000, |
13 "Free workers when they have been idle for this amount of time."); | 13 "Free workers when they have been idle for this amount of time."); |
| 14 DEFINE_FLAG(int, shutdown_timeout_millis, 1000, |
| 15 "Amount of time to wait for a worker to stop during shutdown."); |
14 | 16 |
15 Monitor* ThreadPool::exit_monitor_ = NULL; | 17 Monitor ThreadPool::exit_monitor_; |
16 int* ThreadPool::exit_count_ = NULL; | 18 ThreadPool::Worker* ThreadPool::shutting_down_workers_ = NULL; |
17 | 19 |
18 ThreadPool::ThreadPool() | 20 ThreadPool::ThreadPool() |
19 : shutting_down_(false), | 21 : shutting_down_(false), |
20 all_workers_(NULL), | 22 all_workers_(NULL), |
21 idle_workers_(NULL), | 23 idle_workers_(NULL), |
22 count_started_(0), | 24 count_started_(0), |
23 count_stopped_(0), | 25 count_stopped_(0), |
24 count_running_(0), | 26 count_running_(0), |
25 count_idle_(0) { | 27 count_idle_(0) { |
26 } | 28 } |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
87 current = next; | 89 current = next; |
88 count_stopped_++; | 90 count_stopped_++; |
89 } | 91 } |
90 | 92 |
91 count_idle_ = 0; | 93 count_idle_ = 0; |
92 count_running_ = 0; | 94 count_running_ = 0; |
93 ASSERT(count_started_ == count_stopped_); | 95 ASSERT(count_started_ == count_stopped_); |
94 } | 96 } |
95 // Release ThreadPool::mutex_ before calling Worker functions. | 97 // Release ThreadPool::mutex_ before calling Worker functions. |
96 | 98 |
97 Worker* current = saved; | 99 { |
98 while (current != NULL) { | 100 MonitorLocker eml(&ThreadPool::exit_monitor_); |
99 // We may access all_next_ without holding ThreadPool::mutex_ here | 101 |
100 // because the worker is no longer owned by the ThreadPool. | 102 // First tell all the workers to shut down. |
101 Worker* next = current->all_next_; | 103 Worker* current = saved; |
102 current->all_next_ = NULL; | 104 while (current != NULL) { |
103 current->Shutdown(); | 105 Worker* next = current->all_next_; |
104 current = next; | 106 bool started = current->Shutdown(); |
| 107 // After Shutdown, if started is false, we can no longer touch the worker |
| 108 // because a worker that hasn't started yet may run at any time and |
| 109 // delete itself. |
| 110 if (started) { |
| 111 current->all_next_ = NULL; |
| 112 // We only ensure the shutdown of threads that have started. |
| 113 // Threads that have not started will shutdown immediately as soon as |
| 114 // they run. |
| 115 AddWorkerToShutdownList(current); |
| 116 } |
| 117 current = next; |
| 118 } |
| 119 saved = NULL; |
| 120 |
| 121 // Give workers a chance to exit gracefully. |
| 122 const int64_t start_wait = OS::GetCurrentTimeMillis(); |
| 123 int timeout = FLAG_shutdown_timeout_millis; |
| 124 while (shutting_down_workers_ != NULL) { |
| 125 if (timeout > 0) { |
| 126 // Here, we are waiting for workers to exit. When a worker exits we will |
| 127 // be notified. |
| 128 eml.Wait(timeout); |
| 129 |
| 130 // We decrement the timeout for the next wait by the amount of time |
| 131 // we've already waited. If the new timeout drops below zero, we break |
| 132 // out of this loop, which triggers the termination code below. |
| 133 const int64_t after_wait = OS::GetCurrentTimeMillis(); |
| 134 timeout = FLAG_shutdown_timeout_millis - (after_wait - start_wait); |
| 135 } else { |
| 136 break; |
| 137 } |
| 138 } |
| 139 |
| 140 // It is an error if all workers have not exited within the timeout. We |
| 141 // assume that they have run off into the weeds, and it is a bug. |
| 142 if (shutting_down_workers_ != NULL) { |
| 143 FATAL("Thread pool worker threads failed to exit."); |
| 144 } |
105 } | 145 } |
106 } | 146 } |
107 | 147 |
108 | 148 |
109 bool ThreadPool::IsIdle(Worker* worker) { | 149 bool ThreadPool::IsIdle(Worker* worker) { |
110 ASSERT(worker != NULL && worker->owned_); | 150 ASSERT(worker != NULL && worker->owned_); |
111 for (Worker* current = idle_workers_; | 151 for (Worker* current = idle_workers_; |
112 current != NULL; | 152 current != NULL; |
113 current = current->idle_next_) { | 153 current = current->idle_next_) { |
114 if (current == worker) { | 154 if (current == worker) { |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
199 // Remove from all list. | 239 // Remove from all list. |
200 bool found = RemoveWorkerFromAllList(worker); | 240 bool found = RemoveWorkerFromAllList(worker); |
201 ASSERT(found); | 241 ASSERT(found); |
202 | 242 |
203 count_stopped_++; | 243 count_stopped_++; |
204 count_idle_--; | 244 count_idle_--; |
205 return true; | 245 return true; |
206 } | 246 } |
207 | 247 |
208 | 248 |
| 249 // Only call while holding the exit_monitor_ |
| 250 void ThreadPool::AddWorkerToShutdownList(Worker* worker) { |
| 251 worker->shutdown_next_ = shutting_down_workers_; |
| 252 shutting_down_workers_ = worker; |
| 253 } |
| 254 |
| 255 |
| 256 // Only call while holding the exit_monitor_ |
| 257 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { |
| 258 ASSERT(worker != NULL); |
| 259 if (shutting_down_workers_ == NULL) { |
| 260 return false; |
| 261 } |
| 262 |
| 263 // Special case head of list. |
| 264 if (shutting_down_workers_ == worker) { |
| 265 shutting_down_workers_ = worker->shutdown_next_; |
| 266 worker->shutdown_next_ = NULL; |
| 267 return true; |
| 268 } |
| 269 |
| 270 for (Worker* current = shutting_down_workers_; |
| 271 current->shutdown_next_ != NULL; |
| 272 current = current->shutdown_next_) { |
| 273 if (current->shutdown_next_ == worker) { |
| 274 current->shutdown_next_ = worker->shutdown_next_; |
| 275 worker->shutdown_next_ = NULL; |
| 276 return true; |
| 277 } |
| 278 } |
| 279 return false; |
| 280 } |
| 281 |
| 282 |
209 ThreadPool::Task::Task() { | 283 ThreadPool::Task::Task() { |
210 } | 284 } |
211 | 285 |
212 | 286 |
213 ThreadPool::Task::~Task() { | 287 ThreadPool::Task::~Task() { |
214 } | 288 } |
215 | 289 |
216 | 290 |
217 ThreadPool::Worker::Worker(ThreadPool* pool) | 291 ThreadPool::Worker::Worker(ThreadPool* pool) |
218 : pool_(pool), | 292 : pool_(pool), |
219 task_(NULL), | 293 task_(NULL), |
| 294 id_(OSThread::kInvalidThreadId), |
| 295 started_(false), |
220 owned_(false), | 296 owned_(false), |
221 all_next_(NULL), | 297 all_next_(NULL), |
222 idle_next_(NULL) { | 298 idle_next_(NULL), |
| 299 shutdown_next_(NULL) { |
223 } | 300 } |
224 | 301 |
225 | 302 |
226 void ThreadPool::Worker::StartThread() { | 303 void ThreadPool::Worker::StartThread() { |
227 #if defined(DEBUG) | 304 #if defined(DEBUG) |
228 // Must call SetTask before StartThread. | 305 // Must call SetTask before StartThread. |
229 { // NOLINT | 306 { // NOLINT |
230 MonitorLocker ml(&monitor_); | 307 MonitorLocker ml(&monitor_); |
231 ASSERT(task_ != NULL); | 308 ASSERT(task_ != NULL); |
232 } | 309 } |
(...skipping 24 matching lines...) Expand all Loading... |
257 // out. Give the worker one last desperate chance to live. We | 334 // out. Give the worker one last desperate chance to live. We |
258 // are merciful. | 335 // are merciful. |
259 return 1; | 336 return 1; |
260 } else { | 337 } else { |
261 return FLAG_worker_timeout_millis - waited; | 338 return FLAG_worker_timeout_millis - waited; |
262 } | 339 } |
263 } | 340 } |
264 } | 341 } |
265 | 342 |
266 | 343 |
267 void ThreadPool::Worker::Loop() { | 344 bool ThreadPool::Worker::Loop() { |
268 MonitorLocker ml(&monitor_); | 345 MonitorLocker ml(&monitor_); |
269 int64_t idle_start; | 346 int64_t idle_start; |
270 while (true) { | 347 while (true) { |
271 ASSERT(task_ != NULL); | 348 ASSERT(task_ != NULL); |
272 Task* task = task_; | 349 Task* task = task_; |
273 task_ = NULL; | 350 task_ = NULL; |
274 | 351 |
275 // Release monitor while handling the task. | 352 // Release monitor while handling the task. |
276 monitor_.Exit(); | 353 monitor_.Exit(); |
277 task->Run(); | 354 task->Run(); |
278 ASSERT(Isolate::Current() == NULL); | 355 ASSERT(Isolate::Current() == NULL); |
279 delete task; | 356 delete task; |
280 monitor_.Enter(); | 357 monitor_.Enter(); |
281 | 358 |
282 ASSERT(task_ == NULL); | 359 ASSERT(task_ == NULL); |
283 if (IsDone()) { | 360 if (IsDone()) { |
284 return; | 361 return false; |
285 } | 362 } |
286 ASSERT(pool_ != NULL); | 363 ASSERT(pool_ != NULL); |
287 pool_->SetIdle(this); | 364 pool_->SetIdle(this); |
288 idle_start = OS::GetCurrentTimeMillis(); | 365 idle_start = OS::GetCurrentTimeMillis(); |
289 while (true) { | 366 while (true) { |
290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); | 367 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
291 if (task_ != NULL) { | 368 if (task_ != NULL) { |
292 // We've found a task. Process it, regardless of whether the | 369 // We've found a task. Process it, regardless of whether the |
293 // worker is done_. | 370 // worker is done_. |
294 break; | 371 break; |
295 } | 372 } |
296 if (IsDone()) { | 373 if (IsDone()) { |
297 return; | 374 return false; |
298 } | 375 } |
299 if (result == Monitor::kTimedOut && | 376 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { |
300 pool_->ReleaseIdleWorker(this)) { | 377 return true; |
301 return; | |
302 } | 378 } |
303 } | 379 } |
304 } | 380 } |
305 UNREACHABLE(); | 381 UNREACHABLE(); |
| 382 return false; |
306 } | 383 } |
307 | 384 |
308 | 385 |
309 void ThreadPool::Worker::Shutdown() { | 386 bool ThreadPool::Worker::Shutdown() { |
310 MonitorLocker ml(&monitor_); | 387 MonitorLocker ml(&monitor_); |
311 pool_ = NULL; // Fail fast if someone tries to access pool_. | 388 pool_ = NULL; // Fail fast if someone tries to access pool_. |
312 ml.Notify(); | 389 ml.Notify(); |
| 390 // Return whether the worker thread has started. |
| 391 return started_; |
313 } | 392 } |
314 | 393 |
315 | 394 |
316 // static | 395 // static |
317 void ThreadPool::Worker::Main(uword args) { | 396 void ThreadPool::Worker::Main(uword args) { |
318 Thread::EnsureInit(); | 397 Thread::EnsureInit(); |
319 Worker* worker = reinterpret_cast<Worker*>(args); | 398 Worker* worker = reinterpret_cast<Worker*>(args); |
320 worker->Loop(); | 399 bool delete_self = false; |
| 400 |
| 401 { |
| 402 MonitorLocker ml(&(worker->monitor_)); |
| 403 if (worker->IsDone()) { |
| 404 // id_ hasn't been set, yet, but the ThreadPool is being shutdown. |
| 405 // Delete the task, and return. |
| 406 ASSERT(worker->task_); |
| 407 delete worker->task_; |
| 408 worker->task_ = NULL; |
| 409 delete_self = true; |
| 410 } else { |
| 411 worker->id_ = OSThread::GetCurrentThreadId(); |
| 412 worker->started_ = true; |
| 413 } |
| 414 } |
| 415 |
| 416 // We aren't able to delete the worker while holding the worker's monitor. |
| 417 // Now that we have released it, and we know that ThreadPool::Shutdown |
| 418 // won't touch it again, we can delete it and return. |
| 419 if (delete_self) { |
| 420 delete worker; |
| 421 return; |
| 422 } |
| 423 |
| 424 bool released = worker->Loop(); |
321 | 425 |
322 // It should be okay to access these unlocked here in this assert. | 426 // It should be okay to access these unlocked here in this assert. |
323 ASSERT(!worker->owned_ && | 427 // worker->all_next_ is retained by the pool for shutdown monitoring. |
324 worker->all_next_ == NULL && | 428 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL)); |
325 worker->idle_next_ == NULL); | |
326 | 429 |
327 // The exit monitor is only used during testing. | 430 if (!released) { |
328 if (ThreadPool::exit_monitor_) { | 431 // This worker is exiting because the thread pool is being shut down. |
329 MonitorLocker ml(ThreadPool::exit_monitor_); | 432 // Inform the thread pool that we are exiting. We remove this worker from |
330 (*ThreadPool::exit_count_)++; | 433 // shutting_down_workers_ list because there will be no need for the |
331 ml.Notify(); | 434 // ThreadPool to take action for this worker. |
| 435 MonitorLocker eml(&ThreadPool::exit_monitor_); |
| 436 worker->id_ = OSThread::kInvalidThreadId; |
| 437 RemoveWorkerFromShutdownList(worker); |
| 438 delete worker; |
| 439 eml.Notify(); |
| 440 } else { |
| 441 // This worker is going down because it was idle for too long. This case |
| 442 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker. |
| 443 delete worker; |
332 } | 444 } |
333 delete worker; | |
334 #if defined(TARGET_OS_WINDOWS) | 445 #if defined(TARGET_OS_WINDOWS) |
335 Thread::CleanUp(); | 446 Thread::CleanUp(); |
336 #endif | 447 #endif |
337 } | 448 } |
338 | 449 |
339 } // namespace dart | 450 } // namespace dart |
OLD | NEW |