OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/thread_pool.h" | 5 #include "vm/thread_pool.h" |
6 | 6 |
7 #include "vm/flags.h" | 7 #include "vm/flags.h" |
8 #include "vm/lockers.h" | 8 #include "vm/lockers.h" |
9 | 9 |
10 namespace dart { | 10 namespace dart { |
11 | 11 |
12 DEFINE_FLAG(int, worker_timeout_millis, 5000, | 12 DEFINE_FLAG(int, worker_timeout_millis, 5000, |
13 "Free workers when they have been idle for this amount of time."); | 13 "Free workers when they have been idle for this amount of time."); |
14 #if defined(USING_SIMULATOR) | |
15 // The simulator can take much longer to finish running Dart code. | |
16 DEFINE_FLAG(int, shutdown_timeout_millis, 15000, | |
17 "Amount of time to wait for a worker to stop during shutdown."); | |
18 #else | |
19 DEFINE_FLAG(int, shutdown_timeout_millis, 1000, | |
20 "Amount of time to wait for a worker to stop during shutdown."); | |
21 #endif | |
14 | 22 |
15 Monitor* ThreadPool::exit_monitor_ = NULL; | 23 Mutex ThreadPool::mutex_; |
16 int* ThreadPool::exit_count_ = NULL; | 24 bool ThreadPool::shutting_down_ = false; |
25 ThreadPool::Worker* ThreadPool::all_workers_ = NULL; | |
26 ThreadPool::Worker* ThreadPool::idle_workers_ = NULL; | |
27 uint64_t ThreadPool::count_started_ = 0; | |
28 uint64_t ThreadPool::count_stopped_ = 0; | |
29 uint64_t ThreadPool::count_running_ = 0; | |
30 uint64_t ThreadPool::count_idle_ = 0; | |
31 bool ThreadPool::shutdown_timeout_fatal_ = true; | |
17 | 32 |
18 ThreadPool::ThreadPool() | 33 Monitor ThreadPool::exit_monitor_; |
19 : shutting_down_(false), | 34 ThreadPool::Worker* ThreadPool::shutting_down_workers_ = NULL; |
20 all_workers_(NULL), | |
21 idle_workers_(NULL), | |
22 count_started_(0), | |
23 count_stopped_(0), | |
24 count_running_(0), | |
25 count_idle_(0) { | |
26 } | |
27 | |
28 | |
29 ThreadPool::~ThreadPool() { | |
30 Shutdown(); | |
31 } | |
32 | 35 |
33 | 36 |
34 void ThreadPool::Run(Task* task) { | 37 void ThreadPool::Run(Task* task) { |
35 Worker* worker = NULL; | 38 Worker* worker = NULL; |
36 bool new_worker = false; | 39 bool new_worker = false; |
37 { | 40 { |
38 // We need ThreadPool::mutex_ to access worker lists and other | 41 // We need ThreadPool::mutex_ to access worker lists and other |
39 // ThreadPool state. | 42 // ThreadPool state. |
40 MutexLocker ml(&mutex_); | 43 MutexLocker ml(&mutex_); |
41 if (shutting_down_) { | 44 if (shutting_down_) { |
42 return; | 45 return; |
43 } | 46 } |
44 if (idle_workers_ == NULL) { | 47 if (idle_workers_ == NULL) { |
45 worker = new Worker(this); | 48 worker = new Worker(); |
46 ASSERT(worker != NULL); | 49 ASSERT(worker != NULL); |
47 new_worker = true; | 50 new_worker = true; |
48 count_started_++; | 51 count_started_++; |
49 | 52 |
50 // Add worker to the all_workers_ list. | 53 // Add worker to the all_workers_ list. |
51 worker->all_next_ = all_workers_; | 54 worker->all_next_ = all_workers_; |
52 all_workers_ = worker; | 55 all_workers_ = worker; |
53 worker->owned_ = true; | 56 worker->owned_ = true; |
54 } else { | 57 } else { |
55 // Get the first worker from the idle worker list. | 58 // Get the first worker from the idle worker list. |
56 worker = idle_workers_; | 59 worker = idle_workers_; |
57 idle_workers_ = worker->idle_next_; | 60 idle_workers_ = worker->idle_next_; |
58 worker->idle_next_ = NULL; | 61 worker->idle_next_ = NULL; |
59 count_idle_--; | 62 count_idle_--; |
60 } | 63 } |
61 count_running_++; | 64 count_running_++; |
62 } | 65 } |
63 // Release ThreadPool::mutex_ before calling Worker functions. | 66 // Release ThreadPool::mutex_ before calling Worker functions. |
64 ASSERT(worker != NULL); | 67 ASSERT(worker != NULL); |
65 worker->SetTask(task); | 68 worker->SetTask(task); |
66 if (new_worker) { | 69 if (new_worker) { |
67 // Call StartThread after we've assigned the first task. | 70 // Call StartThread after we've assigned the first task. |
68 worker->StartThread(); | 71 worker->StartThread(); |
69 } | 72 } |
70 } | 73 } |
71 | 74 |
72 | 75 |
73 void ThreadPool::Shutdown() { | 76 bool ThreadPool::Shutdown() { |
74 Worker* saved = NULL; | 77 Worker* saved = NULL; |
75 { | 78 { |
76 MutexLocker ml(&mutex_); | 79 MutexLocker ml(&mutex_); |
77 shutting_down_ = true; | 80 shutting_down_ = true; |
78 saved = all_workers_; | 81 saved = all_workers_; |
79 all_workers_ = NULL; | 82 all_workers_ = NULL; |
80 idle_workers_ = NULL; | 83 idle_workers_ = NULL; |
81 | 84 |
82 Worker* current = saved; | 85 Worker* current = saved; |
83 while (current != NULL) { | 86 while (current != NULL) { |
84 Worker* next = current->all_next_; | 87 Worker* next = current->all_next_; |
85 current->idle_next_ = NULL; | 88 current->idle_next_ = NULL; |
86 current->owned_ = false; | 89 current->owned_ = false; |
87 current = next; | 90 current = next; |
88 count_stopped_++; | 91 count_stopped_++; |
89 } | 92 } |
90 | 93 |
91 count_idle_ = 0; | 94 count_idle_ = 0; |
92 count_running_ = 0; | 95 count_running_ = 0; |
93 ASSERT(count_started_ == count_stopped_); | 96 ASSERT(count_started_ == count_stopped_); |
94 } | 97 } |
95 // Release ThreadPool::mutex_ before calling Worker functions. | 98 // Release ThreadPool::mutex_ before calling Worker functions. |
96 | 99 |
97 Worker* current = saved; | 100 { |
98 while (current != NULL) { | 101 MonitorLocker eml(&ThreadPool::exit_monitor_); |
99 // We may access all_next_ without holding ThreadPool::mutex_ here | 102 |
100 // because the worker is no longer owned by the ThreadPool. | 103 // First tell all the workers to shut down. |
101 Worker* next = current->all_next_; | 104 Worker* current = saved; |
102 current->all_next_ = NULL; | 105 while (current != NULL) { |
103 current->Shutdown(); | 106 Worker* next = current->all_next_; |
104 current = next; | 107 if (current->id_ != OSThread::GetCurrentThreadId()) { |
108 AddWorkerToShutdownList(current); | |
109 } | |
110 current->Shutdown(); | |
111 current = next; | |
112 } | |
113 saved = NULL; | |
114 | |
115 // Give workers a chance to exit gracefully. | |
116 const int64_t start_wait = OS::GetCurrentTimeMillis(); | |
117 int timeout = FLAG_shutdown_timeout_millis; | |
118 while (shutting_down_workers_ != NULL) { | |
119 if (timeout > 0) { | |
120 // Here, we are waiting for workers to exit. When a worker exits we will | |
121 // be notified. | |
122 eml.Wait(timeout); | |
123 | |
124 // We decrement the timeout for the next wait by the amount of time | |
125 // we've already waited. If the new timeout drops below zero, we break | |
126 // out of this loop, which triggers the termination code below. | |
127 const int64_t after_wait = OS::GetCurrentTimeMillis(); | |
128 timeout = FLAG_shutdown_timeout_millis - (after_wait - start_wait); | |
129 } else { | |
130 break; | |
131 } | |
132 } | |
133 | |
134 // It is an error if all workers have not exited within the timeout. We | |
135 // assume that they have run off into the weeds, and it is a bug. | |
136 bool timely_shutdown = shutting_down_workers_ == NULL; | |
137 if (!timely_shutdown && shutdown_timeout_fatal_) { | |
138 FATAL("Thread pool worker threads failed to exit."); | |
139 } | |
140 | |
141 return timely_shutdown; | |
105 } | 142 } |
106 } | 143 } |
107 | 144 |
108 | 145 |
109 bool ThreadPool::IsIdle(Worker* worker) { | 146 bool ThreadPool::IsIdle(Worker* worker) { |
110 ASSERT(worker != NULL && worker->owned_); | 147 ASSERT(worker != NULL && worker->owned_); |
111 for (Worker* current = idle_workers_; | 148 for (Worker* current = idle_workers_; |
112 current != NULL; | 149 current != NULL; |
113 current = current->idle_next_) { | 150 current = current->idle_next_) { |
114 if (current == worker) { | 151 if (current == worker) { |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
149 ASSERT(worker != NULL && worker->owned_); | 186 ASSERT(worker != NULL && worker->owned_); |
150 if (all_workers_ == NULL) { | 187 if (all_workers_ == NULL) { |
151 return false; | 188 return false; |
152 } | 189 } |
153 | 190 |
154 // Special case head of list. | 191 // Special case head of list. |
155 if (all_workers_ == worker) { | 192 if (all_workers_ == worker) { |
156 all_workers_ = worker->all_next_; | 193 all_workers_ = worker->all_next_; |
157 worker->all_next_ = NULL; | 194 worker->all_next_ = NULL; |
158 worker->owned_ = false; | 195 worker->owned_ = false; |
159 worker->pool_ = NULL; | 196 worker->done_ = true; |
160 return true; | 197 return true; |
161 } | 198 } |
162 | 199 |
163 for (Worker* current = all_workers_; | 200 for (Worker* current = all_workers_; |
164 current->all_next_ != NULL; | 201 current->all_next_ != NULL; |
165 current = current->all_next_) { | 202 current = current->all_next_) { |
166 if (current->all_next_ == worker) { | 203 if (current->all_next_ == worker) { |
167 current->all_next_ = worker->all_next_; | 204 current->all_next_ = worker->all_next_; |
168 worker->all_next_ = NULL; | 205 worker->all_next_ = NULL; |
169 worker->owned_ = false; | 206 worker->owned_ = false; |
(...skipping 29 matching lines...) Expand all Loading... | |
199 // Remove from all list. | 236 // Remove from all list. |
200 bool found = RemoveWorkerFromAllList(worker); | 237 bool found = RemoveWorkerFromAllList(worker); |
201 ASSERT(found); | 238 ASSERT(found); |
202 | 239 |
203 count_stopped_++; | 240 count_stopped_++; |
204 count_idle_--; | 241 count_idle_--; |
205 return true; | 242 return true; |
206 } | 243 } |
207 | 244 |
208 | 245 |
246 // Only call while holding the exit_monitor_ | |
247 void ThreadPool::AddWorkerToShutdownList(Worker* worker) { | |
248 worker->shutdown_next_ = shutting_down_workers_; | |
249 shutting_down_workers_ = worker; | |
250 } | |
251 | |
252 | |
253 // Only call while holding the exit_monitor_ | |
254 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { | |
255 ASSERT(worker != NULL); | |
256 if (shutting_down_workers_ == NULL) { | |
257 return false; | |
258 } | |
259 | |
260 // Special case head of list. | |
261 if (shutting_down_workers_ == worker) { | |
262 shutting_down_workers_ = worker->shutdown_next_; | |
263 worker->shutdown_next_ = NULL; | |
264 return true; | |
265 } | |
266 | |
267 for (Worker* current = shutting_down_workers_; | |
268 current->shutdown_next_ != NULL; | |
269 current = current->shutdown_next_) { | |
270 if (current->shutdown_next_ == worker) { | |
271 current->shutdown_next_ = worker->shutdown_next_; | |
272 worker->shutdown_next_ = NULL; | |
273 return true; | |
274 } | |
275 } | |
276 return false; | |
277 } | |
278 | |
279 | |
209 ThreadPool::Task::Task() { | 280 ThreadPool::Task::Task() { |
210 } | 281 } |
211 | 282 |
212 | 283 |
213 ThreadPool::Task::~Task() { | 284 ThreadPool::Task::~Task() { |
214 } | 285 } |
215 | 286 |
216 | 287 |
217 ThreadPool::Worker::Worker(ThreadPool* pool) | 288 ThreadPool::Worker::Worker() |
218 : pool_(pool), | 289 : done_(false), |
219 task_(NULL), | 290 task_(NULL), |
291 id_(OSThread::kInvalidThreadId), | |
292 started_(false), | |
220 owned_(false), | 293 owned_(false), |
221 all_next_(NULL), | 294 all_next_(NULL), |
222 idle_next_(NULL) { | 295 idle_next_(NULL), |
296 shutdown_next_(NULL) { | |
223 } | 297 } |
224 | 298 |
225 | 299 |
226 void ThreadPool::Worker::StartThread() { | 300 void ThreadPool::Worker::StartThread() { |
227 #if defined(DEBUG) | 301 #if defined(DEBUG) |
228 // Must call SetTask before StartThread. | 302 // Must call SetTask before StartThread. |
229 { // NOLINT | 303 { // NOLINT |
230 MonitorLocker ml(&monitor_); | 304 MonitorLocker ml(&monitor_); |
231 ASSERT(task_ != NULL); | 305 ASSERT(task_ != NULL); |
232 } | 306 } |
(...skipping 24 matching lines...) Expand all Loading... | |
257 // out. Give the worker one last desperate chance to live. We | 331 // out. Give the worker one last desperate chance to live. We |
258 // are merciful. | 332 // are merciful. |
259 return 1; | 333 return 1; |
260 } else { | 334 } else { |
261 return FLAG_worker_timeout_millis - waited; | 335 return FLAG_worker_timeout_millis - waited; |
262 } | 336 } |
263 } | 337 } |
264 } | 338 } |
265 | 339 |
266 | 340 |
267 void ThreadPool::Worker::Loop() { | 341 bool ThreadPool::Worker::Loop() { |
268 MonitorLocker ml(&monitor_); | 342 MonitorLocker ml(&monitor_); |
269 int64_t idle_start; | 343 int64_t idle_start; |
270 while (true) { | 344 while (true) { |
271 ASSERT(task_ != NULL); | 345 ASSERT(task_ != NULL); |
272 Task* task = task_; | 346 Task* task = task_; |
273 task_ = NULL; | 347 task_ = NULL; |
274 | 348 |
275 // Release monitor while handling the task. | 349 // Release monitor while handling the task. |
276 monitor_.Exit(); | 350 monitor_.Exit(); |
277 task->Run(); | 351 task->Run(); |
278 ASSERT(Isolate::Current() == NULL); | 352 ASSERT(Isolate::Current() == NULL); |
279 delete task; | 353 delete task; |
280 monitor_.Enter(); | 354 monitor_.Enter(); |
281 | 355 |
282 ASSERT(task_ == NULL); | 356 ASSERT(task_ == NULL); |
283 if (IsDone()) { | 357 if (IsDone()) { |
284 return; | 358 return false; |
285 } | 359 } |
286 ASSERT(pool_ != NULL); | 360 ASSERT(!done_); |
287 pool_->SetIdle(this); | 361 ThreadPool::SetIdle(this); |
288 idle_start = OS::GetCurrentTimeMillis(); | 362 idle_start = OS::GetCurrentTimeMillis(); |
289 while (true) { | 363 while (true) { |
290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); | 364 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
291 if (task_ != NULL) { | 365 if (task_ != NULL) { |
292 // We've found a task. Process it, regardless of whether the | 366 // We've found a task. Process it, regardless of whether the |
293 // worker is done_. | 367 // worker is done_. |
turnidge
2015/06/30 22:15:46
Note to self: figure out when this case happens an
zra
2015/07/20 22:23:39
Acknowledged.
| |
294 break; | 368 break; |
295 } | 369 } |
296 if (IsDone()) { | 370 if (IsDone()) { |
297 return; | 371 return false; |
298 } | 372 } |
299 if (result == Monitor::kTimedOut && | 373 if ((result == Monitor::kTimedOut) && |
300 pool_->ReleaseIdleWorker(this)) { | 374 ThreadPool::ReleaseIdleWorker(this)) { |
301 return; | 375 return true; |
302 } | 376 } |
303 } | 377 } |
304 } | 378 } |
305 UNREACHABLE(); | 379 UNREACHABLE(); |
380 return false; | |
306 } | 381 } |
307 | 382 |
308 | 383 |
309 void ThreadPool::Worker::Shutdown() { | 384 void ThreadPool::Worker::Shutdown() { |
310 MonitorLocker ml(&monitor_); | 385 MonitorLocker ml(&monitor_); |
311 pool_ = NULL; // Fail fast if someone tries to access pool_. | 386 done_ = true; |
312 ml.Notify(); | 387 ml.Notify(); |
313 } | 388 } |
314 | 389 |
315 | 390 |
316 // static | 391 // static |
317 void ThreadPool::Worker::Main(uword args) { | 392 void ThreadPool::Worker::Main(uword args) { |
318 Thread::EnsureInit(); | 393 Thread::EnsureInit(); |
319 Worker* worker = reinterpret_cast<Worker*>(args); | 394 Worker* worker = reinterpret_cast<Worker*>(args); |
320 worker->Loop(); | 395 bool delete_self = false; |
396 | |
397 { | |
398 MonitorLocker ml(&(worker->monitor_)); | |
399 if (worker->IsDone()) { | |
400 // id_ hasn't been set, yet, but the ThreadPool is being shutdown. | |
401 // Delete the task, and return. | |
402 ASSERT(worker->task_); | |
403 delete worker->task_; | |
404 worker->task_ = NULL; | |
405 delete_self = true; | |
406 } else { | |
407 worker->id_ = OSThread::GetCurrentThreadId(); | |
408 worker->started_ = true; | |
409 } | |
410 } | |
411 | |
412 // We aren't able to delete the worker while holding the worker's monitor. | |
413 // Now that we have released it, and we know that ThreadPool::Shutdown | |
414 // won't touch it again, we can delete it and return. | |
415 if (delete_self) { | |
416 MonitorLocker eml(&ThreadPool::exit_monitor_); | |
417 RemoveWorkerFromShutdownList(worker); | |
418 delete worker; | |
419 eml.Notify(); | |
420 return; | |
421 } | |
422 | |
423 bool released = worker->Loop(); | |
321 | 424 |
322 // It should be okay to access these unlocked here in this assert. | 425 // It should be okay to access these unlocked here in this assert. |
323 ASSERT(!worker->owned_ && | 426 // worker->all_next_ is retained by the pool for shutdown monitoring. |
324 worker->all_next_ == NULL && | 427 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL)); |
325 worker->idle_next_ == NULL); | |
326 | 428 |
327 // The exit monitor is only used during testing. | 429 if (!released) { |
328 if (ThreadPool::exit_monitor_) { | 430 // This worker is exiting because the thread pool is being shut down. |
329 MonitorLocker ml(ThreadPool::exit_monitor_); | 431 // Inform the thread pool that we are exiting. We remove this worker from |
330 (*ThreadPool::exit_count_)++; | 432 // shutting_down_workers_ list because there will be no need for the |
331 ml.Notify(); | 433 // ThreadPool to take action for this worker. |
434 MonitorLocker eml(&ThreadPool::exit_monitor_); | |
435 worker->id_ = OSThread::kInvalidThreadId; | |
436 RemoveWorkerFromShutdownList(worker); | |
437 delete worker; | |
438 eml.Notify(); | |
439 } else { | |
440 // This worker is going down because it was idle for too long. This case | |
441 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker. | |
442 delete worker; | |
332 } | 443 } |
333 delete worker; | |
334 #if defined(TARGET_OS_WINDOWS) | 444 #if defined(TARGET_OS_WINDOWS) |
335 Thread::CleanUp(); | 445 Thread::CleanUp(); |
336 #endif | 446 #endif |
337 } | 447 } |
338 | 448 |
339 } // namespace dart | 449 } // namespace dart |
OLD | NEW |