Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(22)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 1414793009: Allow SequencedTaskRunnerHandle::Get() while running unsequenced tasks. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: added comment Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
(...skipping 278 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 class SequencedWorkerPool::Inner { 289 class SequencedWorkerPool::Inner {
290 public: 290 public:
291 // Take a raw pointer to |worker| to avoid cycles (since we're owned 291 // Take a raw pointer to |worker| to avoid cycles (since we're owned
292 // by it). 292 // by it).
293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
294 const std::string& thread_name_prefix, 294 const std::string& thread_name_prefix,
295 TestingObserver* observer); 295 TestingObserver* observer);
296 296
297 ~Inner(); 297 ~Inner();
298 298
299 SequenceToken GetSequenceToken(); 299 static SequenceToken GetSequenceToken();
300 300
301 SequenceToken GetNamedSequenceToken(const std::string& name); 301 SequenceToken GetNamedSequenceToken(const std::string& name);
302 302
303 // This function accepts a name and an ID. If the name is null, the 303 // This function accepts a name and an ID. If the name is null, the
304 // token ID is used. This allows us to implement the optional name lookup 304 // token ID is used. This allows us to implement the optional name lookup
305 // from a single function without having to enter the lock a separate time. 305 // from a single function without having to enter the lock a separate time.
306 bool PostTask(const std::string* optional_token_name, 306 bool PostTask(const std::string* optional_token_name,
307 SequenceToken sequence_token, 307 SequenceToken sequence_token,
308 WorkerShutdown shutdown_behavior, 308 WorkerShutdown shutdown_behavior,
309 const tracked_objects::Location& from_here, 309 const tracked_objects::Location& from_here,
310 const Closure& task, 310 const Closure& task,
311 TimeDelta delay); 311 TimeDelta delay);
312 312
313 bool RunsTasksOnCurrentThread() const; 313 bool RunsTasksOnCurrentThread() const;
314 314
315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
316 316
317 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,
318 WorkerShutdown shutdown_behavior);
319
317 void CleanupForTesting(); 320 void CleanupForTesting();
318 321
319 void SignalHasWorkForTesting(); 322 void SignalHasWorkForTesting();
320 323
321 int GetWorkSignalCountForTesting() const; 324 int GetWorkSignalCountForTesting() const;
322 325
323 void Shutdown(int max_blocking_tasks_after_shutdown); 326 void Shutdown(int max_blocking_tasks_after_shutdown);
324 327
325 bool IsShutdownInProgress(); 328 bool IsShutdownInProgress();
326 329
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after
576 // Need to explicitly join with the threads before they're destroyed or else 579 // Need to explicitly join with the threads before they're destroyed or else
577 // they will be running when our object is half torn down. 580 // they will be running when our object is half torn down.
578 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 581 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
579 it->second->Join(); 582 it->second->Join();
580 threads_.clear(); 583 threads_.clear();
581 584
582 if (testing_observer_) 585 if (testing_observer_)
583 testing_observer_->OnDestruct(); 586 testing_observer_->OnDestruct();
584 } 587 }
585 588
589 // static
586 SequencedWorkerPool::SequenceToken 590 SequencedWorkerPool::SequenceToken
587 SequencedWorkerPool::Inner::GetSequenceToken() { 591 SequencedWorkerPool::Inner::GetSequenceToken() {
588 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 592 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
589 // is used as a sentinel value in SequenceTokens. 593 // is used as a sentinel value in SequenceTokens.
590 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 594 return SequenceToken(g_last_sequence_number_.GetNext() + 1);
591 } 595 }
592 596
593 SequencedWorkerPool::SequenceToken 597 SequencedWorkerPool::SequenceToken
594 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 598 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
595 AutoLock lock(lock_); 599 AutoLock lock(lock_);
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
676 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 680 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
677 SequenceToken sequence_token) const { 681 SequenceToken sequence_token) const {
678 AutoLock lock(lock_); 682 AutoLock lock(lock_);
679 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 683 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
680 if (found == threads_.end()) 684 if (found == threads_.end())
681 return false; 685 return false;
682 return found->second->is_processing_task() && 686 return found->second->is_processing_task() &&
683 sequence_token.Equals(found->second->task_sequence_token()); 687 sequence_token.Equals(found->second->task_sequence_token());
684 } 688 }
685 689
690 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
691 SequenceToken sequence_token,
692 WorkerShutdown shutdown_behavior) {
693 AutoLock lock(lock_);
694 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
695 DCHECK(found != threads_.end());
696 DCHECK(found->second->is_processing_task());
697 DCHECK(!found->second->task_sequence_token().IsValid());
698 found->second->set_running_task_info(sequence_token, shutdown_behavior);
699
700 // Mark the sequence token as in use.
701 bool success = current_sequences_.insert(sequence_token.id_).second;
702 DCHECK(success);
703 }
704
686 // See https://code.google.com/p/chromium/issues/detail?id=168415 705 // See https://code.google.com/p/chromium/issues/detail?id=168415
687 void SequencedWorkerPool::Inner::CleanupForTesting() { 706 void SequencedWorkerPool::Inner::CleanupForTesting() {
688 DCHECK(!RunsTasksOnCurrentThread()); 707 DCHECK(!RunsTasksOnCurrentThread());
689 base::ThreadRestrictions::ScopedAllowWait allow_wait; 708 base::ThreadRestrictions::ScopedAllowWait allow_wait;
690 AutoLock lock(lock_); 709 AutoLock lock(lock_);
691 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 710 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
692 if (shutdown_called_) 711 if (shutdown_called_)
693 return; 712 return;
694 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 713 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
695 return; 714 return;
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
800 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 819 SequenceToken(task.sequence_token_id), task.shutdown_behavior);
801 820
802 tracked_objects::TaskStopwatch stopwatch; 821 tracked_objects::TaskStopwatch stopwatch;
803 stopwatch.Start(); 822 stopwatch.Start();
804 task.task.Run(); 823 task.task.Run();
805 stopwatch.Stop(); 824 stopwatch.Stop();
806 825
807 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( 826 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
808 task, stopwatch); 827 task, stopwatch);
809 828
829 // Update the sequence token in case it has been set from within the
830 // task, so it can be removed from the set of currently running
831 // sequences in DidRunWorkerTask() below.
832 task.sequence_token_id = this_worker->task_sequence_token().id_;
danakj 2015/11/20 19:18:43 Is this line captured in any unit tests?
Bernhard Bauer 2015/12/10 16:09:58 Yes, SequencedWorkerPoolTest.GetSequencedTaskRunne
833
810 // Make sure our task is erased outside the lock for the 834 // Make sure our task is erased outside the lock for the
811 // same reason we do this with delete_these_oustide_lock. 835 // same reason we do this with delete_these_oustide_lock.
812 // Also, do it before calling reset_running_task_info() so 836 // Also, do it before calling reset_running_task_info() so
813 // that sequence-checking from within the task's destructor 837 // that sequence-checking from within the task's destructor
814 // still works. 838 // still works.
815 task.task = Closure(); 839 task.task = Closure();
816 840
817 this_worker->reset_running_task_info(); 841 this_worker->reset_running_task_info();
818 } 842 }
819 DidRunWorkerTask(task); // Must be done inside the lock. 843 DidRunWorkerTask(task); // Must be done inside the lock.
(...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after
1174 SequencedWorkerPool::SequenceToken 1198 SequencedWorkerPool::SequenceToken
1175 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1199 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1176 Worker* worker = Worker::GetForCurrentThread(); 1200 Worker* worker = Worker::GetForCurrentThread();
1177 if (!worker) 1201 if (!worker)
1178 return SequenceToken(); 1202 return SequenceToken();
1179 1203
1180 return worker->task_sequence_token(); 1204 return worker->task_sequence_token();
1181 } 1205 }
1182 1206
1183 // static 1207 // static
1184 scoped_refptr<SequencedWorkerPool> 1208 scoped_refptr<SequencedTaskRunner>
1185 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1209 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
1186 Worker* worker = Worker::GetForCurrentThread(); 1210 Worker* worker = Worker::GetForCurrentThread();
1187 if (!worker) 1211 if (!worker)
1188 return nullptr; 1212 return nullptr;
1189 1213
1190 return worker->worker_pool(); 1214 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
1215 SequenceToken sequence_token = worker->task_sequence_token();
1216 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
1217 if (!sequence_token.IsValid()) {
1218 sequence_token = Inner::GetSequenceToken();
1219 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
1220 shutdown_behavior);
1221 }
1222
1223 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
1224 return new SequencedWorkerPoolSequencedTaskRunner(
1225 std::move(pool), sequence_token, shutdown_behavior);
1191 } 1226 }
1192 1227
1193 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1228 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1194 const std::string& thread_name_prefix) 1229 const std::string& thread_name_prefix)
1195 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1230 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1196 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1231 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1197 } 1232 }
1198 1233
1199 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1234 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1200 const std::string& thread_name_prefix, 1235 const std::string& thread_name_prefix,
1201 TestingObserver* observer) 1236 TestingObserver* observer)
1202 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1237 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1203 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1238 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1204 } 1239 }
1205 1240
1206 SequencedWorkerPool::~SequencedWorkerPool() {} 1241 SequencedWorkerPool::~SequencedWorkerPool() {}
1207 1242
1208 void SequencedWorkerPool::OnDestruct() const { 1243 void SequencedWorkerPool::OnDestruct() const {
1209 // Avoid deleting ourselves on a worker thread (which would 1244 // Avoid deleting ourselves on a worker thread (which would deadlock).
1210 // deadlock).
1211 if (RunsTasksOnCurrentThread()) { 1245 if (RunsTasksOnCurrentThread()) {
1212 constructor_task_runner_->DeleteSoon(FROM_HERE, this); 1246 constructor_task_runner_->DeleteSoon(FROM_HERE, this);
1213 } else { 1247 } else {
1214 delete this; 1248 delete this;
1215 } 1249 }
1216 } 1250 }
1217 1251
1252 // static
1218 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1253 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1219 return inner_->GetSequenceToken(); 1254 return Inner::GetSequenceToken();
1220 } 1255 }
1221 1256
1222 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1257 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1223 const std::string& name) { 1258 const std::string& name) {
1224 return inner_->GetNamedSequenceToken(name); 1259 return inner_->GetNamedSequenceToken(name);
1225 } 1260 }
1226 1261
1227 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1262 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1228 SequenceToken token) { 1263 SequenceToken token) {
1229 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1264 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
1331 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1366 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1332 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1367 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1333 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1368 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1334 } 1369 }
1335 1370
1336 bool SequencedWorkerPool::IsShutdownInProgress() { 1371 bool SequencedWorkerPool::IsShutdownInProgress() {
1337 return inner_->IsShutdownInProgress(); 1372 return inner_->IsShutdownInProgress();
1338 } 1373 }
1339 1374
1340 } // namespace base 1375 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698