Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(450)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 1414793009: Allow SequencedTaskRunnerHandle::Get() while running unsequenced tasks. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: x Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
(...skipping 278 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 class SequencedWorkerPool::Inner { 289 class SequencedWorkerPool::Inner {
290 public: 290 public:
291 // Take a raw pointer to |worker| to avoid cycles (since we're owned 291 // Take a raw pointer to |worker| to avoid cycles (since we're owned
292 // by it). 292 // by it).
293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
294 const std::string& thread_name_prefix, 294 const std::string& thread_name_prefix,
295 TestingObserver* observer); 295 TestingObserver* observer);
296 296
297 ~Inner(); 297 ~Inner();
298 298
299 SequenceToken GetSequenceToken(); 299 static SequenceToken GetSequenceToken();
300 300
301 SequenceToken GetNamedSequenceToken(const std::string& name); 301 SequenceToken GetNamedSequenceToken(const std::string& name);
302 302
303 // This function accepts a name and an ID. If the name is null, the 303 // This function accepts a name and an ID. If the name is null, the
304 // token ID is used. This allows us to implement the optional name lookup 304 // token ID is used. This allows us to implement the optional name lookup
305 // from a single function without having to enter the lock a separate time. 305 // from a single function without having to enter the lock a separate time.
306 bool PostTask(const std::string* optional_token_name, 306 bool PostTask(const std::string* optional_token_name,
307 SequenceToken sequence_token, 307 SequenceToken sequence_token,
308 WorkerShutdown shutdown_behavior, 308 WorkerShutdown shutdown_behavior,
309 const tracked_objects::Location& from_here, 309 const tracked_objects::Location& from_here,
310 const Closure& task, 310 const Closure& task,
311 TimeDelta delay); 311 TimeDelta delay);
312 312
313 bool RunsTasksOnCurrentThread() const; 313 bool RunsTasksOnCurrentThread() const;
314 314
315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
316 316
317 void SetRunningTaskInfoForCurrentThread(
318 SequenceToken sequence_token, WorkerShutdown shutdown_behavior);
319
317 void CleanupForTesting(); 320 void CleanupForTesting();
318 321
319 void SignalHasWorkForTesting(); 322 void SignalHasWorkForTesting();
320 323
321 int GetWorkSignalCountForTesting() const; 324 int GetWorkSignalCountForTesting() const;
322 325
323 void Shutdown(int max_blocking_tasks_after_shutdown); 326 void Shutdown(int max_blocking_tasks_after_shutdown);
324 327
325 bool IsShutdownInProgress(); 328 bool IsShutdownInProgress();
326 329
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after
576 // Need to explicitly join with the threads before they're destroyed or else 579 // Need to explicitly join with the threads before they're destroyed or else
577 // they will be running when our object is half torn down. 580 // they will be running when our object is half torn down.
578 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 581 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
579 it->second->Join(); 582 it->second->Join();
580 threads_.clear(); 583 threads_.clear();
581 584
582 if (testing_observer_) 585 if (testing_observer_)
583 testing_observer_->OnDestruct(); 586 testing_observer_->OnDestruct();
584 } 587 }
585 588
589 // static
586 SequencedWorkerPool::SequenceToken 590 SequencedWorkerPool::SequenceToken
587 SequencedWorkerPool::Inner::GetSequenceToken() { 591 SequencedWorkerPool::Inner::GetSequenceToken() {
588 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 592 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
589 // is used as a sentinel value in SequenceTokens. 593 // is used as a sentinel value in SequenceTokens.
590 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 594 return SequenceToken(g_last_sequence_number_.GetNext() + 1);
591 } 595 }
592 596
593 SequencedWorkerPool::SequenceToken 597 SequencedWorkerPool::SequenceToken
594 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 598 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
595 AutoLock lock(lock_); 599 AutoLock lock(lock_);
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
676 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 680 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
677 SequenceToken sequence_token) const { 681 SequenceToken sequence_token) const {
678 AutoLock lock(lock_); 682 AutoLock lock(lock_);
679 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 683 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
680 if (found == threads_.end()) 684 if (found == threads_.end())
681 return false; 685 return false;
682 return found->second->is_processing_task() && 686 return found->second->is_processing_task() &&
683 sequence_token.Equals(found->second->task_sequence_token()); 687 sequence_token.Equals(found->second->task_sequence_token());
684 } 688 }
685 689
690 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
691 SequenceToken sequence_token, WorkerShutdown shutdown_behavior) {
692 AutoLock lock(lock_);
693 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
694 DCHECK(found != threads_.end());
695 DCHECK(found->second->is_processing_task());
696 DCHECK(!found->second->task_sequence_token().IsValid());
697 found->second->set_running_task_info(sequence_token, shutdown_behavior);
698 }
699
686 // See https://code.google.com/p/chromium/issues/detail?id=168415 700 // See https://code.google.com/p/chromium/issues/detail?id=168415
687 void SequencedWorkerPool::Inner::CleanupForTesting() { 701 void SequencedWorkerPool::Inner::CleanupForTesting() {
688 DCHECK(!RunsTasksOnCurrentThread()); 702 DCHECK(!RunsTasksOnCurrentThread());
689 base::ThreadRestrictions::ScopedAllowWait allow_wait; 703 base::ThreadRestrictions::ScopedAllowWait allow_wait;
690 AutoLock lock(lock_); 704 AutoLock lock(lock_);
691 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 705 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
692 if (shutdown_called_) 706 if (shutdown_called_)
693 return; 707 return;
694 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 708 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
695 return; 709 return;
(...skipping 478 matching lines...) Expand 10 before | Expand all | Expand 10 after
1174 SequencedWorkerPool::SequenceToken 1188 SequencedWorkerPool::SequenceToken
1175 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1189 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1176 Worker* worker = Worker::GetForCurrentThread(); 1190 Worker* worker = Worker::GetForCurrentThread();
1177 if (!worker) 1191 if (!worker)
1178 return SequenceToken(); 1192 return SequenceToken();
1179 1193
1180 return worker->task_sequence_token(); 1194 return worker->task_sequence_token();
1181 } 1195 }
1182 1196
1183 // static 1197 // static
1184 scoped_refptr<SequencedWorkerPool> 1198 scoped_refptr<SequencedTaskRunner>
1185 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1199 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
1186 Worker* worker = Worker::GetForCurrentThread(); 1200 Worker* worker = Worker::GetForCurrentThread();
1187 if (!worker) 1201 if (!worker)
1188 return nullptr; 1202 return nullptr;
1189 1203
1190 return worker->worker_pool(); 1204 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
1205 SequenceToken sequence_token = worker->task_sequence_token();
1206 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
1207 if (!sequence_token.IsValid()) {
1208 sequence_token = Inner::GetSequenceToken();
1209 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
1210 shutdown_behavior);
1211 }
1212 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
1213 return new SequencedWorkerPoolSequencedTaskRunner(pool, sequence_token,
1214 shutdown_behavior);
1191 } 1215 }
1192 1216
1193 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1217 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1194 const std::string& thread_name_prefix) 1218 const std::string& thread_name_prefix)
1195 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1219 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1196 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1220 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1197 } 1221 }
1198 1222
1199 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1223 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1200 const std::string& thread_name_prefix, 1224 const std::string& thread_name_prefix,
1201 TestingObserver* observer) 1225 TestingObserver* observer)
1202 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1226 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1203 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1227 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1204 } 1228 }
1205 1229
1206 SequencedWorkerPool::~SequencedWorkerPool() {} 1230 SequencedWorkerPool::~SequencedWorkerPool() {}
1207 1231
1208 void SequencedWorkerPool::OnDestruct() const { 1232 void SequencedWorkerPool::OnDestruct() const {
1209 // Avoid deleting ourselves on a worker thread (which would 1233 // Avoid deleting ourselves on a worker thread (which would deadlock).
1210 // deadlock).
1211 if (RunsTasksOnCurrentThread()) { 1234 if (RunsTasksOnCurrentThread()) {
1212 constructor_task_runner_->DeleteSoon(FROM_HERE, this); 1235 constructor_task_runner_->DeleteSoon(FROM_HERE, this);
1213 } else { 1236 } else {
1214 delete this; 1237 delete this;
1215 } 1238 }
1216 } 1239 }
1217 1240
1241 // static
1218 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1242 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1219 return inner_->GetSequenceToken(); 1243 return Inner::GetSequenceToken();
1220 } 1244 }
1221 1245
1222 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1246 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1223 const std::string& name) { 1247 const std::string& name) {
1224 return inner_->GetNamedSequenceToken(name); 1248 return inner_->GetNamedSequenceToken(name);
1225 } 1249 }
1226 1250
1227 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1251 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1228 SequenceToken token) { 1252 SequenceToken token) {
1229 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1253 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
1331 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1355 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1332 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1356 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1333 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1357 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1334 } 1358 }
1335 1359
1336 bool SequencedWorkerPool::IsShutdownInProgress() { 1360 bool SequencedWorkerPool::IsShutdownInProgress() {
1337 return inner_->IsShutdownInProgress(); 1361 return inner_->IsShutdownInProgress();
1338 } 1362 }
1339 1363
1340 } // namespace base 1364 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698