Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(210)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 1414793009: Allow SequencedTaskRunnerHandle::Get() while running unsequenced tasks. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: moar test Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
(...skipping 278 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 class SequencedWorkerPool::Inner { 289 class SequencedWorkerPool::Inner {
290 public: 290 public:
291 // Take a raw pointer to |worker| to avoid cycles (since we're owned 291 // Take a raw pointer to |worker| to avoid cycles (since we're owned
292 // by it). 292 // by it).
293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
294 const std::string& thread_name_prefix, 294 const std::string& thread_name_prefix,
295 TestingObserver* observer); 295 TestingObserver* observer);
296 296
297 ~Inner(); 297 ~Inner();
298 298
299 SequenceToken GetSequenceToken(); 299 static SequenceToken GetSequenceToken();
300 300
301 SequenceToken GetNamedSequenceToken(const std::string& name); 301 SequenceToken GetNamedSequenceToken(const std::string& name);
302 302
303 // This function accepts a name and an ID. If the name is null, the 303 // This function accepts a name and an ID. If the name is null, the
304 // token ID is used. This allows us to implement the optional name lookup 304 // token ID is used. This allows us to implement the optional name lookup
305 // from a single function without having to enter the lock a separate time. 305 // from a single function without having to enter the lock a separate time.
306 bool PostTask(const std::string* optional_token_name, 306 bool PostTask(const std::string* optional_token_name,
307 SequenceToken sequence_token, 307 SequenceToken sequence_token,
308 WorkerShutdown shutdown_behavior, 308 WorkerShutdown shutdown_behavior,
309 const tracked_objects::Location& from_here, 309 const tracked_objects::Location& from_here,
310 const Closure& task, 310 const Closure& task,
311 TimeDelta delay); 311 TimeDelta delay);
312 312
313 bool RunsTasksOnCurrentThread() const; 313 bool RunsTasksOnCurrentThread() const;
314 314
315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
316 316
317 bool IsRunningSequence(SequenceToken sequence_token) const;
318
319 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,
320 WorkerShutdown shutdown_behavior);
321
317 void CleanupForTesting(); 322 void CleanupForTesting();
318 323
319 void SignalHasWorkForTesting(); 324 void SignalHasWorkForTesting();
320 325
321 int GetWorkSignalCountForTesting() const; 326 int GetWorkSignalCountForTesting() const;
322 327
323 void Shutdown(int max_blocking_tasks_after_shutdown); 328 void Shutdown(int max_blocking_tasks_after_shutdown);
324 329
325 bool IsShutdownInProgress(); 330 bool IsShutdownInProgress();
326 331
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after
576 // Need to explicitly join with the threads before they're destroyed or else 581 // Need to explicitly join with the threads before they're destroyed or else
577 // they will be running when our object is half torn down. 582 // they will be running when our object is half torn down.
578 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 583 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
579 it->second->Join(); 584 it->second->Join();
580 threads_.clear(); 585 threads_.clear();
581 586
582 if (testing_observer_) 587 if (testing_observer_)
583 testing_observer_->OnDestruct(); 588 testing_observer_->OnDestruct();
584 } 589 }
585 590
591 // static
586 SequencedWorkerPool::SequenceToken 592 SequencedWorkerPool::SequenceToken
587 SequencedWorkerPool::Inner::GetSequenceToken() { 593 SequencedWorkerPool::Inner::GetSequenceToken() {
588 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 594 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
589 // is used as a sentinel value in SequenceTokens. 595 // is used as a sentinel value in SequenceTokens.
590 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 596 return SequenceToken(g_last_sequence_number_.GetNext() + 1);
591 } 597 }
592 598
593 SequencedWorkerPool::SequenceToken 599 SequencedWorkerPool::SequenceToken
594 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 600 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
595 AutoLock lock(lock_); 601 AutoLock lock(lock_);
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
676 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 682 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
677 SequenceToken sequence_token) const { 683 SequenceToken sequence_token) const {
678 AutoLock lock(lock_); 684 AutoLock lock(lock_);
679 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 685 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
680 if (found == threads_.end()) 686 if (found == threads_.end())
681 return false; 687 return false;
682 return found->second->is_processing_task() && 688 return found->second->is_processing_task() &&
683 sequence_token.Equals(found->second->task_sequence_token()); 689 sequence_token.Equals(found->second->task_sequence_token());
684 } 690 }
685 691
692 bool SequencedWorkerPool::Inner::IsRunningSequence(
693 SequenceToken sequence_token) const {
694 DCHECK(sequence_token.IsValid());
695 AutoLock lock(lock_);
696 return !IsSequenceTokenRunnable(sequence_token.id_);
697 }
698
699 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
700 SequenceToken sequence_token,
701 WorkerShutdown shutdown_behavior) {
702 AutoLock lock(lock_);
703 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
704 DCHECK(found != threads_.end());
705 DCHECK(found->second->is_processing_task());
706 DCHECK(!found->second->task_sequence_token().IsValid());
707 found->second->set_running_task_info(sequence_token, shutdown_behavior);
708
709 // Mark the sequence token as in use.
710 bool success = current_sequences_.insert(sequence_token.id_).second;
711 DCHECK(success);
712 }
713
686 // See https://code.google.com/p/chromium/issues/detail?id=168415 714 // See https://code.google.com/p/chromium/issues/detail?id=168415
687 void SequencedWorkerPool::Inner::CleanupForTesting() { 715 void SequencedWorkerPool::Inner::CleanupForTesting() {
688 DCHECK(!RunsTasksOnCurrentThread()); 716 DCHECK(!RunsTasksOnCurrentThread());
689 base::ThreadRestrictions::ScopedAllowWait allow_wait; 717 base::ThreadRestrictions::ScopedAllowWait allow_wait;
690 AutoLock lock(lock_); 718 AutoLock lock(lock_);
691 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 719 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
692 if (shutdown_called_) 720 if (shutdown_called_)
693 return; 721 return;
694 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 722 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
695 return; 723 return;
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
800 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 828 SequenceToken(task.sequence_token_id), task.shutdown_behavior);
801 829
802 tracked_objects::TaskStopwatch stopwatch; 830 tracked_objects::TaskStopwatch stopwatch;
803 stopwatch.Start(); 831 stopwatch.Start();
804 task.task.Run(); 832 task.task.Run();
805 stopwatch.Stop(); 833 stopwatch.Stop();
806 834
807 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( 835 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
808 task, stopwatch); 836 task, stopwatch);
809 837
838 // Update the sequence token in case it has been set from within the
839 // task, so it can be removed from the set of currently running
840 // sequences in DidRunWorkerTask() below.
841 task.sequence_token_id = this_worker->task_sequence_token().id_;
842
810 // Make sure our task is erased outside the lock for the 843 // Make sure our task is erased outside the lock for the
811 // same reason we do this with delete_these_oustide_lock. 844 // same reason we do this with delete_these_oustide_lock.
812 // Also, do it before calling reset_running_task_info() so 845 // Also, do it before calling reset_running_task_info() so
813 // that sequence-checking from within the task's destructor 846 // that sequence-checking from within the task's destructor
814 // still works. 847 // still works.
815 task.task = Closure(); 848 task.task = Closure();
816 849
817 this_worker->reset_running_task_info(); 850 this_worker->reset_running_task_info();
818 } 851 }
819 DidRunWorkerTask(task); // Must be done inside the lock. 852 DidRunWorkerTask(task); // Must be done inside the lock.
(...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after
1174 SequencedWorkerPool::SequenceToken 1207 SequencedWorkerPool::SequenceToken
1175 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1208 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1176 Worker* worker = Worker::GetForCurrentThread(); 1209 Worker* worker = Worker::GetForCurrentThread();
1177 if (!worker) 1210 if (!worker)
1178 return SequenceToken(); 1211 return SequenceToken();
1179 1212
1180 return worker->task_sequence_token(); 1213 return worker->task_sequence_token();
1181 } 1214 }
1182 1215
1183 // static 1216 // static
1184 scoped_refptr<SequencedWorkerPool> 1217 scoped_refptr<SequencedTaskRunner>
1185 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1218 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
1186 Worker* worker = Worker::GetForCurrentThread(); 1219 Worker* worker = Worker::GetForCurrentThread();
1220
1221 // If there is no worker, this thread is not a worker thread. Otherwise, it is
1222 // currently running a task (sequenced or unsequenced).
1187 if (!worker) 1223 if (!worker)
1188 return nullptr; 1224 return nullptr;
1189 1225
1190 return worker->worker_pool(); 1226 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
1227 SequenceToken sequence_token = worker->task_sequence_token();
1228 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
1229 if (!sequence_token.IsValid()) {
1230 // Create a new sequence token and bind this thread to it, to make sure that
1231 // a task posted to the SequencedTaskRunner we are going to return is not
1232 // immediately going to run on a different thread.
1233 sequence_token = Inner::GetSequenceToken();
1234 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
1235 shutdown_behavior);
1236 }
1237
1238 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
1239 return new SequencedWorkerPoolSequencedTaskRunner(
1240 std::move(pool), sequence_token, shutdown_behavior);
1191 } 1241 }
1192 1242
1193 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1243 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1194 const std::string& thread_name_prefix) 1244 const std::string& thread_name_prefix)
1195 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1245 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1196 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1246 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1197 } 1247 }
1198 1248
1199 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1249 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1200 const std::string& thread_name_prefix, 1250 const std::string& thread_name_prefix,
1201 TestingObserver* observer) 1251 TestingObserver* observer)
1202 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1252 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1203 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1253 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1204 } 1254 }
1205 1255
1206 SequencedWorkerPool::~SequencedWorkerPool() {} 1256 SequencedWorkerPool::~SequencedWorkerPool() {}
1207 1257
1208 void SequencedWorkerPool::OnDestruct() const { 1258 void SequencedWorkerPool::OnDestruct() const {
1209 // Avoid deleting ourselves on a worker thread (which would 1259 // Avoid deleting ourselves on a worker thread (which would deadlock).
1210 // deadlock).
1211 if (RunsTasksOnCurrentThread()) { 1260 if (RunsTasksOnCurrentThread()) {
1212 constructor_task_runner_->DeleteSoon(FROM_HERE, this); 1261 constructor_task_runner_->DeleteSoon(FROM_HERE, this);
1213 } else { 1262 } else {
1214 delete this; 1263 delete this;
1215 } 1264 }
1216 } 1265 }
1217 1266
1267 // static
1218 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1268 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1219 return inner_->GetSequenceToken(); 1269 return Inner::GetSequenceToken();
1220 } 1270 }
1221 1271
1222 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1272 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1223 const std::string& name) { 1273 const std::string& name) {
1224 return inner_->GetNamedSequenceToken(name); 1274 return inner_->GetNamedSequenceToken(name);
1225 } 1275 }
1226 1276
1227 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1277 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1228 SequenceToken token) { 1278 SequenceToken token) {
1229 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1279 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
1313 1363
1314 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1364 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1315 return inner_->RunsTasksOnCurrentThread(); 1365 return inner_->RunsTasksOnCurrentThread();
1316 } 1366 }
1317 1367
1318 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1368 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1319 SequenceToken sequence_token) const { 1369 SequenceToken sequence_token) const {
1320 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1370 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1321 } 1371 }
1322 1372
1373 bool SequencedWorkerPool::IsRunningSequence(
1374 SequenceToken sequence_token) const {
1375 return inner_->IsRunningSequence(sequence_token);
1376 }
1377
1323 void SequencedWorkerPool::FlushForTesting() { 1378 void SequencedWorkerPool::FlushForTesting() {
1324 inner_->CleanupForTesting(); 1379 inner_->CleanupForTesting();
1325 } 1380 }
1326 1381
1327 void SequencedWorkerPool::SignalHasWorkForTesting() { 1382 void SequencedWorkerPool::SignalHasWorkForTesting() {
1328 inner_->SignalHasWorkForTesting(); 1383 inner_->SignalHasWorkForTesting();
1329 } 1384 }
1330 1385
1331 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1386 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1332 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1387 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1333 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1388 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1334 } 1389 }
1335 1390
1336 bool SequencedWorkerPool::IsShutdownInProgress() { 1391 bool SequencedWorkerPool::IsShutdownInProgress() {
1337 return inner_->IsShutdownInProgress(); 1392 return inner_->IsShutdownInProgress();
1338 } 1393 }
1339 1394
1340 } // namespace base 1395 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698