Chromium Code Reviews| Index: base/threading/sequenced_worker_pool.cc |
| diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
| index 62431aec4a7d6450a27014232b46ee3236704c48..f43eb3c758a8d9e870556e2ed602f83668bd9f73 100644 |
| --- a/base/threading/sequenced_worker_pool.cc |
| +++ b/base/threading/sequenced_worker_pool.cc |
| @@ -93,6 +93,10 @@ class SequencedWorkerPool::Inner { |
| void FlushForTesting(); |
| + void TriggerSpuriousWorkSignalForTesting(); |
| + |
| + int GetWorkSignalCountForTesting() const; |
| + |
| void Shutdown(); |
| void SetTestingObserver(TestingObserver* observer); |
| @@ -101,6 +105,10 @@ class SequencedWorkerPool::Inner { |
| void ThreadLoop(Worker* this_worker); |
| private: |
| + // Returns whether there are no more pending tasks and all threads |
| + // are idle. Must be called under lock. |
| + bool IsIdle() const; |
| + |
| // Called from within the lock, this converts the given token name into a |
| // token ID, creating a new one if necessary. |
| int LockedGetNamedTokenID(const std::string& name); |
| @@ -142,6 +150,9 @@ class SequencedWorkerPool::Inner { |
| // the lock to avoid blocking important work starting a thread in the lock. |
| void FinishStartingAdditionalThread(int thread_number); |
| + // Signal |has_work_| and increment |has_work_signal_count_|. |
| + void SignalHasWork(); |
| + |
| // Checks whether there is work left that's blocking shutdown. Must be |
| // called inside the lock. |
| bool CanShutdown() const; |
| @@ -158,8 +169,20 @@ class SequencedWorkerPool::Inner { |
| // lock. |
| mutable Lock lock_; |
| - // Condition variable used to wake up worker threads when a task is runnable. |
| - ConditionVariable cond_var_; |
| + // Condition variable that is signalled whenever a new task is |
| + // posted or when Shutdown() is called. |
| + ConditionVariable has_work_cv_; |
| + |
| + // Number of times |has_work_| has been signalled. Used for testing. |
| + int has_work_signal_count_; |
| + |
| + // Condition variable that is signalled whenever IsIdle() goes to |
| + // true. |
| + ConditionVariable is_idle_cv_; |
| + |
| + // Condition variable that is signalled whwnever CanShutdown() goes |
| + // to true. |
| + ConditionVariable can_shutdown_cv_; |
| // The maximum number of worker threads we'll create. |
| const size_t max_threads_; |
| @@ -245,7 +268,10 @@ SequencedWorkerPool::Inner::Inner( |
| : worker_pool_(worker_pool), |
| last_sequence_number_(0), |
| lock_(), |
| - cond_var_(&lock_), |
| + has_work_cv_(&lock_), |
| + has_work_signal_count_(0), |
| + is_idle_cv_(&lock_), |
| + can_shutdown_cv_(&lock_), |
| max_threads_(max_threads), |
| thread_name_prefix_(thread_name_prefix), |
| thread_being_created_(false), |
| @@ -318,7 +344,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
| if (create_thread_id) |
| FinishStartingAdditionalThread(create_thread_id); |
| else |
| - cond_var_.Signal(); |
| + SignalHasWork(); |
| return true; |
| } |
| @@ -329,12 +355,18 @@ bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| } |
| void SequencedWorkerPool::Inner::FlushForTesting() { |
| - { |
| - AutoLock lock(lock_); |
| - while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) |
| - cond_var_.Wait(); |
| - } |
| - cond_var_.Signal(); |
| + AutoLock lock(lock_); |
| + while (!IsIdle()) |
| + is_idle_cv_.Wait(); |
| +} |
| + |
| +void SequencedWorkerPool::Inner::TriggerSpuriousWorkSignalForTesting() { |
| + SignalHasWork(); |
| +} |
| + |
| +int SequencedWorkerPool::Inner::GetWorkSignalCountForTesting() const { |
| + AutoLock lock(lock_); |
| + return has_work_signal_count_; |
| } |
| void SequencedWorkerPool::Inner::Shutdown() { |
|
jar (doing other things)
2012/03/12 17:39:13
IMO, it would be helpful if you commented, or asse
akalin
2012/03/12 18:34:34
Done. (Added comment, and assert is in SWP::Shutdo
|
| @@ -349,9 +381,8 @@ void SequencedWorkerPool::Inner::Shutdown() { |
| return; |
| shutdown_called_ = true; |
| - // Tickle the threads. This will wake up a waiting one so it will know that |
| - // it can exit, which in turn will wake up any other waiting ones. |
| - cond_var_.Signal(); |
| + // Wake up all waiting threads. |
| + has_work_cv_.Broadcast(); |
|
jar (doing other things)
2012/03/12 17:39:13
I liked the original code better, relying on signa
akalin
2012/03/12 18:34:34
Okay, moved back to signal chain.
|
| // There are no pending or running tasks blocking shutdown, we're done. |
| if (CanShutdown()) |
| @@ -371,7 +402,7 @@ void SequencedWorkerPool::Inner::Shutdown() { |
| { |
| AutoLock lock(lock_); |
| while (!CanShutdown()) |
| - cond_var_.Wait(); |
| + can_shutdown_cv_.Wait(); |
| } |
| UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", |
| TimeTicks::Now() - shutdown_wait_begin); |
| @@ -384,57 +415,59 @@ void SequencedWorkerPool::Inner::SetTestingObserver( |
| } |
| void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| - { |
|
jar (doing other things)
2012/03/12 17:39:13
nit: You are dead right that this curly is not nee
akalin
2012/03/12 18:34:34
Done. (It ended up being needed anyway when addres
|
| - AutoLock lock(lock_); |
| - DCHECK(thread_being_created_); |
| - thread_being_created_ = false; |
| - std::pair<ThreadMap::iterator, bool> result = |
| - threads_.insert( |
| - std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); |
| - DCHECK(result.second); |
| - |
| - while (true) { |
| - // See GetWork for what delete_these_outside_lock is doing. |
| - SequencedTask task; |
| - std::vector<Closure> delete_these_outside_lock; |
| - if (GetWork(&task, &delete_these_outside_lock)) { |
| - int new_thread_id = WillRunWorkerTask(task); |
| - { |
| - AutoUnlock unlock(lock_); |
| - cond_var_.Signal(); |
|
jar (doing other things)
2012/03/12 17:39:13
This signal is basically asking that anyone else t
akalin
2012/03/12 18:34:34
Hmm. If every task posted causes a signal, then w
jar (doing other things)
2012/03/12 19:42:36
I *think* you are correct. This is a slightly (IM
akalin
2012/03/12 20:42:15
I see. Okay, I just added a comment as to why it'
|
| - delete_these_outside_lock.clear(); |
| - |
| - // Complete thread creation outside the lock if necessary. |
| - if (new_thread_id) |
| - FinishStartingAdditionalThread(new_thread_id); |
| - |
| - task.task.Run(); |
| - |
| - // Make sure our task is erased outside the lock for the same reason |
| - // we do this with delete_these_oustide_lock. |
| - task.task = Closure(); |
| - } |
| - DidRunWorkerTask(task); // Must be done inside the lock. |
| - } else { |
| - // When we're terminating and there's no more work, we can |
| - // shut down. You can't get more tasks posted once |
| - // shutdown_called_ is set. There may be some tasks stuck |
| - // behind running ones with the same sequence token, but |
| - // additional threads won't help this case. |
| - if (shutdown_called_) |
| - break; |
| - waiting_thread_count_++; |
| - cond_var_.Signal(); // For Flush() that may be waiting on the |
| - // waiting thread count to go up. |
| - cond_var_.Wait(); |
| - waiting_thread_count_--; |
| + AutoLock lock(lock_); |
| + DCHECK(thread_being_created_); |
| + // This can make CanShutdown() go to true. |
| + thread_being_created_ = false; |
| + if (CanShutdown()) |
|
jar (doing other things)
2012/03/12 17:39:13
I don't think anyone cares about CanShutdown() unl
akalin
2012/03/12 18:34:34
Done. Moved can_shutdown_cv_ signal to end of blo
|
| + can_shutdown_cv_.Signal(); |
| + std::pair<ThreadMap::iterator, bool> result = |
| + threads_.insert( |
| + std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); |
| + DCHECK(result.second); |
| + |
| + while (true) { |
| + // See GetWork for what delete_these_outside_lock is doing. |
| + SequencedTask task; |
| + std::vector<Closure> delete_these_outside_lock; |
| + if (GetWork(&task, &delete_these_outside_lock)) { |
| + int new_thread_id = WillRunWorkerTask(task); |
| + { |
| + AutoUnlock unlock(lock_); |
| + delete_these_outside_lock.clear(); |
| + |
| + // Complete thread creation outside the lock if necessary. |
| + if (new_thread_id) |
| + FinishStartingAdditionalThread(new_thread_id); |
| + |
| + task.task.Run(); |
| + |
| + // Make sure our task is erased outside the lock for the same reason |
| + // we do this with delete_these_oustide_lock. |
| + task.task = Closure(); |
| } |
| + DidRunWorkerTask(task); // Must be done inside the lock. |
| + } else { |
| + // When we're terminating and there's no more work, we can |
| + // shut down. You can't get more tasks posted once |
| + // shutdown_called_ is set. There may be some tasks stuck |
| + // behind running ones with the same sequence token, but |
| + // additional threads won't help this case. |
| + if (shutdown_called_) |
| + break; |
| + waiting_thread_count_++; |
| + // This is the only time that IsIdle() can go to true. |
| + if (IsIdle()) |
| + is_idle_cv_.Signal(); |
| + has_work_cv_.Wait(); |
| + waiting_thread_count_--; |
| } |
| } |
| +} |
| - // We noticed we should exit. Wake up the next worker so it knows it should |
| - // exit as well (because the Shutdown() code only signals once). |
| - cond_var_.Signal(); |
|
jar (doing other things)
2012/03/12 17:39:13
This is another fairly critical Signal() call. Thi
akalin
2012/03/12 18:34:34
This was deleted because of the change to using Br
|
| +bool SequencedWorkerPool::Inner::IsIdle() const { |
| + lock_.AssertAcquired(); |
| + return pending_task_count_ == 0 && waiting_thread_count_ == threads_.size(); |
| } |
| int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
| @@ -518,8 +551,12 @@ bool SequencedWorkerPool::Inner::GetWork( |
| *task = *i; |
| i = pending_tasks_.erase(i); |
| pending_task_count_--; |
| - if (task->shutdown_behavior == BLOCK_SHUTDOWN) |
| + if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
| + // This can make CanShutdown() go to true. |
| blocking_shutdown_pending_task_count_--; |
| + if (CanShutdown()) |
| + can_shutdown_cv_.Signal(); |
|
jar (doing other things)
2012/03/12 17:39:13
Again, I don't like seeing special case code inter
akalin
2012/03/12 18:34:34
Done (see above)
|
| + } |
| found_task = true; |
| break; |
| @@ -567,7 +604,10 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| if (task.shutdown_behavior == BLOCK_SHUTDOWN) { |
| DCHECK_GT(blocking_shutdown_thread_count_, 0u); |
| + // This can make CanShutdown() go to true. |
| blocking_shutdown_thread_count_--; |
| + if (CanShutdown()) |
| + can_shutdown_cv_.Signal(); |
|
jar (doing other things)
2012/03/12 17:39:13
Again, please don't add special case code for this
akalin
2012/03/12 18:34:34
Done (see above)
|
| } |
| if (task.sequence_token_id) |
| @@ -640,6 +680,11 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
| new Worker(worker_pool_, thread_number, thread_name_prefix_); |
| } |
| +void SequencedWorkerPool::Inner::SignalHasWork() { |
| + has_work_cv_.Signal(); |
| + ++has_work_signal_count_; |
|
jar (doing other things)
2012/03/12 17:39:13
You are not generally inside a lock, but are manip
akalin
2012/03/12 18:34:34
My mistake. This testing variable is kind of hack
|
| +} |
| + |
| bool SequencedWorkerPool::Inner::CanShutdown() const { |
| lock_.AssertAcquired(); |
| // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
| @@ -665,11 +710,10 @@ void SequencedWorkerPool::OnDestruct() const { |
| // TODO(akalin): Once we can easily check if we're on a worker |
| // thread or not, use that instead of restricting destruction to |
| // only the constructor message loop. |
| - if (constructor_message_loop_->BelongsToCurrentThread()) { |
| + if (constructor_message_loop_->BelongsToCurrentThread()) |
| delete this; |
| - } else { |
| + else |
| constructor_message_loop_->DeleteSoon(FROM_HERE, this); |
| - } |
| } |
| SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
| @@ -748,6 +792,14 @@ void SequencedWorkerPool::FlushForTesting() { |
| inner_->FlushForTesting(); |
| } |
| +void SequencedWorkerPool::TriggerSpuriousWorkSignalForTesting() { |
| + inner_->TriggerSpuriousWorkSignalForTesting(); |
| +} |
| + |
| +int SequencedWorkerPool::GetWorkSignalCountForTesting() const { |
| + return inner_->GetWorkSignalCountForTesting(); |
| +} |
| + |
| void SequencedWorkerPool::Shutdown() { |
| inner_->Shutdown(); |
| } |