| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <list> | 9 #include <list> |
| 10 #include <map> | 10 #include <map> |
| (...skipping 365 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 376 }; | 376 }; |
| 377 | 377 |
| 378 enum CleanupState { | 378 enum CleanupState { |
| 379 CLEANUP_REQUESTED, | 379 CLEANUP_REQUESTED, |
| 380 CLEANUP_STARTING, | 380 CLEANUP_STARTING, |
| 381 CLEANUP_RUNNING, | 381 CLEANUP_RUNNING, |
| 382 CLEANUP_FINISHING, | 382 CLEANUP_FINISHING, |
| 383 CLEANUP_DONE, | 383 CLEANUP_DONE, |
| 384 }; | 384 }; |
| 385 | 385 |
| 386 // Clears ScheduledTasks in |tasks_to_delete| while ensuring that |
| 387 // |this_worker| has the desired task info context during ~ScheduledTask() to |
| 388 // allow sequence-checking. |
| 389 void DeleteWithoutLock(std::vector<SequencedTask>* tasks_to_delete, |
| 390 Worker* this_worker); |
| 391 |
| 386 // Helper used by PostTask() to complete the work when redirection is on. | 392 // Helper used by PostTask() to complete the work when redirection is on. |
| 387 // Returns true if the task may run at some point in the future and false if | 393 // Returns true if the task may run at some point in the future and false if |
| 388 // it will definitely not run. | 394 // it will definitely not run. |
| 389 // Coalesce upon resolution of http://crbug.com/622400. | 395 // Coalesce upon resolution of http://crbug.com/622400. |
| 390 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, | 396 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, |
| 391 const TimeDelta& delay); | 397 const TimeDelta& delay); |
| 392 | 398 |
| 393 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| | 399 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| |
| 394 // and |traits|. | 400 // and |traits|. |
| 395 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( | 401 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( |
| (...skipping 17 matching lines...) Expand all Loading... |
| 413 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run | 419 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run |
| 414 // immediately, and |task| is not filled in. Likewise, |wait_time| is | 420 // immediately, and |task| is not filled in. Likewise, |wait_time| is |
| 415 // filled in the time to wait until the next task to run. In this case, the | 421 // filled in the time to wait until the next task to run. In this case, the |
| 416 // caller should wait the time. | 422 // caller should wait the time. |
| 417 // | 423 // |
| 418 // In any case, the calling code should clear the given | 424 // In any case, the calling code should clear the given |
| 419 // delete_these_outside_lock vector the next time the lock is released. | 425 // delete_these_outside_lock vector the next time the lock is released. |
| 420 // See the implementation for a more detailed description. | 426 // See the implementation for a more detailed description. |
| 421 GetWorkStatus GetWork(SequencedTask* task, | 427 GetWorkStatus GetWork(SequencedTask* task, |
| 422 TimeDelta* wait_time, | 428 TimeDelta* wait_time, |
| 423 std::vector<Closure>* delete_these_outside_lock); | 429 std::vector<SequencedTask>* delete_these_outside_lock); |
| 424 | 430 |
| 425 void HandleCleanup(); | 431 void HandleCleanup(); |
| 426 | 432 |
| 427 // Peforms init and cleanup around running the given task. WillRun... | 433 // Peforms init and cleanup around running the given task. WillRun... |
| 428 // returns the value from PrepareToStartAdditionalThreadIfNecessary. | 434 // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
| 429 // The calling code should call FinishStartingAdditionalThread once the | 435 // The calling code should call FinishStartingAdditionalThread once the |
| 430 // lock is released if the return values is nonzero. | 436 // lock is released if the return values is nonzero. |
| 431 int WillRunWorkerTask(const SequencedTask& task); | 437 int WillRunWorkerTask(const SequencedTask& task); |
| 432 void DidRunWorkerTask(const SequencedTask& task); | 438 void DidRunWorkerTask(const SequencedTask& task); |
| 433 | 439 |
| (...skipping 542 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 976 while (true) { | 982 while (true) { |
| 977 #if defined(OS_MACOSX) | 983 #if defined(OS_MACOSX) |
| 978 base::mac::ScopedNSAutoreleasePool autorelease_pool; | 984 base::mac::ScopedNSAutoreleasePool autorelease_pool; |
| 979 #endif | 985 #endif |
| 980 | 986 |
| 981 HandleCleanup(); | 987 HandleCleanup(); |
| 982 | 988 |
| 983 // See GetWork for what delete_these_outside_lock is doing. | 989 // See GetWork for what delete_these_outside_lock is doing. |
| 984 SequencedTask task; | 990 SequencedTask task; |
| 985 TimeDelta wait_time; | 991 TimeDelta wait_time; |
| 986 std::vector<Closure> delete_these_outside_lock; | 992 std::vector<SequencedTask> delete_these_outside_lock; |
| 987 GetWorkStatus status = | 993 GetWorkStatus status = |
| 988 GetWork(&task, &wait_time, &delete_these_outside_lock); | 994 GetWork(&task, &wait_time, &delete_these_outside_lock); |
| 989 if (status == GET_WORK_FOUND) { | 995 if (status == GET_WORK_FOUND) { |
| 990 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); | 996 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); |
| 991 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), | 997 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), |
| 992 "SequencedWorkerPool::Inner::PostTask", | 998 "SequencedWorkerPool::Inner::PostTask", |
| 993 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), | 999 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), |
| 994 TRACE_EVENT_FLAG_FLOW_IN); | 1000 TRACE_EVENT_FLAG_FLOW_IN); |
| 995 int new_thread_id = WillRunWorkerTask(task); | 1001 int new_thread_id = WillRunWorkerTask(task); |
| 996 { | 1002 { |
| 997 AutoUnlock unlock(lock_); | 1003 AutoUnlock unlock(lock_); |
| 998 // There may be more work available, so wake up another | 1004 // There may be more work available, so wake up another |
| 999 // worker thread. (Technically not required, since we | 1005 // worker thread. (Technically not required, since we |
| 1000 // already get a signal for each new task, but it doesn't | 1006 // already get a signal for each new task, but it doesn't |
| 1001 // hurt.) | 1007 // hurt.) |
| 1002 SignalHasWork(); | 1008 SignalHasWork(); |
| 1003 delete_these_outside_lock.clear(); | 1009 DeleteWithoutLock(&delete_these_outside_lock, this_worker); |
| 1004 | 1010 |
| 1005 // Complete thread creation outside the lock if necessary. | 1011 // Complete thread creation outside the lock if necessary. |
| 1006 if (new_thread_id) | 1012 if (new_thread_id) |
| 1007 FinishStartingAdditionalThread(new_thread_id); | 1013 FinishStartingAdditionalThread(new_thread_id); |
| 1008 | 1014 |
| 1009 this_worker->set_running_task_info( | 1015 this_worker->set_running_task_info( |
| 1010 SequenceToken(task.sequence_token_id), task.shutdown_behavior); | 1016 SequenceToken(task.sequence_token_id), task.shutdown_behavior); |
| 1011 | 1017 |
| 1012 tracked_objects::TaskStopwatch stopwatch; | 1018 tracked_objects::TaskStopwatch stopwatch; |
| 1013 stopwatch.Start(); | 1019 stopwatch.Start(); |
| (...skipping 10 matching lines...) Expand all Loading... |
| 1024 // still works. | 1030 // still works. |
| 1025 task.task = Closure(); | 1031 task.task = Closure(); |
| 1026 | 1032 |
| 1027 this_worker->reset_running_task_info(); | 1033 this_worker->reset_running_task_info(); |
| 1028 } | 1034 } |
| 1029 DidRunWorkerTask(task); // Must be done inside the lock. | 1035 DidRunWorkerTask(task); // Must be done inside the lock. |
| 1030 } else if (cleanup_state_ == CLEANUP_RUNNING) { | 1036 } else if (cleanup_state_ == CLEANUP_RUNNING) { |
| 1031 switch (status) { | 1037 switch (status) { |
| 1032 case GET_WORK_WAIT: { | 1038 case GET_WORK_WAIT: { |
| 1033 AutoUnlock unlock(lock_); | 1039 AutoUnlock unlock(lock_); |
| 1034 delete_these_outside_lock.clear(); | 1040 DeleteWithoutLock(&delete_these_outside_lock, this_worker); |
| 1035 } | 1041 } |
| 1036 break; | 1042 break; |
| 1037 case GET_WORK_NOT_FOUND: | 1043 case GET_WORK_NOT_FOUND: |
| 1038 CHECK(delete_these_outside_lock.empty()); | 1044 CHECK(delete_these_outside_lock.empty()); |
| 1039 cleanup_state_ = CLEANUP_FINISHING; | 1045 cleanup_state_ = CLEANUP_FINISHING; |
| 1040 cleanup_cv_.Broadcast(); | 1046 cleanup_cv_.Broadcast(); |
| 1041 break; | 1047 break; |
| 1042 default: | 1048 default: |
| 1043 NOTREACHED(); | 1049 NOTREACHED(); |
| 1044 } | 1050 } |
| 1045 } else { | 1051 } else { |
| 1046 // When we're terminating and there's no more work, we can | 1052 // When we're terminating and there's no more work, we can |
| 1047 // shut down, other workers can complete any pending or new tasks. | 1053 // shut down, other workers can complete any pending or new tasks. |
| 1048 // We can get additional tasks posted after shutdown_called_ is set | 1054 // We can get additional tasks posted after shutdown_called_ is set |
| 1049 // but only worker threads are allowed to post tasks at that time, and | 1055 // but only worker threads are allowed to post tasks at that time, and |
| 1050 // the workers responsible for posting those tasks will be available | 1056 // the workers responsible for posting those tasks will be available |
| 1051 // to run them. Also, there may be some tasks stuck behind running | 1057 // to run them. Also, there may be some tasks stuck behind running |
| 1052 // ones with the same sequence token, but additional threads won't | 1058 // ones with the same sequence token, but additional threads won't |
| 1053 // help this case. | 1059 // help this case. |
| 1054 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { | 1060 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { |
| 1055 AutoUnlock unlock(lock_); | 1061 AutoUnlock unlock(lock_); |
| 1056 delete_these_outside_lock.clear(); | 1062 DeleteWithoutLock(&delete_these_outside_lock, this_worker); |
| 1057 break; | 1063 break; |
| 1058 } | 1064 } |
| 1059 | 1065 |
| 1060 // No work was found, but there are tasks that need deletion. The | 1066 // No work was found, but there are tasks that need deletion. The |
| 1061 // deletion must happen outside of the lock. | 1067 // deletion must happen outside of the lock. |
| 1062 if (delete_these_outside_lock.size()) { | 1068 if (delete_these_outside_lock.size()) { |
| 1063 AutoUnlock unlock(lock_); | 1069 AutoUnlock unlock(lock_); |
| 1064 delete_these_outside_lock.clear(); | 1070 DeleteWithoutLock(&delete_these_outside_lock, this_worker); |
| 1065 | 1071 |
| 1066 // Since the lock has been released, |status| may no longer be | 1072 // Since the lock has been released, |status| may no longer be |
| 1067 // accurate. It might read GET_WORK_WAIT even if there are tasks | 1073 // accurate. It might read GET_WORK_WAIT even if there are tasks |
| 1068 // ready to perform work. Jump to the top of the loop to recalculate | 1074 // ready to perform work. Jump to the top of the loop to recalculate |
| 1069 // |status|. | 1075 // |status|. |
| 1070 continue; | 1076 continue; |
| 1071 } | 1077 } |
| 1072 | 1078 |
| 1073 waiting_thread_count_++; | 1079 waiting_thread_count_++; |
| 1074 | 1080 |
| 1075 switch (status) { | 1081 switch (status) { |
| 1076 case GET_WORK_NOT_FOUND: | 1082 case GET_WORK_NOT_FOUND: |
| 1077 has_work_cv_.Wait(); | 1083 has_work_cv_.Wait(); |
| 1078 break; | 1084 break; |
| 1079 case GET_WORK_WAIT: | 1085 case GET_WORK_WAIT: |
| 1080 has_work_cv_.TimedWait(wait_time); | 1086 has_work_cv_.TimedWait(wait_time); |
| 1081 break; | 1087 break; |
| 1082 default: | 1088 default: |
| 1083 NOTREACHED(); | 1089 NOTREACHED(); |
| 1084 } | 1090 } |
| 1085 waiting_thread_count_--; | 1091 waiting_thread_count_--; |
| 1086 } | 1092 } |
| 1093 // |delete_these_outside_lock| should have been cleared via |
| 1094 // DeleteWithoutLock() above already. |
| 1095 DCHECK(delete_these_outside_lock.empty()); |
| 1087 } | 1096 } |
| 1088 } // Release lock_. | 1097 } // Release lock_. |
| 1089 | 1098 |
| 1090 // We noticed we should exit. Wake up the next worker so it knows it should | 1099 // We noticed we should exit. Wake up the next worker so it knows it should |
| 1091 // exit as well (because the Shutdown() code only signals once). | 1100 // exit as well (because the Shutdown() code only signals once). |
| 1092 SignalHasWork(); | 1101 SignalHasWork(); |
| 1093 | 1102 |
| 1094 // Possibly unblock shutdown. | 1103 // Possibly unblock shutdown. |
| 1095 can_shutdown_cv_.Signal(); | 1104 can_shutdown_cv_.Signal(); |
| 1096 } | 1105 } |
| 1097 | 1106 |
| 1107 void SequencedWorkerPool::Inner::DeleteWithoutLock( |
| 1108 std::vector<SequencedTask>* tasks_to_delete, |
| 1109 Worker* this_worker) { |
| 1110 while (!tasks_to_delete->empty()) { |
| 1111 const SequencedTask& deleted_task = tasks_to_delete->back(); |
| 1112 this_worker->set_running_task_info( |
| 1113 SequenceToken(deleted_task.sequence_token_id), |
| 1114 deleted_task.shutdown_behavior); |
| 1115 tasks_to_delete->pop_back(); |
| 1116 } |
| 1117 this_worker->reset_running_task_info(); |
| 1118 } |
| 1119 |
| 1098 void SequencedWorkerPool::Inner::HandleCleanup() { | 1120 void SequencedWorkerPool::Inner::HandleCleanup() { |
| 1099 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); | 1121 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 1100 | 1122 |
| 1101 lock_.AssertAcquired(); | 1123 lock_.AssertAcquired(); |
| 1102 if (cleanup_state_ == CLEANUP_DONE) | 1124 if (cleanup_state_ == CLEANUP_DONE) |
| 1103 return; | 1125 return; |
| 1104 if (cleanup_state_ == CLEANUP_REQUESTED) { | 1126 if (cleanup_state_ == CLEANUP_REQUESTED) { |
| 1105 // We win, we get to do the cleanup as soon as the others wise up and idle. | 1127 // We win, we get to do the cleanup as soon as the others wise up and idle. |
| 1106 cleanup_state_ = CLEANUP_STARTING; | 1128 cleanup_state_ = CLEANUP_STARTING; |
| 1107 while (thread_being_created_ || | 1129 while (thread_being_created_ || |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1155 | 1177 |
| 1156 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 1178 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
| 1157 lock_.AssertAcquired(); | 1179 lock_.AssertAcquired(); |
| 1158 // We assume that we never create enough tasks to wrap around. | 1180 // We assume that we never create enough tasks to wrap around. |
| 1159 return next_sequence_task_number_++; | 1181 return next_sequence_task_number_++; |
| 1160 } | 1182 } |
| 1161 | 1183 |
| 1162 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 1184 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| 1163 SequencedTask* task, | 1185 SequencedTask* task, |
| 1164 TimeDelta* wait_time, | 1186 TimeDelta* wait_time, |
| 1165 std::vector<Closure>* delete_these_outside_lock) { | 1187 std::vector<SequencedTask>* delete_these_outside_lock) { |
| 1166 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); | 1188 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 1167 | 1189 |
| 1168 lock_.AssertAcquired(); | 1190 lock_.AssertAcquired(); |
| 1169 | 1191 |
| 1170 // Find the next task with a sequence token that's not currently in use. | 1192 // Find the next task with a sequence token that's not currently in use. |
| 1171 // If the token is in use, that means another thread is running something | 1193 // If the token is in use, that means another thread is running something |
| 1172 // in that sequence, and we can't run it without going out-of-order. | 1194 // in that sequence, and we can't run it without going out-of-order. |
| 1173 // | 1195 // |
| 1174 // This algorithm is simple and fair, but inefficient in some cases. For | 1196 // This algorithm is simple and fair, but inefficient in some cases. For |
| 1175 // example, say somebody schedules 1000 slow tasks with the same sequence | 1197 // example, say somebody schedules 1000 slow tasks with the same sequence |
| (...skipping 25 matching lines...) Expand all Loading... |
| 1201 unrunnable_tasks++; | 1223 unrunnable_tasks++; |
| 1202 ++i; | 1224 ++i; |
| 1203 continue; | 1225 continue; |
| 1204 } | 1226 } |
| 1205 | 1227 |
| 1206 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { | 1228 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { |
| 1207 // We're shutting down and the task we just found isn't blocking | 1229 // We're shutting down and the task we just found isn't blocking |
| 1208 // shutdown. Delete it and get more work. | 1230 // shutdown. Delete it and get more work. |
| 1209 // | 1231 // |
| 1210 // Note that we do not want to delete unrunnable tasks. Deleting a task | 1232 // Note that we do not want to delete unrunnable tasks. Deleting a task |
| 1211 // can have side effects (like freeing some objects) and deleting a | 1233 // can have side effects (like freeing some objects) and deleting a task |
| 1212 // task that's supposed to run after one that's currently running could | 1234 // that's supposed to run after one that's currently running could cause |
| 1213 // cause an obscure crash. | 1235 // an obscure crash. |
| 1214 // | 1236 // |
| 1215 // We really want to delete these tasks outside the lock in case the | 1237 // We really want to delete these tasks outside the lock in case the |
| 1216 // closures are holding refs to objects that want to post work from | 1238 // closures are holding refs to objects that want to post work from their |
| 1217 // their destructorss (which would deadlock). The closures are | 1239 // destructors (which would deadlock). The closures are internally |
| 1218 // internally refcounted, so we just need to keep a copy of them alive | 1240 // refcounted, so we just need to keep a copy of them alive until the lock |
| 1219 // until the lock is exited. The calling code can just clear() the | 1241 // is exited. The calling code can just clear() the vector they passed to |
| 1220 // vector they passed to us once the lock is exited to make this | 1242 // us once the lock is exited to make this happen. |
| 1221 // happen. | 1243 delete_these_outside_lock->push_back(*i); |
| 1222 delete_these_outside_lock->push_back(i->task); | |
| 1223 pending_tasks_.erase(i++); | 1244 pending_tasks_.erase(i++); |
| 1224 continue; | 1245 continue; |
| 1225 } | 1246 } |
| 1226 | 1247 |
| 1227 if (i->time_to_run > current_time) { | 1248 if (i->time_to_run > current_time) { |
| 1228 // The time to run has not come yet. | 1249 // The time to run has not come yet. |
| 1229 *wait_time = i->time_to_run - current_time; | 1250 *wait_time = i->time_to_run - current_time; |
| 1230 status = GET_WORK_WAIT; | 1251 status = GET_WORK_WAIT; |
| 1231 if (cleanup_state_ == CLEANUP_RUNNING) { | 1252 if (cleanup_state_ == CLEANUP_RUNNING) { |
| 1232 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. | 1253 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. |
| 1233 delete_these_outside_lock->push_back(i->task); | 1254 delete_these_outside_lock->push_back(*i); |
| 1234 pending_tasks_.erase(i); | 1255 pending_tasks_.erase(i); |
| 1235 } | 1256 } |
| 1236 break; | 1257 break; |
| 1237 } | 1258 } |
| 1238 | 1259 |
| 1239 // Found a runnable task. | 1260 // Found a runnable task. |
| 1240 *task = *i; | 1261 *task = *i; |
| 1241 pending_tasks_.erase(i); | 1262 pending_tasks_.erase(i); |
| 1242 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { | 1263 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
| 1243 blocking_shutdown_pending_task_count_--; | 1264 blocking_shutdown_pending_task_count_--; |
| (...skipping 360 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1604 bool SequencedWorkerPool::IsShutdownInProgress() { | 1625 bool SequencedWorkerPool::IsShutdownInProgress() { |
| 1605 return inner_->IsShutdownInProgress(); | 1626 return inner_->IsShutdownInProgress(); |
| 1606 } | 1627 } |
| 1607 | 1628 |
| 1608 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1629 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
| 1609 SequenceToken sequence_token) const { | 1630 SequenceToken sequence_token) const { |
| 1610 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1631 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
| 1611 } | 1632 } |
| 1612 | 1633 |
| 1613 } // namespace base | 1634 } // namespace base |
| OLD | NEW |