OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/thread_pool.h" | 5 #include "vm/thread_pool.h" |
6 | 6 |
7 #include "vm/dart.h" | 7 #include "vm/dart.h" |
8 #include "vm/flags.h" | 8 #include "vm/flags.h" |
9 #include "vm/lockers.h" | 9 #include "vm/lockers.h" |
10 | 10 |
11 namespace dart { | 11 namespace dart { |
12 | 12 |
13 DEFINE_FLAG(int, | 13 DEFINE_FLAG(int, |
14 worker_timeout_millis, | 14 worker_timeout_millis, |
15 5000, | 15 5000, |
16 "Free workers when they have been idle for this amount of time."); | 16 "Free workers when they have been idle for this amount of time."); |
17 | 17 |
18 ThreadPool::ThreadPool() | 18 ThreadPool::ThreadPool() |
19 : shutting_down_(false), | 19 : shutting_down_(false), |
20 all_workers_(NULL), | 20 all_workers_(NULL), |
21 idle_workers_(NULL), | 21 idle_workers_(NULL), |
22 count_started_(0), | 22 count_started_(0), |
23 count_stopped_(0), | 23 count_stopped_(0), |
24 count_running_(0), | 24 count_running_(0), |
25 count_idle_(0), | 25 count_idle_(0), |
26 shutting_down_workers_(NULL), | 26 shutting_down_workers_(NULL), |
27 join_list_(NULL) {} | 27 join_list_(NULL) {} |
28 | 28 |
29 | |
30 ThreadPool::~ThreadPool() { | 29 ThreadPool::~ThreadPool() { |
31 Shutdown(); | 30 Shutdown(); |
32 } | 31 } |
33 | 32 |
34 | |
35 bool ThreadPool::Run(Task* task) { | 33 bool ThreadPool::Run(Task* task) { |
36 Worker* worker = NULL; | 34 Worker* worker = NULL; |
37 bool new_worker = false; | 35 bool new_worker = false; |
38 { | 36 { |
39 // We need ThreadPool::mutex_ to access worker lists and other | 37 // We need ThreadPool::mutex_ to access worker lists and other |
40 // ThreadPool state. | 38 // ThreadPool state. |
41 MutexLocker ml(&mutex_); | 39 MutexLocker ml(&mutex_); |
42 if (shutting_down_) { | 40 if (shutting_down_) { |
43 return false; | 41 return false; |
44 } | 42 } |
(...skipping 21 matching lines...) Expand all Loading... |
66 // Release ThreadPool::mutex_ before calling Worker functions. | 64 // Release ThreadPool::mutex_ before calling Worker functions. |
67 ASSERT(worker != NULL); | 65 ASSERT(worker != NULL); |
68 worker->SetTask(task); | 66 worker->SetTask(task); |
69 if (new_worker) { | 67 if (new_worker) { |
70 // Call StartThread after we've assigned the first task. | 68 // Call StartThread after we've assigned the first task. |
71 worker->StartThread(); | 69 worker->StartThread(); |
72 } | 70 } |
73 return true; | 71 return true; |
74 } | 72 } |
75 | 73 |
76 | |
77 void ThreadPool::Shutdown() { | 74 void ThreadPool::Shutdown() { |
78 Worker* saved = NULL; | 75 Worker* saved = NULL; |
79 { | 76 { |
80 MutexLocker ml(&mutex_); | 77 MutexLocker ml(&mutex_); |
81 shutting_down_ = true; | 78 shutting_down_ = true; |
82 saved = all_workers_; | 79 saved = all_workers_; |
83 all_workers_ = NULL; | 80 all_workers_ = NULL; |
84 idle_workers_ = NULL; | 81 idle_workers_ = NULL; |
85 | 82 |
86 Worker* current = saved; | 83 Worker* current = saved; |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
137 JoinList::Join(&list); | 134 JoinList::Join(&list); |
138 | 135 |
139 #if defined(DEBUG) | 136 #if defined(DEBUG) |
140 { | 137 { |
141 MutexLocker ml(&mutex_); | 138 MutexLocker ml(&mutex_); |
142 ASSERT(join_list_ == NULL); | 139 ASSERT(join_list_ == NULL); |
143 } | 140 } |
144 #endif | 141 #endif |
145 } | 142 } |
146 | 143 |
147 | |
148 bool ThreadPool::IsIdle(Worker* worker) { | 144 bool ThreadPool::IsIdle(Worker* worker) { |
149 ASSERT(worker != NULL && worker->owned_); | 145 ASSERT(worker != NULL && worker->owned_); |
150 for (Worker* current = idle_workers_; current != NULL; | 146 for (Worker* current = idle_workers_; current != NULL; |
151 current = current->idle_next_) { | 147 current = current->idle_next_) { |
152 if (current == worker) { | 148 if (current == worker) { |
153 return true; | 149 return true; |
154 } | 150 } |
155 } | 151 } |
156 return false; | 152 return false; |
157 } | 153 } |
158 | 154 |
159 | |
160 bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) { | 155 bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) { |
161 ASSERT(worker != NULL && worker->owned_); | 156 ASSERT(worker != NULL && worker->owned_); |
162 if (idle_workers_ == NULL) { | 157 if (idle_workers_ == NULL) { |
163 return false; | 158 return false; |
164 } | 159 } |
165 | 160 |
166 // Special case head of list. | 161 // Special case head of list. |
167 if (idle_workers_ == worker) { | 162 if (idle_workers_ == worker) { |
168 idle_workers_ = worker->idle_next_; | 163 idle_workers_ = worker->idle_next_; |
169 worker->idle_next_ = NULL; | 164 worker->idle_next_ = NULL; |
170 return true; | 165 return true; |
171 } | 166 } |
172 | 167 |
173 for (Worker* current = idle_workers_; current->idle_next_ != NULL; | 168 for (Worker* current = idle_workers_; current->idle_next_ != NULL; |
174 current = current->idle_next_) { | 169 current = current->idle_next_) { |
175 if (current->idle_next_ == worker) { | 170 if (current->idle_next_ == worker) { |
176 current->idle_next_ = worker->idle_next_; | 171 current->idle_next_ = worker->idle_next_; |
177 worker->idle_next_ = NULL; | 172 worker->idle_next_ = NULL; |
178 return true; | 173 return true; |
179 } | 174 } |
180 } | 175 } |
181 return false; | 176 return false; |
182 } | 177 } |
183 | 178 |
184 | |
185 bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) { | 179 bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) { |
186 ASSERT(worker != NULL && worker->owned_); | 180 ASSERT(worker != NULL && worker->owned_); |
187 if (all_workers_ == NULL) { | 181 if (all_workers_ == NULL) { |
188 return false; | 182 return false; |
189 } | 183 } |
190 | 184 |
191 // Special case head of list. | 185 // Special case head of list. |
192 if (all_workers_ == worker) { | 186 if (all_workers_ == worker) { |
193 all_workers_ = worker->all_next_; | 187 all_workers_ = worker->all_next_; |
194 worker->all_next_ = NULL; | 188 worker->all_next_ = NULL; |
195 worker->owned_ = false; | 189 worker->owned_ = false; |
196 worker->done_ = true; | 190 worker->done_ = true; |
197 return true; | 191 return true; |
198 } | 192 } |
199 | 193 |
200 for (Worker* current = all_workers_; current->all_next_ != NULL; | 194 for (Worker* current = all_workers_; current->all_next_ != NULL; |
201 current = current->all_next_) { | 195 current = current->all_next_) { |
202 if (current->all_next_ == worker) { | 196 if (current->all_next_ == worker) { |
203 current->all_next_ = worker->all_next_; | 197 current->all_next_ = worker->all_next_; |
204 worker->all_next_ = NULL; | 198 worker->all_next_ = NULL; |
205 worker->owned_ = false; | 199 worker->owned_ = false; |
206 return true; | 200 return true; |
207 } | 201 } |
208 } | 202 } |
209 return false; | 203 return false; |
210 } | 204 } |
211 | 205 |
212 | |
213 void ThreadPool::SetIdleLocked(Worker* worker) { | 206 void ThreadPool::SetIdleLocked(Worker* worker) { |
214 ASSERT(mutex_.IsOwnedByCurrentThread()); | 207 ASSERT(mutex_.IsOwnedByCurrentThread()); |
215 ASSERT(worker->owned_ && !IsIdle(worker)); | 208 ASSERT(worker->owned_ && !IsIdle(worker)); |
216 worker->idle_next_ = idle_workers_; | 209 worker->idle_next_ = idle_workers_; |
217 idle_workers_ = worker; | 210 idle_workers_ = worker; |
218 count_idle_++; | 211 count_idle_++; |
219 count_running_--; | 212 count_running_--; |
220 } | 213 } |
221 | 214 |
222 | |
223 void ThreadPool::SetIdleAndReapExited(Worker* worker) { | 215 void ThreadPool::SetIdleAndReapExited(Worker* worker) { |
224 JoinList* list = NULL; | 216 JoinList* list = NULL; |
225 { | 217 { |
226 MutexLocker ml(&mutex_); | 218 MutexLocker ml(&mutex_); |
227 if (shutting_down_) { | 219 if (shutting_down_) { |
228 return; | 220 return; |
229 } | 221 } |
230 if (join_list_ == NULL) { | 222 if (join_list_ == NULL) { |
231 // Nothing to join, add to the idle list and return. | 223 // Nothing to join, add to the idle list and return. |
232 SetIdleLocked(worker); | 224 SetIdleLocked(worker); |
233 return; | 225 return; |
234 } | 226 } |
235 // There is something to join. Grab the join list, drop the lock, do the | 227 // There is something to join. Grab the join list, drop the lock, do the |
236 // join, then grab the lock again and add to the idle list. | 228 // join, then grab the lock again and add to the idle list. |
237 list = join_list_; | 229 list = join_list_; |
238 join_list_ = NULL; | 230 join_list_ = NULL; |
239 } | 231 } |
240 JoinList::Join(&list); | 232 JoinList::Join(&list); |
241 | 233 |
242 { | 234 { |
243 MutexLocker ml(&mutex_); | 235 MutexLocker ml(&mutex_); |
244 if (shutting_down_) { | 236 if (shutting_down_) { |
245 return; | 237 return; |
246 } | 238 } |
247 SetIdleLocked(worker); | 239 SetIdleLocked(worker); |
248 } | 240 } |
249 } | 241 } |
250 | 242 |
251 | |
252 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { | 243 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { |
253 MutexLocker ml(&mutex_); | 244 MutexLocker ml(&mutex_); |
254 if (shutting_down_) { | 245 if (shutting_down_) { |
255 return false; | 246 return false; |
256 } | 247 } |
257 // Remove from idle list. | 248 // Remove from idle list. |
258 if (!RemoveWorkerFromIdleList(worker)) { | 249 if (!RemoveWorkerFromIdleList(worker)) { |
259 return false; | 250 return false; |
260 } | 251 } |
261 // Remove from all list. | 252 // Remove from all list. |
262 bool found = RemoveWorkerFromAllList(worker); | 253 bool found = RemoveWorkerFromAllList(worker); |
263 ASSERT(found); | 254 ASSERT(found); |
264 | 255 |
265 // The thread for worker will exit. Add its ThreadId to the join_list_ | 256 // The thread for worker will exit. Add its ThreadId to the join_list_ |
266 // so that we can join on it at the next opportunity. | 257 // so that we can join on it at the next opportunity. |
267 OSThread* os_thread = OSThread::Current(); | 258 OSThread* os_thread = OSThread::Current(); |
268 ASSERT(os_thread != NULL); | 259 ASSERT(os_thread != NULL); |
269 ThreadJoinId join_id = OSThread::GetCurrentThreadJoinId(os_thread); | 260 ThreadJoinId join_id = OSThread::GetCurrentThreadJoinId(os_thread); |
270 JoinList::AddLocked(join_id, &join_list_); | 261 JoinList::AddLocked(join_id, &join_list_); |
271 count_stopped_++; | 262 count_stopped_++; |
272 count_idle_--; | 263 count_idle_--; |
273 return true; | 264 return true; |
274 } | 265 } |
275 | 266 |
276 | |
277 // Only call while holding the exit_monitor_ | 267 // Only call while holding the exit_monitor_ |
278 void ThreadPool::AddWorkerToShutdownList(Worker* worker) { | 268 void ThreadPool::AddWorkerToShutdownList(Worker* worker) { |
279 ASSERT(exit_monitor_.IsOwnedByCurrentThread()); | 269 ASSERT(exit_monitor_.IsOwnedByCurrentThread()); |
280 worker->shutdown_next_ = shutting_down_workers_; | 270 worker->shutdown_next_ = shutting_down_workers_; |
281 shutting_down_workers_ = worker; | 271 shutting_down_workers_ = worker; |
282 } | 272 } |
283 | 273 |
284 | |
285 // Only call while holding the exit_monitor_ | 274 // Only call while holding the exit_monitor_ |
286 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { | 275 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { |
287 ASSERT(worker != NULL); | 276 ASSERT(worker != NULL); |
288 ASSERT(shutting_down_workers_ != NULL); | 277 ASSERT(shutting_down_workers_ != NULL); |
289 ASSERT(exit_monitor_.IsOwnedByCurrentThread()); | 278 ASSERT(exit_monitor_.IsOwnedByCurrentThread()); |
290 | 279 |
291 // Special case head of list. | 280 // Special case head of list. |
292 if (shutting_down_workers_ == worker) { | 281 if (shutting_down_workers_ == worker) { |
293 shutting_down_workers_ = worker->shutdown_next_; | 282 shutting_down_workers_ = worker->shutdown_next_; |
294 worker->shutdown_next_ = NULL; | 283 worker->shutdown_next_ = NULL; |
295 return true; | 284 return true; |
296 } | 285 } |
297 | 286 |
298 for (Worker* current = shutting_down_workers_; | 287 for (Worker* current = shutting_down_workers_; |
299 current->shutdown_next_ != NULL; current = current->shutdown_next_) { | 288 current->shutdown_next_ != NULL; current = current->shutdown_next_) { |
300 if (current->shutdown_next_ == worker) { | 289 if (current->shutdown_next_ == worker) { |
301 current->shutdown_next_ = worker->shutdown_next_; | 290 current->shutdown_next_ = worker->shutdown_next_; |
302 worker->shutdown_next_ = NULL; | 291 worker->shutdown_next_ = NULL; |
303 return true; | 292 return true; |
304 } | 293 } |
305 } | 294 } |
306 return false; | 295 return false; |
307 } | 296 } |
308 | 297 |
309 | |
310 void ThreadPool::JoinList::AddLocked(ThreadJoinId id, JoinList** list) { | 298 void ThreadPool::JoinList::AddLocked(ThreadJoinId id, JoinList** list) { |
311 *list = new JoinList(id, *list); | 299 *list = new JoinList(id, *list); |
312 } | 300 } |
313 | 301 |
314 | |
315 void ThreadPool::JoinList::Join(JoinList** list) { | 302 void ThreadPool::JoinList::Join(JoinList** list) { |
316 while (*list != NULL) { | 303 while (*list != NULL) { |
317 JoinList* current = *list; | 304 JoinList* current = *list; |
318 *list = current->next(); | 305 *list = current->next(); |
319 OSThread::Join(current->id()); | 306 OSThread::Join(current->id()); |
320 delete current; | 307 delete current; |
321 } | 308 } |
322 } | 309 } |
323 | 310 |
324 | |
325 ThreadPool::Task::Task() {} | 311 ThreadPool::Task::Task() {} |
326 | 312 |
327 | |
328 ThreadPool::Task::~Task() {} | 313 ThreadPool::Task::~Task() {} |
329 | 314 |
330 | |
331 ThreadPool::Worker::Worker(ThreadPool* pool) | 315 ThreadPool::Worker::Worker(ThreadPool* pool) |
332 : pool_(pool), | 316 : pool_(pool), |
333 task_(NULL), | 317 task_(NULL), |
334 id_(OSThread::kInvalidThreadId), | 318 id_(OSThread::kInvalidThreadId), |
335 done_(false), | 319 done_(false), |
336 owned_(false), | 320 owned_(false), |
337 all_next_(NULL), | 321 all_next_(NULL), |
338 idle_next_(NULL), | 322 idle_next_(NULL), |
339 shutdown_next_(NULL) {} | 323 shutdown_next_(NULL) {} |
340 | 324 |
341 | |
342 ThreadId ThreadPool::Worker::id() { | 325 ThreadId ThreadPool::Worker::id() { |
343 MonitorLocker ml(&monitor_); | 326 MonitorLocker ml(&monitor_); |
344 return id_; | 327 return id_; |
345 } | 328 } |
346 | 329 |
347 | |
348 void ThreadPool::Worker::StartThread() { | 330 void ThreadPool::Worker::StartThread() { |
349 #if defined(DEBUG) | 331 #if defined(DEBUG) |
350 // Must call SetTask before StartThread. | 332 // Must call SetTask before StartThread. |
351 { // NOLINT | 333 { // NOLINT |
352 MonitorLocker ml(&monitor_); | 334 MonitorLocker ml(&monitor_); |
353 ASSERT(task_ != NULL); | 335 ASSERT(task_ != NULL); |
354 } | 336 } |
355 #endif | 337 #endif |
356 int result = OSThread::Start("Dart ThreadPool Worker", &Worker::Main, | 338 int result = OSThread::Start("Dart ThreadPool Worker", &Worker::Main, |
357 reinterpret_cast<uword>(this)); | 339 reinterpret_cast<uword>(this)); |
358 if (result != 0) { | 340 if (result != 0) { |
359 FATAL1("Could not start worker thread: result = %d.", result); | 341 FATAL1("Could not start worker thread: result = %d.", result); |
360 } | 342 } |
361 } | 343 } |
362 | 344 |
363 | |
364 void ThreadPool::Worker::SetTask(Task* task) { | 345 void ThreadPool::Worker::SetTask(Task* task) { |
365 MonitorLocker ml(&monitor_); | 346 MonitorLocker ml(&monitor_); |
366 ASSERT(task_ == NULL); | 347 ASSERT(task_ == NULL); |
367 task_ = task; | 348 task_ = task; |
368 ml.Notify(); | 349 ml.Notify(); |
369 } | 350 } |
370 | 351 |
371 | |
372 static int64_t ComputeTimeout(int64_t idle_start) { | 352 static int64_t ComputeTimeout(int64_t idle_start) { |
373 int64_t worker_timeout_micros = | 353 int64_t worker_timeout_micros = |
374 FLAG_worker_timeout_millis * kMicrosecondsPerMillisecond; | 354 FLAG_worker_timeout_millis * kMicrosecondsPerMillisecond; |
375 if (worker_timeout_micros <= 0) { | 355 if (worker_timeout_micros <= 0) { |
376 // No timeout. | 356 // No timeout. |
377 return 0; | 357 return 0; |
378 } else { | 358 } else { |
379 int64_t waited = OS::GetCurrentMonotonicMicros() - idle_start; | 359 int64_t waited = OS::GetCurrentMonotonicMicros() - idle_start; |
380 if (waited >= worker_timeout_micros) { | 360 if (waited >= worker_timeout_micros) { |
381 // We must have gotten a spurious wakeup just before we timed | 361 // We must have gotten a spurious wakeup just before we timed |
382 // out. Give the worker one last desperate chance to live. We | 362 // out. Give the worker one last desperate chance to live. We |
383 // are merciful. | 363 // are merciful. |
384 return 1; | 364 return 1; |
385 } else { | 365 } else { |
386 return worker_timeout_micros - waited; | 366 return worker_timeout_micros - waited; |
387 } | 367 } |
388 } | 368 } |
389 } | 369 } |
390 | 370 |
391 | |
392 bool ThreadPool::Worker::Loop() { | 371 bool ThreadPool::Worker::Loop() { |
393 MonitorLocker ml(&monitor_); | 372 MonitorLocker ml(&monitor_); |
394 int64_t idle_start; | 373 int64_t idle_start; |
395 while (true) { | 374 while (true) { |
396 ASSERT(task_ != NULL); | 375 ASSERT(task_ != NULL); |
397 Task* task = task_; | 376 Task* task = task_; |
398 task_ = NULL; | 377 task_ = NULL; |
399 | 378 |
400 // Release monitor while handling the task. | 379 // Release monitor while handling the task. |
401 ml.Exit(); | 380 ml.Exit(); |
(...skipping 21 matching lines...) Expand all Loading... |
423 } | 402 } |
424 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { | 403 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { |
425 return true; | 404 return true; |
426 } | 405 } |
427 } | 406 } |
428 } | 407 } |
429 UNREACHABLE(); | 408 UNREACHABLE(); |
430 return false; | 409 return false; |
431 } | 410 } |
432 | 411 |
433 | |
434 void ThreadPool::Worker::Shutdown() { | 412 void ThreadPool::Worker::Shutdown() { |
435 MonitorLocker ml(&monitor_); | 413 MonitorLocker ml(&monitor_); |
436 done_ = true; | 414 done_ = true; |
437 ml.Notify(); | 415 ml.Notify(); |
438 } | 416 } |
439 | 417 |
440 | |
441 // static | 418 // static |
442 void ThreadPool::Worker::Main(uword args) { | 419 void ThreadPool::Worker::Main(uword args) { |
443 Worker* worker = reinterpret_cast<Worker*>(args); | 420 Worker* worker = reinterpret_cast<Worker*>(args); |
444 OSThread* os_thread = OSThread::Current(); | 421 OSThread* os_thread = OSThread::Current(); |
445 ASSERT(os_thread != NULL); | 422 ASSERT(os_thread != NULL); |
446 ThreadId id = os_thread->id(); | 423 ThreadId id = os_thread->id(); |
447 ThreadPool* pool; | 424 ThreadPool* pool; |
448 | 425 |
449 // Set the thread's stack_base based on the current stack pointer. | 426 // Set the thread's stack_base based on the current stack pointer. |
450 uword current_sp = Thread::GetCurrentStackPointer(); | 427 uword current_sp = Thread::GetCurrentStackPointer(); |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
503 } | 480 } |
504 | 481 |
505 // Call the thread exit hook here to notify the embedder that the | 482 // Call the thread exit hook here to notify the embedder that the |
506 // thread pool thread is exiting. | 483 // thread pool thread is exiting. |
507 if (Dart::thread_exit_callback() != NULL) { | 484 if (Dart::thread_exit_callback() != NULL) { |
508 (*Dart::thread_exit_callback())(); | 485 (*Dart::thread_exit_callback())(); |
509 } | 486 } |
510 } | 487 } |
511 | 488 |
512 } // namespace dart | 489 } // namespace dart |
OLD | NEW |