| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <list> | 9 #include <list> |
| 10 #include <map> | 10 #include <map> |
| (...skipping 281 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 292 class SequencedWorkerPool::Inner { | 292 class SequencedWorkerPool::Inner { |
| 293 public: | 293 public: |
| 294 // Take a raw pointer to |worker| to avoid cycles (since we're owned | 294 // Take a raw pointer to |worker| to avoid cycles (since we're owned |
| 295 // by it). | 295 // by it). |
| 296 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, | 296 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, |
| 297 const std::string& thread_name_prefix, | 297 const std::string& thread_name_prefix, |
| 298 TestingObserver* observer); | 298 TestingObserver* observer); |
| 299 | 299 |
| 300 ~Inner(); | 300 ~Inner(); |
| 301 | 301 |
| 302 SequenceToken GetSequenceToken(); | 302 static SequenceToken GetSequenceToken(); |
| 303 | 303 |
| 304 SequenceToken GetNamedSequenceToken(const std::string& name); | 304 SequenceToken GetNamedSequenceToken(const std::string& name); |
| 305 | 305 |
| 306 // This function accepts a name and an ID. If the name is null, the | 306 // This function accepts a name and an ID. If the name is null, the |
| 307 // token ID is used. This allows us to implement the optional name lookup | 307 // token ID is used. This allows us to implement the optional name lookup |
| 308 // from a single function without having to enter the lock a separate time. | 308 // from a single function without having to enter the lock a separate time. |
| 309 bool PostTask(const std::string* optional_token_name, | 309 bool PostTask(const std::string* optional_token_name, |
| 310 SequenceToken sequence_token, | 310 SequenceToken sequence_token, |
| 311 WorkerShutdown shutdown_behavior, | 311 WorkerShutdown shutdown_behavior, |
| 312 const tracked_objects::Location& from_here, | 312 const tracked_objects::Location& from_here, |
| 313 const Closure& task, | 313 const Closure& task, |
| 314 TimeDelta delay); | 314 TimeDelta delay); |
| 315 | 315 |
| 316 bool RunsTasksOnCurrentThread() const; | 316 bool RunsTasksOnCurrentThread() const; |
| 317 | 317 |
| 318 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; | 318 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
| 319 | 319 |
| 320 bool IsRunningSequence(SequenceToken sequence_token) const; |
| 321 |
| 322 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token, |
| 323 WorkerShutdown shutdown_behavior); |
| 324 |
| 320 void CleanupForTesting(); | 325 void CleanupForTesting(); |
| 321 | 326 |
| 322 void SignalHasWorkForTesting(); | 327 void SignalHasWorkForTesting(); |
| 323 | 328 |
| 324 int GetWorkSignalCountForTesting() const; | 329 int GetWorkSignalCountForTesting() const; |
| 325 | 330 |
| 326 void Shutdown(int max_blocking_tasks_after_shutdown); | 331 void Shutdown(int max_blocking_tasks_after_shutdown); |
| 327 | 332 |
| 328 bool IsShutdownInProgress(); | 333 bool IsShutdownInProgress(); |
| 329 | 334 |
| (...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 579 // Need to explicitly join with the threads before they're destroyed or else | 584 // Need to explicitly join with the threads before they're destroyed or else |
| 580 // they will be running when our object is half torn down. | 585 // they will be running when our object is half torn down. |
| 581 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) | 586 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) |
| 582 it->second->Join(); | 587 it->second->Join(); |
| 583 threads_.clear(); | 588 threads_.clear(); |
| 584 | 589 |
| 585 if (testing_observer_) | 590 if (testing_observer_) |
| 586 testing_observer_->OnDestruct(); | 591 testing_observer_->OnDestruct(); |
| 587 } | 592 } |
| 588 | 593 |
| 594 // static |
| 589 SequencedWorkerPool::SequenceToken | 595 SequencedWorkerPool::SequenceToken |
| 590 SequencedWorkerPool::Inner::GetSequenceToken() { | 596 SequencedWorkerPool::Inner::GetSequenceToken() { |
| 591 // Need to add one because StaticAtomicSequenceNumber starts at zero, which | 597 // Need to add one because StaticAtomicSequenceNumber starts at zero, which |
| 592 // is used as a sentinel value in SequenceTokens. | 598 // is used as a sentinel value in SequenceTokens. |
| 593 return SequenceToken(g_last_sequence_number_.GetNext() + 1); | 599 return SequenceToken(g_last_sequence_number_.GetNext() + 1); |
| 594 } | 600 } |
| 595 | 601 |
| 596 SequencedWorkerPool::SequenceToken | 602 SequencedWorkerPool::SequenceToken |
| 597 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { | 603 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { |
| 598 AutoLock lock(lock_); | 604 AutoLock lock(lock_); |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 679 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 685 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| 680 SequenceToken sequence_token) const { | 686 SequenceToken sequence_token) const { |
| 681 AutoLock lock(lock_); | 687 AutoLock lock(lock_); |
| 682 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 688 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| 683 if (found == threads_.end()) | 689 if (found == threads_.end()) |
| 684 return false; | 690 return false; |
| 685 return found->second->is_processing_task() && | 691 return found->second->is_processing_task() && |
| 686 sequence_token.Equals(found->second->task_sequence_token()); | 692 sequence_token.Equals(found->second->task_sequence_token()); |
| 687 } | 693 } |
| 688 | 694 |
| 695 bool SequencedWorkerPool::Inner::IsRunningSequence( |
| 696 SequenceToken sequence_token) const { |
| 697 DCHECK(sequence_token.IsValid()); |
| 698 AutoLock lock(lock_); |
| 699 return !IsSequenceTokenRunnable(sequence_token.id_); |
| 700 } |
| 701 |
| 702 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread( |
| 703 SequenceToken sequence_token, |
| 704 WorkerShutdown shutdown_behavior) { |
| 705 AutoLock lock(lock_); |
| 706 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| 707 DCHECK(found != threads_.end()); |
| 708 DCHECK(found->second->is_processing_task()); |
| 709 DCHECK(!found->second->task_sequence_token().IsValid()); |
| 710 found->second->set_running_task_info(sequence_token, shutdown_behavior); |
| 711 |
| 712 // Mark the sequence token as in use. |
| 713 bool success = current_sequences_.insert(sequence_token.id_).second; |
| 714 DCHECK(success); |
| 715 } |
| 716 |
| 689 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 717 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
| 690 void SequencedWorkerPool::Inner::CleanupForTesting() { | 718 void SequencedWorkerPool::Inner::CleanupForTesting() { |
| 691 DCHECK(!RunsTasksOnCurrentThread()); | 719 DCHECK(!RunsTasksOnCurrentThread()); |
| 692 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 720 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| 693 AutoLock lock(lock_); | 721 AutoLock lock(lock_); |
| 694 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 722 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| 695 if (shutdown_called_) | 723 if (shutdown_called_) |
| 696 return; | 724 return; |
| 697 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) | 725 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) |
| 698 return; | 726 return; |
| (...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 803 SequenceToken(task.sequence_token_id), task.shutdown_behavior); | 831 SequenceToken(task.sequence_token_id), task.shutdown_behavior); |
| 804 | 832 |
| 805 tracked_objects::TaskStopwatch stopwatch; | 833 tracked_objects::TaskStopwatch stopwatch; |
| 806 stopwatch.Start(); | 834 stopwatch.Start(); |
| 807 task.task.Run(); | 835 task.task.Run(); |
| 808 stopwatch.Stop(); | 836 stopwatch.Stop(); |
| 809 | 837 |
| 810 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( | 838 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( |
| 811 task, stopwatch); | 839 task, stopwatch); |
| 812 | 840 |
| 841 // Update the sequence token in case it has been set from within the |
| 842 // task, so it can be removed from the set of currently running |
| 843 // sequences in DidRunWorkerTask() below. |
| 844 task.sequence_token_id = this_worker->task_sequence_token().id_; |
| 845 |
| 813 // Make sure our task is erased outside the lock for the | 846 // Make sure our task is erased outside the lock for the |
| 814 // same reason we do this with delete_these_oustide_lock. | 847 // same reason we do this with delete_these_oustide_lock. |
| 815 // Also, do it before calling reset_running_task_info() so | 848 // Also, do it before calling reset_running_task_info() so |
| 816 // that sequence-checking from within the task's destructor | 849 // that sequence-checking from within the task's destructor |
| 817 // still works. | 850 // still works. |
| 818 task.task = Closure(); | 851 task.task = Closure(); |
| 819 | 852 |
| 820 this_worker->reset_running_task_info(); | 853 this_worker->reset_running_task_info(); |
| 821 } | 854 } |
| 822 DidRunWorkerTask(task); // Must be done inside the lock. | 855 DidRunWorkerTask(task); // Must be done inside the lock. |
| (...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1186 // static | 1219 // static |
| 1187 scoped_refptr<SequencedWorkerPool> | 1220 scoped_refptr<SequencedWorkerPool> |
| 1188 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { | 1221 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
| 1189 Worker* worker = Worker::GetForCurrentThread(); | 1222 Worker* worker = Worker::GetForCurrentThread(); |
| 1190 if (!worker) | 1223 if (!worker) |
| 1191 return nullptr; | 1224 return nullptr; |
| 1192 | 1225 |
| 1193 return worker->worker_pool(); | 1226 return worker->worker_pool(); |
| 1194 } | 1227 } |
| 1195 | 1228 |
| 1229 // static |
| 1230 scoped_refptr<SequencedTaskRunner> |
| 1231 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() { |
| 1232 Worker* worker = Worker::GetForCurrentThread(); |
| 1233 |
| 1234 // If there is no worker, this thread is not a worker thread. Otherwise, it is |
| 1235 // currently running a task (sequenced or unsequenced). |
| 1236 if (!worker) |
| 1237 return nullptr; |
| 1238 |
| 1239 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool(); |
| 1240 SequenceToken sequence_token = worker->task_sequence_token(); |
| 1241 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior(); |
| 1242 if (!sequence_token.IsValid()) { |
| 1243 // Create a new sequence token and bind this thread to it, to make sure that |
| 1244 // a task posted to the SequencedTaskRunner we are going to return is not |
| 1245 // immediately going to run on a different thread. |
| 1246 sequence_token = Inner::GetSequenceToken(); |
| 1247 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token, |
| 1248 shutdown_behavior); |
| 1249 } |
| 1250 |
| 1251 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token)); |
| 1252 return new SequencedWorkerPoolSequencedTaskRunner( |
| 1253 std::move(pool), sequence_token, shutdown_behavior); |
| 1254 } |
| 1255 |
| 1196 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1256 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| 1197 const std::string& thread_name_prefix) | 1257 const std::string& thread_name_prefix) |
| 1198 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1258 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
| 1199 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { | 1259 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { |
| 1200 } | 1260 } |
| 1201 | 1261 |
| 1202 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1262 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| 1203 const std::string& thread_name_prefix, | 1263 const std::string& thread_name_prefix, |
| 1204 TestingObserver* observer) | 1264 TestingObserver* observer) |
| 1205 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1265 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
| 1206 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { | 1266 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { |
| 1207 } | 1267 } |
| 1208 | 1268 |
| 1209 SequencedWorkerPool::~SequencedWorkerPool() {} | 1269 SequencedWorkerPool::~SequencedWorkerPool() {} |
| 1210 | 1270 |
| 1211 void SequencedWorkerPool::OnDestruct() const { | 1271 void SequencedWorkerPool::OnDestruct() const { |
| 1212 // Avoid deleting ourselves on a worker thread (which would | 1272 // Avoid deleting ourselves on a worker thread (which would deadlock). |
| 1213 // deadlock). | |
| 1214 if (RunsTasksOnCurrentThread()) { | 1273 if (RunsTasksOnCurrentThread()) { |
| 1215 constructor_task_runner_->DeleteSoon(FROM_HERE, this); | 1274 constructor_task_runner_->DeleteSoon(FROM_HERE, this); |
| 1216 } else { | 1275 } else { |
| 1217 delete this; | 1276 delete this; |
| 1218 } | 1277 } |
| 1219 } | 1278 } |
| 1220 | 1279 |
| 1280 // static |
| 1221 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { | 1281 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
| 1222 return inner_->GetSequenceToken(); | 1282 return Inner::GetSequenceToken(); |
| 1223 } | 1283 } |
| 1224 | 1284 |
| 1225 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( | 1285 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
| 1226 const std::string& name) { | 1286 const std::string& name) { |
| 1227 return inner_->GetNamedSequenceToken(name); | 1287 return inner_->GetNamedSequenceToken(name); |
| 1228 } | 1288 } |
| 1229 | 1289 |
| 1230 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( | 1290 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( |
| 1231 SequenceToken token) { | 1291 SequenceToken token) { |
| 1232 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); | 1292 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1316 | 1376 |
| 1317 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 1377 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
| 1318 return inner_->RunsTasksOnCurrentThread(); | 1378 return inner_->RunsTasksOnCurrentThread(); |
| 1319 } | 1379 } |
| 1320 | 1380 |
| 1321 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1381 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
| 1322 SequenceToken sequence_token) const { | 1382 SequenceToken sequence_token) const { |
| 1323 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1383 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
| 1324 } | 1384 } |
| 1325 | 1385 |
| 1386 bool SequencedWorkerPool::IsRunningSequence( |
| 1387 SequenceToken sequence_token) const { |
| 1388 return inner_->IsRunningSequence(sequence_token); |
| 1389 } |
| 1390 |
| 1326 void SequencedWorkerPool::FlushForTesting() { | 1391 void SequencedWorkerPool::FlushForTesting() { |
| 1327 inner_->CleanupForTesting(); | 1392 inner_->CleanupForTesting(); |
| 1328 } | 1393 } |
| 1329 | 1394 |
| 1330 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1395 void SequencedWorkerPool::SignalHasWorkForTesting() { |
| 1331 inner_->SignalHasWorkForTesting(); | 1396 inner_->SignalHasWorkForTesting(); |
| 1332 } | 1397 } |
| 1333 | 1398 |
| 1334 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1399 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
| 1335 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1400 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
| 1336 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1401 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
| 1337 } | 1402 } |
| 1338 | 1403 |
| 1339 bool SequencedWorkerPool::IsShutdownInProgress() { | 1404 bool SequencedWorkerPool::IsShutdownInProgress() { |
| 1340 return inner_->IsShutdownInProgress(); | 1405 return inner_->IsShutdownInProgress(); |
| 1341 } | 1406 } |
| 1342 | 1407 |
| 1343 } // namespace base | 1408 } // namespace base |
| OLD | NEW |