OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <list> | 9 #include <list> |
10 #include <map> | 10 #include <map> |
(...skipping 365 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
376 }; | 376 }; |
377 | 377 |
378 enum CleanupState { | 378 enum CleanupState { |
379 CLEANUP_REQUESTED, | 379 CLEANUP_REQUESTED, |
380 CLEANUP_STARTING, | 380 CLEANUP_STARTING, |
381 CLEANUP_RUNNING, | 381 CLEANUP_RUNNING, |
382 CLEANUP_FINISHING, | 382 CLEANUP_FINISHING, |
383 CLEANUP_DONE, | 383 CLEANUP_DONE, |
384 }; | 384 }; |
385 | 385 |
386 // Clears ScheduledTasks in |tasks_to_delete| while ensuring that | |
387 // |this_worker| has the desired task info context during ~ScheduledTask() to | |
388 // allow sequence-checking. | |
389 void DeleteWithoutLock(std::vector<SequencedTask>* tasks_to_delete, | |
390 Worker* this_worker); | |
391 | |
392 // Helper used by PostTask() to complete the work when redirection is on. | 386 // Helper used by PostTask() to complete the work when redirection is on. |
393 // Returns true if the task may run at some point in the future and false if | 387 // Returns true if the task may run at some point in the future and false if |
394 // it will definitely not run. | 388 // it will definitely not run. |
395 // Coalesce upon resolution of http://crbug.com/622400. | 389 // Coalesce upon resolution of http://crbug.com/622400. |
396 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, | 390 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, |
397 const TimeDelta& delay); | 391 const TimeDelta& delay); |
398 | 392 |
399 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| | 393 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| |
400 // and |traits|. | 394 // and |traits|. |
401 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( | 395 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( |
(...skipping 17 matching lines...) Expand all Loading... |
419 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run | 413 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run |
420 // immediately, and |task| is not filled in. Likewise, |wait_time| is | 414 // immediately, and |task| is not filled in. Likewise, |wait_time| is |
421 // filled in the time to wait until the next task to run. In this case, the | 415 // filled in the time to wait until the next task to run. In this case, the |
422 // caller should wait the time. | 416 // caller should wait the time. |
423 // | 417 // |
424 // In any case, the calling code should clear the given | 418 // In any case, the calling code should clear the given |
425 // delete_these_outside_lock vector the next time the lock is released. | 419 // delete_these_outside_lock vector the next time the lock is released. |
426 // See the implementation for a more detailed description. | 420 // See the implementation for a more detailed description. |
427 GetWorkStatus GetWork(SequencedTask* task, | 421 GetWorkStatus GetWork(SequencedTask* task, |
428 TimeDelta* wait_time, | 422 TimeDelta* wait_time, |
429 std::vector<SequencedTask>* delete_these_outside_lock); | 423 std::vector<Closure>* delete_these_outside_lock); |
430 | 424 |
431 void HandleCleanup(); | 425 void HandleCleanup(); |
432 | 426 |
433 // Peforms init and cleanup around running the given task. WillRun... | 427 // Peforms init and cleanup around running the given task. WillRun... |
434 // returns the value from PrepareToStartAdditionalThreadIfNecessary. | 428 // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
435 // The calling code should call FinishStartingAdditionalThread once the | 429 // The calling code should call FinishStartingAdditionalThread once the |
436 // lock is released if the return values is nonzero. | 430 // lock is released if the return values is nonzero. |
437 int WillRunWorkerTask(const SequencedTask& task); | 431 int WillRunWorkerTask(const SequencedTask& task); |
438 void DidRunWorkerTask(const SequencedTask& task); | 432 void DidRunWorkerTask(const SequencedTask& task); |
439 | 433 |
(...skipping 542 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
982 while (true) { | 976 while (true) { |
983 #if defined(OS_MACOSX) | 977 #if defined(OS_MACOSX) |
984 base::mac::ScopedNSAutoreleasePool autorelease_pool; | 978 base::mac::ScopedNSAutoreleasePool autorelease_pool; |
985 #endif | 979 #endif |
986 | 980 |
987 HandleCleanup(); | 981 HandleCleanup(); |
988 | 982 |
989 // See GetWork for what delete_these_outside_lock is doing. | 983 // See GetWork for what delete_these_outside_lock is doing. |
990 SequencedTask task; | 984 SequencedTask task; |
991 TimeDelta wait_time; | 985 TimeDelta wait_time; |
992 std::vector<SequencedTask> delete_these_outside_lock; | 986 std::vector<Closure> delete_these_outside_lock; |
993 GetWorkStatus status = | 987 GetWorkStatus status = |
994 GetWork(&task, &wait_time, &delete_these_outside_lock); | 988 GetWork(&task, &wait_time, &delete_these_outside_lock); |
995 if (status == GET_WORK_FOUND) { | 989 if (status == GET_WORK_FOUND) { |
996 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); | 990 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); |
997 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), | 991 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), |
998 "SequencedWorkerPool::Inner::PostTask", | 992 "SequencedWorkerPool::Inner::PostTask", |
999 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), | 993 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), |
1000 TRACE_EVENT_FLAG_FLOW_IN); | 994 TRACE_EVENT_FLAG_FLOW_IN); |
1001 int new_thread_id = WillRunWorkerTask(task); | 995 int new_thread_id = WillRunWorkerTask(task); |
1002 { | 996 { |
1003 AutoUnlock unlock(lock_); | 997 AutoUnlock unlock(lock_); |
1004 // There may be more work available, so wake up another | 998 // There may be more work available, so wake up another |
1005 // worker thread. (Technically not required, since we | 999 // worker thread. (Technically not required, since we |
1006 // already get a signal for each new task, but it doesn't | 1000 // already get a signal for each new task, but it doesn't |
1007 // hurt.) | 1001 // hurt.) |
1008 SignalHasWork(); | 1002 SignalHasWork(); |
1009 DeleteWithoutLock(&delete_these_outside_lock, this_worker); | 1003 delete_these_outside_lock.clear(); |
1010 | 1004 |
1011 // Complete thread creation outside the lock if necessary. | 1005 // Complete thread creation outside the lock if necessary. |
1012 if (new_thread_id) | 1006 if (new_thread_id) |
1013 FinishStartingAdditionalThread(new_thread_id); | 1007 FinishStartingAdditionalThread(new_thread_id); |
1014 | 1008 |
1015 this_worker->set_running_task_info( | 1009 this_worker->set_running_task_info( |
1016 SequenceToken(task.sequence_token_id), task.shutdown_behavior); | 1010 SequenceToken(task.sequence_token_id), task.shutdown_behavior); |
1017 | 1011 |
1018 tracked_objects::TaskStopwatch stopwatch; | 1012 tracked_objects::TaskStopwatch stopwatch; |
1019 stopwatch.Start(); | 1013 stopwatch.Start(); |
(...skipping 10 matching lines...) Expand all Loading... |
1030 // still works. | 1024 // still works. |
1031 task.task = Closure(); | 1025 task.task = Closure(); |
1032 | 1026 |
1033 this_worker->reset_running_task_info(); | 1027 this_worker->reset_running_task_info(); |
1034 } | 1028 } |
1035 DidRunWorkerTask(task); // Must be done inside the lock. | 1029 DidRunWorkerTask(task); // Must be done inside the lock. |
1036 } else if (cleanup_state_ == CLEANUP_RUNNING) { | 1030 } else if (cleanup_state_ == CLEANUP_RUNNING) { |
1037 switch (status) { | 1031 switch (status) { |
1038 case GET_WORK_WAIT: { | 1032 case GET_WORK_WAIT: { |
1039 AutoUnlock unlock(lock_); | 1033 AutoUnlock unlock(lock_); |
1040 DeleteWithoutLock(&delete_these_outside_lock, this_worker); | 1034 delete_these_outside_lock.clear(); |
1041 } | 1035 } |
1042 break; | 1036 break; |
1043 case GET_WORK_NOT_FOUND: | 1037 case GET_WORK_NOT_FOUND: |
1044 CHECK(delete_these_outside_lock.empty()); | 1038 CHECK(delete_these_outside_lock.empty()); |
1045 cleanup_state_ = CLEANUP_FINISHING; | 1039 cleanup_state_ = CLEANUP_FINISHING; |
1046 cleanup_cv_.Broadcast(); | 1040 cleanup_cv_.Broadcast(); |
1047 break; | 1041 break; |
1048 default: | 1042 default: |
1049 NOTREACHED(); | 1043 NOTREACHED(); |
1050 } | 1044 } |
1051 } else { | 1045 } else { |
1052 // When we're terminating and there's no more work, we can | 1046 // When we're terminating and there's no more work, we can |
1053 // shut down, other workers can complete any pending or new tasks. | 1047 // shut down, other workers can complete any pending or new tasks. |
1054 // We can get additional tasks posted after shutdown_called_ is set | 1048 // We can get additional tasks posted after shutdown_called_ is set |
1055 // but only worker threads are allowed to post tasks at that time, and | 1049 // but only worker threads are allowed to post tasks at that time, and |
1056 // the workers responsible for posting those tasks will be available | 1050 // the workers responsible for posting those tasks will be available |
1057 // to run them. Also, there may be some tasks stuck behind running | 1051 // to run them. Also, there may be some tasks stuck behind running |
1058 // ones with the same sequence token, but additional threads won't | 1052 // ones with the same sequence token, but additional threads won't |
1059 // help this case. | 1053 // help this case. |
1060 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { | 1054 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { |
1061 AutoUnlock unlock(lock_); | 1055 AutoUnlock unlock(lock_); |
1062 DeleteWithoutLock(&delete_these_outside_lock, this_worker); | 1056 delete_these_outside_lock.clear(); |
1063 break; | 1057 break; |
1064 } | 1058 } |
1065 | 1059 |
1066 // No work was found, but there are tasks that need deletion. The | 1060 // No work was found, but there are tasks that need deletion. The |
1067 // deletion must happen outside of the lock. | 1061 // deletion must happen outside of the lock. |
1068 if (delete_these_outside_lock.size()) { | 1062 if (delete_these_outside_lock.size()) { |
1069 AutoUnlock unlock(lock_); | 1063 AutoUnlock unlock(lock_); |
1070 DeleteWithoutLock(&delete_these_outside_lock, this_worker); | 1064 delete_these_outside_lock.clear(); |
1071 | 1065 |
1072 // Since the lock has been released, |status| may no longer be | 1066 // Since the lock has been released, |status| may no longer be |
1073 // accurate. It might read GET_WORK_WAIT even if there are tasks | 1067 // accurate. It might read GET_WORK_WAIT even if there are tasks |
1074 // ready to perform work. Jump to the top of the loop to recalculate | 1068 // ready to perform work. Jump to the top of the loop to recalculate |
1075 // |status|. | 1069 // |status|. |
1076 continue; | 1070 continue; |
1077 } | 1071 } |
1078 | 1072 |
1079 waiting_thread_count_++; | 1073 waiting_thread_count_++; |
1080 | 1074 |
1081 switch (status) { | 1075 switch (status) { |
1082 case GET_WORK_NOT_FOUND: | 1076 case GET_WORK_NOT_FOUND: |
1083 has_work_cv_.Wait(); | 1077 has_work_cv_.Wait(); |
1084 break; | 1078 break; |
1085 case GET_WORK_WAIT: | 1079 case GET_WORK_WAIT: |
1086 has_work_cv_.TimedWait(wait_time); | 1080 has_work_cv_.TimedWait(wait_time); |
1087 break; | 1081 break; |
1088 default: | 1082 default: |
1089 NOTREACHED(); | 1083 NOTREACHED(); |
1090 } | 1084 } |
1091 waiting_thread_count_--; | 1085 waiting_thread_count_--; |
1092 } | 1086 } |
1093 // |delete_these_outside_lock| should have been cleared via | |
1094 // DeleteWithoutLock() above already. | |
1095 DCHECK(delete_these_outside_lock.empty()); | |
1096 } | 1087 } |
1097 } // Release lock_. | 1088 } // Release lock_. |
1098 | 1089 |
1099 // We noticed we should exit. Wake up the next worker so it knows it should | 1090 // We noticed we should exit. Wake up the next worker so it knows it should |
1100 // exit as well (because the Shutdown() code only signals once). | 1091 // exit as well (because the Shutdown() code only signals once). |
1101 SignalHasWork(); | 1092 SignalHasWork(); |
1102 | 1093 |
1103 // Possibly unblock shutdown. | 1094 // Possibly unblock shutdown. |
1104 can_shutdown_cv_.Signal(); | 1095 can_shutdown_cv_.Signal(); |
1105 } | 1096 } |
1106 | 1097 |
1107 void SequencedWorkerPool::Inner::DeleteWithoutLock( | |
1108 std::vector<SequencedTask>* tasks_to_delete, | |
1109 Worker* this_worker) { | |
1110 while (!tasks_to_delete->empty()) { | |
1111 const SequencedTask& deleted_task = tasks_to_delete->back(); | |
1112 this_worker->set_running_task_info( | |
1113 SequenceToken(deleted_task.sequence_token_id), | |
1114 deleted_task.shutdown_behavior); | |
1115 tasks_to_delete->pop_back(); | |
1116 } | |
1117 this_worker->reset_running_task_info(); | |
1118 } | |
1119 | |
1120 void SequencedWorkerPool::Inner::HandleCleanup() { | 1098 void SequencedWorkerPool::Inner::HandleCleanup() { |
1121 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); | 1099 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1122 | 1100 |
1123 lock_.AssertAcquired(); | 1101 lock_.AssertAcquired(); |
1124 if (cleanup_state_ == CLEANUP_DONE) | 1102 if (cleanup_state_ == CLEANUP_DONE) |
1125 return; | 1103 return; |
1126 if (cleanup_state_ == CLEANUP_REQUESTED) { | 1104 if (cleanup_state_ == CLEANUP_REQUESTED) { |
1127 // We win, we get to do the cleanup as soon as the others wise up and idle. | 1105 // We win, we get to do the cleanup as soon as the others wise up and idle. |
1128 cleanup_state_ = CLEANUP_STARTING; | 1106 cleanup_state_ = CLEANUP_STARTING; |
1129 while (thread_being_created_ || | 1107 while (thread_being_created_ || |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1177 | 1155 |
1178 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 1156 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
1179 lock_.AssertAcquired(); | 1157 lock_.AssertAcquired(); |
1180 // We assume that we never create enough tasks to wrap around. | 1158 // We assume that we never create enough tasks to wrap around. |
1181 return next_sequence_task_number_++; | 1159 return next_sequence_task_number_++; |
1182 } | 1160 } |
1183 | 1161 |
1184 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 1162 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
1185 SequencedTask* task, | 1163 SequencedTask* task, |
1186 TimeDelta* wait_time, | 1164 TimeDelta* wait_time, |
1187 std::vector<SequencedTask>* delete_these_outside_lock) { | 1165 std::vector<Closure>* delete_these_outside_lock) { |
1188 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); | 1166 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
1189 | 1167 |
1190 lock_.AssertAcquired(); | 1168 lock_.AssertAcquired(); |
1191 | 1169 |
1192 // Find the next task with a sequence token that's not currently in use. | 1170 // Find the next task with a sequence token that's not currently in use. |
1193 // If the token is in use, that means another thread is running something | 1171 // If the token is in use, that means another thread is running something |
1194 // in that sequence, and we can't run it without going out-of-order. | 1172 // in that sequence, and we can't run it without going out-of-order. |
1195 // | 1173 // |
1196 // This algorithm is simple and fair, but inefficient in some cases. For | 1174 // This algorithm is simple and fair, but inefficient in some cases. For |
1197 // example, say somebody schedules 1000 slow tasks with the same sequence | 1175 // example, say somebody schedules 1000 slow tasks with the same sequence |
(...skipping 25 matching lines...) Expand all Loading... |
1223 unrunnable_tasks++; | 1201 unrunnable_tasks++; |
1224 ++i; | 1202 ++i; |
1225 continue; | 1203 continue; |
1226 } | 1204 } |
1227 | 1205 |
1228 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { | 1206 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { |
1229 // We're shutting down and the task we just found isn't blocking | 1207 // We're shutting down and the task we just found isn't blocking |
1230 // shutdown. Delete it and get more work. | 1208 // shutdown. Delete it and get more work. |
1231 // | 1209 // |
1232 // Note that we do not want to delete unrunnable tasks. Deleting a task | 1210 // Note that we do not want to delete unrunnable tasks. Deleting a task |
1233 // can have side effects (like freeing some objects) and deleting a task | 1211 // can have side effects (like freeing some objects) and deleting a |
1234 // that's supposed to run after one that's currently running could cause | 1212 // task that's supposed to run after one that's currently running could |
1235 // an obscure crash. | 1213 // cause an obscure crash. |
1236 // | 1214 // |
1237 // We really want to delete these tasks outside the lock in case the | 1215 // We really want to delete these tasks outside the lock in case the |
1238 // closures are holding refs to objects that want to post work from their | 1216 // closures are holding refs to objects that want to post work from |
1239 // destructors (which would deadlock). The closures are internally | 1217 // their destructorss (which would deadlock). The closures are |
1240 // refcounted, so we just need to keep a copy of them alive until the lock | 1218 // internally refcounted, so we just need to keep a copy of them alive |
1241 // is exited. The calling code can just clear() the vector they passed to | 1219 // until the lock is exited. The calling code can just clear() the |
1242 // us once the lock is exited to make this happen. | 1220 // vector they passed to us once the lock is exited to make this |
1243 delete_these_outside_lock->push_back(*i); | 1221 // happen. |
| 1222 delete_these_outside_lock->push_back(i->task); |
1244 pending_tasks_.erase(i++); | 1223 pending_tasks_.erase(i++); |
1245 continue; | 1224 continue; |
1246 } | 1225 } |
1247 | 1226 |
1248 if (i->time_to_run > current_time) { | 1227 if (i->time_to_run > current_time) { |
1249 // The time to run has not come yet. | 1228 // The time to run has not come yet. |
1250 *wait_time = i->time_to_run - current_time; | 1229 *wait_time = i->time_to_run - current_time; |
1251 status = GET_WORK_WAIT; | 1230 status = GET_WORK_WAIT; |
1252 if (cleanup_state_ == CLEANUP_RUNNING) { | 1231 if (cleanup_state_ == CLEANUP_RUNNING) { |
1253 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. | 1232 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. |
1254 delete_these_outside_lock->push_back(*i); | 1233 delete_these_outside_lock->push_back(i->task); |
1255 pending_tasks_.erase(i); | 1234 pending_tasks_.erase(i); |
1256 } | 1235 } |
1257 break; | 1236 break; |
1258 } | 1237 } |
1259 | 1238 |
1260 // Found a runnable task. | 1239 // Found a runnable task. |
1261 *task = *i; | 1240 *task = *i; |
1262 pending_tasks_.erase(i); | 1241 pending_tasks_.erase(i); |
1263 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { | 1242 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
1264 blocking_shutdown_pending_task_count_--; | 1243 blocking_shutdown_pending_task_count_--; |
(...skipping 360 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1625 bool SequencedWorkerPool::IsShutdownInProgress() { | 1604 bool SequencedWorkerPool::IsShutdownInProgress() { |
1626 return inner_->IsShutdownInProgress(); | 1605 return inner_->IsShutdownInProgress(); |
1627 } | 1606 } |
1628 | 1607 |
1629 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1608 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
1630 SequenceToken sequence_token) const { | 1609 SequenceToken sequence_token) const { |
1631 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1610 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
1632 } | 1611 } |
1633 | 1612 |
1634 } // namespace base | 1613 } // namespace base |
OLD | NEW |