Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1005)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 2190073002: Revert of Revert "Allow SequencedTaskRunnerHandle::Get() while running unsequenced tasks." (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
(...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after
319 SequenceToken sequence_token, 319 SequenceToken sequence_token,
320 WorkerShutdown shutdown_behavior, 320 WorkerShutdown shutdown_behavior,
321 const tracked_objects::Location& from_here, 321 const tracked_objects::Location& from_here,
322 const Closure& task, 322 const Closure& task,
323 TimeDelta delay); 323 TimeDelta delay);
324 324
325 bool RunsTasksOnCurrentThread() const; 325 bool RunsTasksOnCurrentThread() const;
326 326
327 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 327 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
328 328
329 bool IsRunningSequence(SequenceToken sequence_token) const;
330
331 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,
332 WorkerShutdown shutdown_behavior);
333
329 void CleanupForTesting(); 334 void CleanupForTesting();
330 335
331 void SignalHasWorkForTesting(); 336 void SignalHasWorkForTesting();
332 337
333 int GetWorkSignalCountForTesting() const; 338 int GetWorkSignalCountForTesting() const;
334 339
335 void Shutdown(int max_blocking_tasks_after_shutdown); 340 void Shutdown(int max_blocking_tasks_after_shutdown);
336 341
337 bool IsShutdownInProgress(); 342 bool IsShutdownInProgress();
338 343
(...skipping 355 matching lines...) Expand 10 before | Expand all | Expand 10 after
694 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 699 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
695 SequenceToken sequence_token) const { 700 SequenceToken sequence_token) const {
696 AutoLock lock(lock_); 701 AutoLock lock(lock_);
697 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 702 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
698 if (found == threads_.end()) 703 if (found == threads_.end())
699 return false; 704 return false;
700 return found->second->is_processing_task() && 705 return found->second->is_processing_task() &&
701 sequence_token.Equals(found->second->task_sequence_token()); 706 sequence_token.Equals(found->second->task_sequence_token());
702 } 707 }
703 708
709 bool SequencedWorkerPool::Inner::IsRunningSequence(
710 SequenceToken sequence_token) const {
711 DCHECK(sequence_token.IsValid());
712 AutoLock lock(lock_);
713 return !IsSequenceTokenRunnable(sequence_token.id_);
714 }
715
716 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
717 SequenceToken sequence_token,
718 WorkerShutdown shutdown_behavior) {
719 AutoLock lock(lock_);
720 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
721 DCHECK(found != threads_.end());
722 DCHECK(found->second->is_processing_task());
723 DCHECK(!found->second->task_sequence_token().IsValid());
724 found->second->set_running_task_info(sequence_token, shutdown_behavior);
725
726 // Mark the sequence token as in use.
727 bool success = current_sequences_.insert(sequence_token.id_).second;
728 DCHECK(success);
729 }
730
704 // See https://code.google.com/p/chromium/issues/detail?id=168415 731 // See https://code.google.com/p/chromium/issues/detail?id=168415
705 void SequencedWorkerPool::Inner::CleanupForTesting() { 732 void SequencedWorkerPool::Inner::CleanupForTesting() {
706 DCHECK(!RunsTasksOnCurrentThread()); 733 DCHECK(!RunsTasksOnCurrentThread());
707 base::ThreadRestrictions::ScopedAllowWait allow_wait; 734 base::ThreadRestrictions::ScopedAllowWait allow_wait;
708 AutoLock lock(lock_); 735 AutoLock lock(lock_);
709 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 736 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
710 if (shutdown_called_) 737 if (shutdown_called_)
711 return; 738 return;
712 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 739 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
713 return; 740 return;
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after
819 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 846 SequenceToken(task.sequence_token_id), task.shutdown_behavior);
820 847
821 tracked_objects::TaskStopwatch stopwatch; 848 tracked_objects::TaskStopwatch stopwatch;
822 stopwatch.Start(); 849 stopwatch.Start();
823 task.task.Run(); 850 task.task.Run();
824 stopwatch.Stop(); 851 stopwatch.Stop();
825 852
826 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( 853 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
827 task, stopwatch); 854 task, stopwatch);
828 855
856 // Update the sequence token in case it has been set from within the
857 // task, so it can be removed from the set of currently running
858 // sequences in DidRunWorkerTask() below.
859 task.sequence_token_id = this_worker->task_sequence_token().id_;
860
829 // Make sure our task is erased outside the lock for the 861 // Make sure our task is erased outside the lock for the
830 // same reason we do this with delete_these_oustide_lock. 862 // same reason we do this with delete_these_oustide_lock.
831 // Also, do it before calling reset_running_task_info() so 863 // Also, do it before calling reset_running_task_info() so
832 // that sequence-checking from within the task's destructor 864 // that sequence-checking from within the task's destructor
833 // still works. 865 // still works.
834 task.task = Closure(); 866 task.task = Closure();
835 867
836 this_worker->reset_running_task_info(); 868 this_worker->reset_running_task_info();
837 } 869 }
838 DidRunWorkerTask(task); // Must be done inside the lock. 870 DidRunWorkerTask(task); // Must be done inside the lock.
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after
1202 // static 1234 // static
1203 scoped_refptr<SequencedWorkerPool> 1235 scoped_refptr<SequencedWorkerPool>
1204 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1236 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1205 Worker* worker = Worker::GetForCurrentThread(); 1237 Worker* worker = Worker::GetForCurrentThread();
1206 if (!worker) 1238 if (!worker)
1207 return nullptr; 1239 return nullptr;
1208 1240
1209 return worker->worker_pool(); 1241 return worker->worker_pool();
1210 } 1242 }
1211 1243
1244 // static
1245 scoped_refptr<SequencedTaskRunner>
1246 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
1247 Worker* worker = Worker::GetForCurrentThread();
1248
1249 // If there is no worker, this thread is not a worker thread. Otherwise, it is
1250 // currently running a task (sequenced or unsequenced).
1251 if (!worker)
1252 return nullptr;
1253
1254 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
1255 SequenceToken sequence_token = worker->task_sequence_token();
1256 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
1257 if (!sequence_token.IsValid()) {
1258 // Create a new sequence token and bind this thread to it, to make sure that
1259 // a task posted to the SequencedTaskRunner we are going to return is not
1260 // immediately going to run on a different thread.
1261 sequence_token = Inner::GetSequenceToken();
1262 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
1263 shutdown_behavior);
1264 }
1265
1266 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
1267 return new SequencedWorkerPoolSequencedTaskRunner(
1268 std::move(pool), sequence_token, shutdown_behavior);
1269 }
1270
1212 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1271 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1213 const std::string& thread_name_prefix, 1272 const std::string& thread_name_prefix,
1214 base::TaskPriority task_priority) 1273 base::TaskPriority task_priority)
1215 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1274 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1216 inner_(new Inner(this, 1275 inner_(new Inner(this,
1217 max_threads, 1276 max_threads,
1218 thread_name_prefix, 1277 thread_name_prefix,
1219 task_priority, 1278 task_priority,
1220 NULL)) {} 1279 NULL)) {}
1221 1280
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
1346 1405
1347 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1406 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1348 return inner_->RunsTasksOnCurrentThread(); 1407 return inner_->RunsTasksOnCurrentThread();
1349 } 1408 }
1350 1409
1351 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1410 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1352 SequenceToken sequence_token) const { 1411 SequenceToken sequence_token) const {
1353 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1412 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1354 } 1413 }
1355 1414
1415 bool SequencedWorkerPool::IsRunningSequence(
1416 SequenceToken sequence_token) const {
1417 return inner_->IsRunningSequence(sequence_token);
1418 }
1419
1356 void SequencedWorkerPool::FlushForTesting() { 1420 void SequencedWorkerPool::FlushForTesting() {
1357 inner_->CleanupForTesting(); 1421 inner_->CleanupForTesting();
1358 } 1422 }
1359 1423
1360 void SequencedWorkerPool::SignalHasWorkForTesting() { 1424 void SequencedWorkerPool::SignalHasWorkForTesting() {
1361 inner_->SignalHasWorkForTesting(); 1425 inner_->SignalHasWorkForTesting();
1362 } 1426 }
1363 1427
1364 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1428 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1365 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1429 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1366 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1430 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1367 } 1431 }
1368 1432
1369 bool SequencedWorkerPool::IsShutdownInProgress() { 1433 bool SequencedWorkerPool::IsShutdownInProgress() {
1370 return inner_->IsShutdownInProgress(); 1434 return inner_->IsShutdownInProgress();
1371 } 1435 }
1372 1436
1373 } // namespace base 1437 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698