Index: base/threading/sequenced_worker_pool.cc |
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
index 05490a95e268e11fb36122700411afa777df7d2b..3cc50f404c214ed75f1f540cf598f89d4d83e530 100644 |
--- a/base/threading/sequenced_worker_pool.cc |
+++ b/base/threading/sequenced_worker_pool.cc |
@@ -299,7 +299,7 @@ class SequencedWorkerPool::Inner { |
~Inner(); |
- SequenceToken GetSequenceToken(); |
+ static SequenceToken GetSequenceToken(); |
SequenceToken GetNamedSequenceToken(const std::string& name); |
@@ -317,6 +317,11 @@ class SequencedWorkerPool::Inner { |
bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
+ bool IsRunningSequence(SequenceToken sequence_token) const; |
+ |
+ void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token, |
+ WorkerShutdown shutdown_behavior); |
+ |
void CleanupForTesting(); |
void SignalHasWorkForTesting(); |
@@ -586,6 +591,7 @@ SequencedWorkerPool::Inner::~Inner() { |
testing_observer_->OnDestruct(); |
} |
+// static |
SequencedWorkerPool::SequenceToken |
SequencedWorkerPool::Inner::GetSequenceToken() { |
// Need to add one because StaticAtomicSequenceNumber starts at zero, which |
@@ -686,6 +692,28 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
sequence_token.Equals(found->second->task_sequence_token()); |
} |
+bool SequencedWorkerPool::Inner::IsRunningSequence( |
+ SequenceToken sequence_token) const { |
+ DCHECK(sequence_token.IsValid()); |
+ AutoLock lock(lock_); |
+ return !IsSequenceTokenRunnable(sequence_token.id_); |
+} |
+ |
+void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread( |
+ SequenceToken sequence_token, |
+ WorkerShutdown shutdown_behavior) { |
+ AutoLock lock(lock_); |
+ ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
+ DCHECK(found != threads_.end()); |
+ DCHECK(found->second->is_processing_task()); |
+ DCHECK(!found->second->task_sequence_token().IsValid()); |
+ found->second->set_running_task_info(sequence_token, shutdown_behavior); |
+ |
+ // Mark the sequence token as in use. |
+ bool success = current_sequences_.insert(sequence_token.id_).second; |
+ DCHECK(success); |
+} |
+ |
// See https://code.google.com/p/chromium/issues/detail?id=168415 |
void SequencedWorkerPool::Inner::CleanupForTesting() { |
DCHECK(!RunsTasksOnCurrentThread()); |
@@ -810,6 +838,11 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( |
task, stopwatch); |
+ // Update the sequence token in case it has been set from within the |
+ // task, so it can be removed from the set of currently running |
+ // sequences in DidRunWorkerTask() below. |
+ task.sequence_token_id = this_worker->task_sequence_token().id_; |
+ |
// Make sure our task is erased outside the lock for the |
// same reason we do this with delete_these_oustide_lock. |
// Also, do it before calling reset_running_task_info() so |
@@ -1193,6 +1226,33 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
return worker->worker_pool(); |
} |
+// static |
+scoped_refptr<SequencedTaskRunner> |
+SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() { |
+ Worker* worker = Worker::GetForCurrentThread(); |
+ |
+ // If there is no worker, this thread is not a worker thread. Otherwise, it is |
+ // currently running a task (sequenced or unsequenced). |
+ if (!worker) |
+ return nullptr; |
+ |
+ scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool(); |
+ SequenceToken sequence_token = worker->task_sequence_token(); |
+ WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior(); |
+ if (!sequence_token.IsValid()) { |
+ // Create a new sequence token and bind this thread to it, to make sure that |
+ // a task posted to the SequencedTaskRunner we are going to return is not |
+ // immediately going to run on a different thread. |
+ sequence_token = Inner::GetSequenceToken(); |
+ pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token, |
+ shutdown_behavior); |
+ } |
+ |
+ DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token)); |
+ return new SequencedWorkerPoolSequencedTaskRunner( |
+ std::move(pool), sequence_token, shutdown_behavior); |
+} |
+ |
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
const std::string& thread_name_prefix) |
: constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
@@ -1209,8 +1269,7 @@ SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
SequencedWorkerPool::~SequencedWorkerPool() {} |
void SequencedWorkerPool::OnDestruct() const { |
- // Avoid deleting ourselves on a worker thread (which would |
- // deadlock). |
+ // Avoid deleting ourselves on a worker thread (which would deadlock). |
if (RunsTasksOnCurrentThread()) { |
constructor_task_runner_->DeleteSoon(FROM_HERE, this); |
} else { |
@@ -1218,8 +1277,9 @@ void SequencedWorkerPool::OnDestruct() const { |
} |
} |
+// static |
SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
- return inner_->GetSequenceToken(); |
+ return Inner::GetSequenceToken(); |
} |
SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
@@ -1323,6 +1383,11 @@ bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
} |
+bool SequencedWorkerPool::IsRunningSequence( |
+ SequenceToken sequence_token) const { |
+ return inner_->IsRunningSequence(sequence_token); |
+} |
+ |
void SequencedWorkerPool::FlushForTesting() { |
inner_->CleanupForTesting(); |
} |