OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <list> | 9 #include <list> |
10 #include <map> | 10 #include <map> |
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
374 }; | 374 }; |
375 | 375 |
376 enum CleanupState { | 376 enum CleanupState { |
377 CLEANUP_REQUESTED, | 377 CLEANUP_REQUESTED, |
378 CLEANUP_STARTING, | 378 CLEANUP_STARTING, |
379 CLEANUP_RUNNING, | 379 CLEANUP_RUNNING, |
380 CLEANUP_FINISHING, | 380 CLEANUP_FINISHING, |
381 CLEANUP_DONE, | 381 CLEANUP_DONE, |
382 }; | 382 }; |
383 | 383 |
384 // Clears ScheduledTasks in |delete_these_outside_lock| while ensuring that | |
385 // |this_worker| has the desired task info context for ~ScheduledTask() to | |
386 // allow RunsTasksOnCurrentThread() like checks. | |
387 void DeleteTheseOutsideLockHelper( | |
vmpstr
2016/12/03 01:14:04
WDYT about just "DeleteWithoutLock" with "tasks_to
gab
2016/12/23 20:22:10
Done on https://codereview.chromium.org/2581213002
| |
388 std::vector<SequencedTask>* delete_these_outside_lock, | |
389 Worker* this_worker); | |
390 | |
384 // Helper used by PostTask() to complete the work when redirection is on. | 391 // Helper used by PostTask() to complete the work when redirection is on. |
385 // Returns true if the task may run at some point in the future and false if | 392 // Returns true if the task may run at some point in the future and false if |
386 // it will definitely not run. | 393 // it will definitely not run. |
387 // Coalesce upon resolution of http://crbug.com/622400. | 394 // Coalesce upon resolution of http://crbug.com/622400. |
388 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, | 395 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, |
389 const TimeDelta& delay); | 396 const TimeDelta& delay); |
390 | 397 |
391 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| | 398 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| |
392 // and |traits|. | 399 // and |traits|. |
393 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( | 400 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( |
(...skipping 17 matching lines...) Expand all Loading... | |
411 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run | 418 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run |
412 // immediately, and |task| is not filled in. Likewise, |wait_time| is | 419 // immediately, and |task| is not filled in. Likewise, |wait_time| is |
413 // filled in the time to wait until the next task to run. In this case, the | 420 // filled in the time to wait until the next task to run. In this case, the |
414 // caller should wait the time. | 421 // caller should wait the time. |
415 // | 422 // |
416 // In any case, the calling code should clear the given | 423 // In any case, the calling code should clear the given |
417 // delete_these_outside_lock vector the next time the lock is released. | 424 // delete_these_outside_lock vector the next time the lock is released. |
418 // See the implementation for a more detailed description. | 425 // See the implementation for a more detailed description. |
419 GetWorkStatus GetWork(SequencedTask* task, | 426 GetWorkStatus GetWork(SequencedTask* task, |
420 TimeDelta* wait_time, | 427 TimeDelta* wait_time, |
421 std::vector<Closure>* delete_these_outside_lock); | 428 std::vector<SequencedTask>* delete_these_outside_lock); |
422 | 429 |
423 void HandleCleanup(); | 430 void HandleCleanup(); |
424 | 431 |
425 // Peforms init and cleanup around running the given task. WillRun... | 432 // Peforms init and cleanup around running the given task. WillRun... |
426 // returns the value from PrepareToStartAdditionalThreadIfNecessary. | 433 // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
427 // The calling code should call FinishStartingAdditionalThread once the | 434 // The calling code should call FinishStartingAdditionalThread once the |
428 // lock is released if the return values is nonzero. | 435 // lock is released if the return values is nonzero. |
429 int WillRunWorkerTask(const SequencedTask& task); | 436 int WillRunWorkerTask(const SequencedTask& task); |
430 void DidRunWorkerTask(const SequencedTask& task); | 437 void DidRunWorkerTask(const SequencedTask& task); |
431 | 438 |
(...skipping 538 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
970 while (true) { | 977 while (true) { |
971 #if defined(OS_MACOSX) | 978 #if defined(OS_MACOSX) |
972 base::mac::ScopedNSAutoreleasePool autorelease_pool; | 979 base::mac::ScopedNSAutoreleasePool autorelease_pool; |
973 #endif | 980 #endif |
974 | 981 |
975 HandleCleanup(); | 982 HandleCleanup(); |
976 | 983 |
977 // See GetWork for what delete_these_outside_lock is doing. | 984 // See GetWork for what delete_these_outside_lock is doing. |
978 SequencedTask task; | 985 SequencedTask task; |
979 TimeDelta wait_time; | 986 TimeDelta wait_time; |
980 std::vector<Closure> delete_these_outside_lock; | 987 std::vector<SequencedTask> delete_these_outside_lock; |
981 GetWorkStatus status = | 988 GetWorkStatus status = |
982 GetWork(&task, &wait_time, &delete_these_outside_lock); | 989 GetWork(&task, &wait_time, &delete_these_outside_lock); |
983 if (status == GET_WORK_FOUND) { | 990 if (status == GET_WORK_FOUND) { |
984 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); | 991 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); |
985 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), | 992 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), |
986 "SequencedWorkerPool::Inner::PostTask", | 993 "SequencedWorkerPool::Inner::PostTask", |
987 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), | 994 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), |
988 TRACE_EVENT_FLAG_FLOW_IN); | 995 TRACE_EVENT_FLAG_FLOW_IN); |
989 int new_thread_id = WillRunWorkerTask(task); | 996 int new_thread_id = WillRunWorkerTask(task); |
990 { | 997 { |
991 AutoUnlock unlock(lock_); | 998 AutoUnlock unlock(lock_); |
992 // There may be more work available, so wake up another | 999 // There may be more work available, so wake up another |
993 // worker thread. (Technically not required, since we | 1000 // worker thread. (Technically not required, since we |
994 // already get a signal for each new task, but it doesn't | 1001 // already get a signal for each new task, but it doesn't |
995 // hurt.) | 1002 // hurt.) |
996 SignalHasWork(); | 1003 SignalHasWork(); |
997 delete_these_outside_lock.clear(); | 1004 DeleteTheseOutsideLockHelper(&delete_these_outside_lock, this_worker); |
998 | 1005 |
999 // Complete thread creation outside the lock if necessary. | 1006 // Complete thread creation outside the lock if necessary. |
1000 if (new_thread_id) | 1007 if (new_thread_id) |
1001 FinishStartingAdditionalThread(new_thread_id); | 1008 FinishStartingAdditionalThread(new_thread_id); |
1002 | 1009 |
1003 this_worker->set_running_task_info( | 1010 this_worker->set_running_task_info( |
1004 SequenceToken(task.sequence_token_id), task.shutdown_behavior); | 1011 SequenceToken(task.sequence_token_id), task.shutdown_behavior); |
1005 | 1012 |
1006 tracked_objects::TaskStopwatch stopwatch; | 1013 tracked_objects::TaskStopwatch stopwatch; |
1007 stopwatch.Start(); | 1014 stopwatch.Start(); |
(...skipping 10 matching lines...) Expand all Loading... | |
1018 // still works. | 1025 // still works. |
1019 task.task = Closure(); | 1026 task.task = Closure(); |
1020 | 1027 |
1021 this_worker->reset_running_task_info(); | 1028 this_worker->reset_running_task_info(); |
1022 } | 1029 } |
1023 DidRunWorkerTask(task); // Must be done inside the lock. | 1030 DidRunWorkerTask(task); // Must be done inside the lock. |
1024 } else if (cleanup_state_ == CLEANUP_RUNNING) { | 1031 } else if (cleanup_state_ == CLEANUP_RUNNING) { |
1025 switch (status) { | 1032 switch (status) { |
1026 case GET_WORK_WAIT: { | 1033 case GET_WORK_WAIT: { |
1027 AutoUnlock unlock(lock_); | 1034 AutoUnlock unlock(lock_); |
1028 delete_these_outside_lock.clear(); | 1035 DeleteTheseOutsideLockHelper(&delete_these_outside_lock, |
1036 this_worker); | |
1029 } | 1037 } |
1030 break; | 1038 break; |
1031 case GET_WORK_NOT_FOUND: | 1039 case GET_WORK_NOT_FOUND: |
1032 CHECK(delete_these_outside_lock.empty()); | 1040 CHECK(delete_these_outside_lock.empty()); |
1033 cleanup_state_ = CLEANUP_FINISHING; | 1041 cleanup_state_ = CLEANUP_FINISHING; |
1034 cleanup_cv_.Broadcast(); | 1042 cleanup_cv_.Broadcast(); |
1035 break; | 1043 break; |
1036 default: | 1044 default: |
1037 NOTREACHED(); | 1045 NOTREACHED(); |
1038 } | 1046 } |
1039 } else { | 1047 } else { |
1040 // When we're terminating and there's no more work, we can | 1048 // When we're terminating and there's no more work, we can |
1041 // shut down, other workers can complete any pending or new tasks. | 1049 // shut down, other workers can complete any pending or new tasks. |
1042 // We can get additional tasks posted after shutdown_called_ is set | 1050 // We can get additional tasks posted after shutdown_called_ is set |
1043 // but only worker threads are allowed to post tasks at that time, and | 1051 // but only worker threads are allowed to post tasks at that time, and |
1044 // the workers responsible for posting those tasks will be available | 1052 // the workers responsible for posting those tasks will be available |
1045 // to run them. Also, there may be some tasks stuck behind running | 1053 // to run them. Also, there may be some tasks stuck behind running |
1046 // ones with the same sequence token, but additional threads won't | 1054 // ones with the same sequence token, but additional threads won't |
1047 // help this case. | 1055 // help this case. |
1048 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { | 1056 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { |
1049 AutoUnlock unlock(lock_); | 1057 AutoUnlock unlock(lock_); |
1050 delete_these_outside_lock.clear(); | 1058 DeleteTheseOutsideLockHelper(&delete_these_outside_lock, this_worker); |
1051 break; | 1059 break; |
1052 } | 1060 } |
1053 | 1061 |
1054 // No work was found, but there are tasks that need deletion. The | 1062 // No work was found, but there are tasks that need deletion. The |
1055 // deletion must happen outside of the lock. | 1063 // deletion must happen outside of the lock. |
1056 if (delete_these_outside_lock.size()) { | 1064 if (delete_these_outside_lock.size()) { |
1057 AutoUnlock unlock(lock_); | 1065 AutoUnlock unlock(lock_); |
1058 delete_these_outside_lock.clear(); | 1066 DeleteTheseOutsideLockHelper(&delete_these_outside_lock, this_worker); |
1059 | 1067 |
1060 // Since the lock has been released, |status| may no longer be | 1068 // Since the lock has been released, |status| may no longer be |
1061 // accurate. It might read GET_WORK_WAIT even if there are tasks | 1069 // accurate. It might read GET_WORK_WAIT even if there are tasks |
1062 // ready to perform work. Jump to the top of the loop to recalculate | 1070 // ready to perform work. Jump to the top of the loop to recalculate |
1063 // |status|. | 1071 // |status|. |
1064 continue; | 1072 continue; |
1065 } | 1073 } |
1066 | 1074 |
1067 waiting_thread_count_++; | 1075 waiting_thread_count_++; |
1068 | 1076 |
1069 switch (status) { | 1077 switch (status) { |
1070 case GET_WORK_NOT_FOUND: | 1078 case GET_WORK_NOT_FOUND: |
1071 has_work_cv_.Wait(); | 1079 has_work_cv_.Wait(); |
1072 break; | 1080 break; |
1073 case GET_WORK_WAIT: | 1081 case GET_WORK_WAIT: |
1074 has_work_cv_.TimedWait(wait_time); | 1082 has_work_cv_.TimedWait(wait_time); |
1075 break; | 1083 break; |
1076 default: | 1084 default: |
1077 NOTREACHED(); | 1085 NOTREACHED(); |
1078 } | 1086 } |
1079 waiting_thread_count_--; | 1087 waiting_thread_count_--; |
1080 } | 1088 } |
1089 // |delete_these_outside_lock| should have been cleared via | |
1090 // DeleteTheseOutsideLockHelper() above already. | |
1091 DCHECK(delete_these_outside_lock.empty()); | |
1081 } | 1092 } |
1082 } // Release lock_. | 1093 } // Release lock_. |
1083 | 1094 |
1084 // We noticed we should exit. Wake up the next worker so it knows it should | 1095 // We noticed we should exit. Wake up the next worker so it knows it should |
1085 // exit as well (because the Shutdown() code only signals once). | 1096 // exit as well (because the Shutdown() code only signals once). |
1086 SignalHasWork(); | 1097 SignalHasWork(); |
1087 | 1098 |
1088 // Possibly unblock shutdown. | 1099 // Possibly unblock shutdown. |
1089 can_shutdown_cv_.Signal(); | 1100 can_shutdown_cv_.Signal(); |
1090 } | 1101 } |
1091 | 1102 |
1103 void SequencedWorkerPool::Inner::DeleteTheseOutsideLockHelper( | |
1104 std::vector<SequencedTask>* delete_these_outside_lock, | |
1105 Worker* this_worker) { | |
1106 while (!delete_these_outside_lock->empty()) { | |
1107 const SequencedTask& deleted_task = delete_these_outside_lock->back(); | |
1108 this_worker->set_running_task_info( | |
1109 SequenceToken(deleted_task.sequence_token_id), | |
1110 deleted_task.shutdown_behavior); | |
1111 delete_these_outside_lock->pop_back(); | |
1112 this_worker->reset_running_task_info(); | |
vmpstr
2016/12/03 01:14:04
Does this need to happen after every iteration, or
gab
2016/12/23 20:22:10
Done on https://codereview.chromium.org/2581213002
| |
1113 } | |
1114 } | |
1115 | |
1092 void SequencedWorkerPool::Inner::HandleCleanup() { | 1116 void SequencedWorkerPool::Inner::HandleCleanup() { |
1093 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); | 1117 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1094 | 1118 |
1095 lock_.AssertAcquired(); | 1119 lock_.AssertAcquired(); |
1096 if (cleanup_state_ == CLEANUP_DONE) | 1120 if (cleanup_state_ == CLEANUP_DONE) |
1097 return; | 1121 return; |
1098 if (cleanup_state_ == CLEANUP_REQUESTED) { | 1122 if (cleanup_state_ == CLEANUP_REQUESTED) { |
1099 // We win, we get to do the cleanup as soon as the others wise up and idle. | 1123 // We win, we get to do the cleanup as soon as the others wise up and idle. |
1100 cleanup_state_ = CLEANUP_STARTING; | 1124 cleanup_state_ = CLEANUP_STARTING; |
1101 while (thread_being_created_ || | 1125 while (thread_being_created_ || |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1149 | 1173 |
1150 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 1174 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
1151 lock_.AssertAcquired(); | 1175 lock_.AssertAcquired(); |
1152 // We assume that we never create enough tasks to wrap around. | 1176 // We assume that we never create enough tasks to wrap around. |
1153 return next_sequence_task_number_++; | 1177 return next_sequence_task_number_++; |
1154 } | 1178 } |
1155 | 1179 |
1156 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 1180 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
1157 SequencedTask* task, | 1181 SequencedTask* task, |
1158 TimeDelta* wait_time, | 1182 TimeDelta* wait_time, |
1159 std::vector<Closure>* delete_these_outside_lock) { | 1183 std::vector<SequencedTask>* delete_these_outside_lock) { |
1160 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); | 1184 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1161 | 1185 |
1162 lock_.AssertAcquired(); | 1186 lock_.AssertAcquired(); |
1163 | 1187 |
1164 // Find the next task with a sequence token that's not currently in use. | 1188 // Find the next task with a sequence token that's not currently in use. |
1165 // If the token is in use, that means another thread is running something | 1189 // If the token is in use, that means another thread is running something |
1166 // in that sequence, and we can't run it without going out-of-order. | 1190 // in that sequence, and we can't run it without going out-of-order. |
1167 // | 1191 // |
1168 // This algorithm is simple and fair, but inefficient in some cases. For | 1192 // This algorithm is simple and fair, but inefficient in some cases. For |
1169 // example, say somebody schedules 1000 slow tasks with the same sequence | 1193 // example, say somebody schedules 1000 slow tasks with the same sequence |
(...skipping 25 matching lines...) Expand all Loading... | |
1195 unrunnable_tasks++; | 1219 unrunnable_tasks++; |
1196 ++i; | 1220 ++i; |
1197 continue; | 1221 continue; |
1198 } | 1222 } |
1199 | 1223 |
1200 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { | 1224 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { |
1201 // We're shutting down and the task we just found isn't blocking | 1225 // We're shutting down and the task we just found isn't blocking |
1202 // shutdown. Delete it and get more work. | 1226 // shutdown. Delete it and get more work. |
1203 // | 1227 // |
1204 // Note that we do not want to delete unrunnable tasks. Deleting a task | 1228 // Note that we do not want to delete unrunnable tasks. Deleting a task |
1205 // can have side effects (like freeing some objects) and deleting a | 1229 // can have side effects (like freeing some objects) and deleting a task |
1206 // task that's supposed to run after one that's currently running could | 1230 // that's supposed to run after one that's currently running could cause |
1207 // cause an obscure crash. | 1231 // an obscure crash. |
1208 // | 1232 // |
1209 // We really want to delete these tasks outside the lock in case the | 1233 // We really want to delete these tasks outside the lock in case the |
1210 // closures are holding refs to objects that want to post work from | 1234 // closures are holding refs to objects that want to post work from their |
1211 // their destructorss (which would deadlock). The closures are | 1235 // destructors (which would deadlock). The closures are internally |
1212 // internally refcounted, so we just need to keep a copy of them alive | 1236 // refcounted, so we just need to keep a copy of them alive until the lock |
1213 // until the lock is exited. The calling code can just clear() the | 1237 // is exited. The calling code can just clear() the vector they passed to |
1214 // vector they passed to us once the lock is exited to make this | 1238 // us once the lock is exited to make this happen. |
1215 // happen. | 1239 delete_these_outside_lock->push_back(*i); |
1216 delete_these_outside_lock->push_back(i->task); | |
1217 pending_tasks_.erase(i++); | 1240 pending_tasks_.erase(i++); |
1218 continue; | 1241 continue; |
1219 } | 1242 } |
1220 | 1243 |
1221 if (i->time_to_run > current_time) { | 1244 if (i->time_to_run > current_time) { |
1222 // The time to run has not come yet. | 1245 // The time to run has not come yet. |
1223 *wait_time = i->time_to_run - current_time; | 1246 *wait_time = i->time_to_run - current_time; |
1224 status = GET_WORK_WAIT; | 1247 status = GET_WORK_WAIT; |
1225 if (cleanup_state_ == CLEANUP_RUNNING) { | 1248 if (cleanup_state_ == CLEANUP_RUNNING) { |
1226 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. | 1249 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. |
1227 delete_these_outside_lock->push_back(i->task); | 1250 delete_these_outside_lock->push_back(*i); |
1228 pending_tasks_.erase(i); | 1251 pending_tasks_.erase(i); |
1229 } | 1252 } |
1230 break; | 1253 break; |
1231 } | 1254 } |
1232 | 1255 |
1233 // Found a runnable task. | 1256 // Found a runnable task. |
1234 *task = *i; | 1257 *task = *i; |
1235 pending_tasks_.erase(i); | 1258 pending_tasks_.erase(i); |
1236 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { | 1259 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
1237 blocking_shutdown_pending_task_count_--; | 1260 blocking_shutdown_pending_task_count_--; |
(...skipping 358 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1596 bool SequencedWorkerPool::IsShutdownInProgress() { | 1619 bool SequencedWorkerPool::IsShutdownInProgress() { |
1597 return inner_->IsShutdownInProgress(); | 1620 return inner_->IsShutdownInProgress(); |
1598 } | 1621 } |
1599 | 1622 |
1600 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1623 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
1601 SequenceToken sequence_token) const { | 1624 SequenceToken sequence_token) const { |
1602 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1625 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
1603 } | 1626 } |
1604 | 1627 |
1605 } // namespace base | 1628 } // namespace base |
OLD | NEW |