OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/thread_pool.h" | 5 #include "vm/thread_pool.h" |
6 | 6 |
7 #include "vm/flags.h" | 7 #include "vm/flags.h" |
8 #include "vm/lockers.h" | 8 #include "vm/lockers.h" |
9 | 9 |
10 namespace dart { | 10 namespace dart { |
11 | 11 |
12 DEFINE_FLAG(int, worker_timeout_millis, 5000, | 12 DEFINE_FLAG(int, worker_timeout_millis, 5000, |
13 "Free workers when they have been idle for this amount of time."); | 13 "Free workers when they have been idle for this amount of time."); |
14 | 14 |
15 Monitor* ThreadPool::exit_monitor_ = NULL; | |
16 int* ThreadPool::exit_count_ = NULL; | |
17 | |
18 ThreadPool::ThreadPool() | 15 ThreadPool::ThreadPool() |
19 : shutting_down_(false), | 16 : shutting_down_(false), |
20 all_workers_(NULL), | 17 all_workers_(NULL), |
21 idle_workers_(NULL), | 18 idle_workers_(NULL), |
22 count_started_(0), | 19 count_started_(0), |
23 count_stopped_(0), | 20 count_stopped_(0), |
24 count_running_(0), | 21 count_running_(0), |
25 count_idle_(0) { | 22 count_idle_(0), |
23 shutting_down_workers_(NULL) { | |
26 } | 24 } |
27 | 25 |
28 | 26 |
29 ThreadPool::~ThreadPool() { | 27 ThreadPool::~ThreadPool() { |
30 Shutdown(); | 28 Shutdown(); |
31 } | 29 } |
32 | 30 |
33 | 31 |
34 void ThreadPool::Run(Task* task) { | 32 void ThreadPool::Run(Task* task) { |
35 Worker* worker = NULL; | 33 Worker* worker = NULL; |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
87 current = next; | 85 current = next; |
88 count_stopped_++; | 86 count_stopped_++; |
89 } | 87 } |
90 | 88 |
91 count_idle_ = 0; | 89 count_idle_ = 0; |
92 count_running_ = 0; | 90 count_running_ = 0; |
93 ASSERT(count_started_ == count_stopped_); | 91 ASSERT(count_started_ == count_stopped_); |
94 } | 92 } |
95 // Release ThreadPool::mutex_ before calling Worker functions. | 93 // Release ThreadPool::mutex_ before calling Worker functions. |
96 | 94 |
97 Worker* current = saved; | 95 { |
98 while (current != NULL) { | 96 MonitorLocker eml(&exit_monitor_); |
99 // We may access all_next_ without holding ThreadPool::mutex_ here | 97 |
100 // because the worker is no longer owned by the ThreadPool. | 98 // First tell all the workers to shut down. |
101 Worker* next = current->all_next_; | 99 Worker* current = saved; |
102 current->all_next_ = NULL; | 100 while (current != NULL) { |
103 current->Shutdown(); | 101 Worker* next = current->all_next_; |
104 current = next; | 102 if (current->id_ != OSThread::GetCurrentThreadId()) { |
103 AddWorkerToShutdownList(current); | |
104 } | |
105 current->Shutdown(); | |
106 current = next; | |
107 } | |
108 saved = NULL; | |
109 | |
110 // Wait until all workers have exited. | |
111 while (shutting_down_workers_ != NULL) { | |
112 // Here, we are waiting for workers to exit. When a worker exits we will | |
113 // be notified. | |
114 eml.Wait(); | |
115 } | |
105 } | 116 } |
106 } | 117 } |
107 | 118 |
108 | 119 |
109 bool ThreadPool::IsIdle(Worker* worker) { | 120 bool ThreadPool::IsIdle(Worker* worker) { |
110 ASSERT(worker != NULL && worker->owned_); | 121 ASSERT(worker != NULL && worker->owned_); |
111 for (Worker* current = idle_workers_; | 122 for (Worker* current = idle_workers_; |
112 current != NULL; | 123 current != NULL; |
113 current = current->idle_next_) { | 124 current = current->idle_next_) { |
114 if (current == worker) { | 125 if (current == worker) { |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
150 if (all_workers_ == NULL) { | 161 if (all_workers_ == NULL) { |
151 return false; | 162 return false; |
152 } | 163 } |
153 | 164 |
154 // Special case head of list. | 165 // Special case head of list. |
155 if (all_workers_ == worker) { | 166 if (all_workers_ == worker) { |
156 all_workers_ = worker->all_next_; | 167 all_workers_ = worker->all_next_; |
157 worker->all_next_ = NULL; | 168 worker->all_next_ = NULL; |
158 worker->owned_ = false; | 169 worker->owned_ = false; |
159 worker->pool_ = NULL; | 170 worker->pool_ = NULL; |
171 worker->done_ = true; | |
160 return true; | 172 return true; |
161 } | 173 } |
162 | 174 |
163 for (Worker* current = all_workers_; | 175 for (Worker* current = all_workers_; |
164 current->all_next_ != NULL; | 176 current->all_next_ != NULL; |
165 current = current->all_next_) { | 177 current = current->all_next_) { |
166 if (current->all_next_ == worker) { | 178 if (current->all_next_ == worker) { |
167 current->all_next_ = worker->all_next_; | 179 current->all_next_ = worker->all_next_; |
168 worker->all_next_ = NULL; | 180 worker->all_next_ = NULL; |
169 worker->owned_ = false; | 181 worker->owned_ = false; |
(...skipping 29 matching lines...) Expand all Loading... | |
199 // Remove from all list. | 211 // Remove from all list. |
200 bool found = RemoveWorkerFromAllList(worker); | 212 bool found = RemoveWorkerFromAllList(worker); |
201 ASSERT(found); | 213 ASSERT(found); |
202 | 214 |
203 count_stopped_++; | 215 count_stopped_++; |
204 count_idle_--; | 216 count_idle_--; |
205 return true; | 217 return true; |
206 } | 218 } |
207 | 219 |
208 | 220 |
221 // Only call while holding the exit_monitor_ | |
222 void ThreadPool::AddWorkerToShutdownList(Worker* worker) { | |
223 worker->shutdown_next_ = shutting_down_workers_; | |
224 shutting_down_workers_ = worker; | |
225 } | |
226 | |
227 | |
228 // Only call while holding the exit_monitor_ | |
229 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { | |
230 ASSERT(worker != NULL); | |
231 if (shutting_down_workers_ == NULL) { | |
turnidge
2015/08/04 21:39:02
I believe this can be an ASSERT.
zra
2015/08/05 06:23:06
Done.
| |
232 return false; | |
233 } | |
234 | |
235 // Special case head of list. | |
236 if (shutting_down_workers_ == worker) { | |
237 shutting_down_workers_ = worker->shutdown_next_; | |
238 worker->shutdown_next_ = NULL; | |
239 return true; | |
240 } | |
241 | |
242 for (Worker* current = shutting_down_workers_; | |
243 current->shutdown_next_ != NULL; | |
244 current = current->shutdown_next_) { | |
245 if (current->shutdown_next_ == worker) { | |
246 current->shutdown_next_ = worker->shutdown_next_; | |
247 worker->shutdown_next_ = NULL; | |
248 return true; | |
249 } | |
250 } | |
251 return false; | |
252 } | |
253 | |
254 | |
209 ThreadPool::Task::Task() { | 255 ThreadPool::Task::Task() { |
210 } | 256 } |
211 | 257 |
212 | 258 |
213 ThreadPool::Task::~Task() { | 259 ThreadPool::Task::~Task() { |
214 } | 260 } |
215 | 261 |
216 | 262 |
217 ThreadPool::Worker::Worker(ThreadPool* pool) | 263 ThreadPool::Worker::Worker(ThreadPool* pool) |
218 : pool_(pool), | 264 : pool_(pool), |
265 done_(false), | |
219 task_(NULL), | 266 task_(NULL), |
267 id_(OSThread::kInvalidThreadId), | |
268 started_(false), | |
220 owned_(false), | 269 owned_(false), |
221 all_next_(NULL), | 270 all_next_(NULL), |
222 idle_next_(NULL) { | 271 idle_next_(NULL), |
272 shutdown_next_(NULL) { | |
223 } | 273 } |
224 | 274 |
225 | 275 |
226 void ThreadPool::Worker::StartThread() { | 276 void ThreadPool::Worker::StartThread() { |
227 #if defined(DEBUG) | 277 #if defined(DEBUG) |
228 // Must call SetTask before StartThread. | 278 // Must call SetTask before StartThread. |
229 { // NOLINT | 279 { // NOLINT |
230 MonitorLocker ml(&monitor_); | 280 MonitorLocker ml(&monitor_); |
231 ASSERT(task_ != NULL); | 281 ASSERT(task_ != NULL); |
232 } | 282 } |
(...skipping 24 matching lines...) Expand all Loading... | |
257 // out. Give the worker one last desperate chance to live. We | 307 // out. Give the worker one last desperate chance to live. We |
258 // are merciful. | 308 // are merciful. |
259 return 1; | 309 return 1; |
260 } else { | 310 } else { |
261 return FLAG_worker_timeout_millis - waited; | 311 return FLAG_worker_timeout_millis - waited; |
262 } | 312 } |
263 } | 313 } |
264 } | 314 } |
265 | 315 |
266 | 316 |
267 void ThreadPool::Worker::Loop() { | 317 bool ThreadPool::Worker::Loop() { |
268 MonitorLocker ml(&monitor_); | 318 MonitorLocker ml(&monitor_); |
269 int64_t idle_start; | 319 int64_t idle_start; |
270 while (true) { | 320 while (true) { |
271 ASSERT(task_ != NULL); | 321 ASSERT(task_ != NULL); |
272 Task* task = task_; | 322 Task* task = task_; |
273 task_ = NULL; | 323 task_ = NULL; |
274 | 324 |
275 // Release monitor while handling the task. | 325 // Release monitor while handling the task. |
276 monitor_.Exit(); | 326 monitor_.Exit(); |
277 task->Run(); | 327 task->Run(); |
278 ASSERT(Isolate::Current() == NULL); | 328 ASSERT(Isolate::Current() == NULL); |
279 delete task; | 329 delete task; |
280 monitor_.Enter(); | 330 monitor_.Enter(); |
281 | 331 |
282 ASSERT(task_ == NULL); | 332 ASSERT(task_ == NULL); |
283 if (IsDone()) { | 333 if (IsDone()) { |
284 return; | 334 return false; |
285 } | 335 } |
286 ASSERT(pool_ != NULL); | 336 ASSERT(!done_); |
287 pool_->SetIdle(this); | 337 pool_->SetIdle(this); |
288 idle_start = OS::GetCurrentTimeMillis(); | 338 idle_start = OS::GetCurrentTimeMillis(); |
289 while (true) { | 339 while (true) { |
290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); | 340 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
291 if (task_ != NULL) { | 341 if (task_ != NULL) { |
292 // We've found a task. Process it, regardless of whether the | 342 // We've found a task. Process it, regardless of whether the |
293 // worker is done_. | 343 // worker is done_. |
294 break; | 344 break; |
295 } | 345 } |
296 if (IsDone()) { | 346 if (IsDone()) { |
297 return; | 347 return false; |
298 } | 348 } |
299 if (result == Monitor::kTimedOut && | 349 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { |
300 pool_->ReleaseIdleWorker(this)) { | 350 return true; |
301 return; | |
302 } | 351 } |
303 } | 352 } |
304 } | 353 } |
305 UNREACHABLE(); | 354 UNREACHABLE(); |
355 return false; | |
306 } | 356 } |
307 | 357 |
308 | 358 |
309 void ThreadPool::Worker::Shutdown() { | 359 void ThreadPool::Worker::Shutdown() { |
310 MonitorLocker ml(&monitor_); | 360 MonitorLocker ml(&monitor_); |
311 pool_ = NULL; // Fail fast if someone tries to access pool_. | 361 done_ = true; |
312 ml.Notify(); | 362 ml.Notify(); |
313 } | 363 } |
314 | 364 |
315 | 365 |
316 // static | 366 // static |
317 void ThreadPool::Worker::Main(uword args) { | 367 void ThreadPool::Worker::Main(uword args) { |
318 Thread::EnsureInit(); | 368 Thread::EnsureInit(); |
319 Worker* worker = reinterpret_cast<Worker*>(args); | 369 Worker* worker = reinterpret_cast<Worker*>(args); |
320 worker->Loop(); | 370 bool delete_self = false; |
371 | |
372 { | |
373 MonitorLocker ml(&(worker->monitor_)); | |
374 if (worker->IsDone()) { | |
375 // id_ hasn't been set, yet, but the ThreadPool is being shutdown. | |
turnidge
2015/08/04 21:39:02
Remove comma between "set" and "yet"?
zra
2015/08/05 06:23:07
Done.
| |
376 // Delete the task, and return. | |
377 ASSERT(worker->task_); | |
378 delete worker->task_; | |
379 worker->task_ = NULL; | |
380 delete_self = true; | |
381 } else { | |
382 worker->id_ = OSThread::GetCurrentThreadId(); | |
383 worker->started_ = true; | |
384 } | |
385 } | |
386 | |
387 // We aren't able to delete the worker while holding the worker's monitor. | |
388 // Now that we have released it, and we know that ThreadPool::Shutdown | |
389 // won't touch it again, we can delete it and return. | |
390 if (delete_self) { | |
391 MonitorLocker eml(&worker->pool_->exit_monitor_); | |
392 worker->pool_->RemoveWorkerFromShutdownList(worker); | |
393 delete worker; | |
394 eml.Notify(); | |
395 return; | |
396 } | |
397 | |
398 bool released = worker->Loop(); | |
321 | 399 |
322 // It should be okay to access these unlocked here in this assert. | 400 // It should be okay to access these unlocked here in this assert. |
323 ASSERT(!worker->owned_ && | 401 // worker->all_next_ is retained by the pool for shutdown monitoring. |
324 worker->all_next_ == NULL && | 402 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL)); |
325 worker->idle_next_ == NULL); | |
326 | 403 |
327 // The exit monitor is only used during testing. | 404 if (!released) { |
328 if (ThreadPool::exit_monitor_) { | 405 // This worker is exiting because the thread pool is being shut down. |
329 MonitorLocker ml(ThreadPool::exit_monitor_); | 406 // Inform the thread pool that we are exiting. We remove this worker from |
330 (*ThreadPool::exit_count_)++; | 407 // shutting_down_workers_ list because there will be no need for the |
331 ml.Notify(); | 408 // ThreadPool to take action for this worker. |
409 MonitorLocker eml(&worker->pool_->exit_monitor_); | |
410 worker->id_ = OSThread::kInvalidThreadId; | |
411 worker->pool_->RemoveWorkerFromShutdownList(worker); | |
412 delete worker; | |
413 eml.Notify(); | |
414 } else { | |
415 // This worker is going down because it was idle for too long. This case | |
416 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker. | |
417 delete worker; | |
332 } | 418 } |
333 delete worker; | |
334 #if defined(TARGET_OS_WINDOWS) | 419 #if defined(TARGET_OS_WINDOWS) |
335 Thread::CleanUp(); | 420 Thread::CleanUp(); |
336 #endif | 421 #endif |
337 } | 422 } |
338 | 423 |
339 } // namespace dart | 424 } // namespace dart |
OLD | NEW |