Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(61)

Side by Side Diff: runtime/vm/thread_pool.cc

Issue 1177153005: Enables clean VM shutdown. (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: Kill isolates from the service isolate Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "vm/thread_pool.h" 5 #include "vm/thread_pool.h"
6 6
7 #include "vm/flags.h" 7 #include "vm/flags.h"
8 #include "vm/lockers.h" 8 #include "vm/lockers.h"
9 9
10 namespace dart { 10 namespace dart {
11 11
12 DEFINE_FLAG(int, worker_timeout_millis, 5000, 12 DEFINE_FLAG(int, worker_timeout_millis, 5000,
13 "Free workers when they have been idle for this amount of time."); 13 "Free workers when they have been idle for this amount of time.");
14 #if defined(USING_SIMULATOR)
15 // The simulator can take much longer to finish running Dart code.
16 DEFINE_FLAG(int, shutdown_timeout_millis, 15000,
17 "Amount of time to wait for a worker to stop during shutdown.");
18 #else
19 DEFINE_FLAG(int, shutdown_timeout_millis, 1000,
20 "Amount of time to wait for a worker to stop during shutdown.");
21 #endif
14 22
15 Monitor* ThreadPool::exit_monitor_ = NULL; 23 Mutex ThreadPool::mutex_;
16 int* ThreadPool::exit_count_ = NULL; 24 bool ThreadPool::shutting_down_ = false;
25 ThreadPool::Worker* ThreadPool::all_workers_ = NULL;
26 ThreadPool::Worker* ThreadPool::idle_workers_ = NULL;
27 uint64_t ThreadPool::count_started_ = 0;
28 uint64_t ThreadPool::count_stopped_ = 0;
29 uint64_t ThreadPool::count_running_ = 0;
30 uint64_t ThreadPool::count_idle_ = 0;
31 bool ThreadPool::shutdown_timeout_fatal_ = true;
17 32
18 ThreadPool::ThreadPool() 33 Monitor ThreadPool::exit_monitor_;
19 : shutting_down_(false), 34 ThreadPool::Worker* ThreadPool::shutting_down_workers_ = NULL;
20 all_workers_(NULL),
21 idle_workers_(NULL),
22 count_started_(0),
23 count_stopped_(0),
24 count_running_(0),
25 count_idle_(0) {
26 }
27
28
29 ThreadPool::~ThreadPool() {
30 Shutdown();
31 }
32 35
33 36
34 void ThreadPool::Run(Task* task) { 37 void ThreadPool::Run(Task* task) {
35 Worker* worker = NULL; 38 Worker* worker = NULL;
36 bool new_worker = false; 39 bool new_worker = false;
37 { 40 {
38 // We need ThreadPool::mutex_ to access worker lists and other 41 // We need ThreadPool::mutex_ to access worker lists and other
39 // ThreadPool state. 42 // ThreadPool state.
40 MutexLocker ml(&mutex_); 43 MutexLocker ml(&mutex_);
41 if (shutting_down_) { 44 if (shutting_down_) {
42 return; 45 return;
43 } 46 }
44 if (idle_workers_ == NULL) { 47 if (idle_workers_ == NULL) {
45 worker = new Worker(this); 48 worker = new Worker();
46 ASSERT(worker != NULL); 49 ASSERT(worker != NULL);
47 new_worker = true; 50 new_worker = true;
48 count_started_++; 51 count_started_++;
49 52
50 // Add worker to the all_workers_ list. 53 // Add worker to the all_workers_ list.
51 worker->all_next_ = all_workers_; 54 worker->all_next_ = all_workers_;
52 all_workers_ = worker; 55 all_workers_ = worker;
53 worker->owned_ = true; 56 worker->owned_ = true;
54 } else { 57 } else {
55 // Get the first worker from the idle worker list. 58 // Get the first worker from the idle worker list.
56 worker = idle_workers_; 59 worker = idle_workers_;
57 idle_workers_ = worker->idle_next_; 60 idle_workers_ = worker->idle_next_;
58 worker->idle_next_ = NULL; 61 worker->idle_next_ = NULL;
59 count_idle_--; 62 count_idle_--;
60 } 63 }
61 count_running_++; 64 count_running_++;
62 } 65 }
63 // Release ThreadPool::mutex_ before calling Worker functions. 66 // Release ThreadPool::mutex_ before calling Worker functions.
64 ASSERT(worker != NULL); 67 ASSERT(worker != NULL);
65 worker->SetTask(task); 68 worker->SetTask(task);
66 if (new_worker) { 69 if (new_worker) {
67 // Call StartThread after we've assigned the first task. 70 // Call StartThread after we've assigned the first task.
68 worker->StartThread(); 71 worker->StartThread();
69 } 72 }
70 } 73 }
71 74
72 75
73 void ThreadPool::Shutdown() { 76 bool ThreadPool::Shutdown() {
74 Worker* saved = NULL; 77 Worker* saved = NULL;
75 { 78 {
76 MutexLocker ml(&mutex_); 79 MutexLocker ml(&mutex_);
77 shutting_down_ = true; 80 shutting_down_ = true;
78 saved = all_workers_; 81 saved = all_workers_;
79 all_workers_ = NULL; 82 all_workers_ = NULL;
80 idle_workers_ = NULL; 83 idle_workers_ = NULL;
81 84
82 Worker* current = saved; 85 Worker* current = saved;
83 while (current != NULL) { 86 while (current != NULL) {
84 Worker* next = current->all_next_; 87 Worker* next = current->all_next_;
85 current->idle_next_ = NULL; 88 current->idle_next_ = NULL;
86 current->owned_ = false; 89 current->owned_ = false;
87 current = next; 90 current = next;
88 count_stopped_++; 91 count_stopped_++;
89 } 92 }
90 93
91 count_idle_ = 0; 94 count_idle_ = 0;
92 count_running_ = 0; 95 count_running_ = 0;
93 ASSERT(count_started_ == count_stopped_); 96 ASSERT(count_started_ == count_stopped_);
94 } 97 }
95 // Release ThreadPool::mutex_ before calling Worker functions. 98 // Release ThreadPool::mutex_ before calling Worker functions.
96 99
97 Worker* current = saved; 100 {
98 while (current != NULL) { 101 MonitorLocker eml(&ThreadPool::exit_monitor_);
99 // We may access all_next_ without holding ThreadPool::mutex_ here 102
100 // because the worker is no longer owned by the ThreadPool. 103 // First tell all the workers to shut down.
101 Worker* next = current->all_next_; 104 Worker* current = saved;
102 current->all_next_ = NULL; 105 while (current != NULL) {
103 current->Shutdown(); 106 Worker* next = current->all_next_;
104 current = next; 107 if (current->id_ != OSThread::GetCurrentThreadId()) {
108 AddWorkerToShutdownList(current);
109 }
110 current->Shutdown();
111 current = next;
112 }
113 saved = NULL;
114
115 // Give workers a chance to exit gracefully.
116 const int64_t start_wait = OS::GetCurrentTimeMillis();
117 int timeout = FLAG_shutdown_timeout_millis;
118 while (shutting_down_workers_ != NULL) {
119 if (timeout > 0) {
120 // Here, we are waiting for workers to exit. When a worker exits we will
121 // be notified.
122 eml.Wait(timeout);
123
124 // We decrement the timeout for the next wait by the amount of time
125 // we've already waited. If the new timeout drops below zero, we break
126 // out of this loop, which triggers the termination code below.
127 const int64_t after_wait = OS::GetCurrentTimeMillis();
128 timeout = FLAG_shutdown_timeout_millis - (after_wait - start_wait);
129 } else {
130 break;
131 }
132 }
133
134 // It is an error if all workers have not exited within the timeout. We
135 // assume that they have run off into the weeds, and it is a bug.
136 bool timely_shutdown = shutting_down_workers_ == NULL;
137 if (!timely_shutdown && shutdown_timeout_fatal_) {
138 FATAL("Thread pool worker threads failed to exit.");
139 }
140
141 return timely_shutdown;
105 } 142 }
106 } 143 }
107 144
108 145
109 bool ThreadPool::IsIdle(Worker* worker) { 146 bool ThreadPool::IsIdle(Worker* worker) {
110 ASSERT(worker != NULL && worker->owned_); 147 ASSERT(worker != NULL && worker->owned_);
111 for (Worker* current = idle_workers_; 148 for (Worker* current = idle_workers_;
112 current != NULL; 149 current != NULL;
113 current = current->idle_next_) { 150 current = current->idle_next_) {
114 if (current == worker) { 151 if (current == worker) {
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
149 ASSERT(worker != NULL && worker->owned_); 186 ASSERT(worker != NULL && worker->owned_);
150 if (all_workers_ == NULL) { 187 if (all_workers_ == NULL) {
151 return false; 188 return false;
152 } 189 }
153 190
154 // Special case head of list. 191 // Special case head of list.
155 if (all_workers_ == worker) { 192 if (all_workers_ == worker) {
156 all_workers_ = worker->all_next_; 193 all_workers_ = worker->all_next_;
157 worker->all_next_ = NULL; 194 worker->all_next_ = NULL;
158 worker->owned_ = false; 195 worker->owned_ = false;
159 worker->pool_ = NULL; 196 worker->done_ = true;
160 return true; 197 return true;
161 } 198 }
162 199
163 for (Worker* current = all_workers_; 200 for (Worker* current = all_workers_;
164 current->all_next_ != NULL; 201 current->all_next_ != NULL;
165 current = current->all_next_) { 202 current = current->all_next_) {
166 if (current->all_next_ == worker) { 203 if (current->all_next_ == worker) {
167 current->all_next_ = worker->all_next_; 204 current->all_next_ = worker->all_next_;
168 worker->all_next_ = NULL; 205 worker->all_next_ = NULL;
169 worker->owned_ = false; 206 worker->owned_ = false;
(...skipping 29 matching lines...) Expand all
199 // Remove from all list. 236 // Remove from all list.
200 bool found = RemoveWorkerFromAllList(worker); 237 bool found = RemoveWorkerFromAllList(worker);
201 ASSERT(found); 238 ASSERT(found);
202 239
203 count_stopped_++; 240 count_stopped_++;
204 count_idle_--; 241 count_idle_--;
205 return true; 242 return true;
206 } 243 }
207 244
208 245
246 // Only call while holding the exit_monitor_
247 void ThreadPool::AddWorkerToShutdownList(Worker* worker) {
248 worker->shutdown_next_ = shutting_down_workers_;
249 shutting_down_workers_ = worker;
250 }
251
252
253 // Only call while holding the exit_monitor_
254 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) {
255 ASSERT(worker != NULL);
256 if (shutting_down_workers_ == NULL) {
257 return false;
258 }
259
260 // Special case head of list.
261 if (shutting_down_workers_ == worker) {
262 shutting_down_workers_ = worker->shutdown_next_;
263 worker->shutdown_next_ = NULL;
264 return true;
265 }
266
267 for (Worker* current = shutting_down_workers_;
268 current->shutdown_next_ != NULL;
269 current = current->shutdown_next_) {
270 if (current->shutdown_next_ == worker) {
271 current->shutdown_next_ = worker->shutdown_next_;
272 worker->shutdown_next_ = NULL;
273 return true;
274 }
275 }
276 return false;
277 }
278
279
209 ThreadPool::Task::Task() { 280 ThreadPool::Task::Task() {
210 } 281 }
211 282
212 283
213 ThreadPool::Task::~Task() { 284 ThreadPool::Task::~Task() {
214 } 285 }
215 286
216 287
217 ThreadPool::Worker::Worker(ThreadPool* pool) 288 ThreadPool::Worker::Worker()
218 : pool_(pool), 289 : done_(false),
219 task_(NULL), 290 task_(NULL),
291 id_(OSThread::kInvalidThreadId),
292 started_(false),
220 owned_(false), 293 owned_(false),
221 all_next_(NULL), 294 all_next_(NULL),
222 idle_next_(NULL) { 295 idle_next_(NULL),
296 shutdown_next_(NULL) {
223 } 297 }
224 298
225 299
226 void ThreadPool::Worker::StartThread() { 300 void ThreadPool::Worker::StartThread() {
227 #if defined(DEBUG) 301 #if defined(DEBUG)
228 // Must call SetTask before StartThread. 302 // Must call SetTask before StartThread.
229 { // NOLINT 303 { // NOLINT
230 MonitorLocker ml(&monitor_); 304 MonitorLocker ml(&monitor_);
231 ASSERT(task_ != NULL); 305 ASSERT(task_ != NULL);
232 } 306 }
(...skipping 24 matching lines...) Expand all
257 // out. Give the worker one last desperate chance to live. We 331 // out. Give the worker one last desperate chance to live. We
258 // are merciful. 332 // are merciful.
259 return 1; 333 return 1;
260 } else { 334 } else {
261 return FLAG_worker_timeout_millis - waited; 335 return FLAG_worker_timeout_millis - waited;
262 } 336 }
263 } 337 }
264 } 338 }
265 339
266 340
267 void ThreadPool::Worker::Loop() { 341 bool ThreadPool::Worker::Loop() {
268 MonitorLocker ml(&monitor_); 342 MonitorLocker ml(&monitor_);
269 int64_t idle_start; 343 int64_t idle_start;
270 while (true) { 344 while (true) {
271 ASSERT(task_ != NULL); 345 ASSERT(task_ != NULL);
272 Task* task = task_; 346 Task* task = task_;
273 task_ = NULL; 347 task_ = NULL;
274 348
275 // Release monitor while handling the task. 349 // Release monitor while handling the task.
276 monitor_.Exit(); 350 monitor_.Exit();
277 task->Run(); 351 task->Run();
278 ASSERT(Isolate::Current() == NULL); 352 ASSERT(Isolate::Current() == NULL);
279 delete task; 353 delete task;
280 monitor_.Enter(); 354 monitor_.Enter();
281 355
282 ASSERT(task_ == NULL); 356 ASSERT(task_ == NULL);
283 if (IsDone()) { 357 if (IsDone()) {
284 return; 358 return false;
285 } 359 }
286 ASSERT(pool_ != NULL); 360 ASSERT(!done_);
287 pool_->SetIdle(this); 361 ThreadPool::SetIdle(this);
288 idle_start = OS::GetCurrentTimeMillis(); 362 idle_start = OS::GetCurrentTimeMillis();
289 while (true) { 363 while (true) {
290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); 364 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start));
291 if (task_ != NULL) { 365 if (task_ != NULL) {
292 // We've found a task. Process it, regardless of whether the 366 // We've found a task. Process it, regardless of whether the
293 // worker is done_. 367 // worker is done_.
turnidge 2015/06/30 22:15:46 Note to self: figure out when this case happens an
zra 2015/07/20 22:23:39 Acknowledged.
294 break; 368 break;
295 } 369 }
296 if (IsDone()) { 370 if (IsDone()) {
297 return; 371 return false;
298 } 372 }
299 if (result == Monitor::kTimedOut && 373 if ((result == Monitor::kTimedOut) &&
300 pool_->ReleaseIdleWorker(this)) { 374 ThreadPool::ReleaseIdleWorker(this)) {
301 return; 375 return true;
302 } 376 }
303 } 377 }
304 } 378 }
305 UNREACHABLE(); 379 UNREACHABLE();
380 return false;
306 } 381 }
307 382
308 383
309 void ThreadPool::Worker::Shutdown() { 384 void ThreadPool::Worker::Shutdown() {
310 MonitorLocker ml(&monitor_); 385 MonitorLocker ml(&monitor_);
311 pool_ = NULL; // Fail fast if someone tries to access pool_. 386 done_ = true;
312 ml.Notify(); 387 ml.Notify();
313 } 388 }
314 389
315 390
316 // static 391 // static
317 void ThreadPool::Worker::Main(uword args) { 392 void ThreadPool::Worker::Main(uword args) {
318 Thread::EnsureInit(); 393 Thread::EnsureInit();
319 Worker* worker = reinterpret_cast<Worker*>(args); 394 Worker* worker = reinterpret_cast<Worker*>(args);
320 worker->Loop(); 395 bool delete_self = false;
396
397 {
398 MonitorLocker ml(&(worker->monitor_));
399 if (worker->IsDone()) {
400 // id_ hasn't been set, yet, but the ThreadPool is being shutdown.
401 // Delete the task, and return.
402 ASSERT(worker->task_);
403 delete worker->task_;
404 worker->task_ = NULL;
405 delete_self = true;
406 } else {
407 worker->id_ = OSThread::GetCurrentThreadId();
408 worker->started_ = true;
409 }
410 }
411
412 // We aren't able to delete the worker while holding the worker's monitor.
413 // Now that we have released it, and we know that ThreadPool::Shutdown
414 // won't touch it again, we can delete it and return.
415 if (delete_self) {
416 MonitorLocker eml(&ThreadPool::exit_monitor_);
417 RemoveWorkerFromShutdownList(worker);
418 delete worker;
419 eml.Notify();
420 return;
421 }
422
423 bool released = worker->Loop();
321 424
322 // It should be okay to access these unlocked here in this assert. 425 // It should be okay to access these unlocked here in this assert.
323 ASSERT(!worker->owned_ && 426 // worker->all_next_ is retained by the pool for shutdown monitoring.
324 worker->all_next_ == NULL && 427 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL));
325 worker->idle_next_ == NULL);
326 428
327 // The exit monitor is only used during testing. 429 if (!released) {
328 if (ThreadPool::exit_monitor_) { 430 // This worker is exiting because the thread pool is being shut down.
329 MonitorLocker ml(ThreadPool::exit_monitor_); 431 // Inform the thread pool that we are exiting. We remove this worker from
330 (*ThreadPool::exit_count_)++; 432 // shutting_down_workers_ list because there will be no need for the
331 ml.Notify(); 433 // ThreadPool to take action for this worker.
434 MonitorLocker eml(&ThreadPool::exit_monitor_);
435 worker->id_ = OSThread::kInvalidThreadId;
436 RemoveWorkerFromShutdownList(worker);
437 delete worker;
438 eml.Notify();
439 } else {
440 // This worker is going down because it was idle for too long. This case
441 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker.
442 delete worker;
332 } 443 }
333 delete worker;
334 #if defined(TARGET_OS_WINDOWS) 444 #if defined(TARGET_OS_WINDOWS)
335 Thread::CleanUp(); 445 Thread::CleanUp();
336 #endif 446 #endif
337 } 447 }
338 448
339 } // namespace dart 449 } // namespace dart
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698