Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(71)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 2491613004: Make base::Timer sequence-friendly. (Closed)
Patch Set: rebase Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | base/timer/timer.h » ('j') | base/timer/timer.cc » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after
374 }; 374 };
375 375
376 enum CleanupState { 376 enum CleanupState {
377 CLEANUP_REQUESTED, 377 CLEANUP_REQUESTED,
378 CLEANUP_STARTING, 378 CLEANUP_STARTING,
379 CLEANUP_RUNNING, 379 CLEANUP_RUNNING,
380 CLEANUP_FINISHING, 380 CLEANUP_FINISHING,
381 CLEANUP_DONE, 381 CLEANUP_DONE,
382 }; 382 };
383 383
384 // Clears ScheduledTasks in |delete_these_outside_lock| while ensuring that
385 // |this_worker| has the desired task info context for ~ScheduledTask() to
386 // allow RunsTasksOnCurrentThread() like checks.
387 void DeleteTheseOutsideLockHelper(
vmpstr 2016/12/03 01:14:04 WDYT about just "DeleteWithoutLock" with "tasks_to
gab 2016/12/23 20:22:10 Done on https://codereview.chromium.org/2581213002
388 std::vector<SequencedTask>* delete_these_outside_lock,
389 Worker* this_worker);
390
384 // Helper used by PostTask() to complete the work when redirection is on. 391 // Helper used by PostTask() to complete the work when redirection is on.
385 // Returns true if the task may run at some point in the future and false if 392 // Returns true if the task may run at some point in the future and false if
386 // it will definitely not run. 393 // it will definitely not run.
387 // Coalesce upon resolution of http://crbug.com/622400. 394 // Coalesce upon resolution of http://crbug.com/622400.
388 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, 395 bool PostTaskToTaskScheduler(const SequencedTask& sequenced,
389 const TimeDelta& delay); 396 const TimeDelta& delay);
390 397
391 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| 398 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id|
392 // and |traits|. 399 // and |traits|.
393 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( 400 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner(
(...skipping 17 matching lines...) Expand all
411 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run 418 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
412 // immediately, and |task| is not filled in. Likewise, |wait_time| is 419 // immediately, and |task| is not filled in. Likewise, |wait_time| is
413 // filled in the time to wait until the next task to run. In this case, the 420 // filled in the time to wait until the next task to run. In this case, the
414 // caller should wait the time. 421 // caller should wait the time.
415 // 422 //
416 // In any case, the calling code should clear the given 423 // In any case, the calling code should clear the given
417 // delete_these_outside_lock vector the next time the lock is released. 424 // delete_these_outside_lock vector the next time the lock is released.
418 // See the implementation for a more detailed description. 425 // See the implementation for a more detailed description.
419 GetWorkStatus GetWork(SequencedTask* task, 426 GetWorkStatus GetWork(SequencedTask* task,
420 TimeDelta* wait_time, 427 TimeDelta* wait_time,
421 std::vector<Closure>* delete_these_outside_lock); 428 std::vector<SequencedTask>* delete_these_outside_lock);
422 429
423 void HandleCleanup(); 430 void HandleCleanup();
424 431
425 // Peforms init and cleanup around running the given task. WillRun... 432 // Peforms init and cleanup around running the given task. WillRun...
426 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 433 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
427 // The calling code should call FinishStartingAdditionalThread once the 434 // The calling code should call FinishStartingAdditionalThread once the
428 // lock is released if the return values is nonzero. 435 // lock is released if the return values is nonzero.
429 int WillRunWorkerTask(const SequencedTask& task); 436 int WillRunWorkerTask(const SequencedTask& task);
430 void DidRunWorkerTask(const SequencedTask& task); 437 void DidRunWorkerTask(const SequencedTask& task);
431 438
(...skipping 538 matching lines...) Expand 10 before | Expand all | Expand 10 after
970 while (true) { 977 while (true) {
971 #if defined(OS_MACOSX) 978 #if defined(OS_MACOSX)
972 base::mac::ScopedNSAutoreleasePool autorelease_pool; 979 base::mac::ScopedNSAutoreleasePool autorelease_pool;
973 #endif 980 #endif
974 981
975 HandleCleanup(); 982 HandleCleanup();
976 983
977 // See GetWork for what delete_these_outside_lock is doing. 984 // See GetWork for what delete_these_outside_lock is doing.
978 SequencedTask task; 985 SequencedTask task;
979 TimeDelta wait_time; 986 TimeDelta wait_time;
980 std::vector<Closure> delete_these_outside_lock; 987 std::vector<SequencedTask> delete_these_outside_lock;
981 GetWorkStatus status = 988 GetWorkStatus status =
982 GetWork(&task, &wait_time, &delete_these_outside_lock); 989 GetWork(&task, &wait_time, &delete_these_outside_lock);
983 if (status == GET_WORK_FOUND) { 990 if (status == GET_WORK_FOUND) {
984 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); 991 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task);
985 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 992 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
986 "SequencedWorkerPool::Inner::PostTask", 993 "SequencedWorkerPool::Inner::PostTask",
987 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), 994 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))),
988 TRACE_EVENT_FLAG_FLOW_IN); 995 TRACE_EVENT_FLAG_FLOW_IN);
989 int new_thread_id = WillRunWorkerTask(task); 996 int new_thread_id = WillRunWorkerTask(task);
990 { 997 {
991 AutoUnlock unlock(lock_); 998 AutoUnlock unlock(lock_);
992 // There may be more work available, so wake up another 999 // There may be more work available, so wake up another
993 // worker thread. (Technically not required, since we 1000 // worker thread. (Technically not required, since we
994 // already get a signal for each new task, but it doesn't 1001 // already get a signal for each new task, but it doesn't
995 // hurt.) 1002 // hurt.)
996 SignalHasWork(); 1003 SignalHasWork();
997 delete_these_outside_lock.clear(); 1004 DeleteTheseOutsideLockHelper(&delete_these_outside_lock, this_worker);
998 1005
999 // Complete thread creation outside the lock if necessary. 1006 // Complete thread creation outside the lock if necessary.
1000 if (new_thread_id) 1007 if (new_thread_id)
1001 FinishStartingAdditionalThread(new_thread_id); 1008 FinishStartingAdditionalThread(new_thread_id);
1002 1009
1003 this_worker->set_running_task_info( 1010 this_worker->set_running_task_info(
1004 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 1011 SequenceToken(task.sequence_token_id), task.shutdown_behavior);
1005 1012
1006 tracked_objects::TaskStopwatch stopwatch; 1013 tracked_objects::TaskStopwatch stopwatch;
1007 stopwatch.Start(); 1014 stopwatch.Start();
(...skipping 10 matching lines...) Expand all
1018 // still works. 1025 // still works.
1019 task.task = Closure(); 1026 task.task = Closure();
1020 1027
1021 this_worker->reset_running_task_info(); 1028 this_worker->reset_running_task_info();
1022 } 1029 }
1023 DidRunWorkerTask(task); // Must be done inside the lock. 1030 DidRunWorkerTask(task); // Must be done inside the lock.
1024 } else if (cleanup_state_ == CLEANUP_RUNNING) { 1031 } else if (cleanup_state_ == CLEANUP_RUNNING) {
1025 switch (status) { 1032 switch (status) {
1026 case GET_WORK_WAIT: { 1033 case GET_WORK_WAIT: {
1027 AutoUnlock unlock(lock_); 1034 AutoUnlock unlock(lock_);
1028 delete_these_outside_lock.clear(); 1035 DeleteTheseOutsideLockHelper(&delete_these_outside_lock,
1036 this_worker);
1029 } 1037 }
1030 break; 1038 break;
1031 case GET_WORK_NOT_FOUND: 1039 case GET_WORK_NOT_FOUND:
1032 CHECK(delete_these_outside_lock.empty()); 1040 CHECK(delete_these_outside_lock.empty());
1033 cleanup_state_ = CLEANUP_FINISHING; 1041 cleanup_state_ = CLEANUP_FINISHING;
1034 cleanup_cv_.Broadcast(); 1042 cleanup_cv_.Broadcast();
1035 break; 1043 break;
1036 default: 1044 default:
1037 NOTREACHED(); 1045 NOTREACHED();
1038 } 1046 }
1039 } else { 1047 } else {
1040 // When we're terminating and there's no more work, we can 1048 // When we're terminating and there's no more work, we can
1041 // shut down, other workers can complete any pending or new tasks. 1049 // shut down, other workers can complete any pending or new tasks.
1042 // We can get additional tasks posted after shutdown_called_ is set 1050 // We can get additional tasks posted after shutdown_called_ is set
1043 // but only worker threads are allowed to post tasks at that time, and 1051 // but only worker threads are allowed to post tasks at that time, and
1044 // the workers responsible for posting those tasks will be available 1052 // the workers responsible for posting those tasks will be available
1045 // to run them. Also, there may be some tasks stuck behind running 1053 // to run them. Also, there may be some tasks stuck behind running
1046 // ones with the same sequence token, but additional threads won't 1054 // ones with the same sequence token, but additional threads won't
1047 // help this case. 1055 // help this case.
1048 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { 1056 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) {
1049 AutoUnlock unlock(lock_); 1057 AutoUnlock unlock(lock_);
1050 delete_these_outside_lock.clear(); 1058 DeleteTheseOutsideLockHelper(&delete_these_outside_lock, this_worker);
1051 break; 1059 break;
1052 } 1060 }
1053 1061
1054 // No work was found, but there are tasks that need deletion. The 1062 // No work was found, but there are tasks that need deletion. The
1055 // deletion must happen outside of the lock. 1063 // deletion must happen outside of the lock.
1056 if (delete_these_outside_lock.size()) { 1064 if (delete_these_outside_lock.size()) {
1057 AutoUnlock unlock(lock_); 1065 AutoUnlock unlock(lock_);
1058 delete_these_outside_lock.clear(); 1066 DeleteTheseOutsideLockHelper(&delete_these_outside_lock, this_worker);
1059 1067
1060 // Since the lock has been released, |status| may no longer be 1068 // Since the lock has been released, |status| may no longer be
1061 // accurate. It might read GET_WORK_WAIT even if there are tasks 1069 // accurate. It might read GET_WORK_WAIT even if there are tasks
1062 // ready to perform work. Jump to the top of the loop to recalculate 1070 // ready to perform work. Jump to the top of the loop to recalculate
1063 // |status|. 1071 // |status|.
1064 continue; 1072 continue;
1065 } 1073 }
1066 1074
1067 waiting_thread_count_++; 1075 waiting_thread_count_++;
1068 1076
1069 switch (status) { 1077 switch (status) {
1070 case GET_WORK_NOT_FOUND: 1078 case GET_WORK_NOT_FOUND:
1071 has_work_cv_.Wait(); 1079 has_work_cv_.Wait();
1072 break; 1080 break;
1073 case GET_WORK_WAIT: 1081 case GET_WORK_WAIT:
1074 has_work_cv_.TimedWait(wait_time); 1082 has_work_cv_.TimedWait(wait_time);
1075 break; 1083 break;
1076 default: 1084 default:
1077 NOTREACHED(); 1085 NOTREACHED();
1078 } 1086 }
1079 waiting_thread_count_--; 1087 waiting_thread_count_--;
1080 } 1088 }
1089 // |delete_these_outside_lock| should have been cleared via
1090 // DeleteTheseOutsideLockHelper() above already.
1091 DCHECK(delete_these_outside_lock.empty());
1081 } 1092 }
1082 } // Release lock_. 1093 } // Release lock_.
1083 1094
1084 // We noticed we should exit. Wake up the next worker so it knows it should 1095 // We noticed we should exit. Wake up the next worker so it knows it should
1085 // exit as well (because the Shutdown() code only signals once). 1096 // exit as well (because the Shutdown() code only signals once).
1086 SignalHasWork(); 1097 SignalHasWork();
1087 1098
1088 // Possibly unblock shutdown. 1099 // Possibly unblock shutdown.
1089 can_shutdown_cv_.Signal(); 1100 can_shutdown_cv_.Signal();
1090 } 1101 }
1091 1102
1103 void SequencedWorkerPool::Inner::DeleteTheseOutsideLockHelper(
1104 std::vector<SequencedTask>* delete_these_outside_lock,
1105 Worker* this_worker) {
1106 while (!delete_these_outside_lock->empty()) {
1107 const SequencedTask& deleted_task = delete_these_outside_lock->back();
1108 this_worker->set_running_task_info(
1109 SequenceToken(deleted_task.sequence_token_id),
1110 deleted_task.shutdown_behavior);
1111 delete_these_outside_lock->pop_back();
1112 this_worker->reset_running_task_info();
vmpstr 2016/12/03 01:14:04 Does this need to happen after every iteration, or
gab 2016/12/23 20:22:10 Done on https://codereview.chromium.org/2581213002
1113 }
1114 }
1115
1092 void SequencedWorkerPool::Inner::HandleCleanup() { 1116 void SequencedWorkerPool::Inner::HandleCleanup() {
1093 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1117 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
1094 1118
1095 lock_.AssertAcquired(); 1119 lock_.AssertAcquired();
1096 if (cleanup_state_ == CLEANUP_DONE) 1120 if (cleanup_state_ == CLEANUP_DONE)
1097 return; 1121 return;
1098 if (cleanup_state_ == CLEANUP_REQUESTED) { 1122 if (cleanup_state_ == CLEANUP_REQUESTED) {
1099 // We win, we get to do the cleanup as soon as the others wise up and idle. 1123 // We win, we get to do the cleanup as soon as the others wise up and idle.
1100 cleanup_state_ = CLEANUP_STARTING; 1124 cleanup_state_ = CLEANUP_STARTING;
1101 while (thread_being_created_ || 1125 while (thread_being_created_ ||
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
1149 1173
1150 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 1174 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
1151 lock_.AssertAcquired(); 1175 lock_.AssertAcquired();
1152 // We assume that we never create enough tasks to wrap around. 1176 // We assume that we never create enough tasks to wrap around.
1153 return next_sequence_task_number_++; 1177 return next_sequence_task_number_++;
1154 } 1178 }
1155 1179
1156 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 1180 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
1157 SequencedTask* task, 1181 SequencedTask* task,
1158 TimeDelta* wait_time, 1182 TimeDelta* wait_time,
1159 std::vector<Closure>* delete_these_outside_lock) { 1183 std::vector<SequencedTask>* delete_these_outside_lock) {
1160 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1184 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
1161 1185
1162 lock_.AssertAcquired(); 1186 lock_.AssertAcquired();
1163 1187
1164 // Find the next task with a sequence token that's not currently in use. 1188 // Find the next task with a sequence token that's not currently in use.
1165 // If the token is in use, that means another thread is running something 1189 // If the token is in use, that means another thread is running something
1166 // in that sequence, and we can't run it without going out-of-order. 1190 // in that sequence, and we can't run it without going out-of-order.
1167 // 1191 //
1168 // This algorithm is simple and fair, but inefficient in some cases. For 1192 // This algorithm is simple and fair, but inefficient in some cases. For
1169 // example, say somebody schedules 1000 slow tasks with the same sequence 1193 // example, say somebody schedules 1000 slow tasks with the same sequence
(...skipping 25 matching lines...) Expand all
1195 unrunnable_tasks++; 1219 unrunnable_tasks++;
1196 ++i; 1220 ++i;
1197 continue; 1221 continue;
1198 } 1222 }
1199 1223
1200 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 1224 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
1201 // We're shutting down and the task we just found isn't blocking 1225 // We're shutting down and the task we just found isn't blocking
1202 // shutdown. Delete it and get more work. 1226 // shutdown. Delete it and get more work.
1203 // 1227 //
1204 // Note that we do not want to delete unrunnable tasks. Deleting a task 1228 // Note that we do not want to delete unrunnable tasks. Deleting a task
1205 // can have side effects (like freeing some objects) and deleting a 1229 // can have side effects (like freeing some objects) and deleting a task
1206 // task that's supposed to run after one that's currently running could 1230 // that's supposed to run after one that's currently running could cause
1207 // cause an obscure crash. 1231 // an obscure crash.
1208 // 1232 //
1209 // We really want to delete these tasks outside the lock in case the 1233 // We really want to delete these tasks outside the lock in case the
1210 // closures are holding refs to objects that want to post work from 1234 // closures are holding refs to objects that want to post work from their
1211 // their destructorss (which would deadlock). The closures are 1235 // destructors (which would deadlock). The closures are internally
1212 // internally refcounted, so we just need to keep a copy of them alive 1236 // refcounted, so we just need to keep a copy of them alive until the lock
1213 // until the lock is exited. The calling code can just clear() the 1237 // is exited. The calling code can just clear() the vector they passed to
1214 // vector they passed to us once the lock is exited to make this 1238 // us once the lock is exited to make this happen.
1215 // happen. 1239 delete_these_outside_lock->push_back(*i);
1216 delete_these_outside_lock->push_back(i->task);
1217 pending_tasks_.erase(i++); 1240 pending_tasks_.erase(i++);
1218 continue; 1241 continue;
1219 } 1242 }
1220 1243
1221 if (i->time_to_run > current_time) { 1244 if (i->time_to_run > current_time) {
1222 // The time to run has not come yet. 1245 // The time to run has not come yet.
1223 *wait_time = i->time_to_run - current_time; 1246 *wait_time = i->time_to_run - current_time;
1224 status = GET_WORK_WAIT; 1247 status = GET_WORK_WAIT;
1225 if (cleanup_state_ == CLEANUP_RUNNING) { 1248 if (cleanup_state_ == CLEANUP_RUNNING) {
1226 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. 1249 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
1227 delete_these_outside_lock->push_back(i->task); 1250 delete_these_outside_lock->push_back(*i);
1228 pending_tasks_.erase(i); 1251 pending_tasks_.erase(i);
1229 } 1252 }
1230 break; 1253 break;
1231 } 1254 }
1232 1255
1233 // Found a runnable task. 1256 // Found a runnable task.
1234 *task = *i; 1257 *task = *i;
1235 pending_tasks_.erase(i); 1258 pending_tasks_.erase(i);
1236 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 1259 if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
1237 blocking_shutdown_pending_task_count_--; 1260 blocking_shutdown_pending_task_count_--;
(...skipping 358 matching lines...) Expand 10 before | Expand all | Expand 10 after
1596 bool SequencedWorkerPool::IsShutdownInProgress() { 1619 bool SequencedWorkerPool::IsShutdownInProgress() {
1597 return inner_->IsShutdownInProgress(); 1620 return inner_->IsShutdownInProgress();
1598 } 1621 }
1599 1622
1600 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1623 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1601 SequenceToken sequence_token) const { 1624 SequenceToken sequence_token) const {
1602 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1625 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1603 } 1626 }
1604 1627
1605 } // namespace base 1628 } // namespace base
OLDNEW
« no previous file with comments | « no previous file | base/timer/timer.h » ('j') | base/timer/timer.cc » ('J')

Powered by Google App Engine
This is Rietveld 408576698