OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <list> | 7 #include <list> |
8 #include <map> | 8 #include <map> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
(...skipping 278 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
289 class SequencedWorkerPool::Inner { | 289 class SequencedWorkerPool::Inner { |
290 public: | 290 public: |
291 // Take a raw pointer to |worker| to avoid cycles (since we're owned | 291 // Take a raw pointer to |worker| to avoid cycles (since we're owned |
292 // by it). | 292 // by it). |
293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, | 293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, |
294 const std::string& thread_name_prefix, | 294 const std::string& thread_name_prefix, |
295 TestingObserver* observer); | 295 TestingObserver* observer); |
296 | 296 |
297 ~Inner(); | 297 ~Inner(); |
298 | 298 |
299 SequenceToken GetSequenceToken(); | 299 static SequenceToken GetSequenceToken(); |
300 | 300 |
301 SequenceToken GetNamedSequenceToken(const std::string& name); | 301 SequenceToken GetNamedSequenceToken(const std::string& name); |
302 | 302 |
303 // This function accepts a name and an ID. If the name is null, the | 303 // This function accepts a name and an ID. If the name is null, the |
304 // token ID is used. This allows us to implement the optional name lookup | 304 // token ID is used. This allows us to implement the optional name lookup |
305 // from a single function without having to enter the lock a separate time. | 305 // from a single function without having to enter the lock a separate time. |
306 bool PostTask(const std::string* optional_token_name, | 306 bool PostTask(const std::string* optional_token_name, |
307 SequenceToken sequence_token, | 307 SequenceToken sequence_token, |
308 WorkerShutdown shutdown_behavior, | 308 WorkerShutdown shutdown_behavior, |
309 const tracked_objects::Location& from_here, | 309 const tracked_objects::Location& from_here, |
310 const Closure& task, | 310 const Closure& task, |
311 TimeDelta delay); | 311 TimeDelta delay); |
312 | 312 |
313 bool RunsTasksOnCurrentThread() const; | 313 bool RunsTasksOnCurrentThread() const; |
314 | 314 |
315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; | 315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
316 | 316 |
317 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token, | |
318 WorkerShutdown shutdown_behavior); | |
319 | |
317 void CleanupForTesting(); | 320 void CleanupForTesting(); |
318 | 321 |
319 void SignalHasWorkForTesting(); | 322 void SignalHasWorkForTesting(); |
320 | 323 |
321 int GetWorkSignalCountForTesting() const; | 324 int GetWorkSignalCountForTesting() const; |
322 | 325 |
323 void Shutdown(int max_blocking_tasks_after_shutdown); | 326 void Shutdown(int max_blocking_tasks_after_shutdown); |
324 | 327 |
325 bool IsShutdownInProgress(); | 328 bool IsShutdownInProgress(); |
326 | 329 |
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
576 // Need to explicitly join with the threads before they're destroyed or else | 579 // Need to explicitly join with the threads before they're destroyed or else |
577 // they will be running when our object is half torn down. | 580 // they will be running when our object is half torn down. |
578 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) | 581 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) |
579 it->second->Join(); | 582 it->second->Join(); |
580 threads_.clear(); | 583 threads_.clear(); |
581 | 584 |
582 if (testing_observer_) | 585 if (testing_observer_) |
583 testing_observer_->OnDestruct(); | 586 testing_observer_->OnDestruct(); |
584 } | 587 } |
585 | 588 |
589 // static | |
586 SequencedWorkerPool::SequenceToken | 590 SequencedWorkerPool::SequenceToken |
587 SequencedWorkerPool::Inner::GetSequenceToken() { | 591 SequencedWorkerPool::Inner::GetSequenceToken() { |
588 // Need to add one because StaticAtomicSequenceNumber starts at zero, which | 592 // Need to add one because StaticAtomicSequenceNumber starts at zero, which |
589 // is used as a sentinel value in SequenceTokens. | 593 // is used as a sentinel value in SequenceTokens. |
590 return SequenceToken(g_last_sequence_number_.GetNext() + 1); | 594 return SequenceToken(g_last_sequence_number_.GetNext() + 1); |
591 } | 595 } |
592 | 596 |
593 SequencedWorkerPool::SequenceToken | 597 SequencedWorkerPool::SequenceToken |
594 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { | 598 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { |
595 AutoLock lock(lock_); | 599 AutoLock lock(lock_); |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
676 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 680 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
677 SequenceToken sequence_token) const { | 681 SequenceToken sequence_token) const { |
678 AutoLock lock(lock_); | 682 AutoLock lock(lock_); |
679 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 683 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
680 if (found == threads_.end()) | 684 if (found == threads_.end()) |
681 return false; | 685 return false; |
682 return found->second->is_processing_task() && | 686 return found->second->is_processing_task() && |
683 sequence_token.Equals(found->second->task_sequence_token()); | 687 sequence_token.Equals(found->second->task_sequence_token()); |
684 } | 688 } |
685 | 689 |
690 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread( | |
691 SequenceToken sequence_token, | |
692 WorkerShutdown shutdown_behavior) { | |
693 AutoLock lock(lock_); | |
694 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | |
695 DCHECK(found != threads_.end()); | |
696 DCHECK(found->second->is_processing_task()); | |
697 DCHECK(!found->second->task_sequence_token().IsValid()); | |
698 found->second->set_running_task_info(sequence_token, shutdown_behavior); | |
699 } | |
700 | |
686 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 701 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
687 void SequencedWorkerPool::Inner::CleanupForTesting() { | 702 void SequencedWorkerPool::Inner::CleanupForTesting() { |
688 DCHECK(!RunsTasksOnCurrentThread()); | 703 DCHECK(!RunsTasksOnCurrentThread()); |
689 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 704 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
690 AutoLock lock(lock_); | 705 AutoLock lock(lock_); |
691 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 706 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
692 if (shutdown_called_) | 707 if (shutdown_called_) |
693 return; | 708 return; |
694 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) | 709 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) |
695 return; | 710 return; |
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
804 task.task.Run(); | 819 task.task.Run(); |
805 stopwatch.Stop(); | 820 stopwatch.Stop(); |
806 | 821 |
807 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( | 822 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( |
808 task, stopwatch); | 823 task, stopwatch); |
809 | 824 |
810 // Make sure our task is erased outside the lock for the | 825 // Make sure our task is erased outside the lock for the |
811 // same reason we do this with delete_these_oustide_lock. | 826 // same reason we do this with delete_these_oustide_lock. |
812 // Also, do it before calling reset_running_task_info() so | 827 // Also, do it before calling reset_running_task_info() so |
813 // that sequence-checking from within the task's destructor | 828 // that sequence-checking from within the task's destructor |
814 // still works. | 829 // still works. |
danakj
2015/11/17 22:03:39
The task.sequence_token_id is no longer correct af
Bernhard Bauer
2015/11/18 13:15:50
I think this is referring to the fact that we migh
danakj
2015/11/20 19:18:43
Can you add a unit test that uses SequenceChecker
Bernhard Bauer
2015/12/10 16:09:58
Done! I verified that the test fails if the task i
| |
815 task.task = Closure(); | 830 task.task = Closure(); |
816 | 831 |
817 this_worker->reset_running_task_info(); | 832 this_worker->reset_running_task_info(); |
818 } | 833 } |
819 DidRunWorkerTask(task); // Must be done inside the lock. | 834 DidRunWorkerTask(task); // Must be done inside the lock. |
820 } else if (cleanup_state_ == CLEANUP_RUNNING) { | 835 } else if (cleanup_state_ == CLEANUP_RUNNING) { |
821 switch (status) { | 836 switch (status) { |
822 case GET_WORK_WAIT: { | 837 case GET_WORK_WAIT: { |
823 AutoUnlock unlock(lock_); | 838 AutoUnlock unlock(lock_); |
824 delete_these_outside_lock.clear(); | 839 delete_these_outside_lock.clear(); |
(...skipping 349 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1174 SequencedWorkerPool::SequenceToken | 1189 SequencedWorkerPool::SequenceToken |
1175 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { | 1190 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { |
1176 Worker* worker = Worker::GetForCurrentThread(); | 1191 Worker* worker = Worker::GetForCurrentThread(); |
1177 if (!worker) | 1192 if (!worker) |
1178 return SequenceToken(); | 1193 return SequenceToken(); |
1179 | 1194 |
1180 return worker->task_sequence_token(); | 1195 return worker->task_sequence_token(); |
1181 } | 1196 } |
1182 | 1197 |
1183 // static | 1198 // static |
1184 scoped_refptr<SequencedWorkerPool> | 1199 scoped_refptr<SequencedTaskRunner> |
1185 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { | 1200 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() { |
1186 Worker* worker = Worker::GetForCurrentThread(); | 1201 Worker* worker = Worker::GetForCurrentThread(); |
1187 if (!worker) | 1202 if (!worker) |
1188 return nullptr; | 1203 return nullptr; |
1189 | 1204 |
1190 return worker->worker_pool(); | 1205 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool(); |
1206 SequenceToken sequence_token = worker->task_sequence_token(); | |
1207 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior(); | |
danakj
2015/11/17 22:03:39
Can you DCHECK that is_processing_task() is true?
Bernhard Bauer
2015/11/18 13:15:50
task_shutdown_behavior() already does that. Altern
danakj
2015/11/20 19:18:43
Can you add a comment at the !worker early out exp
Bernhard Bauer
2015/12/10 16:09:58
Done, modulo s/sequenced //, because a worker thre
| |
1208 if (!sequence_token.IsValid()) { | |
1209 sequence_token = Inner::GetSequenceToken(); | |
1210 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token, | |
danakj
2015/11/17 22:03:39
Why do you do this? Now the thread is left in a st
Bernhard Bauer
2015/11/18 13:15:50
I do this to make sure that a task posted to the n
danakj
2015/11/20 19:18:43
Can you encode this into a comment in here?
And c
Bernhard Bauer
2015/12/10 16:09:58
Done.
danakj
2015/12/16 21:53:32
What about just calling it twice and making sure y
| |
1211 shutdown_behavior); | |
1212 } | |
1213 | |
1214 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token)); | |
1215 return new SequencedWorkerPoolSequencedTaskRunner(pool, sequence_token, | |
danakj
2015/11/17 22:03:39
can you move() the pool smart pointer? no need to
Bernhard Bauer
2015/11/18 13:15:50
Done.
| |
1216 shutdown_behavior); | |
1191 } | 1217 } |
1192 | 1218 |
1193 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1219 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
1194 const std::string& thread_name_prefix) | 1220 const std::string& thread_name_prefix) |
1195 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1221 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
1196 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { | 1222 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { |
1197 } | 1223 } |
1198 | 1224 |
1199 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1225 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
1200 const std::string& thread_name_prefix, | 1226 const std::string& thread_name_prefix, |
1201 TestingObserver* observer) | 1227 TestingObserver* observer) |
1202 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1228 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
1203 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { | 1229 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { |
1204 } | 1230 } |
1205 | 1231 |
1206 SequencedWorkerPool::~SequencedWorkerPool() {} | 1232 SequencedWorkerPool::~SequencedWorkerPool() {} |
1207 | 1233 |
1208 void SequencedWorkerPool::OnDestruct() const { | 1234 void SequencedWorkerPool::OnDestruct() const { |
1209 // Avoid deleting ourselves on a worker thread (which would | 1235 // Avoid deleting ourselves on a worker thread (which would deadlock). |
1210 // deadlock). | |
1211 if (RunsTasksOnCurrentThread()) { | 1236 if (RunsTasksOnCurrentThread()) { |
1212 constructor_task_runner_->DeleteSoon(FROM_HERE, this); | 1237 constructor_task_runner_->DeleteSoon(FROM_HERE, this); |
1213 } else { | 1238 } else { |
1214 delete this; | 1239 delete this; |
1215 } | 1240 } |
1216 } | 1241 } |
1217 | 1242 |
1243 // static | |
1218 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { | 1244 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
1219 return inner_->GetSequenceToken(); | 1245 return Inner::GetSequenceToken(); |
1220 } | 1246 } |
1221 | 1247 |
1222 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( | 1248 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
1223 const std::string& name) { | 1249 const std::string& name) { |
1224 return inner_->GetNamedSequenceToken(name); | 1250 return inner_->GetNamedSequenceToken(name); |
1225 } | 1251 } |
1226 | 1252 |
1227 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( | 1253 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( |
1228 SequenceToken token) { | 1254 SequenceToken token) { |
1229 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); | 1255 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1331 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1357 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
1332 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1358 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
1333 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1359 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
1334 } | 1360 } |
1335 | 1361 |
1336 bool SequencedWorkerPool::IsShutdownInProgress() { | 1362 bool SequencedWorkerPool::IsShutdownInProgress() { |
1337 return inner_->IsShutdownInProgress(); | 1363 return inner_->IsShutdownInProgress(); |
1338 } | 1364 } |
1339 | 1365 |
1340 } // namespace base | 1366 } // namespace base |
OLD | NEW |