| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <list> | 7 #include <list> |
| 8 #include <map> | 8 #include <map> |
| 9 #include <set> | 9 #include <set> |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 278 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 289 class SequencedWorkerPool::Inner { | 289 class SequencedWorkerPool::Inner { |
| 290 public: | 290 public: |
| 291 // Take a raw pointer to |worker| to avoid cycles (since we're owned | 291 // Take a raw pointer to |worker| to avoid cycles (since we're owned |
| 292 // by it). | 292 // by it). |
| 293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, | 293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, |
| 294 const std::string& thread_name_prefix, | 294 const std::string& thread_name_prefix, |
| 295 TestingObserver* observer); | 295 TestingObserver* observer); |
| 296 | 296 |
| 297 ~Inner(); | 297 ~Inner(); |
| 298 | 298 |
| 299 SequenceToken GetSequenceToken(); | 299 static SequenceToken GetSequenceToken(); |
| 300 | 300 |
| 301 SequenceToken GetNamedSequenceToken(const std::string& name); | 301 SequenceToken GetNamedSequenceToken(const std::string& name); |
| 302 | 302 |
| 303 // This function accepts a name and an ID. If the name is null, the | 303 // This function accepts a name and an ID. If the name is null, the |
| 304 // token ID is used. This allows us to implement the optional name lookup | 304 // token ID is used. This allows us to implement the optional name lookup |
| 305 // from a single function without having to enter the lock a separate time. | 305 // from a single function without having to enter the lock a separate time. |
| 306 bool PostTask(const std::string* optional_token_name, | 306 bool PostTask(const std::string* optional_token_name, |
| 307 SequenceToken sequence_token, | 307 SequenceToken sequence_token, |
| 308 WorkerShutdown shutdown_behavior, | 308 WorkerShutdown shutdown_behavior, |
| 309 const tracked_objects::Location& from_here, | 309 const tracked_objects::Location& from_here, |
| 310 const Closure& task, | 310 const Closure& task, |
| 311 TimeDelta delay); | 311 TimeDelta delay); |
| 312 | 312 |
| 313 bool RunsTasksOnCurrentThread() const; | 313 bool RunsTasksOnCurrentThread() const; |
| 314 | 314 |
| 315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; | 315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
| 316 | 316 |
| 317 bool IsRunningSequence(SequenceToken sequence_token) const; |
| 318 |
| 319 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token, |
| 320 WorkerShutdown shutdown_behavior); |
| 321 |
| 317 void CleanupForTesting(); | 322 void CleanupForTesting(); |
| 318 | 323 |
| 319 void SignalHasWorkForTesting(); | 324 void SignalHasWorkForTesting(); |
| 320 | 325 |
| 321 int GetWorkSignalCountForTesting() const; | 326 int GetWorkSignalCountForTesting() const; |
| 322 | 327 |
| 323 void Shutdown(int max_blocking_tasks_after_shutdown); | 328 void Shutdown(int max_blocking_tasks_after_shutdown); |
| 324 | 329 |
| 325 bool IsShutdownInProgress(); | 330 bool IsShutdownInProgress(); |
| 326 | 331 |
| (...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 576 // Need to explicitly join with the threads before they're destroyed or else | 581 // Need to explicitly join with the threads before they're destroyed or else |
| 577 // they will be running when our object is half torn down. | 582 // they will be running when our object is half torn down. |
| 578 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) | 583 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) |
| 579 it->second->Join(); | 584 it->second->Join(); |
| 580 threads_.clear(); | 585 threads_.clear(); |
| 581 | 586 |
| 582 if (testing_observer_) | 587 if (testing_observer_) |
| 583 testing_observer_->OnDestruct(); | 588 testing_observer_->OnDestruct(); |
| 584 } | 589 } |
| 585 | 590 |
| 591 // static |
| 586 SequencedWorkerPool::SequenceToken | 592 SequencedWorkerPool::SequenceToken |
| 587 SequencedWorkerPool::Inner::GetSequenceToken() { | 593 SequencedWorkerPool::Inner::GetSequenceToken() { |
| 588 // Need to add one because StaticAtomicSequenceNumber starts at zero, which | 594 // Need to add one because StaticAtomicSequenceNumber starts at zero, which |
| 589 // is used as a sentinel value in SequenceTokens. | 595 // is used as a sentinel value in SequenceTokens. |
| 590 return SequenceToken(g_last_sequence_number_.GetNext() + 1); | 596 return SequenceToken(g_last_sequence_number_.GetNext() + 1); |
| 591 } | 597 } |
| 592 | 598 |
| 593 SequencedWorkerPool::SequenceToken | 599 SequencedWorkerPool::SequenceToken |
| 594 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { | 600 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { |
| 595 AutoLock lock(lock_); | 601 AutoLock lock(lock_); |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 676 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 682 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| 677 SequenceToken sequence_token) const { | 683 SequenceToken sequence_token) const { |
| 678 AutoLock lock(lock_); | 684 AutoLock lock(lock_); |
| 679 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 685 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| 680 if (found == threads_.end()) | 686 if (found == threads_.end()) |
| 681 return false; | 687 return false; |
| 682 return found->second->is_processing_task() && | 688 return found->second->is_processing_task() && |
| 683 sequence_token.Equals(found->second->task_sequence_token()); | 689 sequence_token.Equals(found->second->task_sequence_token()); |
| 684 } | 690 } |
| 685 | 691 |
| 692 bool SequencedWorkerPool::Inner::IsRunningSequence( |
| 693 SequenceToken sequence_token) const { |
| 694 DCHECK(sequence_token.IsValid()); |
| 695 AutoLock lock(lock_); |
| 696 return !IsSequenceTokenRunnable(sequence_token.id_); |
| 697 } |
| 698 |
| 699 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread( |
| 700 SequenceToken sequence_token, |
| 701 WorkerShutdown shutdown_behavior) { |
| 702 AutoLock lock(lock_); |
| 703 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| 704 DCHECK(found != threads_.end()); |
| 705 DCHECK(found->second->is_processing_task()); |
| 706 DCHECK(!found->second->task_sequence_token().IsValid()); |
| 707 found->second->set_running_task_info(sequence_token, shutdown_behavior); |
| 708 |
| 709 // Mark the sequence token as in use. |
| 710 bool success = current_sequences_.insert(sequence_token.id_).second; |
| 711 DCHECK(success); |
| 712 } |
| 713 |
| 686 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 714 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
| 687 void SequencedWorkerPool::Inner::CleanupForTesting() { | 715 void SequencedWorkerPool::Inner::CleanupForTesting() { |
| 688 DCHECK(!RunsTasksOnCurrentThread()); | 716 DCHECK(!RunsTasksOnCurrentThread()); |
| 689 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 717 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| 690 AutoLock lock(lock_); | 718 AutoLock lock(lock_); |
| 691 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 719 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| 692 if (shutdown_called_) | 720 if (shutdown_called_) |
| 693 return; | 721 return; |
| 694 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) | 722 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) |
| 695 return; | 723 return; |
| (...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 800 SequenceToken(task.sequence_token_id), task.shutdown_behavior); | 828 SequenceToken(task.sequence_token_id), task.shutdown_behavior); |
| 801 | 829 |
| 802 tracked_objects::TaskStopwatch stopwatch; | 830 tracked_objects::TaskStopwatch stopwatch; |
| 803 stopwatch.Start(); | 831 stopwatch.Start(); |
| 804 task.task.Run(); | 832 task.task.Run(); |
| 805 stopwatch.Stop(); | 833 stopwatch.Stop(); |
| 806 | 834 |
| 807 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( | 835 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( |
| 808 task, stopwatch); | 836 task, stopwatch); |
| 809 | 837 |
| 838 // Update the sequence token in case it has been set from within the |
| 839 // task, so it can be removed from the set of currently running |
| 840 // sequences in DidRunWorkerTask() below. |
| 841 task.sequence_token_id = this_worker->task_sequence_token().id_; |
| 842 |
| 810 // Make sure our task is erased outside the lock for the | 843 // Make sure our task is erased outside the lock for the |
| 811 // same reason we do this with delete_these_oustide_lock. | 844 // same reason we do this with delete_these_oustide_lock. |
| 812 // Also, do it before calling reset_running_task_info() so | 845 // Also, do it before calling reset_running_task_info() so |
| 813 // that sequence-checking from within the task's destructor | 846 // that sequence-checking from within the task's destructor |
| 814 // still works. | 847 // still works. |
| 815 task.task = Closure(); | 848 task.task = Closure(); |
| 816 | 849 |
| 817 this_worker->reset_running_task_info(); | 850 this_worker->reset_running_task_info(); |
| 818 } | 851 } |
| 819 DidRunWorkerTask(task); // Must be done inside the lock. | 852 DidRunWorkerTask(task); // Must be done inside the lock. |
| (...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1174 SequencedWorkerPool::SequenceToken | 1207 SequencedWorkerPool::SequenceToken |
| 1175 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { | 1208 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { |
| 1176 Worker* worker = Worker::GetForCurrentThread(); | 1209 Worker* worker = Worker::GetForCurrentThread(); |
| 1177 if (!worker) | 1210 if (!worker) |
| 1178 return SequenceToken(); | 1211 return SequenceToken(); |
| 1179 | 1212 |
| 1180 return worker->task_sequence_token(); | 1213 return worker->task_sequence_token(); |
| 1181 } | 1214 } |
| 1182 | 1215 |
| 1183 // static | 1216 // static |
| 1184 scoped_refptr<SequencedWorkerPool> | 1217 scoped_refptr<SequencedTaskRunner> |
| 1185 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { | 1218 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() { |
| 1186 Worker* worker = Worker::GetForCurrentThread(); | 1219 Worker* worker = Worker::GetForCurrentThread(); |
| 1220 |
| 1221 // If there is no worker, this thread is not a worker thread. Otherwise, it is |
| 1222 // currently running a task (sequenced or unsequenced). |
| 1187 if (!worker) | 1223 if (!worker) |
| 1188 return nullptr; | 1224 return nullptr; |
| 1189 | 1225 |
| 1190 return worker->worker_pool(); | 1226 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool(); |
| 1227 SequenceToken sequence_token = worker->task_sequence_token(); |
| 1228 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior(); |
| 1229 if (!sequence_token.IsValid()) { |
| 1230 // Create a new sequence token and bind this thread to it, to make sure that |
| 1231 // a task posted to the SequencedTaskRunner we are going to return is not |
| 1232 // immediately going to run on a different thread. |
| 1233 sequence_token = Inner::GetSequenceToken(); |
| 1234 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token, |
| 1235 shutdown_behavior); |
| 1236 } |
| 1237 |
| 1238 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token)); |
| 1239 return new SequencedWorkerPoolSequencedTaskRunner( |
| 1240 std::move(pool), sequence_token, shutdown_behavior); |
| 1191 } | 1241 } |
| 1192 | 1242 |
| 1193 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1243 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| 1194 const std::string& thread_name_prefix) | 1244 const std::string& thread_name_prefix) |
| 1195 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1245 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
| 1196 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { | 1246 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { |
| 1197 } | 1247 } |
| 1198 | 1248 |
| 1199 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1249 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| 1200 const std::string& thread_name_prefix, | 1250 const std::string& thread_name_prefix, |
| 1201 TestingObserver* observer) | 1251 TestingObserver* observer) |
| 1202 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1252 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
| 1203 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { | 1253 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { |
| 1204 } | 1254 } |
| 1205 | 1255 |
| 1206 SequencedWorkerPool::~SequencedWorkerPool() {} | 1256 SequencedWorkerPool::~SequencedWorkerPool() {} |
| 1207 | 1257 |
| 1208 void SequencedWorkerPool::OnDestruct() const { | 1258 void SequencedWorkerPool::OnDestruct() const { |
| 1209 // Avoid deleting ourselves on a worker thread (which would | 1259 // Avoid deleting ourselves on a worker thread (which would deadlock). |
| 1210 // deadlock). | |
| 1211 if (RunsTasksOnCurrentThread()) { | 1260 if (RunsTasksOnCurrentThread()) { |
| 1212 constructor_task_runner_->DeleteSoon(FROM_HERE, this); | 1261 constructor_task_runner_->DeleteSoon(FROM_HERE, this); |
| 1213 } else { | 1262 } else { |
| 1214 delete this; | 1263 delete this; |
| 1215 } | 1264 } |
| 1216 } | 1265 } |
| 1217 | 1266 |
| 1267 // static |
| 1218 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { | 1268 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
| 1219 return inner_->GetSequenceToken(); | 1269 return Inner::GetSequenceToken(); |
| 1220 } | 1270 } |
| 1221 | 1271 |
| 1222 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( | 1272 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
| 1223 const std::string& name) { | 1273 const std::string& name) { |
| 1224 return inner_->GetNamedSequenceToken(name); | 1274 return inner_->GetNamedSequenceToken(name); |
| 1225 } | 1275 } |
| 1226 | 1276 |
| 1227 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( | 1277 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( |
| 1228 SequenceToken token) { | 1278 SequenceToken token) { |
| 1229 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); | 1279 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1313 | 1363 |
| 1314 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 1364 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
| 1315 return inner_->RunsTasksOnCurrentThread(); | 1365 return inner_->RunsTasksOnCurrentThread(); |
| 1316 } | 1366 } |
| 1317 | 1367 |
| 1318 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1368 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
| 1319 SequenceToken sequence_token) const { | 1369 SequenceToken sequence_token) const { |
| 1320 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1370 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
| 1321 } | 1371 } |
| 1322 | 1372 |
| 1373 bool SequencedWorkerPool::IsRunningSequence( |
| 1374 SequenceToken sequence_token) const { |
| 1375 return inner_->IsRunningSequence(sequence_token); |
| 1376 } |
| 1377 |
| 1323 void SequencedWorkerPool::FlushForTesting() { | 1378 void SequencedWorkerPool::FlushForTesting() { |
| 1324 inner_->CleanupForTesting(); | 1379 inner_->CleanupForTesting(); |
| 1325 } | 1380 } |
| 1326 | 1381 |
| 1327 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1382 void SequencedWorkerPool::SignalHasWorkForTesting() { |
| 1328 inner_->SignalHasWorkForTesting(); | 1383 inner_->SignalHasWorkForTesting(); |
| 1329 } | 1384 } |
| 1330 | 1385 |
| 1331 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1386 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
| 1332 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1387 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
| 1333 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1388 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
| 1334 } | 1389 } |
| 1335 | 1390 |
| 1336 bool SequencedWorkerPool::IsShutdownInProgress() { | 1391 bool SequencedWorkerPool::IsShutdownInProgress() { |
| 1337 return inner_->IsShutdownInProgress(); | 1392 return inner_->IsShutdownInProgress(); |
| 1338 } | 1393 } |
| 1339 | 1394 |
| 1340 } // namespace base | 1395 } // namespace base |
| OLD | NEW |