| Index: base/threading/sequenced_worker_pool.cc
|
| diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
|
| index 6d47068fc9d44c37bc3454cfc4bdd0ae60805262..35028547ae41f2d6dfbff2f13209ce7d3bc8b414 100644
|
| --- a/base/threading/sequenced_worker_pool.cc
|
| +++ b/base/threading/sequenced_worker_pool.cc
|
| @@ -1,199 +1,46 @@
|
| -// Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
| +// Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
| // Use of this source code is governed by a BSD-style license that can be
|
| // found in the LICENSE file.
|
|
|
| #include "base/threading/sequenced_worker_pool.h"
|
|
|
| -#include <deque>
|
| -#include <set>
|
| -
|
| -#include "base/atomicops.h"
|
| -#include "base/bind.h"
|
| -#include "base/memory/scoped_ptr.h"
|
| +#include "base/compiler_specific.h"
|
| +#include "base/logging.h"
|
| #include "base/metrics/histogram.h"
|
| #include "base/stringprintf.h"
|
| -#include "base/synchronization/condition_variable.h"
|
| #include "base/threading/simple_thread.h"
|
| -#include "base/threading/thread.h"
|
| +#include "base/time.h"
|
|
|
| namespace base {
|
|
|
| -namespace {
|
| -
|
| -struct SequencedTask {
|
| - int sequence_token_id;
|
| - SequencedWorkerPool::WorkerShutdown shutdown_behavior;
|
| - tracked_objects::Location location;
|
| - base::Closure task;
|
| -};
|
| -
|
| -} // namespace
|
| -
|
| // Worker ---------------------------------------------------------------------
|
|
|
| -class SequencedWorkerPool::Worker : public base::SimpleThread {
|
| +class SequencedWorkerPool::Worker : public SimpleThread {
|
| public:
|
| - Worker(SequencedWorkerPool::Inner* inner,
|
| + // Hold a ref to |worker_pool|, since we want to keep it around even
|
| + // if it doesn't join our thread. Note that this (deliberately)
|
| + // leaks on shutdown.
|
| + Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
|
| int thread_number,
|
| const std::string& thread_name_prefix);
|
| - ~Worker();
|
| + virtual ~Worker();
|
|
|
| // SimpleThread implementation. This actually runs the background thread.
|
| - virtual void Run();
|
| + virtual void Run() OVERRIDE;
|
|
|
| private:
|
| - SequencedWorkerPool::Inner* inner_;
|
| - SequencedWorkerPool::WorkerShutdown current_shutdown_mode_;
|
| + const scoped_refptr<SequencedWorkerPool> worker_pool_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(Worker);
|
| };
|
|
|
| -
|
| -// Inner ----------------------------------------------------------------------
|
| -
|
| -class SequencedWorkerPool::Inner
|
| - : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> {
|
| - public:
|
| - Inner(size_t max_threads, const std::string& thread_name_prefix);
|
| - virtual ~Inner();
|
| -
|
| - SequenceToken GetSequenceToken();
|
| -
|
| - SequenceToken GetNamedSequenceToken(const std::string& name);
|
| -
|
| - // This function accepts a name and an ID. If the name is null, the
|
| - // token ID is used. This allows us to implement the optional name lookup
|
| - // from a single function without having to enter the lock a separate time.
|
| - bool PostTask(const std::string* optional_token_name,
|
| - int sequence_token_id,
|
| - SequencedWorkerPool::WorkerShutdown shutdown_behavior,
|
| - const tracked_objects::Location& from_here,
|
| - const base::Closure& task);
|
| -
|
| - void Flush();
|
| -
|
| - void Shutdown();
|
| -
|
| - void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer);
|
| -
|
| - // Runs the worker loop on the background thread.
|
| - void ThreadLoop(Worker* this_worker);
|
| -
|
| - private:
|
| - // Called from within the lock, this converts the given token name into a
|
| - // token ID, creating a new one if necessary.
|
| - int LockedGetNamedTokenID(const std::string& name);
|
| -
|
| - // The calling code should clear the given delete_these_oustide_lock
|
| - // vector the next time the lock is released. See the implementation for
|
| - // a more detailed description.
|
| - bool GetWork(SequencedTask* task,
|
| - std::vector<base::Closure>* delete_these_outside_lock);
|
| -
|
| - // Peforms init and cleanup around running the given task. WillRun...
|
| - // returns the value from PrepareToStartAdditionalThreadIfNecessary.
|
| - // The calling code should call FinishStartingAdditionalThread once the
|
| - // lock is released if the return values is nonzero.
|
| - int WillRunWorkerTask(const SequencedTask& task);
|
| - void DidRunWorkerTask(const SequencedTask& task);
|
| -
|
| - // Returns true if there are no threads currently running the given
|
| - // sequence token.
|
| - bool IsSequenceTokenRunnable(int sequence_token_id) const;
|
| -
|
| - // Checks if all threads are busy and the addition of one more could run an
|
| - // additional task waiting in the queue. This must be called from within
|
| - // the lock.
|
| - //
|
| - // If another thread is helpful, this will mark the thread as being in the
|
| - // process of starting and returns the index of the new thread which will be
|
| - // 0 or more. The caller should then call FinishStartingAdditionalThread to
|
| - // complete initialization once the lock is released.
|
| - //
|
| - // If another thread is not necessary, returne 0;
|
| - //
|
| - // See the implementedion for more.
|
| - int PrepareToStartAdditionalThreadIfHelpful();
|
| -
|
| - // The second part of thread creation after
|
| - // PrepareToStartAdditionalThreadIfHelpful with the thread number it
|
| - // generated. This actually creates the thread and should be called outside
|
| - // the lock to avoid blocking important work starting a thread in the lock.
|
| - void FinishStartingAdditionalThread(int thread_number);
|
| -
|
| - // Checks whether there is work left that's blocking shutdown. Must be
|
| - // called inside the lock.
|
| - bool CanShutdown() const;
|
| -
|
| - // The last sequence number used. Managed by GetSequenceToken, since this
|
| - // only does threadsafe increment operations, you do not need to hold the
|
| - // lock.
|
| - volatile base::subtle::Atomic32 last_sequence_number_;
|
| -
|
| - // This lock protects |everything in this class|. Do not read or modify
|
| - // anything without holding this lock. Do not block while holding this
|
| - // lock.
|
| - base::Lock lock_;
|
| -
|
| - // Condition variable used to wake up worker threads when a task is runnable.
|
| - base::ConditionVariable cond_var_;
|
| -
|
| - // The maximum number of worker threads we'll create.
|
| - size_t max_threads_;
|
| -
|
| - std::string thread_name_prefix_;
|
| -
|
| - // Associates all known sequence token names with their IDs.
|
| - std::map<std::string, int> named_sequence_tokens_;
|
| -
|
| - // Owning pointers to all threads we've created so far. Since we lazily
|
| - // create threads, this may be less than max_threads_ and will be initially
|
| - // empty.
|
| - std::vector<linked_ptr<Worker> > threads_;
|
| -
|
| - // Set to true when we're in the process of creating another thread.
|
| - // See PrepareToStartAdditionalThreadIfHelpful for more.
|
| - bool thread_being_created_;
|
| -
|
| - // Number of threads currently waiting for work.
|
| - size_t waiting_thread_count_;
|
| -
|
| - // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
|
| - // flag set.
|
| - size_t blocking_shutdown_thread_count_;
|
| -
|
| - // In-order list of all pending tasks. These are tasks waiting for a thread
|
| - // to run on or that are blocked on a previous task in their sequence.
|
| - //
|
| - // We maintain the pending_task_count_ separately for metrics because
|
| - // list.size() can be linear time.
|
| - std::list<SequencedTask> pending_tasks_;
|
| - size_t pending_task_count_;
|
| -
|
| - // Number of tasks in the pending_tasks_ list that are marked as blocking
|
| - // shutdown.
|
| - size_t blocking_shutdown_pending_task_count_;
|
| -
|
| - // Lists all sequence tokens currently executing.
|
| - std::set<int> current_sequences_;
|
| -
|
| - // Set when the app is terminating and no further tasks should be allowed,
|
| - // though we may still be running existing tasks.
|
| - bool terminating_;
|
| -
|
| - // Set when Shutdown is called to do some assertions.
|
| - bool shutdown_called_;
|
| -
|
| - SequencedWorkerPool::TestingObserver* testing_observer_;
|
| -};
|
| -
|
| -SequencedWorkerPool::Worker::Worker(SequencedWorkerPool::Inner* inner,
|
| - int thread_number,
|
| - const std::string& prefix)
|
| - : base::SimpleThread(
|
| +SequencedWorkerPool::Worker::Worker(
|
| + const scoped_refptr<SequencedWorkerPool>& worker_pool,
|
| + int thread_number,
|
| + const std::string& prefix)
|
| + : SimpleThread(
|
| prefix + StringPrintf("Worker%d", thread_number).c_str()),
|
| - inner_(inner),
|
| - current_shutdown_mode_(SequencedWorkerPool::CONTINUE_ON_SHUTDOWN) {
|
| + worker_pool_(worker_pool) {
|
| Start();
|
| }
|
|
|
| @@ -206,11 +53,20 @@ void SequencedWorkerPool::Worker::Run() {
|
| // using DelegateSimpleThread and have Inner implement the Delegate to avoid
|
| // having these worker objects at all, but that method lacks the ability to
|
| // send thread-specific information easily to the thread loop.
|
| - inner_->ThreadLoop(this);
|
| + worker_pool_->ThreadLoop(this);
|
| }
|
|
|
| -SequencedWorkerPool::Inner::Inner(size_t max_threads,
|
| - const std::string& thread_name_prefix)
|
| +// SequencedWorkerPool --------------------------------------------------------
|
| +
|
| +SequencedWorkerPool::SequencedTask::SequencedTask()
|
| + : sequence_token_id(0),
|
| + shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
|
| +
|
| +SequencedWorkerPool::SequencedTask::~SequencedTask() {}
|
| +
|
| +SequencedWorkerPool::SequencedWorkerPool(
|
| + size_t max_threads,
|
| + const std::string& thread_name_prefix)
|
| : last_sequence_number_(0),
|
| lock_(),
|
| cond_var_(&lock_),
|
| @@ -221,12 +77,11 @@ SequencedWorkerPool::Inner::Inner(size_t max_threads,
|
| blocking_shutdown_thread_count_(0),
|
| pending_task_count_(0),
|
| blocking_shutdown_pending_task_count_(0),
|
| - terminating_(false),
|
| shutdown_called_(false),
|
| testing_observer_(NULL) {
|
| }
|
|
|
| -SequencedWorkerPool::Inner::~Inner() {
|
| +SequencedWorkerPool::~SequencedWorkerPool() {
|
| // You must call Shutdown() before destroying the pool.
|
| DCHECK(shutdown_called_);
|
|
|
| @@ -237,82 +92,79 @@ SequencedWorkerPool::Inner::~Inner() {
|
| threads_.clear();
|
| }
|
|
|
| -SequencedWorkerPool::SequenceToken
|
| -SequencedWorkerPool::Inner::GetSequenceToken() {
|
| - base::subtle::Atomic32 result =
|
| - base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
|
| +SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
|
| + subtle::Atomic32 result =
|
| + subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
|
| return SequenceToken(static_cast<int>(result));
|
| }
|
|
|
| -SequencedWorkerPool::SequenceToken
|
| -SequencedWorkerPool::Inner::GetNamedSequenceToken(
|
| +SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
|
| const std::string& name) {
|
| - base::AutoLock lock(lock_);
|
| + AutoLock lock(lock_);
|
| return SequenceToken(LockedGetNamedTokenID(name));
|
| }
|
|
|
| -bool SequencedWorkerPool::Inner::PostTask(
|
| - const std::string* optional_token_name,
|
| - int sequence_token_id,
|
| - SequencedWorkerPool::WorkerShutdown shutdown_behavior,
|
| +bool SequencedWorkerPool::PostWorkerTask(
|
| const tracked_objects::Location& from_here,
|
| - const base::Closure& task) {
|
| - SequencedTask sequenced;
|
| - sequenced.sequence_token_id = sequence_token_id;
|
| - sequenced.shutdown_behavior = shutdown_behavior;
|
| - sequenced.location = from_here;
|
| - sequenced.task = task;
|
| -
|
| - int create_thread_id = 0;
|
| - {
|
| - base::AutoLock lock(lock_);
|
| - if (terminating_)
|
| - return false;
|
| -
|
| - // Now that we have the lock, apply the named token rules.
|
| - if (optional_token_name)
|
| - sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
|
| + const Closure& task) {
|
| + return PostTaskHelper(NULL, SequenceToken(), BLOCK_SHUTDOWN,
|
| + from_here, task);
|
| +}
|
|
|
| - pending_tasks_.push_back(sequenced);
|
| - pending_task_count_++;
|
| - if (shutdown_behavior == BLOCK_SHUTDOWN)
|
| - blocking_shutdown_pending_task_count_++;
|
| +bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
|
| + const tracked_objects::Location& from_here,
|
| + const Closure& task,
|
| + WorkerShutdown shutdown_behavior) {
|
| + return PostTaskHelper(NULL, SequenceToken(), shutdown_behavior,
|
| + from_here, task);
|
| +}
|
|
|
| - create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
|
| - }
|
| +bool SequencedWorkerPool::PostSequencedWorkerTask(
|
| + SequenceToken sequence_token,
|
| + const tracked_objects::Location& from_here,
|
| + const Closure& task) {
|
| + return PostTaskHelper(NULL, sequence_token, BLOCK_SHUTDOWN,
|
| + from_here, task);
|
| +}
|
|
|
| - // Actually start the additional thread or signal an existing one now that
|
| - // we're outside the lock.
|
| - if (create_thread_id)
|
| - FinishStartingAdditionalThread(create_thread_id);
|
| - else
|
| - cond_var_.Signal();
|
| +bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
|
| + const std::string& token_name,
|
| + const tracked_objects::Location& from_here,
|
| + const Closure& task) {
|
| + DCHECK(!token_name.empty());
|
| + return PostTaskHelper(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
|
| + from_here, task);
|
| +}
|
|
|
| - return true;
|
| +bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
|
| + SequenceToken sequence_token,
|
| + const tracked_objects::Location& from_here,
|
| + const Closure& task,
|
| + WorkerShutdown shutdown_behavior) {
|
| + return PostTaskHelper(NULL, sequence_token, shutdown_behavior,
|
| + from_here, task);
|
| }
|
|
|
| -void SequencedWorkerPool::Inner::Flush() {
|
| +void SequencedWorkerPool::FlushForTesting() {
|
| {
|
| - base::AutoLock lock(lock_);
|
| + AutoLock lock(lock_);
|
| while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size())
|
| cond_var_.Wait();
|
| }
|
| cond_var_.Signal();
|
| }
|
|
|
| -void SequencedWorkerPool::Inner::Shutdown() {
|
| - if (shutdown_called_)
|
| - return;
|
| - shutdown_called_ = true;
|
| -
|
| +void SequencedWorkerPool::Shutdown() {
|
| // Mark us as terminated and go through and drop all tasks that aren't
|
| // required to run on shutdown. Since no new tasks will get posted once the
|
| // terminated flag is set, this ensures that all remaining tasks are required
|
| // for shutdown whenever the termianted_ flag is set.
|
| {
|
| - base::AutoLock lock(lock_);
|
| - DCHECK(!terminating_);
|
| - terminating_ = true;
|
| + AutoLock lock(lock_);
|
| +
|
| + if (shutdown_called_)
|
| + return;
|
| + shutdown_called_ = true;
|
|
|
| // Tickle the threads. This will wake up a waiting one so it will know that
|
| // it can exit, which in turn will wake up any other waiting ones.
|
| @@ -330,27 +182,66 @@ void SequencedWorkerPool::Inner::Shutdown() {
|
| if (testing_observer_)
|
| testing_observer_->WillWaitForShutdown();
|
|
|
| - base::TimeTicks shutdown_wait_begin = base::TimeTicks::Now();
|
| + TimeTicks shutdown_wait_begin = TimeTicks::Now();
|
|
|
| // Wait for no more tasks.
|
| {
|
| - base::AutoLock lock(lock_);
|
| + AutoLock lock(lock_);
|
| while (!CanShutdown())
|
| cond_var_.Wait();
|
| }
|
| UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
|
| - base::TimeTicks::Now() - shutdown_wait_begin);
|
| + TimeTicks::Now() - shutdown_wait_begin);
|
| }
|
|
|
| -void SequencedWorkerPool::Inner::SetTestingObserver(
|
| - SequencedWorkerPool::TestingObserver* observer) {
|
| - base::AutoLock lock(lock_);
|
| +void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) {
|
| + AutoLock lock(lock_);
|
| testing_observer_ = observer;
|
| }
|
|
|
| -void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
|
| +bool SequencedWorkerPool::PostTaskHelper(
|
| + const std::string* optional_token_name,
|
| + SequenceToken sequence_token,
|
| + WorkerShutdown shutdown_behavior,
|
| + const tracked_objects::Location& from_here,
|
| + const Closure& task) {
|
| + SequencedTask sequenced;
|
| + sequenced.sequence_token_id = sequence_token.id_;
|
| + sequenced.shutdown_behavior = shutdown_behavior;
|
| + sequenced.location = from_here;
|
| + sequenced.task = task;
|
| +
|
| + int create_thread_id = 0;
|
| + {
|
| + AutoLock lock(lock_);
|
| + if (shutdown_called_)
|
| + return false;
|
| +
|
| + // Now that we have the lock, apply the named token rules.
|
| + if (optional_token_name)
|
| + sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
|
| +
|
| + pending_tasks_.push_back(sequenced);
|
| + pending_task_count_++;
|
| + if (shutdown_behavior == BLOCK_SHUTDOWN)
|
| + blocking_shutdown_pending_task_count_++;
|
| +
|
| + create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
|
| + }
|
| +
|
| + // Actually start the additional thread or signal an existing one now that
|
| + // we're outside the lock.
|
| + if (create_thread_id)
|
| + FinishStartingAdditionalThread(create_thread_id);
|
| + else
|
| + cond_var_.Signal();
|
| +
|
| + return true;
|
| +}
|
| +
|
| +void SequencedWorkerPool::ThreadLoop(Worker* this_worker) {
|
| {
|
| - base::AutoLock lock(lock_);
|
| + AutoLock lock(lock_);
|
| DCHECK(thread_being_created_);
|
| thread_being_created_ = false;
|
| threads_.push_back(linked_ptr<Worker>(this_worker));
|
| @@ -358,11 +249,11 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
|
| while (true) {
|
| // See GetWork for what delete_these_outside_lock is doing.
|
| SequencedTask task;
|
| - std::vector<base::Closure> delete_these_outside_lock;
|
| + std::vector<Closure> delete_these_outside_lock;
|
| if (GetWork(&task, &delete_these_outside_lock)) {
|
| int new_thread_id = WillRunWorkerTask(task);
|
| {
|
| - base::AutoUnlock unlock(lock_);
|
| + AutoUnlock unlock(lock_);
|
| cond_var_.Signal();
|
| delete_these_outside_lock.clear();
|
|
|
| @@ -374,15 +265,16 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
|
|
|
| // Make sure our task is erased outside the lock for the same reason
|
| // we do this with delete_these_oustide_lock.
|
| - task.task = base::Closure();
|
| + task.task = Closure();
|
| }
|
| DidRunWorkerTask(task); // Must be done inside the lock.
|
| } else {
|
| - // When we're terminating and there's no more work, we can shut down.
|
| - // You can't get more tasks posted once terminating_ is set. There may
|
| - // be some tasks stuck behind running ones with the same sequence
|
| - // token, but additional threads won't help this case.
|
| - if (terminating_)
|
| + // When we're terminating and there's no more work, we can
|
| + // shut down. You can't get more tasks posted once
|
| + // shutdown_called_ is set. There may be some tasks stuck
|
| + // behind running ones with the same sequence token, but
|
| + // additional threads won't help this case.
|
| + if (shutdown_called_)
|
| break;
|
| waiting_thread_count_++;
|
| cond_var_.Signal(); // For Flush() that may be waiting on the
|
| @@ -398,7 +290,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
|
| cond_var_.Signal();
|
| }
|
|
|
| -int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
|
| +int SequencedWorkerPool::LockedGetNamedTokenID(
|
| const std::string& name) {
|
| lock_.AssertAcquired();
|
| DCHECK(!name.empty());
|
| @@ -414,9 +306,9 @@ int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
|
| return result.id_;
|
| }
|
|
|
| -bool SequencedWorkerPool::Inner::GetWork(
|
| +bool SequencedWorkerPool::GetWork(
|
| SequencedTask* task,
|
| - std::vector<base::Closure>* delete_these_outside_lock) {
|
| + std::vector<Closure>* delete_these_outside_lock) {
|
| lock_.AssertAcquired();
|
|
|
| DCHECK_EQ(pending_tasks_.size(), pending_task_count_);
|
| @@ -455,7 +347,7 @@ bool SequencedWorkerPool::Inner::GetWork(
|
| continue;
|
| }
|
|
|
| - if (terminating_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
|
| + if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
|
| // We're shutting down and the task we just found isn't blocking
|
| // shutdown. Delete it and get more work.
|
| //
|
| @@ -495,14 +387,14 @@ bool SequencedWorkerPool::Inner::GetWork(
|
| return found_task;
|
| }
|
|
|
| -int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
|
| +int SequencedWorkerPool::WillRunWorkerTask(const SequencedTask& task) {
|
| lock_.AssertAcquired();
|
|
|
| // Mark the task's sequence number as in use.
|
| if (task.sequence_token_id)
|
| current_sequences_.insert(task.sequence_token_id);
|
|
|
| - if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN)
|
| + if (task.shutdown_behavior == BLOCK_SHUTDOWN)
|
| blocking_shutdown_thread_count_++;
|
|
|
| // We just picked up a task. Since StartAdditionalThreadIfHelpful only
|
| @@ -523,10 +415,10 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
|
| return PrepareToStartAdditionalThreadIfHelpful();
|
| }
|
|
|
| -void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
|
| +void SequencedWorkerPool::DidRunWorkerTask(const SequencedTask& task) {
|
| lock_.AssertAcquired();
|
|
|
| - if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) {
|
| + if (task.shutdown_behavior == BLOCK_SHUTDOWN) {
|
| DCHECK_GT(blocking_shutdown_thread_count_, 0u);
|
| blocking_shutdown_thread_count_--;
|
| }
|
| @@ -535,7 +427,7 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
|
| current_sequences_.erase(task.sequence_token_id);
|
| }
|
|
|
| -bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
|
| +bool SequencedWorkerPool::IsSequenceTokenRunnable(
|
| int sequence_token_id) const {
|
| lock_.AssertAcquired();
|
| return !sequence_token_id ||
|
| @@ -543,7 +435,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
|
| current_sequences_.end();
|
| }
|
|
|
| -int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
|
| +int SequencedWorkerPool::PrepareToStartAdditionalThreadIfHelpful() {
|
| + lock_.AssertAcquired();
|
| // How thread creation works:
|
| //
|
| // We'de like to avoid creating threads with the lock held. However, we
|
| @@ -567,13 +460,13 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
|
| // 2. The first task post causes us to start a worker. Other tasks do not
|
| // cause a worker to start since one is pending.
|
| // 3. Main thread initiates shutdown.
|
| - // 4. No more threads are created since the terminating_ flag is set.
|
| + // 4. No more threads are created since the shutdown_called_ flag is set.
|
| //
|
| // The result is that one may expect that max_threads_ workers to be created
|
| // given the workload, but in reality fewer may be created because the
|
| // sequence of thread creation on the background threads is racing with the
|
| // shutdown call.
|
| - if (!terminating_ &&
|
| + if (!shutdown_called_ &&
|
| !thread_being_created_ &&
|
| threads_.size() < max_threads_ &&
|
| waiting_thread_count_ == 0) {
|
| @@ -590,7 +483,7 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
|
| return 0;
|
| }
|
|
|
| -void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
|
| +void SequencedWorkerPool::FinishStartingAdditionalThread(
|
| int thread_number) {
|
| // Called outside of the lock.
|
| DCHECK(thread_number > 0);
|
| @@ -600,7 +493,7 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
|
| new Worker(this, thread_number, thread_name_prefix_);
|
| }
|
|
|
| -bool SequencedWorkerPool::Inner::CanShutdown() const {
|
| +bool SequencedWorkerPool::CanShutdown() const {
|
| lock_.AssertAcquired();
|
| // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
|
| return !thread_being_created_ &&
|
| @@ -608,73 +501,4 @@ bool SequencedWorkerPool::Inner::CanShutdown() const {
|
| blocking_shutdown_pending_task_count_ == 0;
|
| }
|
|
|
| -// SequencedWorkerPool --------------------------------------------------------
|
| -
|
| -SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
|
| - const std::string& thread_name_prefix)
|
| - : inner_(new Inner(max_threads, thread_name_prefix)) {
|
| -}
|
| -
|
| -SequencedWorkerPool::~SequencedWorkerPool() {
|
| -}
|
| -
|
| -SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
|
| - return inner_->GetSequenceToken();
|
| -}
|
| -
|
| -SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
|
| - const std::string& name) {
|
| - return inner_->GetNamedSequenceToken(name);
|
| -}
|
| -
|
| -bool SequencedWorkerPool::PostWorkerTask(
|
| - const tracked_objects::Location& from_here,
|
| - const base::Closure& task) {
|
| - return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task);
|
| -}
|
| -
|
| -bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
|
| - const tracked_objects::Location& from_here,
|
| - const base::Closure& task,
|
| - WorkerShutdown shutdown_behavior) {
|
| - return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task);
|
| -}
|
| -
|
| -bool SequencedWorkerPool::PostSequencedWorkerTask(
|
| - SequenceToken sequence_token,
|
| - const tracked_objects::Location& from_here,
|
| - const base::Closure& task) {
|
| - return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN,
|
| - from_here, task);
|
| -}
|
| -
|
| -bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
|
| - const std::string& token_name,
|
| - const tracked_objects::Location& from_here,
|
| - const base::Closure& task) {
|
| - DCHECK(!token_name.empty());
|
| - return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task);
|
| -}
|
| -
|
| -bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
|
| - SequenceToken sequence_token,
|
| - const tracked_objects::Location& from_here,
|
| - const base::Closure& task,
|
| - WorkerShutdown shutdown_behavior) {
|
| - return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior,
|
| - from_here, task);
|
| -}
|
| -
|
| -void SequencedWorkerPool::FlushForTesting() {
|
| - inner_->Flush();
|
| -}
|
| -
|
| -void SequencedWorkerPool::Shutdown() {
|
| - inner_->Shutdown();
|
| -}
|
| -
|
| -void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) {
|
| - inner_->SetTestingObserver(observer);
|
| -}
|
| -
|
| } // namespace base
|
|
|