OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/thread_pool.h" | 5 #include "vm/thread_pool.h" |
6 | 6 |
7 #include "vm/flags.h" | 7 #include "vm/flags.h" |
8 #include "vm/lockers.h" | 8 #include "vm/lockers.h" |
9 | 9 |
10 namespace dart { | 10 namespace dart { |
11 | 11 |
12 DEFINE_FLAG(int, worker_timeout_millis, 5000, | 12 DEFINE_FLAG(int, worker_timeout_millis, 5000, |
13 "Free workers when they have been idle for this amount of time."); | 13 "Free workers when they have been idle for this amount of time."); |
14 | 14 |
| 15 Monitor* ThreadPool::exit_monitor_ = NULL; |
| 16 int* ThreadPool::exit_count_ = NULL; |
| 17 |
15 ThreadPool::ThreadPool() | 18 ThreadPool::ThreadPool() |
16 : shutting_down_(false), | 19 : shutting_down_(false), |
17 all_workers_(NULL), | 20 all_workers_(NULL), |
18 idle_workers_(NULL), | 21 idle_workers_(NULL), |
19 count_started_(0), | 22 count_started_(0), |
20 count_stopped_(0), | 23 count_stopped_(0), |
21 count_running_(0), | 24 count_running_(0), |
22 count_idle_(0), | 25 count_idle_(0) { |
23 shutting_down_workers_(NULL) { | |
24 } | 26 } |
25 | 27 |
26 | 28 |
27 ThreadPool::~ThreadPool() { | 29 ThreadPool::~ThreadPool() { |
28 Shutdown(); | 30 Shutdown(); |
29 } | 31 } |
30 | 32 |
31 | 33 |
32 void ThreadPool::Run(Task* task) { | 34 void ThreadPool::Run(Task* task) { |
33 Worker* worker = NULL; | 35 Worker* worker = NULL; |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
85 current = next; | 87 current = next; |
86 count_stopped_++; | 88 count_stopped_++; |
87 } | 89 } |
88 | 90 |
89 count_idle_ = 0; | 91 count_idle_ = 0; |
90 count_running_ = 0; | 92 count_running_ = 0; |
91 ASSERT(count_started_ == count_stopped_); | 93 ASSERT(count_started_ == count_stopped_); |
92 } | 94 } |
93 // Release ThreadPool::mutex_ before calling Worker functions. | 95 // Release ThreadPool::mutex_ before calling Worker functions. |
94 | 96 |
95 { | 97 Worker* current = saved; |
96 MonitorLocker eml(&exit_monitor_); | 98 while (current != NULL) { |
97 | 99 // We may access all_next_ without holding ThreadPool::mutex_ here |
98 // First tell all the workers to shut down. | 100 // because the worker is no longer owned by the ThreadPool. |
99 Worker* current = saved; | 101 Worker* next = current->all_next_; |
100 while (current != NULL) { | 102 current->all_next_ = NULL; |
101 Worker* next = current->all_next_; | 103 current->Shutdown(); |
102 if (current->id_ != OSThread::GetCurrentThreadId()) { | 104 current = next; |
103 AddWorkerToShutdownList(current); | |
104 } | |
105 current->Shutdown(); | |
106 current = next; | |
107 } | |
108 saved = NULL; | |
109 | |
110 // Wait until all workers have exited. | |
111 while (shutting_down_workers_ != NULL) { | |
112 // Here, we are waiting for workers to exit. When a worker exits we will | |
113 // be notified. | |
114 eml.Wait(); | |
115 } | |
116 } | 105 } |
117 } | 106 } |
118 | 107 |
119 | 108 |
120 bool ThreadPool::IsIdle(Worker* worker) { | 109 bool ThreadPool::IsIdle(Worker* worker) { |
121 ASSERT(worker != NULL && worker->owned_); | 110 ASSERT(worker != NULL && worker->owned_); |
122 for (Worker* current = idle_workers_; | 111 for (Worker* current = idle_workers_; |
123 current != NULL; | 112 current != NULL; |
124 current = current->idle_next_) { | 113 current = current->idle_next_) { |
125 if (current == worker) { | 114 if (current == worker) { |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
161 if (all_workers_ == NULL) { | 150 if (all_workers_ == NULL) { |
162 return false; | 151 return false; |
163 } | 152 } |
164 | 153 |
165 // Special case head of list. | 154 // Special case head of list. |
166 if (all_workers_ == worker) { | 155 if (all_workers_ == worker) { |
167 all_workers_ = worker->all_next_; | 156 all_workers_ = worker->all_next_; |
168 worker->all_next_ = NULL; | 157 worker->all_next_ = NULL; |
169 worker->owned_ = false; | 158 worker->owned_ = false; |
170 worker->pool_ = NULL; | 159 worker->pool_ = NULL; |
171 worker->done_ = true; | |
172 return true; | 160 return true; |
173 } | 161 } |
174 | 162 |
175 for (Worker* current = all_workers_; | 163 for (Worker* current = all_workers_; |
176 current->all_next_ != NULL; | 164 current->all_next_ != NULL; |
177 current = current->all_next_) { | 165 current = current->all_next_) { |
178 if (current->all_next_ == worker) { | 166 if (current->all_next_ == worker) { |
179 current->all_next_ = worker->all_next_; | 167 current->all_next_ = worker->all_next_; |
180 worker->all_next_ = NULL; | 168 worker->all_next_ = NULL; |
181 worker->owned_ = false; | 169 worker->owned_ = false; |
(...skipping 29 matching lines...) Expand all Loading... |
211 // Remove from all list. | 199 // Remove from all list. |
212 bool found = RemoveWorkerFromAllList(worker); | 200 bool found = RemoveWorkerFromAllList(worker); |
213 ASSERT(found); | 201 ASSERT(found); |
214 | 202 |
215 count_stopped_++; | 203 count_stopped_++; |
216 count_idle_--; | 204 count_idle_--; |
217 return true; | 205 return true; |
218 } | 206 } |
219 | 207 |
220 | 208 |
221 // Only call while holding the exit_monitor_ | |
222 void ThreadPool::AddWorkerToShutdownList(Worker* worker) { | |
223 worker->shutdown_next_ = shutting_down_workers_; | |
224 shutting_down_workers_ = worker; | |
225 } | |
226 | |
227 | |
228 // Only call while holding the exit_monitor_ | |
229 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { | |
230 ASSERT(worker != NULL); | |
231 ASSERT(shutting_down_workers_ != NULL); | |
232 | |
233 // Special case head of list. | |
234 if (shutting_down_workers_ == worker) { | |
235 shutting_down_workers_ = worker->shutdown_next_; | |
236 worker->shutdown_next_ = NULL; | |
237 return true; | |
238 } | |
239 | |
240 for (Worker* current = shutting_down_workers_; | |
241 current->shutdown_next_ != NULL; | |
242 current = current->shutdown_next_) { | |
243 if (current->shutdown_next_ == worker) { | |
244 current->shutdown_next_ = worker->shutdown_next_; | |
245 worker->shutdown_next_ = NULL; | |
246 return true; | |
247 } | |
248 } | |
249 return false; | |
250 } | |
251 | |
252 | |
253 ThreadPool::Task::Task() { | 209 ThreadPool::Task::Task() { |
254 } | 210 } |
255 | 211 |
256 | 212 |
257 ThreadPool::Task::~Task() { | 213 ThreadPool::Task::~Task() { |
258 } | 214 } |
259 | 215 |
260 | 216 |
261 ThreadPool::Worker::Worker(ThreadPool* pool) | 217 ThreadPool::Worker::Worker(ThreadPool* pool) |
262 : pool_(pool), | 218 : pool_(pool), |
263 done_(false), | |
264 task_(NULL), | 219 task_(NULL), |
265 id_(OSThread::kInvalidThreadId), | |
266 started_(false), | |
267 owned_(false), | 220 owned_(false), |
268 all_next_(NULL), | 221 all_next_(NULL), |
269 idle_next_(NULL), | 222 idle_next_(NULL) { |
270 shutdown_next_(NULL) { | |
271 } | 223 } |
272 | 224 |
273 | 225 |
274 void ThreadPool::Worker::StartThread() { | 226 void ThreadPool::Worker::StartThread() { |
275 #if defined(DEBUG) | 227 #if defined(DEBUG) |
276 // Must call SetTask before StartThread. | 228 // Must call SetTask before StartThread. |
277 { // NOLINT | 229 { // NOLINT |
278 MonitorLocker ml(&monitor_); | 230 MonitorLocker ml(&monitor_); |
279 ASSERT(task_ != NULL); | 231 ASSERT(task_ != NULL); |
280 } | 232 } |
(...skipping 24 matching lines...) Expand all Loading... |
305 // out. Give the worker one last desperate chance to live. We | 257 // out. Give the worker one last desperate chance to live. We |
306 // are merciful. | 258 // are merciful. |
307 return 1; | 259 return 1; |
308 } else { | 260 } else { |
309 return FLAG_worker_timeout_millis - waited; | 261 return FLAG_worker_timeout_millis - waited; |
310 } | 262 } |
311 } | 263 } |
312 } | 264 } |
313 | 265 |
314 | 266 |
315 bool ThreadPool::Worker::Loop() { | 267 void ThreadPool::Worker::Loop() { |
316 MonitorLocker ml(&monitor_); | 268 MonitorLocker ml(&monitor_); |
317 int64_t idle_start; | 269 int64_t idle_start; |
318 while (true) { | 270 while (true) { |
319 ASSERT(task_ != NULL); | 271 ASSERT(task_ != NULL); |
320 Task* task = task_; | 272 Task* task = task_; |
321 task_ = NULL; | 273 task_ = NULL; |
322 | 274 |
323 // Release monitor while handling the task. | 275 // Release monitor while handling the task. |
324 monitor_.Exit(); | 276 monitor_.Exit(); |
325 task->Run(); | 277 task->Run(); |
326 ASSERT(Isolate::Current() == NULL); | 278 ASSERT(Isolate::Current() == NULL); |
327 delete task; | 279 delete task; |
328 monitor_.Enter(); | 280 monitor_.Enter(); |
329 | 281 |
330 ASSERT(task_ == NULL); | 282 ASSERT(task_ == NULL); |
331 if (IsDone()) { | 283 if (IsDone()) { |
332 return false; | 284 return; |
333 } | 285 } |
334 ASSERT(!done_); | 286 ASSERT(pool_ != NULL); |
335 pool_->SetIdle(this); | 287 pool_->SetIdle(this); |
336 idle_start = OS::GetCurrentTimeMillis(); | 288 idle_start = OS::GetCurrentTimeMillis(); |
337 while (true) { | 289 while (true) { |
338 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); | 290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
339 if (task_ != NULL) { | 291 if (task_ != NULL) { |
340 // We've found a task. Process it, regardless of whether the | 292 // We've found a task. Process it, regardless of whether the |
341 // worker is done_. | 293 // worker is done_. |
342 break; | 294 break; |
343 } | 295 } |
344 if (IsDone()) { | 296 if (IsDone()) { |
345 return false; | 297 return; |
346 } | 298 } |
347 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { | 299 if (result == Monitor::kTimedOut && |
348 return true; | 300 pool_->ReleaseIdleWorker(this)) { |
| 301 return; |
349 } | 302 } |
350 } | 303 } |
351 } | 304 } |
352 UNREACHABLE(); | 305 UNREACHABLE(); |
353 return false; | |
354 } | 306 } |
355 | 307 |
356 | 308 |
357 void ThreadPool::Worker::Shutdown() { | 309 void ThreadPool::Worker::Shutdown() { |
358 MonitorLocker ml(&monitor_); | 310 MonitorLocker ml(&monitor_); |
359 done_ = true; | 311 pool_ = NULL; // Fail fast if someone tries to access pool_. |
360 ml.Notify(); | 312 ml.Notify(); |
361 } | 313 } |
362 | 314 |
363 | 315 |
364 // static | 316 // static |
365 void ThreadPool::Worker::Main(uword args) { | 317 void ThreadPool::Worker::Main(uword args) { |
366 Thread::EnsureInit(); | 318 Thread::EnsureInit(); |
367 Worker* worker = reinterpret_cast<Worker*>(args); | 319 Worker* worker = reinterpret_cast<Worker*>(args); |
368 bool delete_self = false; | 320 worker->Loop(); |
369 | |
370 { | |
371 MonitorLocker ml(&(worker->monitor_)); | |
372 if (worker->IsDone()) { | |
373 // id_ hasn't been set yet, but the ThreadPool is being shutdown. | |
374 // Delete the task, and return. | |
375 ASSERT(worker->task_); | |
376 delete worker->task_; | |
377 worker->task_ = NULL; | |
378 delete_self = true; | |
379 } else { | |
380 worker->id_ = OSThread::GetCurrentThreadId(); | |
381 worker->started_ = true; | |
382 } | |
383 } | |
384 | |
385 // We aren't able to delete the worker while holding the worker's monitor. | |
386 // Now that we have released it, and we know that ThreadPool::Shutdown | |
387 // won't touch it again, we can delete it and return. | |
388 if (delete_self) { | |
389 MonitorLocker eml(&worker->pool_->exit_monitor_); | |
390 worker->pool_->RemoveWorkerFromShutdownList(worker); | |
391 delete worker; | |
392 eml.Notify(); | |
393 return; | |
394 } | |
395 | |
396 bool released = worker->Loop(); | |
397 | 321 |
398 // It should be okay to access these unlocked here in this assert. | 322 // It should be okay to access these unlocked here in this assert. |
399 // worker->all_next_ is retained by the pool for shutdown monitoring. | 323 ASSERT(!worker->owned_ && |
400 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL)); | 324 worker->all_next_ == NULL && |
| 325 worker->idle_next_ == NULL); |
401 | 326 |
402 if (!released) { | 327 // The exit monitor is only used during testing. |
403 // This worker is exiting because the thread pool is being shut down. | 328 if (ThreadPool::exit_monitor_) { |
404 // Inform the thread pool that we are exiting. We remove this worker from | 329 MonitorLocker ml(ThreadPool::exit_monitor_); |
405 // shutting_down_workers_ list because there will be no need for the | 330 (*ThreadPool::exit_count_)++; |
406 // ThreadPool to take action for this worker. | 331 ml.Notify(); |
407 MonitorLocker eml(&worker->pool_->exit_monitor_); | |
408 worker->id_ = OSThread::kInvalidThreadId; | |
409 worker->pool_->RemoveWorkerFromShutdownList(worker); | |
410 delete worker; | |
411 eml.Notify(); | |
412 } else { | |
413 // This worker is going down because it was idle for too long. This case | |
414 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker. | |
415 delete worker; | |
416 } | 332 } |
| 333 delete worker; |
417 #if defined(TARGET_OS_WINDOWS) | 334 #if defined(TARGET_OS_WINDOWS) |
418 Thread::CleanUp(); | 335 Thread::CleanUp(); |
419 #endif | 336 #endif |
420 } | 337 } |
421 | 338 |
422 } // namespace dart | 339 } // namespace dart |
OLD | NEW |