Chromium Code Reviews| Index: base/threading/sequenced_worker_pool.cc |
| diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
| index 62431aec4a7d6450a27014232b46ee3236704c48..13f9c925d0a6fdbbc3019b4abbf01ded28cd1ddc 100644 |
| --- a/base/threading/sequenced_worker_pool.cc |
| +++ b/base/threading/sequenced_worker_pool.cc |
| @@ -93,6 +93,10 @@ class SequencedWorkerPool::Inner { |
| void FlushForTesting(); |
| + void TriggerSpuriousWorkSignalForTesting(); |
| + |
| + int GetWorkSignalCountForTesting() const; |
| + |
| void Shutdown(); |
| void SetTestingObserver(TestingObserver* observer); |
| @@ -101,6 +105,10 @@ class SequencedWorkerPool::Inner { |
| void ThreadLoop(Worker* this_worker); |
| private: |
| + // Returns whether there are no more pending tasks and all threads |
| + // are idle. Must be called under lock. |
| + bool IsIdle() const; |
| + |
| // Called from within the lock, this converts the given token name into a |
| // token ID, creating a new one if necessary. |
| int LockedGetNamedTokenID(const std::string& name); |
| @@ -142,6 +150,9 @@ class SequencedWorkerPool::Inner { |
| // the lock to avoid blocking important work starting a thread in the lock. |
| void FinishStartingAdditionalThread(int thread_number); |
| + // Signal |has_work_| and increment |has_work_signal_count_|. |
| + void SignalHasWork(); |
| + |
| // Checks whether there is work left that's blocking shutdown. Must be |
| // called inside the lock. |
| bool CanShutdown() const; |
| @@ -158,8 +169,20 @@ class SequencedWorkerPool::Inner { |
| // lock. |
| mutable Lock lock_; |
| - // Condition variable used to wake up worker threads when a task is runnable. |
| - ConditionVariable cond_var_; |
| + // Condition variable that is signalled whenever a new task is |
| + // posted or when Shutdown() is called. |
| + ConditionVariable has_work_cv_; |
| + |
| + // Number of times |has_work_| has been signalled. Used for testing. |
| + int has_work_signal_count_; |
| + |
| + // Condition variable that is signalled whenever IsIdle() goes to |
| + // true. |
| + ConditionVariable is_idle_cv_; |
| + |
| + // Condition variable that is signalled whwnever CanShutdown() goes |
| + // to true. |
| + ConditionVariable can_shutdown_cv_; |
| // The maximum number of worker threads we'll create. |
| const size_t max_threads_; |
| @@ -245,7 +268,10 @@ SequencedWorkerPool::Inner::Inner( |
| : worker_pool_(worker_pool), |
| last_sequence_number_(0), |
| lock_(), |
| - cond_var_(&lock_), |
| + has_work_cv_(&lock_), |
| + has_work_signal_count_(0), |
| + is_idle_cv_(&lock_), |
| + can_shutdown_cv_(&lock_), |
| max_threads_(max_threads), |
| thread_name_prefix_(thread_name_prefix), |
| thread_being_created_(false), |
| @@ -318,7 +344,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
| if (create_thread_id) |
| FinishStartingAdditionalThread(create_thread_id); |
| else |
| - cond_var_.Signal(); |
| + SignalHasWork(); |
| return true; |
| } |
| @@ -329,12 +355,18 @@ bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| } |
| void SequencedWorkerPool::Inner::FlushForTesting() { |
| - { |
| - AutoLock lock(lock_); |
| - while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) |
| - cond_var_.Wait(); |
| - } |
| - cond_var_.Signal(); |
| + AutoLock lock(lock_); |
| + while (!IsIdle()) |
| + is_idle_cv_.Wait(); |
| +} |
| + |
| +void SequencedWorkerPool::Inner::TriggerSpuriousWorkSignalForTesting() { |
| + SignalHasWork(); |
| +} |
| + |
| +int SequencedWorkerPool::Inner::GetWorkSignalCountForTesting() const { |
|
jar (doing other things)
2012/03/12 19:42:36
nit: It bugs me a little that we've added this mus
akalin
2012/03/12 20:42:16
I think I can remove this variable and instead add
|
| + AutoLock lock(lock_); |
| + return has_work_signal_count_; |
| } |
| void SequencedWorkerPool::Inner::Shutdown() { |
| @@ -351,27 +383,25 @@ void SequencedWorkerPool::Inner::Shutdown() { |
| // Tickle the threads. This will wake up a waiting one so it will know that |
| // it can exit, which in turn will wake up any other waiting ones. |
| - cond_var_.Signal(); |
| + has_work_cv_.Signal(); |
| // There are no pending or running tasks blocking shutdown, we're done. |
| if (CanShutdown()) |
| return; |
| } |
| - // If we get here, we know we're either waiting on a blocking task that's |
| - // currently running, waiting on a blocking task that hasn't been scheduled |
| - // yet, or both. Block on the "queue empty" event to know when all tasks are |
| - // complete. This must be done outside the lock. |
| + // If we're here, then something is blocking shutdown. So wait for |
| + // CanShutdown() to go to true. |
| + |
| if (testing_observer_) |
| testing_observer_->WillWaitForShutdown(); |
| TimeTicks shutdown_wait_begin = TimeTicks::Now(); |
| - // Wait for no more tasks. |
| { |
| AutoLock lock(lock_); |
| while (!CanShutdown()) |
| - cond_var_.Wait(); |
| + can_shutdown_cv_.Wait(); |
| } |
| UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", |
| TimeTicks::Now() - shutdown_wait_begin); |
| @@ -401,7 +431,9 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| int new_thread_id = WillRunWorkerTask(task); |
| { |
| AutoUnlock unlock(lock_); |
| - cond_var_.Signal(); |
| + // There may be more work available, so wake up another |
| + // worker thread. |
| + has_work_cv_.Signal(); |
| delete_these_outside_lock.clear(); |
| // Complete thread creation outside the lock if necessary. |
| @@ -424,17 +456,27 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| if (shutdown_called_) |
| break; |
| waiting_thread_count_++; |
| - cond_var_.Signal(); // For Flush() that may be waiting on the |
| - // waiting thread count to go up. |
| - cond_var_.Wait(); |
| + // This is the only time that IsIdle() can go to true. |
| + if (IsIdle()) |
| + is_idle_cv_.Signal(); |
| + has_work_cv_.Wait(); |
| waiting_thread_count_--; |
| } |
| } |
| + |
| + // Unblock shutdown if possible. |
| + if (CanShutdown()) |
| + can_shutdown_cv_.Signal(); |
|
jar (doing other things)
2012/03/12 19:42:36
Placing this here is probably more optimal than I
akalin
2012/03/12 20:42:16
Yeah, I think that's simpler. I moved it out.
|
| } |
|
jar (doing other things)
2012/03/12 19:42:36
nit: Add comment: // Release lock_.
akalin
2012/03/12 20:42:16
Done.
|
| // We noticed we should exit. Wake up the next worker so it knows it should |
| // exit as well (because the Shutdown() code only signals once). |
| - cond_var_.Signal(); |
| + has_work_cv_.Signal(); |
| +} |
| + |
| +bool SequencedWorkerPool::Inner::IsIdle() const { |
| + lock_.AssertAcquired(); |
| + return pending_task_count_ == 0 && waiting_thread_count_ == threads_.size(); |
| } |
| int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
| @@ -518,8 +560,9 @@ bool SequencedWorkerPool::Inner::GetWork( |
| *task = *i; |
| i = pending_tasks_.erase(i); |
| pending_task_count_--; |
| - if (task->shutdown_behavior == BLOCK_SHUTDOWN) |
| + if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
| blocking_shutdown_pending_task_count_--; |
| + } |
| found_task = true; |
| break; |
| @@ -640,6 +683,14 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
| new Worker(worker_pool_, thread_number, thread_name_prefix_); |
| } |
| +void SequencedWorkerPool::Inner::SignalHasWork() { |
| + has_work_cv_.Signal(); |
| + { |
| + AutoLock lock(lock_); |
| + ++has_work_signal_count_; |
| + } |
| +} |
| + |
| bool SequencedWorkerPool::Inner::CanShutdown() const { |
| lock_.AssertAcquired(); |
| // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
| @@ -665,11 +716,10 @@ void SequencedWorkerPool::OnDestruct() const { |
| // TODO(akalin): Once we can easily check if we're on a worker |
| // thread or not, use that instead of restricting destruction to |
| // only the constructor message loop. |
| - if (constructor_message_loop_->BelongsToCurrentThread()) { |
| + if (constructor_message_loop_->BelongsToCurrentThread()) |
| delete this; |
| - } else { |
| + else |
| constructor_message_loop_->DeleteSoon(FROM_HERE, this); |
| - } |
| } |
| SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
| @@ -748,7 +798,16 @@ void SequencedWorkerPool::FlushForTesting() { |
| inner_->FlushForTesting(); |
| } |
| +void SequencedWorkerPool::TriggerSpuriousWorkSignalForTesting() { |
| + inner_->TriggerSpuriousWorkSignalForTesting(); |
| +} |
| + |
| +int SequencedWorkerPool::GetWorkSignalCountForTesting() const { |
| + return inner_->GetWorkSignalCountForTesting(); |
| +} |
| + |
| void SequencedWorkerPool::Shutdown() { |
| + DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
| inner_->Shutdown(); |
| } |