| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <list> | 9 #include <list> |
| 10 #include <map> | 10 #include <map> |
| (...skipping 365 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 376 }; | 376 }; |
| 377 | 377 |
| 378 enum CleanupState { | 378 enum CleanupState { |
| 379 CLEANUP_REQUESTED, | 379 CLEANUP_REQUESTED, |
| 380 CLEANUP_STARTING, | 380 CLEANUP_STARTING, |
| 381 CLEANUP_RUNNING, | 381 CLEANUP_RUNNING, |
| 382 CLEANUP_FINISHING, | 382 CLEANUP_FINISHING, |
| 383 CLEANUP_DONE, | 383 CLEANUP_DONE, |
| 384 }; | 384 }; |
| 385 | 385 |
| 386 // Clears ScheduledTasks in |tasks_to_delete| while ensuring that | |
| 387 // |this_worker| has the desired task info context during ~ScheduledTask() to | |
| 388 // allow sequence-checking. | |
| 389 void DeleteWithoutLock(std::vector<SequencedTask>* tasks_to_delete, | |
| 390 Worker* this_worker); | |
| 391 | |
| 392 // Helper used by PostTask() to complete the work when redirection is on. | 386 // Helper used by PostTask() to complete the work when redirection is on. |
| 393 // Returns true if the task may run at some point in the future and false if | 387 // Returns true if the task may run at some point in the future and false if |
| 394 // it will definitely not run. | 388 // it will definitely not run. |
| 395 // Coalesce upon resolution of http://crbug.com/622400. | 389 // Coalesce upon resolution of http://crbug.com/622400. |
| 396 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, | 390 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, |
| 397 const TimeDelta& delay); | 391 const TimeDelta& delay); |
| 398 | 392 |
| 399 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| | 393 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| |
| 400 // and |traits|. | 394 // and |traits|. |
| 401 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( | 395 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( |
| (...skipping 17 matching lines...) Expand all Loading... |
| 419 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run | 413 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run |
| 420 // immediately, and |task| is not filled in. Likewise, |wait_time| is | 414 // immediately, and |task| is not filled in. Likewise, |wait_time| is |
| 421 // filled in the time to wait until the next task to run. In this case, the | 415 // filled in the time to wait until the next task to run. In this case, the |
| 422 // caller should wait the time. | 416 // caller should wait the time. |
| 423 // | 417 // |
| 424 // In any case, the calling code should clear the given | 418 // In any case, the calling code should clear the given |
| 425 // delete_these_outside_lock vector the next time the lock is released. | 419 // delete_these_outside_lock vector the next time the lock is released. |
| 426 // See the implementation for a more detailed description. | 420 // See the implementation for a more detailed description. |
| 427 GetWorkStatus GetWork(SequencedTask* task, | 421 GetWorkStatus GetWork(SequencedTask* task, |
| 428 TimeDelta* wait_time, | 422 TimeDelta* wait_time, |
| 429 std::vector<SequencedTask>* delete_these_outside_lock); | 423 std::vector<Closure>* delete_these_outside_lock); |
| 430 | 424 |
| 431 void HandleCleanup(); | 425 void HandleCleanup(); |
| 432 | 426 |
| 433 // Peforms init and cleanup around running the given task. WillRun... | 427 // Peforms init and cleanup around running the given task. WillRun... |
| 434 // returns the value from PrepareToStartAdditionalThreadIfNecessary. | 428 // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
| 435 // The calling code should call FinishStartingAdditionalThread once the | 429 // The calling code should call FinishStartingAdditionalThread once the |
| 436 // lock is released if the return values is nonzero. | 430 // lock is released if the return values is nonzero. |
| 437 int WillRunWorkerTask(const SequencedTask& task); | 431 int WillRunWorkerTask(const SequencedTask& task); |
| 438 void DidRunWorkerTask(const SequencedTask& task); | 432 void DidRunWorkerTask(const SequencedTask& task); |
| 439 | 433 |
| (...skipping 542 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 982 while (true) { | 976 while (true) { |
| 983 #if defined(OS_MACOSX) | 977 #if defined(OS_MACOSX) |
| 984 base::mac::ScopedNSAutoreleasePool autorelease_pool; | 978 base::mac::ScopedNSAutoreleasePool autorelease_pool; |
| 985 #endif | 979 #endif |
| 986 | 980 |
| 987 HandleCleanup(); | 981 HandleCleanup(); |
| 988 | 982 |
| 989 // See GetWork for what delete_these_outside_lock is doing. | 983 // See GetWork for what delete_these_outside_lock is doing. |
| 990 SequencedTask task; | 984 SequencedTask task; |
| 991 TimeDelta wait_time; | 985 TimeDelta wait_time; |
| 992 std::vector<SequencedTask> delete_these_outside_lock; | 986 std::vector<Closure> delete_these_outside_lock; |
| 993 GetWorkStatus status = | 987 GetWorkStatus status = |
| 994 GetWork(&task, &wait_time, &delete_these_outside_lock); | 988 GetWork(&task, &wait_time, &delete_these_outside_lock); |
| 995 if (status == GET_WORK_FOUND) { | 989 if (status == GET_WORK_FOUND) { |
| 996 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); | 990 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); |
| 997 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), | 991 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), |
| 998 "SequencedWorkerPool::Inner::PostTask", | 992 "SequencedWorkerPool::Inner::PostTask", |
| 999 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), | 993 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), |
| 1000 TRACE_EVENT_FLAG_FLOW_IN); | 994 TRACE_EVENT_FLAG_FLOW_IN); |
| 1001 int new_thread_id = WillRunWorkerTask(task); | 995 int new_thread_id = WillRunWorkerTask(task); |
| 1002 { | 996 { |
| 1003 AutoUnlock unlock(lock_); | 997 AutoUnlock unlock(lock_); |
| 1004 // There may be more work available, so wake up another | 998 // There may be more work available, so wake up another |
| 1005 // worker thread. (Technically not required, since we | 999 // worker thread. (Technically not required, since we |
| 1006 // already get a signal for each new task, but it doesn't | 1000 // already get a signal for each new task, but it doesn't |
| 1007 // hurt.) | 1001 // hurt.) |
| 1008 SignalHasWork(); | 1002 SignalHasWork(); |
| 1009 DeleteWithoutLock(&delete_these_outside_lock, this_worker); | 1003 delete_these_outside_lock.clear(); |
| 1010 | 1004 |
| 1011 // Complete thread creation outside the lock if necessary. | 1005 // Complete thread creation outside the lock if necessary. |
| 1012 if (new_thread_id) | 1006 if (new_thread_id) |
| 1013 FinishStartingAdditionalThread(new_thread_id); | 1007 FinishStartingAdditionalThread(new_thread_id); |
| 1014 | 1008 |
| 1015 this_worker->set_running_task_info( | 1009 this_worker->set_running_task_info( |
| 1016 SequenceToken(task.sequence_token_id), task.shutdown_behavior); | 1010 SequenceToken(task.sequence_token_id), task.shutdown_behavior); |
| 1017 | 1011 |
| 1018 tracked_objects::TaskStopwatch stopwatch; | 1012 tracked_objects::TaskStopwatch stopwatch; |
| 1019 stopwatch.Start(); | 1013 stopwatch.Start(); |
| (...skipping 10 matching lines...) Expand all Loading... |
| 1030 // still works. | 1024 // still works. |
| 1031 task.task = Closure(); | 1025 task.task = Closure(); |
| 1032 | 1026 |
| 1033 this_worker->reset_running_task_info(); | 1027 this_worker->reset_running_task_info(); |
| 1034 } | 1028 } |
| 1035 DidRunWorkerTask(task); // Must be done inside the lock. | 1029 DidRunWorkerTask(task); // Must be done inside the lock. |
| 1036 } else if (cleanup_state_ == CLEANUP_RUNNING) { | 1030 } else if (cleanup_state_ == CLEANUP_RUNNING) { |
| 1037 switch (status) { | 1031 switch (status) { |
| 1038 case GET_WORK_WAIT: { | 1032 case GET_WORK_WAIT: { |
| 1039 AutoUnlock unlock(lock_); | 1033 AutoUnlock unlock(lock_); |
| 1040 DeleteWithoutLock(&delete_these_outside_lock, this_worker); | 1034 delete_these_outside_lock.clear(); |
| 1041 } | 1035 } |
| 1042 break; | 1036 break; |
| 1043 case GET_WORK_NOT_FOUND: | 1037 case GET_WORK_NOT_FOUND: |
| 1044 CHECK(delete_these_outside_lock.empty()); | 1038 CHECK(delete_these_outside_lock.empty()); |
| 1045 cleanup_state_ = CLEANUP_FINISHING; | 1039 cleanup_state_ = CLEANUP_FINISHING; |
| 1046 cleanup_cv_.Broadcast(); | 1040 cleanup_cv_.Broadcast(); |
| 1047 break; | 1041 break; |
| 1048 default: | 1042 default: |
| 1049 NOTREACHED(); | 1043 NOTREACHED(); |
| 1050 } | 1044 } |
| 1051 } else { | 1045 } else { |
| 1052 // When we're terminating and there's no more work, we can | 1046 // When we're terminating and there's no more work, we can |
| 1053 // shut down, other workers can complete any pending or new tasks. | 1047 // shut down, other workers can complete any pending or new tasks. |
| 1054 // We can get additional tasks posted after shutdown_called_ is set | 1048 // We can get additional tasks posted after shutdown_called_ is set |
| 1055 // but only worker threads are allowed to post tasks at that time, and | 1049 // but only worker threads are allowed to post tasks at that time, and |
| 1056 // the workers responsible for posting those tasks will be available | 1050 // the workers responsible for posting those tasks will be available |
| 1057 // to run them. Also, there may be some tasks stuck behind running | 1051 // to run them. Also, there may be some tasks stuck behind running |
| 1058 // ones with the same sequence token, but additional threads won't | 1052 // ones with the same sequence token, but additional threads won't |
| 1059 // help this case. | 1053 // help this case. |
| 1060 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { | 1054 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { |
| 1061 AutoUnlock unlock(lock_); | 1055 AutoUnlock unlock(lock_); |
| 1062 DeleteWithoutLock(&delete_these_outside_lock, this_worker); | 1056 delete_these_outside_lock.clear(); |
| 1063 break; | 1057 break; |
| 1064 } | 1058 } |
| 1065 | 1059 |
| 1066 // No work was found, but there are tasks that need deletion. The | 1060 // No work was found, but there are tasks that need deletion. The |
| 1067 // deletion must happen outside of the lock. | 1061 // deletion must happen outside of the lock. |
| 1068 if (delete_these_outside_lock.size()) { | 1062 if (delete_these_outside_lock.size()) { |
| 1069 AutoUnlock unlock(lock_); | 1063 AutoUnlock unlock(lock_); |
| 1070 DeleteWithoutLock(&delete_these_outside_lock, this_worker); | 1064 delete_these_outside_lock.clear(); |
| 1071 | 1065 |
| 1072 // Since the lock has been released, |status| may no longer be | 1066 // Since the lock has been released, |status| may no longer be |
| 1073 // accurate. It might read GET_WORK_WAIT even if there are tasks | 1067 // accurate. It might read GET_WORK_WAIT even if there are tasks |
| 1074 // ready to perform work. Jump to the top of the loop to recalculate | 1068 // ready to perform work. Jump to the top of the loop to recalculate |
| 1075 // |status|. | 1069 // |status|. |
| 1076 continue; | 1070 continue; |
| 1077 } | 1071 } |
| 1078 | 1072 |
| 1079 waiting_thread_count_++; | 1073 waiting_thread_count_++; |
| 1080 | 1074 |
| 1081 switch (status) { | 1075 switch (status) { |
| 1082 case GET_WORK_NOT_FOUND: | 1076 case GET_WORK_NOT_FOUND: |
| 1083 has_work_cv_.Wait(); | 1077 has_work_cv_.Wait(); |
| 1084 break; | 1078 break; |
| 1085 case GET_WORK_WAIT: | 1079 case GET_WORK_WAIT: |
| 1086 has_work_cv_.TimedWait(wait_time); | 1080 has_work_cv_.TimedWait(wait_time); |
| 1087 break; | 1081 break; |
| 1088 default: | 1082 default: |
| 1089 NOTREACHED(); | 1083 NOTREACHED(); |
| 1090 } | 1084 } |
| 1091 waiting_thread_count_--; | 1085 waiting_thread_count_--; |
| 1092 } | 1086 } |
| 1093 // |delete_these_outside_lock| should have been cleared via | |
| 1094 // DeleteWithoutLock() above already. | |
| 1095 DCHECK(delete_these_outside_lock.empty()); | |
| 1096 } | 1087 } |
| 1097 } // Release lock_. | 1088 } // Release lock_. |
| 1098 | 1089 |
| 1099 // We noticed we should exit. Wake up the next worker so it knows it should | 1090 // We noticed we should exit. Wake up the next worker so it knows it should |
| 1100 // exit as well (because the Shutdown() code only signals once). | 1091 // exit as well (because the Shutdown() code only signals once). |
| 1101 SignalHasWork(); | 1092 SignalHasWork(); |
| 1102 | 1093 |
| 1103 // Possibly unblock shutdown. | 1094 // Possibly unblock shutdown. |
| 1104 can_shutdown_cv_.Signal(); | 1095 can_shutdown_cv_.Signal(); |
| 1105 } | 1096 } |
| 1106 | 1097 |
| 1107 void SequencedWorkerPool::Inner::DeleteWithoutLock( | |
| 1108 std::vector<SequencedTask>* tasks_to_delete, | |
| 1109 Worker* this_worker) { | |
| 1110 while (!tasks_to_delete->empty()) { | |
| 1111 const SequencedTask& deleted_task = tasks_to_delete->back(); | |
| 1112 this_worker->set_running_task_info( | |
| 1113 SequenceToken(deleted_task.sequence_token_id), | |
| 1114 deleted_task.shutdown_behavior); | |
| 1115 tasks_to_delete->pop_back(); | |
| 1116 } | |
| 1117 this_worker->reset_running_task_info(); | |
| 1118 } | |
| 1119 | |
| 1120 void SequencedWorkerPool::Inner::HandleCleanup() { | 1098 void SequencedWorkerPool::Inner::HandleCleanup() { |
| 1121 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); | 1099 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 1122 | 1100 |
| 1123 lock_.AssertAcquired(); | 1101 lock_.AssertAcquired(); |
| 1124 if (cleanup_state_ == CLEANUP_DONE) | 1102 if (cleanup_state_ == CLEANUP_DONE) |
| 1125 return; | 1103 return; |
| 1126 if (cleanup_state_ == CLEANUP_REQUESTED) { | 1104 if (cleanup_state_ == CLEANUP_REQUESTED) { |
| 1127 // We win, we get to do the cleanup as soon as the others wise up and idle. | 1105 // We win, we get to do the cleanup as soon as the others wise up and idle. |
| 1128 cleanup_state_ = CLEANUP_STARTING; | 1106 cleanup_state_ = CLEANUP_STARTING; |
| 1129 while (thread_being_created_ || | 1107 while (thread_being_created_ || |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1177 | 1155 |
| 1178 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 1156 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
| 1179 lock_.AssertAcquired(); | 1157 lock_.AssertAcquired(); |
| 1180 // We assume that we never create enough tasks to wrap around. | 1158 // We assume that we never create enough tasks to wrap around. |
| 1181 return next_sequence_task_number_++; | 1159 return next_sequence_task_number_++; |
| 1182 } | 1160 } |
| 1183 | 1161 |
| 1184 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 1162 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| 1185 SequencedTask* task, | 1163 SequencedTask* task, |
| 1186 TimeDelta* wait_time, | 1164 TimeDelta* wait_time, |
| 1187 std::vector<SequencedTask>* delete_these_outside_lock) { | 1165 std::vector<Closure>* delete_these_outside_lock) { |
| 1188 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); | 1166 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 1189 | 1167 |
| 1190 lock_.AssertAcquired(); | 1168 lock_.AssertAcquired(); |
| 1191 | 1169 |
| 1192 // Find the next task with a sequence token that's not currently in use. | 1170 // Find the next task with a sequence token that's not currently in use. |
| 1193 // If the token is in use, that means another thread is running something | 1171 // If the token is in use, that means another thread is running something |
| 1194 // in that sequence, and we can't run it without going out-of-order. | 1172 // in that sequence, and we can't run it without going out-of-order. |
| 1195 // | 1173 // |
| 1196 // This algorithm is simple and fair, but inefficient in some cases. For | 1174 // This algorithm is simple and fair, but inefficient in some cases. For |
| 1197 // example, say somebody schedules 1000 slow tasks with the same sequence | 1175 // example, say somebody schedules 1000 slow tasks with the same sequence |
| (...skipping 25 matching lines...) Expand all Loading... |
| 1223 unrunnable_tasks++; | 1201 unrunnable_tasks++; |
| 1224 ++i; | 1202 ++i; |
| 1225 continue; | 1203 continue; |
| 1226 } | 1204 } |
| 1227 | 1205 |
| 1228 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { | 1206 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { |
| 1229 // We're shutting down and the task we just found isn't blocking | 1207 // We're shutting down and the task we just found isn't blocking |
| 1230 // shutdown. Delete it and get more work. | 1208 // shutdown. Delete it and get more work. |
| 1231 // | 1209 // |
| 1232 // Note that we do not want to delete unrunnable tasks. Deleting a task | 1210 // Note that we do not want to delete unrunnable tasks. Deleting a task |
| 1233 // can have side effects (like freeing some objects) and deleting a task | 1211 // can have side effects (like freeing some objects) and deleting a |
| 1234 // that's supposed to run after one that's currently running could cause | 1212 // task that's supposed to run after one that's currently running could |
| 1235 // an obscure crash. | 1213 // cause an obscure crash. |
| 1236 // | 1214 // |
| 1237 // We really want to delete these tasks outside the lock in case the | 1215 // We really want to delete these tasks outside the lock in case the |
| 1238 // closures are holding refs to objects that want to post work from their | 1216 // closures are holding refs to objects that want to post work from |
| 1239 // destructors (which would deadlock). The closures are internally | 1217 // their destructorss (which would deadlock). The closures are |
| 1240 // refcounted, so we just need to keep a copy of them alive until the lock | 1218 // internally refcounted, so we just need to keep a copy of them alive |
| 1241 // is exited. The calling code can just clear() the vector they passed to | 1219 // until the lock is exited. The calling code can just clear() the |
| 1242 // us once the lock is exited to make this happen. | 1220 // vector they passed to us once the lock is exited to make this |
| 1243 delete_these_outside_lock->push_back(*i); | 1221 // happen. |
| 1222 delete_these_outside_lock->push_back(i->task); |
| 1244 pending_tasks_.erase(i++); | 1223 pending_tasks_.erase(i++); |
| 1245 continue; | 1224 continue; |
| 1246 } | 1225 } |
| 1247 | 1226 |
| 1248 if (i->time_to_run > current_time) { | 1227 if (i->time_to_run > current_time) { |
| 1249 // The time to run has not come yet. | 1228 // The time to run has not come yet. |
| 1250 *wait_time = i->time_to_run - current_time; | 1229 *wait_time = i->time_to_run - current_time; |
| 1251 status = GET_WORK_WAIT; | 1230 status = GET_WORK_WAIT; |
| 1252 if (cleanup_state_ == CLEANUP_RUNNING) { | 1231 if (cleanup_state_ == CLEANUP_RUNNING) { |
| 1253 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. | 1232 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. |
| 1254 delete_these_outside_lock->push_back(*i); | 1233 delete_these_outside_lock->push_back(i->task); |
| 1255 pending_tasks_.erase(i); | 1234 pending_tasks_.erase(i); |
| 1256 } | 1235 } |
| 1257 break; | 1236 break; |
| 1258 } | 1237 } |
| 1259 | 1238 |
| 1260 // Found a runnable task. | 1239 // Found a runnable task. |
| 1261 *task = *i; | 1240 *task = *i; |
| 1262 pending_tasks_.erase(i); | 1241 pending_tasks_.erase(i); |
| 1263 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { | 1242 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
| 1264 blocking_shutdown_pending_task_count_--; | 1243 blocking_shutdown_pending_task_count_--; |
| (...skipping 360 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1625 bool SequencedWorkerPool::IsShutdownInProgress() { | 1604 bool SequencedWorkerPool::IsShutdownInProgress() { |
| 1626 return inner_->IsShutdownInProgress(); | 1605 return inner_->IsShutdownInProgress(); |
| 1627 } | 1606 } |
| 1628 | 1607 |
| 1629 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1608 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
| 1630 SequenceToken sequence_token) const { | 1609 SequenceToken sequence_token) const { |
| 1631 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1610 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
| 1632 } | 1611 } |
| 1633 | 1612 |
| 1634 } // namespace base | 1613 } // namespace base |
| OLD | NEW |