Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <list> | 9 #include <list> |
| 10 #include <map> | 10 #include <map> |
| (...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 319 SequenceToken sequence_token, | 319 SequenceToken sequence_token, |
| 320 WorkerShutdown shutdown_behavior, | 320 WorkerShutdown shutdown_behavior, |
| 321 const tracked_objects::Location& from_here, | 321 const tracked_objects::Location& from_here, |
| 322 const Closure& task, | 322 const Closure& task, |
| 323 TimeDelta delay); | 323 TimeDelta delay); |
| 324 | 324 |
| 325 bool RunsTasksOnCurrentThread() const; | 325 bool RunsTasksOnCurrentThread() const; |
| 326 | 326 |
| 327 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; | 327 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
| 328 | 328 |
| 329 bool IsRunningSequence(SequenceToken sequence_token) const; | |
| 330 | |
| 331 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token, | |
| 332 WorkerShutdown shutdown_behavior); | |
| 333 | |
| 334 void CleanupForTesting(); | 329 void CleanupForTesting(); |
| 335 | 330 |
| 336 void SignalHasWorkForTesting(); | 331 void SignalHasWorkForTesting(); |
| 337 | 332 |
| 338 int GetWorkSignalCountForTesting() const; | 333 int GetWorkSignalCountForTesting() const; |
| 339 | 334 |
| 340 void Shutdown(int max_blocking_tasks_after_shutdown); | 335 void Shutdown(int max_blocking_tasks_after_shutdown); |
| 341 | 336 |
| 342 bool IsShutdownInProgress(); | 337 bool IsShutdownInProgress(); |
| 343 | 338 |
| (...skipping 355 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 699 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 694 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| 700 SequenceToken sequence_token) const { | 695 SequenceToken sequence_token) const { |
| 701 AutoLock lock(lock_); | 696 AutoLock lock(lock_); |
| 702 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 697 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| 703 if (found == threads_.end()) | 698 if (found == threads_.end()) |
| 704 return false; | 699 return false; |
| 705 return found->second->is_processing_task() && | 700 return found->second->is_processing_task() && |
| 706 sequence_token.Equals(found->second->task_sequence_token()); | 701 sequence_token.Equals(found->second->task_sequence_token()); |
| 707 } | 702 } |
| 708 | 703 |
| 709 bool SequencedWorkerPool::Inner::IsRunningSequence( | |
| 710 SequenceToken sequence_token) const { | |
| 711 DCHECK(sequence_token.IsValid()); | |
| 712 AutoLock lock(lock_); | |
| 713 return !IsSequenceTokenRunnable(sequence_token.id_); | |
| 714 } | |
| 715 | |
| 716 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread( | |
| 717 SequenceToken sequence_token, | |
| 718 WorkerShutdown shutdown_behavior) { | |
| 719 AutoLock lock(lock_); | |
| 720 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | |
| 721 DCHECK(found != threads_.end()); | |
| 722 DCHECK(found->second->is_processing_task()); | |
| 723 DCHECK(!found->second->task_sequence_token().IsValid()); | |
| 724 found->second->set_running_task_info(sequence_token, shutdown_behavior); | |
| 725 | |
| 726 // Mark the sequence token as in use. | |
| 727 bool success = current_sequences_.insert(sequence_token.id_).second; | |
| 728 DCHECK(success); | |
| 729 } | |
| 730 | |
| 731 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 704 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
| 732 void SequencedWorkerPool::Inner::CleanupForTesting() { | 705 void SequencedWorkerPool::Inner::CleanupForTesting() { |
| 733 DCHECK(!RunsTasksOnCurrentThread()); | 706 DCHECK(!RunsTasksOnCurrentThread()); |
| 734 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 707 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| 735 AutoLock lock(lock_); | 708 AutoLock lock(lock_); |
| 736 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 709 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| 737 if (shutdown_called_) | 710 if (shutdown_called_) |
| 738 return; | 711 return; |
| 739 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) | 712 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) |
| 740 return; | 713 return; |
| (...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 846 SequenceToken(task.sequence_token_id), task.shutdown_behavior); | 819 SequenceToken(task.sequence_token_id), task.shutdown_behavior); |
| 847 | 820 |
| 848 tracked_objects::TaskStopwatch stopwatch; | 821 tracked_objects::TaskStopwatch stopwatch; |
| 849 stopwatch.Start(); | 822 stopwatch.Start(); |
| 850 task.task.Run(); | 823 task.task.Run(); |
| 851 stopwatch.Stop(); | 824 stopwatch.Stop(); |
| 852 | 825 |
| 853 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( | 826 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( |
| 854 task, stopwatch); | 827 task, stopwatch); |
| 855 | 828 |
| 856 // Update the sequence token in case it has been set from within the | |
| 857 // task, so it can be removed from the set of currently running | |
| 858 // sequences in DidRunWorkerTask() below. | |
| 859 task.sequence_token_id = this_worker->task_sequence_token().id_; | |
| 860 | |
| 861 // Make sure our task is erased outside the lock for the | 829 // Make sure our task is erased outside the lock for the |
| 862 // same reason we do this with delete_these_oustide_lock. | 830 // same reason we do this with delete_these_oustide_lock. |
| 863 // Also, do it before calling reset_running_task_info() so | 831 // Also, do it before calling reset_running_task_info() so |
| 864 // that sequence-checking from within the task's destructor | 832 // that sequence-checking from within the task's destructor |
| 865 // still works. | 833 // still works. |
| 866 task.task = Closure(); | 834 task.task = Closure(); |
| 867 | 835 |
| 868 this_worker->reset_running_task_info(); | 836 this_worker->reset_running_task_info(); |
| 869 } | 837 } |
| 870 DidRunWorkerTask(task); // Must be done inside the lock. | 838 DidRunWorkerTask(task); // Must be done inside the lock. |
| (...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1234 // static | 1202 // static |
| 1235 scoped_refptr<SequencedWorkerPool> | 1203 scoped_refptr<SequencedWorkerPool> |
| 1236 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { | 1204 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
| 1237 Worker* worker = Worker::GetForCurrentThread(); | 1205 Worker* worker = Worker::GetForCurrentThread(); |
| 1238 if (!worker) | 1206 if (!worker) |
| 1239 return nullptr; | 1207 return nullptr; |
| 1240 | 1208 |
| 1241 return worker->worker_pool(); | 1209 return worker->worker_pool(); |
| 1242 } | 1210 } |
| 1243 | 1211 |
| 1244 // static | |
| 1245 scoped_refptr<SequencedTaskRunner> | |
| 1246 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() { | |
| 1247 Worker* worker = Worker::GetForCurrentThread(); | |
| 1248 | |
| 1249 // If there is no worker, this thread is not a worker thread. Otherwise, it is | |
| 1250 // currently running a task (sequenced or unsequenced). | |
| 1251 if (!worker) | |
| 1252 return nullptr; | |
| 1253 | |
| 1254 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool(); | |
| 1255 SequenceToken sequence_token = worker->task_sequence_token(); | |
| 1256 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior(); | |
| 1257 if (!sequence_token.IsValid()) { | |
| 1258 // Create a new sequence token and bind this thread to it, to make sure that | |
| 1259 // a task posted to the SequencedTaskRunner we are going to return is not | |
| 1260 // immediately going to run on a different thread. | |
| 1261 sequence_token = Inner::GetSequenceToken(); | |
| 1262 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token, | |
| 1263 shutdown_behavior); | |
| 1264 } | |
| 1265 | |
| 1266 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token)); | |
| 1267 return new SequencedWorkerPoolSequencedTaskRunner( | |
| 1268 std::move(pool), sequence_token, shutdown_behavior); | |
| 1269 } | |
| 1270 | |
| 1271 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1212 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| 1272 const std::string& thread_name_prefix, | 1213 const std::string& thread_name_prefix, |
| 1273 base::TaskPriority task_priority) | 1214 base::TaskPriority task_priority) |
| 1274 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1215 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
| 1275 inner_(new Inner(this, | 1216 inner_(new Inner(this, |
| 1276 max_threads, | 1217 max_threads, |
| 1277 thread_name_prefix, | 1218 thread_name_prefix, |
| 1278 task_priority, | 1219 task_priority, |
| 1279 NULL)) {} | 1220 NULL)) {} |
| 1280 | 1221 |
| (...skipping 10 matching lines...) Expand all Loading... | |
| 1291 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1232 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
| 1292 inner_(new Inner(this, | 1233 inner_(new Inner(this, |
| 1293 max_threads, | 1234 max_threads, |
| 1294 thread_name_prefix, | 1235 thread_name_prefix, |
| 1295 task_priority, | 1236 task_priority, |
| 1296 observer)) {} | 1237 observer)) {} |
| 1297 | 1238 |
| 1298 SequencedWorkerPool::~SequencedWorkerPool() {} | 1239 SequencedWorkerPool::~SequencedWorkerPool() {} |
| 1299 | 1240 |
| 1300 void SequencedWorkerPool::OnDestruct() const { | 1241 void SequencedWorkerPool::OnDestruct() const { |
| 1301 // Avoid deleting ourselves on a worker thread (which would deadlock). | 1242 // Avoid deleting ourselves on a worker thread (which would |
|
danakj
2016/07/27 21:22:52
could leave this better wrapping
gab
2016/07/28 14:13:30
Oops, not quite sure how this happened, <ENTER> in
| |
| 1243 // deadlock). | |
| 1302 if (RunsTasksOnCurrentThread()) { | 1244 if (RunsTasksOnCurrentThread()) { |
| 1303 constructor_task_runner_->DeleteSoon(FROM_HERE, this); | 1245 constructor_task_runner_->DeleteSoon(FROM_HERE, this); |
| 1304 } else { | 1246 } else { |
| 1305 delete this; | 1247 delete this; |
| 1306 } | 1248 } |
| 1307 } | 1249 } |
| 1308 | 1250 |
| 1309 // static | 1251 // static |
| 1310 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { | 1252 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
| 1311 return Inner::GetSequenceToken(); | 1253 return Inner::GetSequenceToken(); |
| (...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1405 | 1347 |
| 1406 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 1348 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
| 1407 return inner_->RunsTasksOnCurrentThread(); | 1349 return inner_->RunsTasksOnCurrentThread(); |
| 1408 } | 1350 } |
| 1409 | 1351 |
| 1410 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1352 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
| 1411 SequenceToken sequence_token) const { | 1353 SequenceToken sequence_token) const { |
| 1412 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1354 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
| 1413 } | 1355 } |
| 1414 | 1356 |
| 1415 bool SequencedWorkerPool::IsRunningSequence( | |
| 1416 SequenceToken sequence_token) const { | |
| 1417 return inner_->IsRunningSequence(sequence_token); | |
| 1418 } | |
| 1419 | |
| 1420 void SequencedWorkerPool::FlushForTesting() { | 1357 void SequencedWorkerPool::FlushForTesting() { |
| 1421 inner_->CleanupForTesting(); | 1358 inner_->CleanupForTesting(); |
| 1422 } | 1359 } |
| 1423 | 1360 |
| 1424 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1361 void SequencedWorkerPool::SignalHasWorkForTesting() { |
| 1425 inner_->SignalHasWorkForTesting(); | 1362 inner_->SignalHasWorkForTesting(); |
| 1426 } | 1363 } |
| 1427 | 1364 |
| 1428 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1365 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
| 1429 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1366 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
| 1430 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1367 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
| 1431 } | 1368 } |
| 1432 | 1369 |
| 1433 bool SequencedWorkerPool::IsShutdownInProgress() { | 1370 bool SequencedWorkerPool::IsShutdownInProgress() { |
| 1434 return inner_->IsShutdownInProgress(); | 1371 return inner_->IsShutdownInProgress(); |
| 1435 } | 1372 } |
| 1436 | 1373 |
| 1437 } // namespace base | 1374 } // namespace base |
| OLD | NEW |