Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <list> | 7 #include <list> |
| 8 #include <map> | 8 #include <map> |
| 9 #include <set> | 9 #include <set> |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 86 bool PostTask(const std::string* optional_token_name, | 86 bool PostTask(const std::string* optional_token_name, |
| 87 SequenceToken sequence_token, | 87 SequenceToken sequence_token, |
| 88 WorkerShutdown shutdown_behavior, | 88 WorkerShutdown shutdown_behavior, |
| 89 const tracked_objects::Location& from_here, | 89 const tracked_objects::Location& from_here, |
| 90 const Closure& task); | 90 const Closure& task); |
| 91 | 91 |
| 92 bool RunsTasksOnCurrentThread() const; | 92 bool RunsTasksOnCurrentThread() const; |
| 93 | 93 |
| 94 void FlushForTesting(); | 94 void FlushForTesting(); |
| 95 | 95 |
| 96 void TriggerSpuriousWorkSignalForTesting(); | |
| 97 | |
| 98 int GetWorkSignalCountForTesting() const; | |
| 99 | |
| 96 void Shutdown(); | 100 void Shutdown(); |
| 97 | 101 |
| 98 void SetTestingObserver(TestingObserver* observer); | 102 void SetTestingObserver(TestingObserver* observer); |
| 99 | 103 |
| 100 // Runs the worker loop on the background thread. | 104 // Runs the worker loop on the background thread. |
| 101 void ThreadLoop(Worker* this_worker); | 105 void ThreadLoop(Worker* this_worker); |
| 102 | 106 |
| 103 private: | 107 private: |
| 108 // Returns whether there are no more pending tasks and all threads | |
| 109 // are idle. Must be called under lock. | |
| 110 bool IsIdle() const; | |
| 111 | |
| 104 // Called from within the lock, this converts the given token name into a | 112 // Called from within the lock, this converts the given token name into a |
| 105 // token ID, creating a new one if necessary. | 113 // token ID, creating a new one if necessary. |
| 106 int LockedGetNamedTokenID(const std::string& name); | 114 int LockedGetNamedTokenID(const std::string& name); |
| 107 | 115 |
| 108 // The calling code should clear the given delete_these_oustide_lock | 116 // The calling code should clear the given delete_these_oustide_lock |
| 109 // vector the next time the lock is released. See the implementation for | 117 // vector the next time the lock is released. See the implementation for |
| 110 // a more detailed description. | 118 // a more detailed description. |
| 111 bool GetWork(SequencedTask* task, | 119 bool GetWork(SequencedTask* task, |
| 112 std::vector<Closure>* delete_these_outside_lock); | 120 std::vector<Closure>* delete_these_outside_lock); |
| 113 | 121 |
| (...skipping 21 matching lines...) Expand all Loading... | |
| 135 // | 143 // |
| 136 // See the implementedion for more. | 144 // See the implementedion for more. |
| 137 int PrepareToStartAdditionalThreadIfHelpful(); | 145 int PrepareToStartAdditionalThreadIfHelpful(); |
| 138 | 146 |
| 139 // The second part of thread creation after | 147 // The second part of thread creation after |
| 140 // PrepareToStartAdditionalThreadIfHelpful with the thread number it | 148 // PrepareToStartAdditionalThreadIfHelpful with the thread number it |
| 141 // generated. This actually creates the thread and should be called outside | 149 // generated. This actually creates the thread and should be called outside |
| 142 // the lock to avoid blocking important work starting a thread in the lock. | 150 // the lock to avoid blocking important work starting a thread in the lock. |
| 143 void FinishStartingAdditionalThread(int thread_number); | 151 void FinishStartingAdditionalThread(int thread_number); |
| 144 | 152 |
| 153 // Signal |has_work_| and increment |has_work_signal_count_|. | |
| 154 void SignalHasWork(); | |
| 155 | |
| 145 // Checks whether there is work left that's blocking shutdown. Must be | 156 // Checks whether there is work left that's blocking shutdown. Must be |
| 146 // called inside the lock. | 157 // called inside the lock. |
| 147 bool CanShutdown() const; | 158 bool CanShutdown() const; |
| 148 | 159 |
| 149 SequencedWorkerPool* const worker_pool_; | 160 SequencedWorkerPool* const worker_pool_; |
| 150 | 161 |
| 151 // The last sequence number used. Managed by GetSequenceToken, since this | 162 // The last sequence number used. Managed by GetSequenceToken, since this |
| 152 // only does threadsafe increment operations, you do not need to hold the | 163 // only does threadsafe increment operations, you do not need to hold the |
| 153 // lock. | 164 // lock. |
| 154 volatile subtle::Atomic32 last_sequence_number_; | 165 volatile subtle::Atomic32 last_sequence_number_; |
| 155 | 166 |
| 156 // This lock protects |everything in this class|. Do not read or modify | 167 // This lock protects |everything in this class|. Do not read or modify |
| 157 // anything without holding this lock. Do not block while holding this | 168 // anything without holding this lock. Do not block while holding this |
| 158 // lock. | 169 // lock. |
| 159 mutable Lock lock_; | 170 mutable Lock lock_; |
| 160 | 171 |
| 161 // Condition variable used to wake up worker threads when a task is runnable. | 172 // Condition variable that is signalled whenever a new task is |
| 162 ConditionVariable cond_var_; | 173 // posted or when Shutdown() is called. |
| 174 ConditionVariable has_work_cv_; | |
| 175 | |
| 176 // Number of times |has_work_| has been signalled. Used for testing. | |
| 177 int has_work_signal_count_; | |
| 178 | |
| 179 // Condition variable that is signalled whenever IsIdle() goes to | |
| 180 // true. | |
| 181 ConditionVariable is_idle_cv_; | |
| 182 | |
| 183 // Condition variable that is signalled whwnever CanShutdown() goes | |
| 184 // to true. | |
| 185 ConditionVariable can_shutdown_cv_; | |
| 163 | 186 |
| 164 // The maximum number of worker threads we'll create. | 187 // The maximum number of worker threads we'll create. |
| 165 const size_t max_threads_; | 188 const size_t max_threads_; |
| 166 | 189 |
| 167 const std::string thread_name_prefix_; | 190 const std::string thread_name_prefix_; |
| 168 | 191 |
| 169 // Associates all known sequence token names with their IDs. | 192 // Associates all known sequence token names with their IDs. |
| 170 std::map<std::string, int> named_sequence_tokens_; | 193 std::map<std::string, int> named_sequence_tokens_; |
| 171 | 194 |
| 172 // Owning pointers to all threads we've created so far, indexed by | 195 // Owning pointers to all threads we've created so far, indexed by |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 238 | 261 |
| 239 // Inner definitions --------------------------------------------------------- | 262 // Inner definitions --------------------------------------------------------- |
| 240 | 263 |
| 241 SequencedWorkerPool::Inner::Inner( | 264 SequencedWorkerPool::Inner::Inner( |
| 242 SequencedWorkerPool* worker_pool, | 265 SequencedWorkerPool* worker_pool, |
| 243 size_t max_threads, | 266 size_t max_threads, |
| 244 const std::string& thread_name_prefix) | 267 const std::string& thread_name_prefix) |
| 245 : worker_pool_(worker_pool), | 268 : worker_pool_(worker_pool), |
| 246 last_sequence_number_(0), | 269 last_sequence_number_(0), |
| 247 lock_(), | 270 lock_(), |
| 248 cond_var_(&lock_), | 271 has_work_cv_(&lock_), |
| 272 has_work_signal_count_(0), | |
| 273 is_idle_cv_(&lock_), | |
| 274 can_shutdown_cv_(&lock_), | |
| 249 max_threads_(max_threads), | 275 max_threads_(max_threads), |
| 250 thread_name_prefix_(thread_name_prefix), | 276 thread_name_prefix_(thread_name_prefix), |
| 251 thread_being_created_(false), | 277 thread_being_created_(false), |
| 252 waiting_thread_count_(0), | 278 waiting_thread_count_(0), |
| 253 blocking_shutdown_thread_count_(0), | 279 blocking_shutdown_thread_count_(0), |
| 254 pending_task_count_(0), | 280 pending_task_count_(0), |
| 255 blocking_shutdown_pending_task_count_(0), | 281 blocking_shutdown_pending_task_count_(0), |
| 256 shutdown_called_(false), | 282 shutdown_called_(false), |
| 257 testing_observer_(NULL) {} | 283 testing_observer_(NULL) {} |
| 258 | 284 |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 311 blocking_shutdown_pending_task_count_++; | 337 blocking_shutdown_pending_task_count_++; |
| 312 | 338 |
| 313 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | 339 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| 314 } | 340 } |
| 315 | 341 |
| 316 // Actually start the additional thread or signal an existing one now that | 342 // Actually start the additional thread or signal an existing one now that |
| 317 // we're outside the lock. | 343 // we're outside the lock. |
| 318 if (create_thread_id) | 344 if (create_thread_id) |
| 319 FinishStartingAdditionalThread(create_thread_id); | 345 FinishStartingAdditionalThread(create_thread_id); |
| 320 else | 346 else |
| 321 cond_var_.Signal(); | 347 SignalHasWork(); |
| 322 | 348 |
| 323 return true; | 349 return true; |
| 324 } | 350 } |
| 325 | 351 |
| 326 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 352 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| 327 AutoLock lock(lock_); | 353 AutoLock lock(lock_); |
| 328 return ContainsKey(threads_, PlatformThread::CurrentId()); | 354 return ContainsKey(threads_, PlatformThread::CurrentId()); |
| 329 } | 355 } |
| 330 | 356 |
| 331 void SequencedWorkerPool::Inner::FlushForTesting() { | 357 void SequencedWorkerPool::Inner::FlushForTesting() { |
| 332 { | 358 AutoLock lock(lock_); |
| 333 AutoLock lock(lock_); | 359 while (!IsIdle()) |
| 334 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) | 360 is_idle_cv_.Wait(); |
| 335 cond_var_.Wait(); | 361 } |
| 336 } | 362 |
| 337 cond_var_.Signal(); | 363 void SequencedWorkerPool::Inner::TriggerSpuriousWorkSignalForTesting() { |
| 364 SignalHasWork(); | |
| 365 } | |
| 366 | |
| 367 int SequencedWorkerPool::Inner::GetWorkSignalCountForTesting() const { | |
| 368 AutoLock lock(lock_); | |
| 369 return has_work_signal_count_; | |
| 338 } | 370 } |
| 339 | 371 |
| 340 void SequencedWorkerPool::Inner::Shutdown() { | 372 void SequencedWorkerPool::Inner::Shutdown() { |
|
jar (doing other things)
2012/03/12 17:39:13
IMO, it would be helpful if you commented, or asse
akalin
2012/03/12 18:34:34
Done. (Added comment, and assert is in SWP::Shutdo
| |
| 341 // Mark us as terminated and go through and drop all tasks that aren't | 373 // Mark us as terminated and go through and drop all tasks that aren't |
| 342 // required to run on shutdown. Since no new tasks will get posted once the | 374 // required to run on shutdown. Since no new tasks will get posted once the |
| 343 // terminated flag is set, this ensures that all remaining tasks are required | 375 // terminated flag is set, this ensures that all remaining tasks are required |
| 344 // for shutdown whenever the termianted_ flag is set. | 376 // for shutdown whenever the termianted_ flag is set. |
| 345 { | 377 { |
| 346 AutoLock lock(lock_); | 378 AutoLock lock(lock_); |
| 347 | 379 |
| 348 if (shutdown_called_) | 380 if (shutdown_called_) |
| 349 return; | 381 return; |
| 350 shutdown_called_ = true; | 382 shutdown_called_ = true; |
| 351 | 383 |
| 352 // Tickle the threads. This will wake up a waiting one so it will know that | 384 // Wake up all waiting threads. |
| 353 // it can exit, which in turn will wake up any other waiting ones. | 385 has_work_cv_.Broadcast(); |
|
jar (doing other things)
2012/03/12 17:39:13
I liked the original code better, relying on signa
akalin
2012/03/12 18:34:34
Okay, moved back to signal chain.
| |
| 354 cond_var_.Signal(); | |
| 355 | 386 |
| 356 // There are no pending or running tasks blocking shutdown, we're done. | 387 // There are no pending or running tasks blocking shutdown, we're done. |
| 357 if (CanShutdown()) | 388 if (CanShutdown()) |
| 358 return; | 389 return; |
| 359 } | 390 } |
| 360 | 391 |
| 361 // If we get here, we know we're either waiting on a blocking task that's | 392 // If we get here, we know we're either waiting on a blocking task that's |
| 362 // currently running, waiting on a blocking task that hasn't been scheduled | 393 // currently running, waiting on a blocking task that hasn't been scheduled |
| 363 // yet, or both. Block on the "queue empty" event to know when all tasks are | 394 // yet, or both. Block on the "queue empty" event to know when all tasks are |
|
jar (doing other things)
2012/03/12 17:39:13
This comment doesn't relate to any code IMO.
In ad
akalin
2012/03/12 18:34:34
Replaced comment (and the one below) with somethin
| |
| 364 // complete. This must be done outside the lock. | 395 // complete. This must be done outside the lock. |
| 365 if (testing_observer_) | 396 if (testing_observer_) |
| 366 testing_observer_->WillWaitForShutdown(); | 397 testing_observer_->WillWaitForShutdown(); |
| 367 | 398 |
| 368 TimeTicks shutdown_wait_begin = TimeTicks::Now(); | 399 TimeTicks shutdown_wait_begin = TimeTicks::Now(); |
| 369 | 400 |
| 370 // Wait for no more tasks. | 401 // Wait for no more tasks. |
| 371 { | 402 { |
| 372 AutoLock lock(lock_); | 403 AutoLock lock(lock_); |
| 373 while (!CanShutdown()) | 404 while (!CanShutdown()) |
| 374 cond_var_.Wait(); | 405 can_shutdown_cv_.Wait(); |
| 375 } | 406 } |
| 376 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", | 407 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", |
| 377 TimeTicks::Now() - shutdown_wait_begin); | 408 TimeTicks::Now() - shutdown_wait_begin); |
| 378 } | 409 } |
| 379 | 410 |
| 380 void SequencedWorkerPool::Inner::SetTestingObserver( | 411 void SequencedWorkerPool::Inner::SetTestingObserver( |
| 381 TestingObserver* observer) { | 412 TestingObserver* observer) { |
| 382 AutoLock lock(lock_); | 413 AutoLock lock(lock_); |
| 383 testing_observer_ = observer; | 414 testing_observer_ = observer; |
| 384 } | 415 } |
| 385 | 416 |
| 386 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { | 417 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| 387 { | 418 AutoLock lock(lock_); |
|
jar (doing other things)
2012/03/12 17:39:13
nit: You are dead right that this curly is not nee
akalin
2012/03/12 18:34:34
Done. (It ended up being needed anyway when addres
| |
| 388 AutoLock lock(lock_); | 419 DCHECK(thread_being_created_); |
| 389 DCHECK(thread_being_created_); | 420 // This can make CanShutdown() go to true. |
| 390 thread_being_created_ = false; | 421 thread_being_created_ = false; |
| 391 std::pair<ThreadMap::iterator, bool> result = | 422 if (CanShutdown()) |
|
jar (doing other things)
2012/03/12 17:39:13
I don't think anyone cares about CanShutdown() unl
akalin
2012/03/12 18:34:34
Done. Moved can_shutdown_cv_ signal to end of blo
| |
| 392 threads_.insert( | 423 can_shutdown_cv_.Signal(); |
| 393 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); | 424 std::pair<ThreadMap::iterator, bool> result = |
| 394 DCHECK(result.second); | 425 threads_.insert( |
| 426 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); | |
| 427 DCHECK(result.second); | |
| 395 | 428 |
| 396 while (true) { | 429 while (true) { |
| 397 // See GetWork for what delete_these_outside_lock is doing. | 430 // See GetWork for what delete_these_outside_lock is doing. |
| 398 SequencedTask task; | 431 SequencedTask task; |
| 399 std::vector<Closure> delete_these_outside_lock; | 432 std::vector<Closure> delete_these_outside_lock; |
| 400 if (GetWork(&task, &delete_these_outside_lock)) { | 433 if (GetWork(&task, &delete_these_outside_lock)) { |
| 401 int new_thread_id = WillRunWorkerTask(task); | 434 int new_thread_id = WillRunWorkerTask(task); |
| 402 { | 435 { |
| 403 AutoUnlock unlock(lock_); | 436 AutoUnlock unlock(lock_); |
| 404 cond_var_.Signal(); | 437 delete_these_outside_lock.clear(); |
|
jar (doing other things)
2012/03/12 17:39:13
This signal is basically asking that anyone else t
akalin
2012/03/12 18:34:34
Hmm. If every task posted causes a signal, then w
jar (doing other things)
2012/03/12 19:42:36
I *think* you are correct. This is a slightly (IM
akalin
2012/03/12 20:42:15
I see. Okay, I just added a comment as to why it'
| |
| 405 delete_these_outside_lock.clear(); | |
| 406 | 438 |
| 407 // Complete thread creation outside the lock if necessary. | 439 // Complete thread creation outside the lock if necessary. |
| 408 if (new_thread_id) | 440 if (new_thread_id) |
| 409 FinishStartingAdditionalThread(new_thread_id); | 441 FinishStartingAdditionalThread(new_thread_id); |
| 410 | 442 |
| 411 task.task.Run(); | 443 task.task.Run(); |
| 412 | 444 |
| 413 // Make sure our task is erased outside the lock for the same reason | 445 // Make sure our task is erased outside the lock for the same reason |
| 414 // we do this with delete_these_oustide_lock. | 446 // we do this with delete_these_oustide_lock. |
| 415 task.task = Closure(); | 447 task.task = Closure(); |
| 416 } | |
| 417 DidRunWorkerTask(task); // Must be done inside the lock. | |
| 418 } else { | |
| 419 // When we're terminating and there's no more work, we can | |
| 420 // shut down. You can't get more tasks posted once | |
| 421 // shutdown_called_ is set. There may be some tasks stuck | |
| 422 // behind running ones with the same sequence token, but | |
| 423 // additional threads won't help this case. | |
| 424 if (shutdown_called_) | |
| 425 break; | |
| 426 waiting_thread_count_++; | |
| 427 cond_var_.Signal(); // For Flush() that may be waiting on the | |
| 428 // waiting thread count to go up. | |
| 429 cond_var_.Wait(); | |
| 430 waiting_thread_count_--; | |
| 431 } | 448 } |
| 449 DidRunWorkerTask(task); // Must be done inside the lock. | |
| 450 } else { | |
| 451 // When we're terminating and there's no more work, we can | |
| 452 // shut down. You can't get more tasks posted once | |
| 453 // shutdown_called_ is set. There may be some tasks stuck | |
| 454 // behind running ones with the same sequence token, but | |
| 455 // additional threads won't help this case. | |
| 456 if (shutdown_called_) | |
| 457 break; | |
| 458 waiting_thread_count_++; | |
| 459 // This is the only time that IsIdle() can go to true. | |
| 460 if (IsIdle()) | |
| 461 is_idle_cv_.Signal(); | |
| 462 has_work_cv_.Wait(); | |
| 463 waiting_thread_count_--; | |
| 432 } | 464 } |
| 433 } | 465 } |
| 466 } | |
| 434 | 467 |
| 435 // We noticed we should exit. Wake up the next worker so it knows it should | 468 bool SequencedWorkerPool::Inner::IsIdle() const { |
| 436 // exit as well (because the Shutdown() code only signals once). | 469 lock_.AssertAcquired(); |
| 437 cond_var_.Signal(); | 470 return pending_task_count_ == 0 && waiting_thread_count_ == threads_.size(); |
|
jar (doing other things)
2012/03/12 17:39:13
This is another fairly critical Signal() call. Thi
akalin
2012/03/12 18:34:34
This was deleted because of the change to using Br
| |
| 438 } | 471 } |
| 439 | 472 |
| 440 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( | 473 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
| 441 const std::string& name) { | 474 const std::string& name) { |
| 442 lock_.AssertAcquired(); | 475 lock_.AssertAcquired(); |
| 443 DCHECK(!name.empty()); | 476 DCHECK(!name.empty()); |
| 444 | 477 |
| 445 std::map<std::string, int>::const_iterator found = | 478 std::map<std::string, int>::const_iterator found = |
| 446 named_sequence_tokens_.find(name); | 479 named_sequence_tokens_.find(name); |
| 447 if (found != named_sequence_tokens_.end()) | 480 if (found != named_sequence_tokens_.end()) |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 511 // vector they passed to us once the lock is exited to make this | 544 // vector they passed to us once the lock is exited to make this |
| 512 // happen. | 545 // happen. |
| 513 delete_these_outside_lock->push_back(i->task); | 546 delete_these_outside_lock->push_back(i->task); |
| 514 i = pending_tasks_.erase(i); | 547 i = pending_tasks_.erase(i); |
| 515 pending_task_count_--; | 548 pending_task_count_--; |
| 516 } else { | 549 } else { |
| 517 // Found a runnable task. | 550 // Found a runnable task. |
| 518 *task = *i; | 551 *task = *i; |
| 519 i = pending_tasks_.erase(i); | 552 i = pending_tasks_.erase(i); |
| 520 pending_task_count_--; | 553 pending_task_count_--; |
| 521 if (task->shutdown_behavior == BLOCK_SHUTDOWN) | 554 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
| 555 // This can make CanShutdown() go to true. | |
| 522 blocking_shutdown_pending_task_count_--; | 556 blocking_shutdown_pending_task_count_--; |
| 557 if (CanShutdown()) | |
| 558 can_shutdown_cv_.Signal(); | |
|
jar (doing other things)
2012/03/12 17:39:13
Again, I don't like seeing special case code inter
akalin
2012/03/12 18:34:34
Done (see above)
| |
| 559 } | |
| 523 | 560 |
| 524 found_task = true; | 561 found_task = true; |
| 525 break; | 562 break; |
| 526 } | 563 } |
| 527 } | 564 } |
| 528 | 565 |
| 529 // Track the number of tasks we had to skip over to see if we should be | 566 // Track the number of tasks we had to skip over to see if we should be |
| 530 // making this more efficient. If this number ever becomes large or is | 567 // making this more efficient. If this number ever becomes large or is |
| 531 // frequently "some", we should consider the optimization above. | 568 // frequently "some", we should consider the optimization above. |
| 532 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", | 569 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", |
| (...skipping 27 matching lines...) Expand all Loading... | |
| 560 // second thread (since we only create one at a time) will be blocked by | 597 // second thread (since we only create one at a time) will be blocked by |
| 561 // the execution of the first task, which could be arbitrarily long. | 598 // the execution of the first task, which could be arbitrarily long. |
| 562 return PrepareToStartAdditionalThreadIfHelpful(); | 599 return PrepareToStartAdditionalThreadIfHelpful(); |
| 563 } | 600 } |
| 564 | 601 |
| 565 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { | 602 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| 566 lock_.AssertAcquired(); | 603 lock_.AssertAcquired(); |
| 567 | 604 |
| 568 if (task.shutdown_behavior == BLOCK_SHUTDOWN) { | 605 if (task.shutdown_behavior == BLOCK_SHUTDOWN) { |
| 569 DCHECK_GT(blocking_shutdown_thread_count_, 0u); | 606 DCHECK_GT(blocking_shutdown_thread_count_, 0u); |
| 607 // This can make CanShutdown() go to true. | |
| 570 blocking_shutdown_thread_count_--; | 608 blocking_shutdown_thread_count_--; |
| 609 if (CanShutdown()) | |
| 610 can_shutdown_cv_.Signal(); | |
|
jar (doing other things)
2012/03/12 17:39:13
Again, please don't add special case code for this
akalin
2012/03/12 18:34:34
Done (see above)
| |
| 571 } | 611 } |
| 572 | 612 |
| 573 if (task.sequence_token_id) | 613 if (task.sequence_token_id) |
| 574 current_sequences_.erase(task.sequence_token_id); | 614 current_sequences_.erase(task.sequence_token_id); |
| 575 } | 615 } |
| 576 | 616 |
| 577 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( | 617 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| 578 int sequence_token_id) const { | 618 int sequence_token_id) const { |
| 579 lock_.AssertAcquired(); | 619 lock_.AssertAcquired(); |
| 580 return !sequence_token_id || | 620 return !sequence_token_id || |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 633 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( | 673 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
| 634 int thread_number) { | 674 int thread_number) { |
| 635 // Called outside of the lock. | 675 // Called outside of the lock. |
| 636 DCHECK(thread_number > 0); | 676 DCHECK(thread_number > 0); |
| 637 | 677 |
| 638 // The worker is assigned to the list when the thread actually starts, which | 678 // The worker is assigned to the list when the thread actually starts, which |
| 639 // will manage the memory of the pointer. | 679 // will manage the memory of the pointer. |
| 640 new Worker(worker_pool_, thread_number, thread_name_prefix_); | 680 new Worker(worker_pool_, thread_number, thread_name_prefix_); |
| 641 } | 681 } |
| 642 | 682 |
| 683 void SequencedWorkerPool::Inner::SignalHasWork() { | |
| 684 has_work_cv_.Signal(); | |
| 685 ++has_work_signal_count_; | |
|
jar (doing other things)
2012/03/12 17:39:13
You are not generally inside a lock, but are manip
akalin
2012/03/12 18:34:34
My mistake. This testing variable is kind of hack
| |
| 686 } | |
| 687 | |
| 643 bool SequencedWorkerPool::Inner::CanShutdown() const { | 688 bool SequencedWorkerPool::Inner::CanShutdown() const { |
| 644 lock_.AssertAcquired(); | 689 lock_.AssertAcquired(); |
| 645 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. | 690 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
| 646 return !thread_being_created_ && | 691 return !thread_being_created_ && |
| 647 blocking_shutdown_thread_count_ == 0 && | 692 blocking_shutdown_thread_count_ == 0 && |
| 648 blocking_shutdown_pending_task_count_ == 0; | 693 blocking_shutdown_pending_task_count_ == 0; |
| 649 } | 694 } |
| 650 | 695 |
| 651 // SequencedWorkerPool -------------------------------------------------------- | 696 // SequencedWorkerPool -------------------------------------------------------- |
| 652 | 697 |
| 653 SequencedWorkerPool::SequencedWorkerPool( | 698 SequencedWorkerPool::SequencedWorkerPool( |
| 654 size_t max_threads, | 699 size_t max_threads, |
| 655 const std::string& thread_name_prefix) | 700 const std::string& thread_name_prefix) |
| 656 : constructor_message_loop_(MessageLoopProxy::current()), | 701 : constructor_message_loop_(MessageLoopProxy::current()), |
| 657 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this), | 702 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this), |
| 658 max_threads, thread_name_prefix)) { | 703 max_threads, thread_name_prefix)) { |
| 659 DCHECK(constructor_message_loop_.get()); | 704 DCHECK(constructor_message_loop_.get()); |
| 660 } | 705 } |
| 661 | 706 |
| 662 SequencedWorkerPool::~SequencedWorkerPool() {} | 707 SequencedWorkerPool::~SequencedWorkerPool() {} |
| 663 | 708 |
| 664 void SequencedWorkerPool::OnDestruct() const { | 709 void SequencedWorkerPool::OnDestruct() const { |
| 665 // TODO(akalin): Once we can easily check if we're on a worker | 710 // TODO(akalin): Once we can easily check if we're on a worker |
| 666 // thread or not, use that instead of restricting destruction to | 711 // thread or not, use that instead of restricting destruction to |
| 667 // only the constructor message loop. | 712 // only the constructor message loop. |
| 668 if (constructor_message_loop_->BelongsToCurrentThread()) { | 713 if (constructor_message_loop_->BelongsToCurrentThread()) |
| 669 delete this; | 714 delete this; |
| 670 } else { | 715 else |
| 671 constructor_message_loop_->DeleteSoon(FROM_HERE, this); | 716 constructor_message_loop_->DeleteSoon(FROM_HERE, this); |
| 672 } | |
| 673 } | 717 } |
| 674 | 718 |
| 675 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { | 719 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
| 676 return inner_->GetSequenceToken(); | 720 return inner_->GetSequenceToken(); |
| 677 } | 721 } |
| 678 | 722 |
| 679 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( | 723 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
| 680 const std::string& name) { | 724 const std::string& name) { |
| 681 return inner_->GetNamedSequenceToken(name); | 725 return inner_->GetNamedSequenceToken(name); |
| 682 } | 726 } |
| (...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 741 } | 785 } |
| 742 | 786 |
| 743 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 787 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
| 744 return inner_->RunsTasksOnCurrentThread(); | 788 return inner_->RunsTasksOnCurrentThread(); |
| 745 } | 789 } |
| 746 | 790 |
| 747 void SequencedWorkerPool::FlushForTesting() { | 791 void SequencedWorkerPool::FlushForTesting() { |
| 748 inner_->FlushForTesting(); | 792 inner_->FlushForTesting(); |
| 749 } | 793 } |
| 750 | 794 |
| 795 void SequencedWorkerPool::TriggerSpuriousWorkSignalForTesting() { | |
| 796 inner_->TriggerSpuriousWorkSignalForTesting(); | |
| 797 } | |
| 798 | |
| 799 int SequencedWorkerPool::GetWorkSignalCountForTesting() const { | |
| 800 return inner_->GetWorkSignalCountForTesting(); | |
| 801 } | |
| 802 | |
| 751 void SequencedWorkerPool::Shutdown() { | 803 void SequencedWorkerPool::Shutdown() { |
| 752 inner_->Shutdown(); | 804 inner_->Shutdown(); |
| 753 } | 805 } |
| 754 | 806 |
| 755 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { | 807 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { |
| 756 inner_->SetTestingObserver(observer); | 808 inner_->SetTestingObserver(observer); |
| 757 } | 809 } |
| 758 | 810 |
| 759 } // namespace base | 811 } // namespace base |
| OLD | NEW |