OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/thread_pool.h" | 5 #include "vm/thread_pool.h" |
6 | 6 |
7 #include "vm/flags.h" | 7 #include "vm/flags.h" |
8 #include "vm/lockers.h" | 8 #include "vm/lockers.h" |
9 | 9 |
10 namespace dart { | 10 namespace dart { |
11 | 11 |
12 DEFINE_FLAG(int, worker_timeout_millis, 5000, | 12 DEFINE_FLAG(int, worker_timeout_millis, 5000, |
13 "Free workers when they have been idle for this amount of time."); | 13 "Free workers when they have been idle for this amount of time."); |
14 | 14 |
15 Monitor* ThreadPool::exit_monitor_ = NULL; | |
16 int* ThreadPool::exit_count_ = NULL; | |
17 | |
18 ThreadPool::ThreadPool() | 15 ThreadPool::ThreadPool() |
19 : shutting_down_(false), | 16 : shutting_down_(false), |
20 all_workers_(NULL), | 17 all_workers_(NULL), |
21 idle_workers_(NULL), | 18 idle_workers_(NULL), |
22 count_started_(0), | 19 count_started_(0), |
23 count_stopped_(0), | 20 count_stopped_(0), |
24 count_running_(0), | 21 count_running_(0), |
25 count_idle_(0) { | 22 count_idle_(0), |
| 23 idle_join_list_(NULL), |
| 24 shutting_down_workers_(NULL), |
| 25 join_list_(NULL) { |
26 } | 26 } |
27 | 27 |
28 | 28 |
29 ThreadPool::~ThreadPool() { | 29 ThreadPool::~ThreadPool() { |
30 Shutdown(); | 30 Shutdown(); |
31 } | 31 } |
32 | 32 |
33 | 33 |
34 void ThreadPool::Run(Task* task) { | 34 void ThreadPool::Run(Task* task) { |
35 Worker* worker = NULL; | 35 Worker* worker = NULL; |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
87 current = next; | 87 current = next; |
88 count_stopped_++; | 88 count_stopped_++; |
89 } | 89 } |
90 | 90 |
91 count_idle_ = 0; | 91 count_idle_ = 0; |
92 count_running_ = 0; | 92 count_running_ = 0; |
93 ASSERT(count_started_ == count_stopped_); | 93 ASSERT(count_started_ == count_stopped_); |
94 } | 94 } |
95 // Release ThreadPool::mutex_ before calling Worker functions. | 95 // Release ThreadPool::mutex_ before calling Worker functions. |
96 | 96 |
97 Worker* current = saved; | 97 { |
98 while (current != NULL) { | 98 MonitorLocker eml(&exit_monitor_); |
99 // We may access all_next_ without holding ThreadPool::mutex_ here | 99 |
100 // because the worker is no longer owned by the ThreadPool. | 100 // First tell all the workers to shut down. |
101 Worker* next = current->all_next_; | 101 Worker* current = saved; |
102 current->all_next_ = NULL; | 102 while (current != NULL) { |
103 current->Shutdown(); | 103 Worker* next = current->all_next_; |
104 current = next; | 104 if (current->id_ != OSThread::GetCurrentThreadJoinId()) { |
| 105 AddWorkerToShutdownList(current); |
| 106 } |
| 107 current->Shutdown(); |
| 108 current = next; |
| 109 } |
| 110 saved = NULL; |
| 111 |
| 112 // Wait until all workers will exit. |
| 113 while (shutting_down_workers_ != NULL) { |
| 114 // Here, we are waiting for workers to exit. When a worker exits we will |
| 115 // be notified. |
| 116 eml.Wait(); |
| 117 } |
| 118 |
| 119 // Join non-idle threads. |
| 120 JoinList::Join(&join_list_); |
105 } | 121 } |
| 122 |
| 123 // Join any remaining idle threads. |
| 124 JoinList::Join(&idle_join_list_); |
106 } | 125 } |
107 | 126 |
108 | 127 |
109 bool ThreadPool::IsIdle(Worker* worker) { | 128 bool ThreadPool::IsIdle(Worker* worker) { |
110 ASSERT(worker != NULL && worker->owned_); | 129 ASSERT(worker != NULL && worker->owned_); |
111 for (Worker* current = idle_workers_; | 130 for (Worker* current = idle_workers_; |
112 current != NULL; | 131 current != NULL; |
113 current = current->idle_next_) { | 132 current = current->idle_next_) { |
114 if (current == worker) { | 133 if (current == worker) { |
115 return true; | 134 return true; |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
149 ASSERT(worker != NULL && worker->owned_); | 168 ASSERT(worker != NULL && worker->owned_); |
150 if (all_workers_ == NULL) { | 169 if (all_workers_ == NULL) { |
151 return false; | 170 return false; |
152 } | 171 } |
153 | 172 |
154 // Special case head of list. | 173 // Special case head of list. |
155 if (all_workers_ == worker) { | 174 if (all_workers_ == worker) { |
156 all_workers_ = worker->all_next_; | 175 all_workers_ = worker->all_next_; |
157 worker->all_next_ = NULL; | 176 worker->all_next_ = NULL; |
158 worker->owned_ = false; | 177 worker->owned_ = false; |
159 worker->pool_ = NULL; | 178 worker->done_ = true; |
160 return true; | 179 return true; |
161 } | 180 } |
162 | 181 |
163 for (Worker* current = all_workers_; | 182 for (Worker* current = all_workers_; |
164 current->all_next_ != NULL; | 183 current->all_next_ != NULL; |
165 current = current->all_next_) { | 184 current = current->all_next_) { |
166 if (current->all_next_ == worker) { | 185 if (current->all_next_ == worker) { |
167 current->all_next_ = worker->all_next_; | 186 current->all_next_ = worker->all_next_; |
168 worker->all_next_ = NULL; | 187 worker->all_next_ = NULL; |
169 worker->owned_ = false; | 188 worker->owned_ = false; |
(...skipping 10 matching lines...) Expand all Loading... |
180 return; | 199 return; |
181 } | 200 } |
182 ASSERT(worker->owned_ && !IsIdle(worker)); | 201 ASSERT(worker->owned_ && !IsIdle(worker)); |
183 worker->idle_next_ = idle_workers_; | 202 worker->idle_next_ = idle_workers_; |
184 idle_workers_ = worker; | 203 idle_workers_ = worker; |
185 count_idle_++; | 204 count_idle_++; |
186 count_running_--; | 205 count_running_--; |
187 } | 206 } |
188 | 207 |
189 | 208 |
| 209 void ThreadPool::ReapExitedIdleThreads() { |
| 210 JoinList* list = NULL; |
| 211 { |
| 212 MutexLocker ml(&mutex_); |
| 213 if (shutting_down_) { |
| 214 // If we're shutting down, the threads will be reaped in Shutdown. |
| 215 return; |
| 216 } |
| 217 list = idle_join_list_; |
| 218 idle_join_list_ = NULL; |
| 219 } |
| 220 JoinList::Join(&list); |
| 221 } |
| 222 |
| 223 |
| 224 class ReaperTask : public ThreadPool::Task { |
| 225 public: |
| 226 explicit ReaperTask(ThreadPool* pool) : pool_(pool) {} |
| 227 virtual void Run() { |
| 228 pool_->ReapExitedIdleThreads(); |
| 229 } |
| 230 |
| 231 private: |
| 232 ThreadPool* pool_; |
| 233 }; |
| 234 |
| 235 |
190 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { | 236 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { |
191 MutexLocker ml(&mutex_); | 237 Worker* idle_worker = NULL; |
192 if (shutting_down_) { | 238 { |
193 return false; | 239 MutexLocker ml(&mutex_); |
| 240 if (shutting_down_) { |
| 241 return false; |
| 242 } |
| 243 // Remove from idle list. |
| 244 if (!RemoveWorkerFromIdleList(worker)) { |
| 245 return false; |
| 246 } |
| 247 // Remove from all list. |
| 248 bool found = RemoveWorkerFromAllList(worker); |
| 249 ASSERT(found); |
| 250 |
| 251 // The thread for worker will exit. Add its ThreadId to the idle_join_list_ |
| 252 // so that we can join on it at the next opportunity. |
| 253 JoinList::Add(worker->id_, &idle_join_list_); |
| 254 |
| 255 // If there's an idle worker hanging around, set it up to reap the exiting |
| 256 // idle thread. |
| 257 if (idle_workers_ != NULL) { |
| 258 Worker* idle_worker = idle_workers_; |
| 259 idle_workers_ = idle_workers_->idle_next_; |
| 260 idle_worker->idle_next_ = NULL; |
| 261 count_idle_--; |
| 262 } |
| 263 |
| 264 count_stopped_++; |
| 265 count_idle_--; |
194 } | 266 } |
195 // Remove from idle list. | 267 if (idle_worker != NULL) { |
196 if (!RemoveWorkerFromIdleList(worker)) { | 268 idle_worker->SetTask(new ReaperTask(this)); |
197 return false; | |
198 } | 269 } |
199 // Remove from all list. | 270 return true; |
200 bool found = RemoveWorkerFromAllList(worker); | 271 } |
201 ASSERT(found); | |
202 | 272 |
203 count_stopped_++; | 273 |
204 count_idle_--; | 274 // Only call while holding the exit_monitor_ |
205 return true; | 275 void ThreadPool::AddWorkerToShutdownList(Worker* worker) { |
| 276 worker->shutdown_next_ = shutting_down_workers_; |
| 277 shutting_down_workers_ = worker; |
| 278 } |
| 279 |
| 280 |
| 281 // Only call while holding the exit_monitor_ |
| 282 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { |
| 283 ASSERT(worker != NULL); |
| 284 ASSERT(shutting_down_workers_ != NULL); |
| 285 |
| 286 // Special case head of list. |
| 287 if (shutting_down_workers_ == worker) { |
| 288 shutting_down_workers_ = worker->shutdown_next_; |
| 289 worker->shutdown_next_ = NULL; |
| 290 return true; |
| 291 } |
| 292 |
| 293 for (Worker* current = shutting_down_workers_; |
| 294 current->shutdown_next_ != NULL; |
| 295 current = current->shutdown_next_) { |
| 296 if (current->shutdown_next_ == worker) { |
| 297 current->shutdown_next_ = worker->shutdown_next_; |
| 298 worker->shutdown_next_ = NULL; |
| 299 return true; |
| 300 } |
| 301 } |
| 302 return false; |
| 303 } |
| 304 |
| 305 |
| 306 void ThreadPool::JoinList::Add(ThreadJoinId id, JoinList** list) { |
| 307 *list = new JoinList(id, *list); |
| 308 } |
| 309 |
| 310 |
| 311 void ThreadPool::JoinList::Join(JoinList** list) { |
| 312 while (*list) { |
| 313 JoinList* current = *list; |
| 314 *list = current->next(); |
| 315 OSThread::Join(current->id()); |
| 316 delete current; |
| 317 } |
206 } | 318 } |
207 | 319 |
208 | 320 |
209 ThreadPool::Task::Task() { | 321 ThreadPool::Task::Task() { |
210 } | 322 } |
211 | 323 |
212 | 324 |
213 ThreadPool::Task::~Task() { | 325 ThreadPool::Task::~Task() { |
214 } | 326 } |
215 | 327 |
216 | 328 |
217 ThreadPool::Worker::Worker(ThreadPool* pool) | 329 ThreadPool::Worker::Worker(ThreadPool* pool) |
218 : pool_(pool), | 330 : pool_(pool), |
219 task_(NULL), | 331 task_(NULL), |
| 332 id_(OSThread::kInvalidThreadJoinId), |
| 333 started_(false), |
| 334 done_(false), |
220 owned_(false), | 335 owned_(false), |
221 all_next_(NULL), | 336 all_next_(NULL), |
222 idle_next_(NULL) { | 337 idle_next_(NULL), |
| 338 shutdown_next_(NULL) { |
223 } | 339 } |
224 | 340 |
225 | 341 |
226 void ThreadPool::Worker::StartThread() { | 342 void ThreadPool::Worker::StartThread() { |
227 #if defined(DEBUG) | 343 #if defined(DEBUG) |
228 // Must call SetTask before StartThread. | 344 // Must call SetTask before StartThread. |
229 { // NOLINT | 345 { // NOLINT |
230 MonitorLocker ml(&monitor_); | 346 MonitorLocker ml(&monitor_); |
231 ASSERT(task_ != NULL); | 347 ASSERT(task_ != NULL); |
232 } | 348 } |
(...skipping 24 matching lines...) Expand all Loading... |
257 // out. Give the worker one last desperate chance to live. We | 373 // out. Give the worker one last desperate chance to live. We |
258 // are merciful. | 374 // are merciful. |
259 return 1; | 375 return 1; |
260 } else { | 376 } else { |
261 return FLAG_worker_timeout_millis - waited; | 377 return FLAG_worker_timeout_millis - waited; |
262 } | 378 } |
263 } | 379 } |
264 } | 380 } |
265 | 381 |
266 | 382 |
267 void ThreadPool::Worker::Loop() { | 383 bool ThreadPool::Worker::Loop() { |
268 MonitorLocker ml(&monitor_); | 384 MonitorLocker ml(&monitor_); |
269 int64_t idle_start; | 385 int64_t idle_start; |
270 while (true) { | 386 while (true) { |
271 ASSERT(task_ != NULL); | 387 ASSERT(task_ != NULL); |
272 Task* task = task_; | 388 Task* task = task_; |
273 task_ = NULL; | 389 task_ = NULL; |
274 | 390 |
275 // Release monitor while handling the task. | 391 // Release monitor while handling the task. |
276 monitor_.Exit(); | 392 monitor_.Exit(); |
277 task->Run(); | 393 task->Run(); |
278 ASSERT(Isolate::Current() == NULL); | 394 ASSERT(Isolate::Current() == NULL); |
279 delete task; | 395 delete task; |
| 396 pool_->ReapExitedIdleThreads(); |
280 monitor_.Enter(); | 397 monitor_.Enter(); |
281 | 398 |
282 ASSERT(task_ == NULL); | 399 ASSERT(task_ == NULL); |
283 if (IsDone()) { | 400 if (IsDone()) { |
284 return; | 401 return false; |
285 } | 402 } |
286 ASSERT(pool_ != NULL); | 403 ASSERT(!done_); |
287 pool_->SetIdle(this); | 404 pool_->SetIdle(this); |
288 idle_start = OS::GetCurrentTimeMillis(); | 405 idle_start = OS::GetCurrentTimeMillis(); |
289 while (true) { | 406 while (true) { |
290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); | 407 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
291 if (task_ != NULL) { | 408 if (task_ != NULL) { |
292 // We've found a task. Process it, regardless of whether the | 409 // We've found a task. Process it, regardless of whether the |
293 // worker is done_. | 410 // worker is done_. |
294 break; | 411 break; |
295 } | 412 } |
296 if (IsDone()) { | 413 if (IsDone()) { |
297 return; | 414 return false; |
298 } | 415 } |
299 if (result == Monitor::kTimedOut && | 416 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { |
300 pool_->ReleaseIdleWorker(this)) { | 417 return true; |
301 return; | |
302 } | 418 } |
303 } | 419 } |
304 } | 420 } |
305 UNREACHABLE(); | 421 UNREACHABLE(); |
| 422 return false; |
306 } | 423 } |
307 | 424 |
308 | 425 |
309 void ThreadPool::Worker::Shutdown() { | 426 void ThreadPool::Worker::Shutdown() { |
310 MonitorLocker ml(&monitor_); | 427 MonitorLocker ml(&monitor_); |
311 pool_ = NULL; // Fail fast if someone tries to access pool_. | 428 done_ = true; |
312 ml.Notify(); | 429 ml.Notify(); |
313 } | 430 } |
314 | 431 |
315 | 432 |
316 // static | 433 // static |
317 void ThreadPool::Worker::Main(uword args) { | 434 void ThreadPool::Worker::Main(uword args) { |
318 Thread::EnsureInit(); | 435 Thread::EnsureInit(); |
319 Worker* worker = reinterpret_cast<Worker*>(args); | 436 Worker* worker = reinterpret_cast<Worker*>(args); |
320 worker->Loop(); | 437 bool delete_self = false; |
| 438 |
| 439 { |
| 440 MonitorLocker ml(&(worker->monitor_)); |
| 441 if (worker->IsDone()) { |
| 442 // id_ hasn't been set yet, but the ThreadPool is being shutdown. |
| 443 // Delete the task, and return. |
| 444 ASSERT(worker->task_); |
| 445 delete worker->task_; |
| 446 worker->task_ = NULL; |
| 447 delete_self = true; |
| 448 } else { |
| 449 worker->id_ = OSThread::GetCurrentThreadJoinId(); |
| 450 worker->started_ = true; |
| 451 } |
| 452 } |
| 453 |
| 454 // We aren't able to delete the worker while holding the worker's monitor. |
| 455 // Now that we have released it, and we know that ThreadPool::Shutdown |
| 456 // won't touch it again, we can delete it and return. |
| 457 if (delete_self) { |
| 458 MonitorLocker eml(&worker->pool_->exit_monitor_); |
| 459 ThreadPool::JoinList::Add( |
| 460 OSThread::GetCurrentThreadJoinId(), &worker->pool_->join_list_); |
| 461 worker->pool_->RemoveWorkerFromShutdownList(worker); |
| 462 delete worker; |
| 463 eml.Notify(); |
| 464 return; |
| 465 } |
| 466 |
| 467 bool released = worker->Loop(); |
321 | 468 |
322 // It should be okay to access these unlocked here in this assert. | 469 // It should be okay to access these unlocked here in this assert. |
323 ASSERT(!worker->owned_ && | 470 // worker->all_next_ is retained by the pool for shutdown monitoring. |
324 worker->all_next_ == NULL && | 471 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL)); |
325 worker->idle_next_ == NULL); | |
326 | 472 |
327 // The exit monitor is only used during testing. | 473 if (!released) { |
328 if (ThreadPool::exit_monitor_) { | 474 // This worker is exiting because the thread pool is being shut down. |
329 MonitorLocker ml(ThreadPool::exit_monitor_); | 475 // Inform the thread pool that we are exiting. We remove this worker from |
330 (*ThreadPool::exit_count_)++; | 476 // shutting_down_workers_ list because there will be no need for the |
331 ml.Notify(); | 477 // ThreadPool to take action for this worker. |
| 478 MonitorLocker eml(&worker->pool_->exit_monitor_); |
| 479 JoinList::Add(worker->id_, &worker->pool_->join_list_); |
| 480 worker->id_ = OSThread::kInvalidThreadJoinId; |
| 481 worker->pool_->RemoveWorkerFromShutdownList(worker); |
| 482 delete worker; |
| 483 eml.Notify(); |
| 484 } else { |
| 485 // This worker is going down because it was idle for too long. This case |
| 486 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker. |
| 487 delete worker; |
332 } | 488 } |
333 delete worker; | |
334 #if defined(TARGET_OS_WINDOWS) | 489 #if defined(TARGET_OS_WINDOWS) |
335 Thread::CleanUp(); | 490 Thread::CleanUp(); |
336 #endif | 491 #endif |
337 } | 492 } |
338 | 493 |
339 } // namespace dart | 494 } // namespace dart |
OLD | NEW |