OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <list> | 9 #include <list> |
10 #include <map> | 10 #include <map> |
(...skipping 281 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
292 class SequencedWorkerPool::Inner { | 292 class SequencedWorkerPool::Inner { |
293 public: | 293 public: |
294 // Take a raw pointer to |worker| to avoid cycles (since we're owned | 294 // Take a raw pointer to |worker| to avoid cycles (since we're owned |
295 // by it). | 295 // by it). |
296 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, | 296 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, |
297 const std::string& thread_name_prefix, | 297 const std::string& thread_name_prefix, |
298 TestingObserver* observer); | 298 TestingObserver* observer); |
299 | 299 |
300 ~Inner(); | 300 ~Inner(); |
301 | 301 |
302 SequenceToken GetSequenceToken(); | 302 static SequenceToken GetSequenceToken(); |
303 | 303 |
304 SequenceToken GetNamedSequenceToken(const std::string& name); | 304 SequenceToken GetNamedSequenceToken(const std::string& name); |
305 | 305 |
306 // This function accepts a name and an ID. If the name is null, the | 306 // This function accepts a name and an ID. If the name is null, the |
307 // token ID is used. This allows us to implement the optional name lookup | 307 // token ID is used. This allows us to implement the optional name lookup |
308 // from a single function without having to enter the lock a separate time. | 308 // from a single function without having to enter the lock a separate time. |
309 bool PostTask(const std::string* optional_token_name, | 309 bool PostTask(const std::string* optional_token_name, |
310 SequenceToken sequence_token, | 310 SequenceToken sequence_token, |
311 WorkerShutdown shutdown_behavior, | 311 WorkerShutdown shutdown_behavior, |
312 const tracked_objects::Location& from_here, | 312 const tracked_objects::Location& from_here, |
313 const Closure& task, | 313 const Closure& task, |
314 TimeDelta delay); | 314 TimeDelta delay); |
315 | 315 |
316 bool RunsTasksOnCurrentThread() const; | 316 bool RunsTasksOnCurrentThread() const; |
317 | 317 |
318 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; | 318 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
319 | 319 |
| 320 bool IsRunningSequence(SequenceToken sequence_token) const; |
| 321 |
| 322 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token, |
| 323 WorkerShutdown shutdown_behavior); |
| 324 |
320 void CleanupForTesting(); | 325 void CleanupForTesting(); |
321 | 326 |
322 void SignalHasWorkForTesting(); | 327 void SignalHasWorkForTesting(); |
323 | 328 |
324 int GetWorkSignalCountForTesting() const; | 329 int GetWorkSignalCountForTesting() const; |
325 | 330 |
326 void Shutdown(int max_blocking_tasks_after_shutdown); | 331 void Shutdown(int max_blocking_tasks_after_shutdown); |
327 | 332 |
328 bool IsShutdownInProgress(); | 333 bool IsShutdownInProgress(); |
329 | 334 |
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
579 // Need to explicitly join with the threads before they're destroyed or else | 584 // Need to explicitly join with the threads before they're destroyed or else |
580 // they will be running when our object is half torn down. | 585 // they will be running when our object is half torn down. |
581 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) | 586 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) |
582 it->second->Join(); | 587 it->second->Join(); |
583 threads_.clear(); | 588 threads_.clear(); |
584 | 589 |
585 if (testing_observer_) | 590 if (testing_observer_) |
586 testing_observer_->OnDestruct(); | 591 testing_observer_->OnDestruct(); |
587 } | 592 } |
588 | 593 |
| 594 // static |
589 SequencedWorkerPool::SequenceToken | 595 SequencedWorkerPool::SequenceToken |
590 SequencedWorkerPool::Inner::GetSequenceToken() { | 596 SequencedWorkerPool::Inner::GetSequenceToken() { |
591 // Need to add one because StaticAtomicSequenceNumber starts at zero, which | 597 // Need to add one because StaticAtomicSequenceNumber starts at zero, which |
592 // is used as a sentinel value in SequenceTokens. | 598 // is used as a sentinel value in SequenceTokens. |
593 return SequenceToken(g_last_sequence_number_.GetNext() + 1); | 599 return SequenceToken(g_last_sequence_number_.GetNext() + 1); |
594 } | 600 } |
595 | 601 |
596 SequencedWorkerPool::SequenceToken | 602 SequencedWorkerPool::SequenceToken |
597 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { | 603 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { |
598 AutoLock lock(lock_); | 604 AutoLock lock(lock_); |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
679 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 685 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
680 SequenceToken sequence_token) const { | 686 SequenceToken sequence_token) const { |
681 AutoLock lock(lock_); | 687 AutoLock lock(lock_); |
682 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 688 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
683 if (found == threads_.end()) | 689 if (found == threads_.end()) |
684 return false; | 690 return false; |
685 return found->second->is_processing_task() && | 691 return found->second->is_processing_task() && |
686 sequence_token.Equals(found->second->task_sequence_token()); | 692 sequence_token.Equals(found->second->task_sequence_token()); |
687 } | 693 } |
688 | 694 |
| 695 bool SequencedWorkerPool::Inner::IsRunningSequence( |
| 696 SequenceToken sequence_token) const { |
| 697 DCHECK(sequence_token.IsValid()); |
| 698 AutoLock lock(lock_); |
| 699 return !IsSequenceTokenRunnable(sequence_token.id_); |
| 700 } |
| 701 |
| 702 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread( |
| 703 SequenceToken sequence_token, |
| 704 WorkerShutdown shutdown_behavior) { |
| 705 AutoLock lock(lock_); |
| 706 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| 707 DCHECK(found != threads_.end()); |
| 708 DCHECK(found->second->is_processing_task()); |
| 709 DCHECK(!found->second->task_sequence_token().IsValid()); |
| 710 found->second->set_running_task_info(sequence_token, shutdown_behavior); |
| 711 |
| 712 // Mark the sequence token as in use. |
| 713 bool success = current_sequences_.insert(sequence_token.id_).second; |
| 714 DCHECK(success); |
| 715 } |
| 716 |
689 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 717 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
690 void SequencedWorkerPool::Inner::CleanupForTesting() { | 718 void SequencedWorkerPool::Inner::CleanupForTesting() { |
691 DCHECK(!RunsTasksOnCurrentThread()); | 719 DCHECK(!RunsTasksOnCurrentThread()); |
692 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 720 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
693 AutoLock lock(lock_); | 721 AutoLock lock(lock_); |
694 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 722 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
695 if (shutdown_called_) | 723 if (shutdown_called_) |
696 return; | 724 return; |
697 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) | 725 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) |
698 return; | 726 return; |
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
803 SequenceToken(task.sequence_token_id), task.shutdown_behavior); | 831 SequenceToken(task.sequence_token_id), task.shutdown_behavior); |
804 | 832 |
805 tracked_objects::TaskStopwatch stopwatch; | 833 tracked_objects::TaskStopwatch stopwatch; |
806 stopwatch.Start(); | 834 stopwatch.Start(); |
807 task.task.Run(); | 835 task.task.Run(); |
808 stopwatch.Stop(); | 836 stopwatch.Stop(); |
809 | 837 |
810 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( | 838 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( |
811 task, stopwatch); | 839 task, stopwatch); |
812 | 840 |
| 841 // Update the sequence token in case it has been set from within the |
| 842 // task, so it can be removed from the set of currently running |
| 843 // sequences in DidRunWorkerTask() below. |
| 844 task.sequence_token_id = this_worker->task_sequence_token().id_; |
| 845 |
813 // Make sure our task is erased outside the lock for the | 846 // Make sure our task is erased outside the lock for the |
814 // same reason we do this with delete_these_oustide_lock. | 847 // same reason we do this with delete_these_oustide_lock. |
815 // Also, do it before calling reset_running_task_info() so | 848 // Also, do it before calling reset_running_task_info() so |
816 // that sequence-checking from within the task's destructor | 849 // that sequence-checking from within the task's destructor |
817 // still works. | 850 // still works. |
818 task.task = Closure(); | 851 task.task = Closure(); |
819 | 852 |
820 this_worker->reset_running_task_info(); | 853 this_worker->reset_running_task_info(); |
821 } | 854 } |
822 DidRunWorkerTask(task); // Must be done inside the lock. | 855 DidRunWorkerTask(task); // Must be done inside the lock. |
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1186 // static | 1219 // static |
1187 scoped_refptr<SequencedWorkerPool> | 1220 scoped_refptr<SequencedWorkerPool> |
1188 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { | 1221 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
1189 Worker* worker = Worker::GetForCurrentThread(); | 1222 Worker* worker = Worker::GetForCurrentThread(); |
1190 if (!worker) | 1223 if (!worker) |
1191 return nullptr; | 1224 return nullptr; |
1192 | 1225 |
1193 return worker->worker_pool(); | 1226 return worker->worker_pool(); |
1194 } | 1227 } |
1195 | 1228 |
| 1229 // static |
| 1230 scoped_refptr<SequencedTaskRunner> |
| 1231 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() { |
| 1232 Worker* worker = Worker::GetForCurrentThread(); |
| 1233 |
| 1234 // If there is no worker, this thread is not a worker thread. Otherwise, it is |
| 1235 // currently running a task (sequenced or unsequenced). |
| 1236 if (!worker) |
| 1237 return nullptr; |
| 1238 |
| 1239 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool(); |
| 1240 SequenceToken sequence_token = worker->task_sequence_token(); |
| 1241 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior(); |
| 1242 if (!sequence_token.IsValid()) { |
| 1243 // Create a new sequence token and bind this thread to it, to make sure that |
| 1244 // a task posted to the SequencedTaskRunner we are going to return is not |
| 1245 // immediately going to run on a different thread. |
| 1246 sequence_token = Inner::GetSequenceToken(); |
| 1247 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token, |
| 1248 shutdown_behavior); |
| 1249 } |
| 1250 |
| 1251 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token)); |
| 1252 return new SequencedWorkerPoolSequencedTaskRunner( |
| 1253 std::move(pool), sequence_token, shutdown_behavior); |
| 1254 } |
| 1255 |
1196 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1256 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
1197 const std::string& thread_name_prefix) | 1257 const std::string& thread_name_prefix) |
1198 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1258 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
1199 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { | 1259 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { |
1200 } | 1260 } |
1201 | 1261 |
1202 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1262 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
1203 const std::string& thread_name_prefix, | 1263 const std::string& thread_name_prefix, |
1204 TestingObserver* observer) | 1264 TestingObserver* observer) |
1205 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1265 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
1206 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { | 1266 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { |
1207 } | 1267 } |
1208 | 1268 |
1209 SequencedWorkerPool::~SequencedWorkerPool() {} | 1269 SequencedWorkerPool::~SequencedWorkerPool() {} |
1210 | 1270 |
1211 void SequencedWorkerPool::OnDestruct() const { | 1271 void SequencedWorkerPool::OnDestruct() const { |
1212 // Avoid deleting ourselves on a worker thread (which would | 1272 // Avoid deleting ourselves on a worker thread (which would deadlock). |
1213 // deadlock). | |
1214 if (RunsTasksOnCurrentThread()) { | 1273 if (RunsTasksOnCurrentThread()) { |
1215 constructor_task_runner_->DeleteSoon(FROM_HERE, this); | 1274 constructor_task_runner_->DeleteSoon(FROM_HERE, this); |
1216 } else { | 1275 } else { |
1217 delete this; | 1276 delete this; |
1218 } | 1277 } |
1219 } | 1278 } |
1220 | 1279 |
| 1280 // static |
1221 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { | 1281 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
1222 return inner_->GetSequenceToken(); | 1282 return Inner::GetSequenceToken(); |
1223 } | 1283 } |
1224 | 1284 |
1225 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( | 1285 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
1226 const std::string& name) { | 1286 const std::string& name) { |
1227 return inner_->GetNamedSequenceToken(name); | 1287 return inner_->GetNamedSequenceToken(name); |
1228 } | 1288 } |
1229 | 1289 |
1230 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( | 1290 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( |
1231 SequenceToken token) { | 1291 SequenceToken token) { |
1232 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); | 1292 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); |
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1316 | 1376 |
1317 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 1377 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
1318 return inner_->RunsTasksOnCurrentThread(); | 1378 return inner_->RunsTasksOnCurrentThread(); |
1319 } | 1379 } |
1320 | 1380 |
1321 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1381 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
1322 SequenceToken sequence_token) const { | 1382 SequenceToken sequence_token) const { |
1323 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1383 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
1324 } | 1384 } |
1325 | 1385 |
| 1386 bool SequencedWorkerPool::IsRunningSequence( |
| 1387 SequenceToken sequence_token) const { |
| 1388 return inner_->IsRunningSequence(sequence_token); |
| 1389 } |
| 1390 |
1326 void SequencedWorkerPool::FlushForTesting() { | 1391 void SequencedWorkerPool::FlushForTesting() { |
1327 inner_->CleanupForTesting(); | 1392 inner_->CleanupForTesting(); |
1328 } | 1393 } |
1329 | 1394 |
1330 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1395 void SequencedWorkerPool::SignalHasWorkForTesting() { |
1331 inner_->SignalHasWorkForTesting(); | 1396 inner_->SignalHasWorkForTesting(); |
1332 } | 1397 } |
1333 | 1398 |
1334 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1399 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
1335 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1400 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
1336 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1401 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
1337 } | 1402 } |
1338 | 1403 |
1339 bool SequencedWorkerPool::IsShutdownInProgress() { | 1404 bool SequencedWorkerPool::IsShutdownInProgress() { |
1340 return inner_->IsShutdownInProgress(); | 1405 return inner_->IsShutdownInProgress(); |
1341 } | 1406 } |
1342 | 1407 |
1343 } // namespace base | 1408 } // namespace base |
OLD | NEW |