Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(164)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 1414793009: Allow SequencedTaskRunnerHandle::Get() while running unsequenced tasks. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: review Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
(...skipping 278 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 class SequencedWorkerPool::Inner { 289 class SequencedWorkerPool::Inner {
290 public: 290 public:
291 // Take a raw pointer to |worker| to avoid cycles (since we're owned 291 // Take a raw pointer to |worker| to avoid cycles (since we're owned
292 // by it). 292 // by it).
293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
294 const std::string& thread_name_prefix, 294 const std::string& thread_name_prefix,
295 TestingObserver* observer); 295 TestingObserver* observer);
296 296
297 ~Inner(); 297 ~Inner();
298 298
299 SequenceToken GetSequenceToken(); 299 static SequenceToken GetSequenceToken();
300 300
301 SequenceToken GetNamedSequenceToken(const std::string& name); 301 SequenceToken GetNamedSequenceToken(const std::string& name);
302 302
303 // This function accepts a name and an ID. If the name is null, the 303 // This function accepts a name and an ID. If the name is null, the
304 // token ID is used. This allows us to implement the optional name lookup 304 // token ID is used. This allows us to implement the optional name lookup
305 // from a single function without having to enter the lock a separate time. 305 // from a single function without having to enter the lock a separate time.
306 bool PostTask(const std::string* optional_token_name, 306 bool PostTask(const std::string* optional_token_name,
307 SequenceToken sequence_token, 307 SequenceToken sequence_token,
308 WorkerShutdown shutdown_behavior, 308 WorkerShutdown shutdown_behavior,
309 const tracked_objects::Location& from_here, 309 const tracked_objects::Location& from_here,
310 const Closure& task, 310 const Closure& task,
311 TimeDelta delay); 311 TimeDelta delay);
312 312
313 bool RunsTasksOnCurrentThread() const; 313 bool RunsTasksOnCurrentThread() const;
314 314
315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
316 316
317 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,
318 WorkerShutdown shutdown_behavior);
319
317 void CleanupForTesting(); 320 void CleanupForTesting();
318 321
319 void SignalHasWorkForTesting(); 322 void SignalHasWorkForTesting();
320 323
321 int GetWorkSignalCountForTesting() const; 324 int GetWorkSignalCountForTesting() const;
322 325
323 void Shutdown(int max_blocking_tasks_after_shutdown); 326 void Shutdown(int max_blocking_tasks_after_shutdown);
324 327
325 bool IsShutdownInProgress(); 328 bool IsShutdownInProgress();
326 329
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after
576 // Need to explicitly join with the threads before they're destroyed or else 579 // Need to explicitly join with the threads before they're destroyed or else
577 // they will be running when our object is half torn down. 580 // they will be running when our object is half torn down.
578 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 581 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
579 it->second->Join(); 582 it->second->Join();
580 threads_.clear(); 583 threads_.clear();
581 584
582 if (testing_observer_) 585 if (testing_observer_)
583 testing_observer_->OnDestruct(); 586 testing_observer_->OnDestruct();
584 } 587 }
585 588
589 // static
586 SequencedWorkerPool::SequenceToken 590 SequencedWorkerPool::SequenceToken
587 SequencedWorkerPool::Inner::GetSequenceToken() { 591 SequencedWorkerPool::Inner::GetSequenceToken() {
588 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 592 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
589 // is used as a sentinel value in SequenceTokens. 593 // is used as a sentinel value in SequenceTokens.
590 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 594 return SequenceToken(g_last_sequence_number_.GetNext() + 1);
591 } 595 }
592 596
593 SequencedWorkerPool::SequenceToken 597 SequencedWorkerPool::SequenceToken
594 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 598 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
595 AutoLock lock(lock_); 599 AutoLock lock(lock_);
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
676 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 680 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
677 SequenceToken sequence_token) const { 681 SequenceToken sequence_token) const {
678 AutoLock lock(lock_); 682 AutoLock lock(lock_);
679 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 683 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
680 if (found == threads_.end()) 684 if (found == threads_.end())
681 return false; 685 return false;
682 return found->second->is_processing_task() && 686 return found->second->is_processing_task() &&
683 sequence_token.Equals(found->second->task_sequence_token()); 687 sequence_token.Equals(found->second->task_sequence_token());
684 } 688 }
685 689
690 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
691 SequenceToken sequence_token,
692 WorkerShutdown shutdown_behavior) {
693 AutoLock lock(lock_);
694 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
695 DCHECK(found != threads_.end());
696 DCHECK(found->second->is_processing_task());
697 DCHECK(!found->second->task_sequence_token().IsValid());
698 found->second->set_running_task_info(sequence_token, shutdown_behavior);
699
700 // Mark the sequence token as in use.
701 bool success = current_sequences_.insert(sequence_token.id_).second;
702 DCHECK(success);
703 }
704
686 // See https://code.google.com/p/chromium/issues/detail?id=168415 705 // See https://code.google.com/p/chromium/issues/detail?id=168415
687 void SequencedWorkerPool::Inner::CleanupForTesting() { 706 void SequencedWorkerPool::Inner::CleanupForTesting() {
688 DCHECK(!RunsTasksOnCurrentThread()); 707 DCHECK(!RunsTasksOnCurrentThread());
689 base::ThreadRestrictions::ScopedAllowWait allow_wait; 708 base::ThreadRestrictions::ScopedAllowWait allow_wait;
690 AutoLock lock(lock_); 709 AutoLock lock(lock_);
691 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 710 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
692 if (shutdown_called_) 711 if (shutdown_called_)
693 return; 712 return;
694 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 713 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
695 return; 714 return;
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
800 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 819 SequenceToken(task.sequence_token_id), task.shutdown_behavior);
801 820
802 tracked_objects::TaskStopwatch stopwatch; 821 tracked_objects::TaskStopwatch stopwatch;
803 stopwatch.Start(); 822 stopwatch.Start();
804 task.task.Run(); 823 task.task.Run();
805 stopwatch.Stop(); 824 stopwatch.Stop();
806 825
807 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( 826 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
808 task, stopwatch); 827 task, stopwatch);
809 828
829 // Update the sequence token in case it has been set from within the
830 // task.
831 task.sequence_token_id = this_worker->task_sequence_token().id_;
gab 2015/11/18 22:37:33 Why does this matter? Isn't the task's sequence_to
Bernhard Bauer 2015/11/19 10:15:12 DidRunWorkerTask() below removes the sequence toke
gab 2015/11/19 14:34:36 I see, can you highlight that in the comment as we
Bernhard Bauer 2015/11/19 17:01:55 Done.
832
810 // Make sure our task is erased outside the lock for the 833 // Make sure our task is erased outside the lock for the
811 // same reason we do this with delete_these_oustide_lock. 834 // same reason we do this with delete_these_oustide_lock.
812 // Also, do it before calling reset_running_task_info() so 835 // Also, do it before calling reset_running_task_info() so
813 // that sequence-checking from within the task's destructor 836 // that sequence-checking from within the task's destructor
814 // still works. 837 // still works.
815 task.task = Closure(); 838 task.task = Closure();
816 839
817 this_worker->reset_running_task_info(); 840 this_worker->reset_running_task_info();
818 } 841 }
819 DidRunWorkerTask(task); // Must be done inside the lock. 842 DidRunWorkerTask(task); // Must be done inside the lock.
(...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after
1174 SequencedWorkerPool::SequenceToken 1197 SequencedWorkerPool::SequenceToken
1175 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1198 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1176 Worker* worker = Worker::GetForCurrentThread(); 1199 Worker* worker = Worker::GetForCurrentThread();
1177 if (!worker) 1200 if (!worker)
1178 return SequenceToken(); 1201 return SequenceToken();
1179 1202
1180 return worker->task_sequence_token(); 1203 return worker->task_sequence_token();
1181 } 1204 }
1182 1205
1183 // static 1206 // static
1184 scoped_refptr<SequencedWorkerPool> 1207 scoped_refptr<SequencedTaskRunner>
1185 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1208 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
1186 Worker* worker = Worker::GetForCurrentThread(); 1209 Worker* worker = Worker::GetForCurrentThread();
1187 if (!worker) 1210 if (!worker)
1188 return nullptr; 1211 return nullptr;
1189 1212
1190 return worker->worker_pool(); 1213 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
1214 SequenceToken sequence_token = worker->task_sequence_token();
1215 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
1216 if (!sequence_token.IsValid()) {
1217 sequence_token = Inner::GetSequenceToken();
1218 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
1219 shutdown_behavior);
1220 }
1221
1222 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
1223 return new SequencedWorkerPoolSequencedTaskRunner(
1224 std::move(pool), sequence_token, shutdown_behavior);
1191 } 1225 }
1192 1226
1193 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1227 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1194 const std::string& thread_name_prefix) 1228 const std::string& thread_name_prefix)
1195 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1229 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1196 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1230 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1197 } 1231 }
1198 1232
1199 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1233 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1200 const std::string& thread_name_prefix, 1234 const std::string& thread_name_prefix,
1201 TestingObserver* observer) 1235 TestingObserver* observer)
1202 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1236 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1203 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1237 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1204 } 1238 }
1205 1239
1206 SequencedWorkerPool::~SequencedWorkerPool() {} 1240 SequencedWorkerPool::~SequencedWorkerPool() {}
1207 1241
1208 void SequencedWorkerPool::OnDestruct() const { 1242 void SequencedWorkerPool::OnDestruct() const {
1209 // Avoid deleting ourselves on a worker thread (which would 1243 // Avoid deleting ourselves on a worker thread (which would deadlock).
1210 // deadlock).
1211 if (RunsTasksOnCurrentThread()) { 1244 if (RunsTasksOnCurrentThread()) {
1212 constructor_task_runner_->DeleteSoon(FROM_HERE, this); 1245 constructor_task_runner_->DeleteSoon(FROM_HERE, this);
1213 } else { 1246 } else {
1214 delete this; 1247 delete this;
1215 } 1248 }
1216 } 1249 }
1217 1250
1251 // static
1218 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1252 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1219 return inner_->GetSequenceToken(); 1253 return Inner::GetSequenceToken();
1220 } 1254 }
1221 1255
1222 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1256 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1223 const std::string& name) { 1257 const std::string& name) {
1224 return inner_->GetNamedSequenceToken(name); 1258 return inner_->GetNamedSequenceToken(name);
1225 } 1259 }
1226 1260
1227 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1261 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1228 SequenceToken token) { 1262 SequenceToken token) {
1229 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1263 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
1331 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1365 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1332 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1366 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1333 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1367 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1334 } 1368 }
1335 1369
1336 bool SequencedWorkerPool::IsShutdownInProgress() { 1370 bool SequencedWorkerPool::IsShutdownInProgress() {
1337 return inner_->IsShutdownInProgress(); 1371 return inner_->IsShutdownInProgress();
1338 } 1372 }
1339 1373
1340 } // namespace base 1374 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698