Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(933)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 1414793009: Allow SequencedTaskRunnerHandle::Get() while running unsequenced tasks. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: sync Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
(...skipping 281 matching lines...) Expand 10 before | Expand all | Expand 10 after
292 class SequencedWorkerPool::Inner { 292 class SequencedWorkerPool::Inner {
293 public: 293 public:
294 // Take a raw pointer to |worker| to avoid cycles (since we're owned 294 // Take a raw pointer to |worker| to avoid cycles (since we're owned
295 // by it). 295 // by it).
296 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 296 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
297 const std::string& thread_name_prefix, 297 const std::string& thread_name_prefix,
298 TestingObserver* observer); 298 TestingObserver* observer);
299 299
300 ~Inner(); 300 ~Inner();
301 301
302 SequenceToken GetSequenceToken(); 302 static SequenceToken GetSequenceToken();
303 303
304 SequenceToken GetNamedSequenceToken(const std::string& name); 304 SequenceToken GetNamedSequenceToken(const std::string& name);
305 305
306 // This function accepts a name and an ID. If the name is null, the 306 // This function accepts a name and an ID. If the name is null, the
307 // token ID is used. This allows us to implement the optional name lookup 307 // token ID is used. This allows us to implement the optional name lookup
308 // from a single function without having to enter the lock a separate time. 308 // from a single function without having to enter the lock a separate time.
309 bool PostTask(const std::string* optional_token_name, 309 bool PostTask(const std::string* optional_token_name,
310 SequenceToken sequence_token, 310 SequenceToken sequence_token,
311 WorkerShutdown shutdown_behavior, 311 WorkerShutdown shutdown_behavior,
312 const tracked_objects::Location& from_here, 312 const tracked_objects::Location& from_here,
313 const Closure& task, 313 const Closure& task,
314 TimeDelta delay); 314 TimeDelta delay);
315 315
316 bool RunsTasksOnCurrentThread() const; 316 bool RunsTasksOnCurrentThread() const;
317 317
318 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 318 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
319 319
320 bool IsRunningSequence(SequenceToken sequence_token) const;
321
322 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,
323 WorkerShutdown shutdown_behavior);
324
320 void CleanupForTesting(); 325 void CleanupForTesting();
321 326
322 void SignalHasWorkForTesting(); 327 void SignalHasWorkForTesting();
323 328
324 int GetWorkSignalCountForTesting() const; 329 int GetWorkSignalCountForTesting() const;
325 330
326 void Shutdown(int max_blocking_tasks_after_shutdown); 331 void Shutdown(int max_blocking_tasks_after_shutdown);
327 332
328 bool IsShutdownInProgress(); 333 bool IsShutdownInProgress();
329 334
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after
579 // Need to explicitly join with the threads before they're destroyed or else 584 // Need to explicitly join with the threads before they're destroyed or else
580 // they will be running when our object is half torn down. 585 // they will be running when our object is half torn down.
581 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 586 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
582 it->second->Join(); 587 it->second->Join();
583 threads_.clear(); 588 threads_.clear();
584 589
585 if (testing_observer_) 590 if (testing_observer_)
586 testing_observer_->OnDestruct(); 591 testing_observer_->OnDestruct();
587 } 592 }
588 593
594 // static
589 SequencedWorkerPool::SequenceToken 595 SequencedWorkerPool::SequenceToken
590 SequencedWorkerPool::Inner::GetSequenceToken() { 596 SequencedWorkerPool::Inner::GetSequenceToken() {
591 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 597 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
592 // is used as a sentinel value in SequenceTokens. 598 // is used as a sentinel value in SequenceTokens.
593 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 599 return SequenceToken(g_last_sequence_number_.GetNext() + 1);
594 } 600 }
595 601
596 SequencedWorkerPool::SequenceToken 602 SequencedWorkerPool::SequenceToken
597 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 603 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
598 AutoLock lock(lock_); 604 AutoLock lock(lock_);
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
679 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 685 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
680 SequenceToken sequence_token) const { 686 SequenceToken sequence_token) const {
681 AutoLock lock(lock_); 687 AutoLock lock(lock_);
682 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 688 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
683 if (found == threads_.end()) 689 if (found == threads_.end())
684 return false; 690 return false;
685 return found->second->is_processing_task() && 691 return found->second->is_processing_task() &&
686 sequence_token.Equals(found->second->task_sequence_token()); 692 sequence_token.Equals(found->second->task_sequence_token());
687 } 693 }
688 694
695 bool SequencedWorkerPool::Inner::IsRunningSequence(
696 SequenceToken sequence_token) const {
697 DCHECK(sequence_token.IsValid());
698 AutoLock lock(lock_);
699 return !IsSequenceTokenRunnable(sequence_token.id_);
700 }
701
702 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
703 SequenceToken sequence_token,
704 WorkerShutdown shutdown_behavior) {
705 AutoLock lock(lock_);
706 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
707 DCHECK(found != threads_.end());
708 DCHECK(found->second->is_processing_task());
709 DCHECK(!found->second->task_sequence_token().IsValid());
710 found->second->set_running_task_info(sequence_token, shutdown_behavior);
711
712 // Mark the sequence token as in use.
713 bool success = current_sequences_.insert(sequence_token.id_).second;
714 DCHECK(success);
715 }
716
689 // See https://code.google.com/p/chromium/issues/detail?id=168415 717 // See https://code.google.com/p/chromium/issues/detail?id=168415
690 void SequencedWorkerPool::Inner::CleanupForTesting() { 718 void SequencedWorkerPool::Inner::CleanupForTesting() {
691 DCHECK(!RunsTasksOnCurrentThread()); 719 DCHECK(!RunsTasksOnCurrentThread());
692 base::ThreadRestrictions::ScopedAllowWait allow_wait; 720 base::ThreadRestrictions::ScopedAllowWait allow_wait;
693 AutoLock lock(lock_); 721 AutoLock lock(lock_);
694 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 722 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
695 if (shutdown_called_) 723 if (shutdown_called_)
696 return; 724 return;
697 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 725 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
698 return; 726 return;
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
803 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 831 SequenceToken(task.sequence_token_id), task.shutdown_behavior);
804 832
805 tracked_objects::TaskStopwatch stopwatch; 833 tracked_objects::TaskStopwatch stopwatch;
806 stopwatch.Start(); 834 stopwatch.Start();
807 task.task.Run(); 835 task.task.Run();
808 stopwatch.Stop(); 836 stopwatch.Stop();
809 837
810 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( 838 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
811 task, stopwatch); 839 task, stopwatch);
812 840
841 // Update the sequence token in case it has been set from within the
842 // task, so it can be removed from the set of currently running
843 // sequences in DidRunWorkerTask() below.
844 task.sequence_token_id = this_worker->task_sequence_token().id_;
845
813 // Make sure our task is erased outside the lock for the 846 // Make sure our task is erased outside the lock for the
814 // same reason we do this with delete_these_oustide_lock. 847 // same reason we do this with delete_these_oustide_lock.
815 // Also, do it before calling reset_running_task_info() so 848 // Also, do it before calling reset_running_task_info() so
816 // that sequence-checking from within the task's destructor 849 // that sequence-checking from within the task's destructor
817 // still works. 850 // still works.
818 task.task = Closure(); 851 task.task = Closure();
819 852
820 this_worker->reset_running_task_info(); 853 this_worker->reset_running_task_info();
821 } 854 }
822 DidRunWorkerTask(task); // Must be done inside the lock. 855 DidRunWorkerTask(task); // Must be done inside the lock.
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after
1186 // static 1219 // static
1187 scoped_refptr<SequencedWorkerPool> 1220 scoped_refptr<SequencedWorkerPool>
1188 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1221 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1189 Worker* worker = Worker::GetForCurrentThread(); 1222 Worker* worker = Worker::GetForCurrentThread();
1190 if (!worker) 1223 if (!worker)
1191 return nullptr; 1224 return nullptr;
1192 1225
1193 return worker->worker_pool(); 1226 return worker->worker_pool();
1194 } 1227 }
1195 1228
1229 // static
1230 scoped_refptr<SequencedTaskRunner>
1231 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
1232 Worker* worker = Worker::GetForCurrentThread();
1233
1234 // If there is no worker, this thread is not a worker thread. Otherwise, it is
1235 // currently running a task (sequenced or unsequenced).
1236 if (!worker)
1237 return nullptr;
1238
1239 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
1240 SequenceToken sequence_token = worker->task_sequence_token();
1241 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
1242 if (!sequence_token.IsValid()) {
1243 // Create a new sequence token and bind this thread to it, to make sure that
1244 // a task posted to the SequencedTaskRunner we are going to return is not
1245 // immediately going to run on a different thread.
1246 sequence_token = Inner::GetSequenceToken();
1247 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
1248 shutdown_behavior);
1249 }
1250
1251 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
1252 return new SequencedWorkerPoolSequencedTaskRunner(
1253 std::move(pool), sequence_token, shutdown_behavior);
1254 }
1255
1196 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1256 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1197 const std::string& thread_name_prefix) 1257 const std::string& thread_name_prefix)
1198 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1258 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1199 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1259 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1200 } 1260 }
1201 1261
1202 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1262 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1203 const std::string& thread_name_prefix, 1263 const std::string& thread_name_prefix,
1204 TestingObserver* observer) 1264 TestingObserver* observer)
1205 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1265 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1206 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1266 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1207 } 1267 }
1208 1268
1209 SequencedWorkerPool::~SequencedWorkerPool() {} 1269 SequencedWorkerPool::~SequencedWorkerPool() {}
1210 1270
1211 void SequencedWorkerPool::OnDestruct() const { 1271 void SequencedWorkerPool::OnDestruct() const {
1212 // Avoid deleting ourselves on a worker thread (which would 1272 // Avoid deleting ourselves on a worker thread (which would deadlock).
1213 // deadlock).
1214 if (RunsTasksOnCurrentThread()) { 1273 if (RunsTasksOnCurrentThread()) {
1215 constructor_task_runner_->DeleteSoon(FROM_HERE, this); 1274 constructor_task_runner_->DeleteSoon(FROM_HERE, this);
1216 } else { 1275 } else {
1217 delete this; 1276 delete this;
1218 } 1277 }
1219 } 1278 }
1220 1279
1280 // static
1221 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1281 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1222 return inner_->GetSequenceToken(); 1282 return Inner::GetSequenceToken();
1223 } 1283 }
1224 1284
1225 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1285 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1226 const std::string& name) { 1286 const std::string& name) {
1227 return inner_->GetNamedSequenceToken(name); 1287 return inner_->GetNamedSequenceToken(name);
1228 } 1288 }
1229 1289
1230 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1290 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1231 SequenceToken token) { 1291 SequenceToken token) {
1232 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1292 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
1316 1376
1317 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1377 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1318 return inner_->RunsTasksOnCurrentThread(); 1378 return inner_->RunsTasksOnCurrentThread();
1319 } 1379 }
1320 1380
1321 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1381 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1322 SequenceToken sequence_token) const { 1382 SequenceToken sequence_token) const {
1323 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1383 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1324 } 1384 }
1325 1385
1386 bool SequencedWorkerPool::IsRunningSequence(
1387 SequenceToken sequence_token) const {
1388 return inner_->IsRunningSequence(sequence_token);
1389 }
1390
1326 void SequencedWorkerPool::FlushForTesting() { 1391 void SequencedWorkerPool::FlushForTesting() {
1327 inner_->CleanupForTesting(); 1392 inner_->CleanupForTesting();
1328 } 1393 }
1329 1394
1330 void SequencedWorkerPool::SignalHasWorkForTesting() { 1395 void SequencedWorkerPool::SignalHasWorkForTesting() {
1331 inner_->SignalHasWorkForTesting(); 1396 inner_->SignalHasWorkForTesting();
1332 } 1397 }
1333 1398
1334 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1399 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1335 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1400 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1336 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1401 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1337 } 1402 }
1338 1403
1339 bool SequencedWorkerPool::IsShutdownInProgress() { 1404 bool SequencedWorkerPool::IsShutdownInProgress() {
1340 return inner_->IsShutdownInProgress(); 1405 return inner_->IsShutdownInProgress();
1341 } 1406 }
1342 1407
1343 } // namespace base 1408 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698