Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(108)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 1414793009: Allow SequencedTaskRunnerHandle::Get() while running unsequenced tasks. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
(...skipping 278 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 class SequencedWorkerPool::Inner { 289 class SequencedWorkerPool::Inner {
290 public: 290 public:
291 // Take a raw pointer to |worker| to avoid cycles (since we're owned 291 // Take a raw pointer to |worker| to avoid cycles (since we're owned
292 // by it). 292 // by it).
293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 293 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
294 const std::string& thread_name_prefix, 294 const std::string& thread_name_prefix,
295 TestingObserver* observer); 295 TestingObserver* observer);
296 296
297 ~Inner(); 297 ~Inner();
298 298
299 SequenceToken GetSequenceToken(); 299 static SequenceToken GetSequenceToken();
300 300
301 SequenceToken GetNamedSequenceToken(const std::string& name); 301 SequenceToken GetNamedSequenceToken(const std::string& name);
302 302
303 // This function accepts a name and an ID. If the name is null, the 303 // This function accepts a name and an ID. If the name is null, the
304 // token ID is used. This allows us to implement the optional name lookup 304 // token ID is used. This allows us to implement the optional name lookup
305 // from a single function without having to enter the lock a separate time. 305 // from a single function without having to enter the lock a separate time.
306 bool PostTask(const std::string* optional_token_name, 306 bool PostTask(const std::string* optional_token_name,
307 SequenceToken sequence_token, 307 SequenceToken sequence_token,
308 WorkerShutdown shutdown_behavior, 308 WorkerShutdown shutdown_behavior,
309 const tracked_objects::Location& from_here, 309 const tracked_objects::Location& from_here,
310 const Closure& task, 310 const Closure& task,
311 TimeDelta delay); 311 TimeDelta delay);
312 312
313 bool RunsTasksOnCurrentThread() const; 313 bool RunsTasksOnCurrentThread() const;
314 314
315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 315 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
316 316
317 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,
318 WorkerShutdown shutdown_behavior);
319
317 void CleanupForTesting(); 320 void CleanupForTesting();
318 321
319 void SignalHasWorkForTesting(); 322 void SignalHasWorkForTesting();
320 323
321 int GetWorkSignalCountForTesting() const; 324 int GetWorkSignalCountForTesting() const;
322 325
323 void Shutdown(int max_blocking_tasks_after_shutdown); 326 void Shutdown(int max_blocking_tasks_after_shutdown);
324 327
325 bool IsShutdownInProgress(); 328 bool IsShutdownInProgress();
326 329
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after
576 // Need to explicitly join with the threads before they're destroyed or else 579 // Need to explicitly join with the threads before they're destroyed or else
577 // they will be running when our object is half torn down. 580 // they will be running when our object is half torn down.
578 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 581 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
579 it->second->Join(); 582 it->second->Join();
580 threads_.clear(); 583 threads_.clear();
581 584
582 if (testing_observer_) 585 if (testing_observer_)
583 testing_observer_->OnDestruct(); 586 testing_observer_->OnDestruct();
584 } 587 }
585 588
589 // static
586 SequencedWorkerPool::SequenceToken 590 SequencedWorkerPool::SequenceToken
587 SequencedWorkerPool::Inner::GetSequenceToken() { 591 SequencedWorkerPool::Inner::GetSequenceToken() {
588 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 592 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
589 // is used as a sentinel value in SequenceTokens. 593 // is used as a sentinel value in SequenceTokens.
590 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 594 return SequenceToken(g_last_sequence_number_.GetNext() + 1);
591 } 595 }
592 596
593 SequencedWorkerPool::SequenceToken 597 SequencedWorkerPool::SequenceToken
594 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 598 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
595 AutoLock lock(lock_); 599 AutoLock lock(lock_);
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
676 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 680 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
677 SequenceToken sequence_token) const { 681 SequenceToken sequence_token) const {
678 AutoLock lock(lock_); 682 AutoLock lock(lock_);
679 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 683 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
680 if (found == threads_.end()) 684 if (found == threads_.end())
681 return false; 685 return false;
682 return found->second->is_processing_task() && 686 return found->second->is_processing_task() &&
683 sequence_token.Equals(found->second->task_sequence_token()); 687 sequence_token.Equals(found->second->task_sequence_token());
684 } 688 }
685 689
690 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
691 SequenceToken sequence_token,
692 WorkerShutdown shutdown_behavior) {
693 AutoLock lock(lock_);
694 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
695 DCHECK(found != threads_.end());
696 DCHECK(found->second->is_processing_task());
697 DCHECK(!found->second->task_sequence_token().IsValid());
gab 2015/11/10 19:54:18 I don't see code unsetting this when the task is d
Bernhard Bauer 2015/11/10 20:31:47 This is set before the next task is run, so it wil
698 found->second->set_running_task_info(sequence_token, shutdown_behavior);
699 }
700
686 // See https://code.google.com/p/chromium/issues/detail?id=168415 701 // See https://code.google.com/p/chromium/issues/detail?id=168415
687 void SequencedWorkerPool::Inner::CleanupForTesting() { 702 void SequencedWorkerPool::Inner::CleanupForTesting() {
688 DCHECK(!RunsTasksOnCurrentThread()); 703 DCHECK(!RunsTasksOnCurrentThread());
689 base::ThreadRestrictions::ScopedAllowWait allow_wait; 704 base::ThreadRestrictions::ScopedAllowWait allow_wait;
690 AutoLock lock(lock_); 705 AutoLock lock(lock_);
691 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 706 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
692 if (shutdown_called_) 707 if (shutdown_called_)
693 return; 708 return;
694 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 709 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
695 return; 710 return;
(...skipping 478 matching lines...) Expand 10 before | Expand all | Expand 10 after
1174 SequencedWorkerPool::SequenceToken 1189 SequencedWorkerPool::SequenceToken
1175 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1190 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1176 Worker* worker = Worker::GetForCurrentThread(); 1191 Worker* worker = Worker::GetForCurrentThread();
1177 if (!worker) 1192 if (!worker)
1178 return SequenceToken(); 1193 return SequenceToken();
1179 1194
1180 return worker->task_sequence_token(); 1195 return worker->task_sequence_token();
1181 } 1196 }
1182 1197
1183 // static 1198 // static
1184 scoped_refptr<SequencedWorkerPool> 1199 scoped_refptr<SequencedTaskRunner>
1185 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1200 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
1186 Worker* worker = Worker::GetForCurrentThread(); 1201 Worker* worker = Worker::GetForCurrentThread();
1187 if (!worker) 1202 if (!worker)
1188 return nullptr; 1203 return nullptr;
1189 1204
1190 return worker->worker_pool(); 1205 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
gab 2015/11/10 19:54:19 Move |pool| to 1213 (above DCHECK) as it's not nee
Bernhard Bauer 2015/11/10 20:31:47 I do need it in line 1210, so I could only move it
1206 SequenceToken sequence_token = worker->task_sequence_token();
1207 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
1208 if (!sequence_token.IsValid()) {
1209 sequence_token = Inner::GetSequenceToken();
1210 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
1211 shutdown_behavior);
1212 }
1213 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
1214 return new SequencedWorkerPoolSequencedTaskRunner(pool, sequence_token,
1215 shutdown_behavior);
1191 } 1216 }
1192 1217
1193 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1218 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1194 const std::string& thread_name_prefix) 1219 const std::string& thread_name_prefix)
1195 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1220 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1196 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1221 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1197 } 1222 }
1198 1223
1199 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1224 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1200 const std::string& thread_name_prefix, 1225 const std::string& thread_name_prefix,
1201 TestingObserver* observer) 1226 TestingObserver* observer)
1202 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1227 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1203 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1228 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1204 } 1229 }
1205 1230
1206 SequencedWorkerPool::~SequencedWorkerPool() {} 1231 SequencedWorkerPool::~SequencedWorkerPool() {}
1207 1232
1208 void SequencedWorkerPool::OnDestruct() const { 1233 void SequencedWorkerPool::OnDestruct() const {
1209 // Avoid deleting ourselves on a worker thread (which would 1234 // Avoid deleting ourselves on a worker thread (which would deadlock).
1210 // deadlock).
1211 if (RunsTasksOnCurrentThread()) { 1235 if (RunsTasksOnCurrentThread()) {
1212 constructor_task_runner_->DeleteSoon(FROM_HERE, this); 1236 constructor_task_runner_->DeleteSoon(FROM_HERE, this);
1213 } else { 1237 } else {
1214 delete this; 1238 delete this;
1215 } 1239 }
1216 } 1240 }
1217 1241
1242 // static
1218 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1243 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1219 return inner_->GetSequenceToken(); 1244 return Inner::GetSequenceToken();
1220 } 1245 }
1221 1246
1222 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1247 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1223 const std::string& name) { 1248 const std::string& name) {
1224 return inner_->GetNamedSequenceToken(name); 1249 return inner_->GetNamedSequenceToken(name);
1225 } 1250 }
1226 1251
1227 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1252 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1228 SequenceToken token) { 1253 SequenceToken token) {
1229 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1254 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
1331 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1356 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1332 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1357 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1333 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1358 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1334 } 1359 }
1335 1360
1336 bool SequencedWorkerPool::IsShutdownInProgress() { 1361 bool SequencedWorkerPool::IsShutdownInProgress() {
1337 return inner_->IsShutdownInProgress(); 1362 return inner_->IsShutdownInProgress();
1338 } 1363 }
1339 1364
1340 } // namespace base 1365 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698