OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <list> | 9 #include <list> |
10 #include <map> | 10 #include <map> |
(...skipping 365 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
376 }; | 376 }; |
377 | 377 |
378 enum CleanupState { | 378 enum CleanupState { |
379 CLEANUP_REQUESTED, | 379 CLEANUP_REQUESTED, |
380 CLEANUP_STARTING, | 380 CLEANUP_STARTING, |
381 CLEANUP_RUNNING, | 381 CLEANUP_RUNNING, |
382 CLEANUP_FINISHING, | 382 CLEANUP_FINISHING, |
383 CLEANUP_DONE, | 383 CLEANUP_DONE, |
384 }; | 384 }; |
385 | 385 |
| 386 // Clears ScheduledTasks in |tasks_to_delete| while ensuring that |
| 387 // |this_worker| has the desired task info context during ~ScheduledTask() to |
| 388 // allow sequence-checking. |
| 389 void DeleteWithoutLock(std::vector<SequencedTask>* tasks_to_delete, |
| 390 Worker* this_worker); |
| 391 |
386 // Helper used by PostTask() to complete the work when redirection is on. | 392 // Helper used by PostTask() to complete the work when redirection is on. |
387 // Returns true if the task may run at some point in the future and false if | 393 // Returns true if the task may run at some point in the future and false if |
388 // it will definitely not run. | 394 // it will definitely not run. |
389 // Coalesce upon resolution of http://crbug.com/622400. | 395 // Coalesce upon resolution of http://crbug.com/622400. |
390 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, | 396 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, |
391 const TimeDelta& delay); | 397 const TimeDelta& delay); |
392 | 398 |
393 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| | 399 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| |
394 // and |traits|. | 400 // and |traits|. |
395 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( | 401 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( |
(...skipping 17 matching lines...) Expand all Loading... |
413 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run | 419 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run |
414 // immediately, and |task| is not filled in. Likewise, |wait_time| is | 420 // immediately, and |task| is not filled in. Likewise, |wait_time| is |
415 // filled in the time to wait until the next task to run. In this case, the | 421 // filled in the time to wait until the next task to run. In this case, the |
416 // caller should wait the time. | 422 // caller should wait the time. |
417 // | 423 // |
418 // In any case, the calling code should clear the given | 424 // In any case, the calling code should clear the given |
419 // delete_these_outside_lock vector the next time the lock is released. | 425 // delete_these_outside_lock vector the next time the lock is released. |
420 // See the implementation for a more detailed description. | 426 // See the implementation for a more detailed description. |
421 GetWorkStatus GetWork(SequencedTask* task, | 427 GetWorkStatus GetWork(SequencedTask* task, |
422 TimeDelta* wait_time, | 428 TimeDelta* wait_time, |
423 std::vector<Closure>* delete_these_outside_lock); | 429 std::vector<SequencedTask>* delete_these_outside_lock); |
424 | 430 |
425 void HandleCleanup(); | 431 void HandleCleanup(); |
426 | 432 |
427 // Peforms init and cleanup around running the given task. WillRun... | 433 // Peforms init and cleanup around running the given task. WillRun... |
428 // returns the value from PrepareToStartAdditionalThreadIfNecessary. | 434 // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
429 // The calling code should call FinishStartingAdditionalThread once the | 435 // The calling code should call FinishStartingAdditionalThread once the |
430 // lock is released if the return values is nonzero. | 436 // lock is released if the return values is nonzero. |
431 int WillRunWorkerTask(const SequencedTask& task); | 437 int WillRunWorkerTask(const SequencedTask& task); |
432 void DidRunWorkerTask(const SequencedTask& task); | 438 void DidRunWorkerTask(const SequencedTask& task); |
433 | 439 |
(...skipping 542 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
976 while (true) { | 982 while (true) { |
977 #if defined(OS_MACOSX) | 983 #if defined(OS_MACOSX) |
978 base::mac::ScopedNSAutoreleasePool autorelease_pool; | 984 base::mac::ScopedNSAutoreleasePool autorelease_pool; |
979 #endif | 985 #endif |
980 | 986 |
981 HandleCleanup(); | 987 HandleCleanup(); |
982 | 988 |
983 // See GetWork for what delete_these_outside_lock is doing. | 989 // See GetWork for what delete_these_outside_lock is doing. |
984 SequencedTask task; | 990 SequencedTask task; |
985 TimeDelta wait_time; | 991 TimeDelta wait_time; |
986 std::vector<Closure> delete_these_outside_lock; | 992 std::vector<SequencedTask> delete_these_outside_lock; |
987 GetWorkStatus status = | 993 GetWorkStatus status = |
988 GetWork(&task, &wait_time, &delete_these_outside_lock); | 994 GetWork(&task, &wait_time, &delete_these_outside_lock); |
989 if (status == GET_WORK_FOUND) { | 995 if (status == GET_WORK_FOUND) { |
990 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); | 996 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); |
991 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), | 997 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), |
992 "SequencedWorkerPool::Inner::PostTask", | 998 "SequencedWorkerPool::Inner::PostTask", |
993 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), | 999 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), |
994 TRACE_EVENT_FLAG_FLOW_IN); | 1000 TRACE_EVENT_FLAG_FLOW_IN); |
995 int new_thread_id = WillRunWorkerTask(task); | 1001 int new_thread_id = WillRunWorkerTask(task); |
996 { | 1002 { |
997 AutoUnlock unlock(lock_); | 1003 AutoUnlock unlock(lock_); |
998 // There may be more work available, so wake up another | 1004 // There may be more work available, so wake up another |
999 // worker thread. (Technically not required, since we | 1005 // worker thread. (Technically not required, since we |
1000 // already get a signal for each new task, but it doesn't | 1006 // already get a signal for each new task, but it doesn't |
1001 // hurt.) | 1007 // hurt.) |
1002 SignalHasWork(); | 1008 SignalHasWork(); |
1003 delete_these_outside_lock.clear(); | 1009 DeleteWithoutLock(&delete_these_outside_lock, this_worker); |
1004 | 1010 |
1005 // Complete thread creation outside the lock if necessary. | 1011 // Complete thread creation outside the lock if necessary. |
1006 if (new_thread_id) | 1012 if (new_thread_id) |
1007 FinishStartingAdditionalThread(new_thread_id); | 1013 FinishStartingAdditionalThread(new_thread_id); |
1008 | 1014 |
1009 this_worker->set_running_task_info( | 1015 this_worker->set_running_task_info( |
1010 SequenceToken(task.sequence_token_id), task.shutdown_behavior); | 1016 SequenceToken(task.sequence_token_id), task.shutdown_behavior); |
1011 | 1017 |
1012 tracked_objects::TaskStopwatch stopwatch; | 1018 tracked_objects::TaskStopwatch stopwatch; |
1013 stopwatch.Start(); | 1019 stopwatch.Start(); |
(...skipping 10 matching lines...) Expand all Loading... |
1024 // still works. | 1030 // still works. |
1025 task.task = Closure(); | 1031 task.task = Closure(); |
1026 | 1032 |
1027 this_worker->reset_running_task_info(); | 1033 this_worker->reset_running_task_info(); |
1028 } | 1034 } |
1029 DidRunWorkerTask(task); // Must be done inside the lock. | 1035 DidRunWorkerTask(task); // Must be done inside the lock. |
1030 } else if (cleanup_state_ == CLEANUP_RUNNING) { | 1036 } else if (cleanup_state_ == CLEANUP_RUNNING) { |
1031 switch (status) { | 1037 switch (status) { |
1032 case GET_WORK_WAIT: { | 1038 case GET_WORK_WAIT: { |
1033 AutoUnlock unlock(lock_); | 1039 AutoUnlock unlock(lock_); |
1034 delete_these_outside_lock.clear(); | 1040 DeleteWithoutLock(&delete_these_outside_lock, this_worker); |
1035 } | 1041 } |
1036 break; | 1042 break; |
1037 case GET_WORK_NOT_FOUND: | 1043 case GET_WORK_NOT_FOUND: |
1038 CHECK(delete_these_outside_lock.empty()); | 1044 CHECK(delete_these_outside_lock.empty()); |
1039 cleanup_state_ = CLEANUP_FINISHING; | 1045 cleanup_state_ = CLEANUP_FINISHING; |
1040 cleanup_cv_.Broadcast(); | 1046 cleanup_cv_.Broadcast(); |
1041 break; | 1047 break; |
1042 default: | 1048 default: |
1043 NOTREACHED(); | 1049 NOTREACHED(); |
1044 } | 1050 } |
1045 } else { | 1051 } else { |
1046 // When we're terminating and there's no more work, we can | 1052 // When we're terminating and there's no more work, we can |
1047 // shut down, other workers can complete any pending or new tasks. | 1053 // shut down, other workers can complete any pending or new tasks. |
1048 // We can get additional tasks posted after shutdown_called_ is set | 1054 // We can get additional tasks posted after shutdown_called_ is set |
1049 // but only worker threads are allowed to post tasks at that time, and | 1055 // but only worker threads are allowed to post tasks at that time, and |
1050 // the workers responsible for posting those tasks will be available | 1056 // the workers responsible for posting those tasks will be available |
1051 // to run them. Also, there may be some tasks stuck behind running | 1057 // to run them. Also, there may be some tasks stuck behind running |
1052 // ones with the same sequence token, but additional threads won't | 1058 // ones with the same sequence token, but additional threads won't |
1053 // help this case. | 1059 // help this case. |
1054 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { | 1060 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { |
1055 AutoUnlock unlock(lock_); | 1061 AutoUnlock unlock(lock_); |
1056 delete_these_outside_lock.clear(); | 1062 DeleteWithoutLock(&delete_these_outside_lock, this_worker); |
1057 break; | 1063 break; |
1058 } | 1064 } |
1059 | 1065 |
1060 // No work was found, but there are tasks that need deletion. The | 1066 // No work was found, but there are tasks that need deletion. The |
1061 // deletion must happen outside of the lock. | 1067 // deletion must happen outside of the lock. |
1062 if (delete_these_outside_lock.size()) { | 1068 if (delete_these_outside_lock.size()) { |
1063 AutoUnlock unlock(lock_); | 1069 AutoUnlock unlock(lock_); |
1064 delete_these_outside_lock.clear(); | 1070 DeleteWithoutLock(&delete_these_outside_lock, this_worker); |
1065 | 1071 |
1066 // Since the lock has been released, |status| may no longer be | 1072 // Since the lock has been released, |status| may no longer be |
1067 // accurate. It might read GET_WORK_WAIT even if there are tasks | 1073 // accurate. It might read GET_WORK_WAIT even if there are tasks |
1068 // ready to perform work. Jump to the top of the loop to recalculate | 1074 // ready to perform work. Jump to the top of the loop to recalculate |
1069 // |status|. | 1075 // |status|. |
1070 continue; | 1076 continue; |
1071 } | 1077 } |
1072 | 1078 |
1073 waiting_thread_count_++; | 1079 waiting_thread_count_++; |
1074 | 1080 |
1075 switch (status) { | 1081 switch (status) { |
1076 case GET_WORK_NOT_FOUND: | 1082 case GET_WORK_NOT_FOUND: |
1077 has_work_cv_.Wait(); | 1083 has_work_cv_.Wait(); |
1078 break; | 1084 break; |
1079 case GET_WORK_WAIT: | 1085 case GET_WORK_WAIT: |
1080 has_work_cv_.TimedWait(wait_time); | 1086 has_work_cv_.TimedWait(wait_time); |
1081 break; | 1087 break; |
1082 default: | 1088 default: |
1083 NOTREACHED(); | 1089 NOTREACHED(); |
1084 } | 1090 } |
1085 waiting_thread_count_--; | 1091 waiting_thread_count_--; |
1086 } | 1092 } |
| 1093 // |delete_these_outside_lock| should have been cleared via |
| 1094 // DeleteWithoutLock() above already. |
| 1095 DCHECK(delete_these_outside_lock.empty()); |
1087 } | 1096 } |
1088 } // Release lock_. | 1097 } // Release lock_. |
1089 | 1098 |
1090 // We noticed we should exit. Wake up the next worker so it knows it should | 1099 // We noticed we should exit. Wake up the next worker so it knows it should |
1091 // exit as well (because the Shutdown() code only signals once). | 1100 // exit as well (because the Shutdown() code only signals once). |
1092 SignalHasWork(); | 1101 SignalHasWork(); |
1093 | 1102 |
1094 // Possibly unblock shutdown. | 1103 // Possibly unblock shutdown. |
1095 can_shutdown_cv_.Signal(); | 1104 can_shutdown_cv_.Signal(); |
1096 } | 1105 } |
1097 | 1106 |
| 1107 void SequencedWorkerPool::Inner::DeleteWithoutLock( |
| 1108 std::vector<SequencedTask>* tasks_to_delete, |
| 1109 Worker* this_worker) { |
| 1110 while (!tasks_to_delete->empty()) { |
| 1111 const SequencedTask& deleted_task = tasks_to_delete->back(); |
| 1112 this_worker->set_running_task_info( |
| 1113 SequenceToken(deleted_task.sequence_token_id), |
| 1114 deleted_task.shutdown_behavior); |
| 1115 tasks_to_delete->pop_back(); |
| 1116 } |
| 1117 this_worker->reset_running_task_info(); |
| 1118 } |
| 1119 |
1098 void SequencedWorkerPool::Inner::HandleCleanup() { | 1120 void SequencedWorkerPool::Inner::HandleCleanup() { |
1099 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); | 1121 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1100 | 1122 |
1101 lock_.AssertAcquired(); | 1123 lock_.AssertAcquired(); |
1102 if (cleanup_state_ == CLEANUP_DONE) | 1124 if (cleanup_state_ == CLEANUP_DONE) |
1103 return; | 1125 return; |
1104 if (cleanup_state_ == CLEANUP_REQUESTED) { | 1126 if (cleanup_state_ == CLEANUP_REQUESTED) { |
1105 // We win, we get to do the cleanup as soon as the others wise up and idle. | 1127 // We win, we get to do the cleanup as soon as the others wise up and idle. |
1106 cleanup_state_ = CLEANUP_STARTING; | 1128 cleanup_state_ = CLEANUP_STARTING; |
1107 while (thread_being_created_ || | 1129 while (thread_being_created_ || |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1155 | 1177 |
1156 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 1178 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
1157 lock_.AssertAcquired(); | 1179 lock_.AssertAcquired(); |
1158 // We assume that we never create enough tasks to wrap around. | 1180 // We assume that we never create enough tasks to wrap around. |
1159 return next_sequence_task_number_++; | 1181 return next_sequence_task_number_++; |
1160 } | 1182 } |
1161 | 1183 |
1162 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 1184 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
1163 SequencedTask* task, | 1185 SequencedTask* task, |
1164 TimeDelta* wait_time, | 1186 TimeDelta* wait_time, |
1165 std::vector<Closure>* delete_these_outside_lock) { | 1187 std::vector<SequencedTask>* delete_these_outside_lock) { |
1166 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); | 1188 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1167 | 1189 |
1168 lock_.AssertAcquired(); | 1190 lock_.AssertAcquired(); |
1169 | 1191 |
1170 // Find the next task with a sequence token that's not currently in use. | 1192 // Find the next task with a sequence token that's not currently in use. |
1171 // If the token is in use, that means another thread is running something | 1193 // If the token is in use, that means another thread is running something |
1172 // in that sequence, and we can't run it without going out-of-order. | 1194 // in that sequence, and we can't run it without going out-of-order. |
1173 // | 1195 // |
1174 // This algorithm is simple and fair, but inefficient in some cases. For | 1196 // This algorithm is simple and fair, but inefficient in some cases. For |
1175 // example, say somebody schedules 1000 slow tasks with the same sequence | 1197 // example, say somebody schedules 1000 slow tasks with the same sequence |
(...skipping 25 matching lines...) Expand all Loading... |
1201 unrunnable_tasks++; | 1223 unrunnable_tasks++; |
1202 ++i; | 1224 ++i; |
1203 continue; | 1225 continue; |
1204 } | 1226 } |
1205 | 1227 |
1206 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { | 1228 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { |
1207 // We're shutting down and the task we just found isn't blocking | 1229 // We're shutting down and the task we just found isn't blocking |
1208 // shutdown. Delete it and get more work. | 1230 // shutdown. Delete it and get more work. |
1209 // | 1231 // |
1210 // Note that we do not want to delete unrunnable tasks. Deleting a task | 1232 // Note that we do not want to delete unrunnable tasks. Deleting a task |
1211 // can have side effects (like freeing some objects) and deleting a | 1233 // can have side effects (like freeing some objects) and deleting a task |
1212 // task that's supposed to run after one that's currently running could | 1234 // that's supposed to run after one that's currently running could cause |
1213 // cause an obscure crash. | 1235 // an obscure crash. |
1214 // | 1236 // |
1215 // We really want to delete these tasks outside the lock in case the | 1237 // We really want to delete these tasks outside the lock in case the |
1216 // closures are holding refs to objects that want to post work from | 1238 // closures are holding refs to objects that want to post work from their |
1217 // their destructorss (which would deadlock). The closures are | 1239 // destructors (which would deadlock). The closures are internally |
1218 // internally refcounted, so we just need to keep a copy of them alive | 1240 // refcounted, so we just need to keep a copy of them alive until the lock |
1219 // until the lock is exited. The calling code can just clear() the | 1241 // is exited. The calling code can just clear() the vector they passed to |
1220 // vector they passed to us once the lock is exited to make this | 1242 // us once the lock is exited to make this happen. |
1221 // happen. | 1243 delete_these_outside_lock->push_back(*i); |
1222 delete_these_outside_lock->push_back(i->task); | |
1223 pending_tasks_.erase(i++); | 1244 pending_tasks_.erase(i++); |
1224 continue; | 1245 continue; |
1225 } | 1246 } |
1226 | 1247 |
1227 if (i->time_to_run > current_time) { | 1248 if (i->time_to_run > current_time) { |
1228 // The time to run has not come yet. | 1249 // The time to run has not come yet. |
1229 *wait_time = i->time_to_run - current_time; | 1250 *wait_time = i->time_to_run - current_time; |
1230 status = GET_WORK_WAIT; | 1251 status = GET_WORK_WAIT; |
1231 if (cleanup_state_ == CLEANUP_RUNNING) { | 1252 if (cleanup_state_ == CLEANUP_RUNNING) { |
1232 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. | 1253 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. |
1233 delete_these_outside_lock->push_back(i->task); | 1254 delete_these_outside_lock->push_back(*i); |
1234 pending_tasks_.erase(i); | 1255 pending_tasks_.erase(i); |
1235 } | 1256 } |
1236 break; | 1257 break; |
1237 } | 1258 } |
1238 | 1259 |
1239 // Found a runnable task. | 1260 // Found a runnable task. |
1240 *task = *i; | 1261 *task = *i; |
1241 pending_tasks_.erase(i); | 1262 pending_tasks_.erase(i); |
1242 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { | 1263 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
1243 blocking_shutdown_pending_task_count_--; | 1264 blocking_shutdown_pending_task_count_--; |
(...skipping 360 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1604 bool SequencedWorkerPool::IsShutdownInProgress() { | 1625 bool SequencedWorkerPool::IsShutdownInProgress() { |
1605 return inner_->IsShutdownInProgress(); | 1626 return inner_->IsShutdownInProgress(); |
1606 } | 1627 } |
1607 | 1628 |
1608 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1629 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
1609 SequenceToken sequence_token) const { | 1630 SequenceToken sequence_token) const { |
1610 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1631 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
1611 } | 1632 } |
1612 | 1633 |
1613 } // namespace base | 1634 } // namespace base |
OLD | NEW |