| OLD | NEW |
| 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Multi-threaded tests of ConditionVariable class. | 5 // Multi-threaded tests of ConditionVariable class. |
| 6 | 6 |
| 7 #include <time.h> | 7 #include <time.h> |
| 8 #include <algorithm> | 8 #include <algorithm> |
| 9 #include <vector> | 9 #include <vector> |
| 10 | 10 |
| 11 #include "base/synchronization/condition_variable.h" | |
| 12 #include "base/lock.h" | |
| 13 #include "base/logging.h" | 11 #include "base/logging.h" |
| 14 #include "base/scoped_ptr.h" | 12 #include "base/scoped_ptr.h" |
| 15 #include "base/spin_wait.h" | 13 #include "base/spin_wait.h" |
| 14 #include "base/synchronization/condition_variable.h" |
| 15 #include "base/synchronization/lock.h" |
| 16 #include "base/threading/platform_thread.h" | 16 #include "base/threading/platform_thread.h" |
| 17 #include "base/threading/thread_collision_warner.h" | 17 #include "base/threading/thread_collision_warner.h" |
| 18 #include "base/time.h" | 18 #include "base/time.h" |
| 19 #include "testing/gtest/include/gtest/gtest.h" | 19 #include "testing/gtest/include/gtest/gtest.h" |
| 20 #include "testing/platform_test.h" | 20 #include "testing/platform_test.h" |
| 21 | 21 |
| 22 namespace base { | 22 namespace base { |
| 23 | 23 |
| 24 namespace { | 24 namespace { |
| 25 //------------------------------------------------------------------------------ | 25 //------------------------------------------------------------------------------ |
| (...skipping 165 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 191 // Test serial task servicing, as well as two parallel task servicing methods. | 191 // Test serial task servicing, as well as two parallel task servicing methods. |
| 192 TEST_F(ConditionVariableTest, MultiThreadConsumerTest) { | 192 TEST_F(ConditionVariableTest, MultiThreadConsumerTest) { |
| 193 const int kThreadCount = 10; | 193 const int kThreadCount = 10; |
| 194 WorkQueue queue(kThreadCount); // Start the threads. | 194 WorkQueue queue(kThreadCount); // Start the threads. |
| 195 | 195 |
| 196 const int kTaskCount = 10; // Number of tasks in each mini-test here. | 196 const int kTaskCount = 10; // Number of tasks in each mini-test here. |
| 197 | 197 |
| 198 Time start_time; // Used to time task processing. | 198 Time start_time; // Used to time task processing. |
| 199 | 199 |
| 200 { | 200 { |
| 201 AutoLock auto_lock(*queue.lock()); | 201 base::AutoLock auto_lock(*queue.lock()); |
| 202 while (!queue.EveryIdWasAllocated()) | 202 while (!queue.EveryIdWasAllocated()) |
| 203 queue.all_threads_have_ids()->Wait(); | 203 queue.all_threads_have_ids()->Wait(); |
| 204 } | 204 } |
| 205 | 205 |
| 206 // If threads aren't in a wait state, they may start to gobble up tasks in | 206 // If threads aren't in a wait state, they may start to gobble up tasks in |
| 207 // parallel, short-circuiting (breaking) this test. | 207 // parallel, short-circuiting (breaking) this test. |
| 208 queue.SpinUntilAllThreadsAreWaiting(); | 208 queue.SpinUntilAllThreadsAreWaiting(); |
| 209 | 209 |
| 210 { | 210 { |
| 211 // Since we have no tasks yet, all threads should be waiting by now. | 211 // Since we have no tasks yet, all threads should be waiting by now. |
| 212 AutoLock auto_lock(*queue.lock()); | 212 base::AutoLock auto_lock(*queue.lock()); |
| 213 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); | 213 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); |
| 214 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); | 214 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); |
| 215 EXPECT_EQ(0, queue.task_count()); | 215 EXPECT_EQ(0, queue.task_count()); |
| 216 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); | 216 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); |
| 217 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 217 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
| 218 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); | 218 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); |
| 219 | 219 |
| 220 // Set up to make one worker do 30ms tasks sequentially. | 220 // Set up to make one worker do 30ms tasks sequentially. |
| 221 queue.ResetHistory(); | 221 queue.ResetHistory(); |
| 222 queue.SetTaskCount(kTaskCount); | 222 queue.SetTaskCount(kTaskCount); |
| 223 queue.SetWorkTime(kThirtyMs); | 223 queue.SetWorkTime(kThirtyMs); |
| 224 queue.SetAllowHelp(false); | 224 queue.SetAllowHelp(false); |
| 225 | 225 |
| 226 start_time = Time::Now(); | 226 start_time = Time::Now(); |
| 227 } | 227 } |
| 228 | 228 |
| 229 queue.work_is_available()->Signal(); // Start up one thread. | 229 queue.work_is_available()->Signal(); // Start up one thread. |
| 230 // Wait till we at least start to handle tasks (and we're not all waiting). | 230 // Wait till we at least start to handle tasks (and we're not all waiting). |
| 231 queue.SpinUntilTaskCountLessThan(kTaskCount); | 231 queue.SpinUntilTaskCountLessThan(kTaskCount); |
| 232 | 232 |
| 233 { | 233 { |
| 234 // Wait until all 10 work tasks have at least been assigned. | 234 // Wait until all 10 work tasks have at least been assigned. |
| 235 AutoLock auto_lock(*queue.lock()); | 235 base::AutoLock auto_lock(*queue.lock()); |
| 236 while (queue.task_count()) | 236 while (queue.task_count()) |
| 237 queue.no_more_tasks()->Wait(); | 237 queue.no_more_tasks()->Wait(); |
| 238 // The last of the tasks *might* still be running, but... all but one should | 238 // The last of the tasks *might* still be running, but... all but one should |
| 239 // be done by now, since tasks are being done serially. | 239 // be done by now, since tasks are being done serially. |
| 240 EXPECT_LE(queue.GetWorkTime().InMilliseconds() * (kTaskCount - 1), | 240 EXPECT_LE(queue.GetWorkTime().InMilliseconds() * (kTaskCount - 1), |
| 241 (Time::Now() - start_time).InMilliseconds()); | 241 (Time::Now() - start_time).InMilliseconds()); |
| 242 | 242 |
| 243 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); | 243 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); |
| 244 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); | 244 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); |
| 245 EXPECT_LE(kTaskCount - 1, queue.GetMaxCompletionsByWorkerThread()); | 245 EXPECT_LE(kTaskCount - 1, queue.GetMaxCompletionsByWorkerThread()); |
| 246 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 246 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
| 247 EXPECT_LE(kTaskCount - 1, queue.GetNumberOfCompletedTasks()); | 247 EXPECT_LE(kTaskCount - 1, queue.GetNumberOfCompletedTasks()); |
| 248 } | 248 } |
| 249 | 249 |
| 250 // Wait to be sure all tasks are done. | 250 // Wait to be sure all tasks are done. |
| 251 queue.SpinUntilAllThreadsAreWaiting(); | 251 queue.SpinUntilAllThreadsAreWaiting(); |
| 252 | 252 |
| 253 { | 253 { |
| 254 // Check that all work was done by one thread id. | 254 // Check that all work was done by one thread id. |
| 255 AutoLock auto_lock(*queue.lock()); | 255 base::AutoLock auto_lock(*queue.lock()); |
| 256 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); | 256 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); |
| 257 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); | 257 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); |
| 258 EXPECT_EQ(0, queue.task_count()); | 258 EXPECT_EQ(0, queue.task_count()); |
| 259 EXPECT_EQ(kTaskCount, queue.GetMaxCompletionsByWorkerThread()); | 259 EXPECT_EQ(kTaskCount, queue.GetMaxCompletionsByWorkerThread()); |
| 260 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 260 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
| 261 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); | 261 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); |
| 262 | 262 |
| 263 // Set up to make each task include getting help from another worker, so | 263 // Set up to make each task include getting help from another worker, so |
| 264 // so that the work gets done in paralell. | 264 // so that the work gets done in paralell. |
| 265 queue.ResetHistory(); | 265 queue.ResetHistory(); |
| 266 queue.SetTaskCount(kTaskCount); | 266 queue.SetTaskCount(kTaskCount); |
| 267 queue.SetWorkTime(kThirtyMs); | 267 queue.SetWorkTime(kThirtyMs); |
| 268 queue.SetAllowHelp(true); | 268 queue.SetAllowHelp(true); |
| 269 | 269 |
| 270 start_time = Time::Now(); | 270 start_time = Time::Now(); |
| 271 } | 271 } |
| 272 | 272 |
| 273 queue.work_is_available()->Signal(); // But each worker can signal another. | 273 queue.work_is_available()->Signal(); // But each worker can signal another. |
| 274 // Wait till we at least start to handle tasks (and we're not all waiting). | 274 // Wait till we at least start to handle tasks (and we're not all waiting). |
| 275 queue.SpinUntilTaskCountLessThan(kTaskCount); | 275 queue.SpinUntilTaskCountLessThan(kTaskCount); |
| 276 // Wait to allow the all workers to get done. | 276 // Wait to allow the all workers to get done. |
| 277 queue.SpinUntilAllThreadsAreWaiting(); | 277 queue.SpinUntilAllThreadsAreWaiting(); |
| 278 | 278 |
| 279 { | 279 { |
| 280 // Wait until all work tasks have at least been assigned. | 280 // Wait until all work tasks have at least been assigned. |
| 281 AutoLock auto_lock(*queue.lock()); | 281 base::AutoLock auto_lock(*queue.lock()); |
| 282 while (queue.task_count()) | 282 while (queue.task_count()) |
| 283 queue.no_more_tasks()->Wait(); | 283 queue.no_more_tasks()->Wait(); |
| 284 | 284 |
| 285 // To avoid racy assumptions, we'll just assert that at least 2 threads | 285 // To avoid racy assumptions, we'll just assert that at least 2 threads |
| 286 // did work. We know that the first worker should have gone to sleep, and | 286 // did work. We know that the first worker should have gone to sleep, and |
| 287 // hence a second worker should have gotten an assignment. | 287 // hence a second worker should have gotten an assignment. |
| 288 EXPECT_LE(2, queue.GetNumThreadsTakingAssignments()); | 288 EXPECT_LE(2, queue.GetNumThreadsTakingAssignments()); |
| 289 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); | 289 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); |
| 290 | 290 |
| 291 // Try to ask all workers to help, and only a few will do the work. | 291 // Try to ask all workers to help, and only a few will do the work. |
| 292 queue.ResetHistory(); | 292 queue.ResetHistory(); |
| 293 queue.SetTaskCount(3); | 293 queue.SetTaskCount(3); |
| 294 queue.SetWorkTime(kThirtyMs); | 294 queue.SetWorkTime(kThirtyMs); |
| 295 queue.SetAllowHelp(false); | 295 queue.SetAllowHelp(false); |
| 296 } | 296 } |
| 297 queue.work_is_available()->Broadcast(); // Make them all try. | 297 queue.work_is_available()->Broadcast(); // Make them all try. |
| 298 // Wait till we at least start to handle tasks (and we're not all waiting). | 298 // Wait till we at least start to handle tasks (and we're not all waiting). |
| 299 queue.SpinUntilTaskCountLessThan(3); | 299 queue.SpinUntilTaskCountLessThan(3); |
| 300 // Wait to allow the 3 workers to get done. | 300 // Wait to allow the 3 workers to get done. |
| 301 queue.SpinUntilAllThreadsAreWaiting(); | 301 queue.SpinUntilAllThreadsAreWaiting(); |
| 302 | 302 |
| 303 { | 303 { |
| 304 AutoLock auto_lock(*queue.lock()); | 304 base::AutoLock auto_lock(*queue.lock()); |
| 305 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); | 305 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); |
| 306 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); | 306 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); |
| 307 EXPECT_EQ(0, queue.task_count()); | 307 EXPECT_EQ(0, queue.task_count()); |
| 308 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); | 308 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); |
| 309 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 309 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
| 310 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); | 310 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); |
| 311 | 311 |
| 312 // Set up to make each task get help from another worker. | 312 // Set up to make each task get help from another worker. |
| 313 queue.ResetHistory(); | 313 queue.ResetHistory(); |
| 314 queue.SetTaskCount(3); | 314 queue.SetTaskCount(3); |
| 315 queue.SetWorkTime(kThirtyMs); | 315 queue.SetWorkTime(kThirtyMs); |
| 316 queue.SetAllowHelp(true); // Allow (unnecessary) help requests. | 316 queue.SetAllowHelp(true); // Allow (unnecessary) help requests. |
| 317 } | 317 } |
| 318 queue.work_is_available()->Broadcast(); // Signal all threads. | 318 queue.work_is_available()->Broadcast(); // Signal all threads. |
| 319 // Wait till we at least start to handle tasks (and we're not all waiting). | 319 // Wait till we at least start to handle tasks (and we're not all waiting). |
| 320 queue.SpinUntilTaskCountLessThan(3); | 320 queue.SpinUntilTaskCountLessThan(3); |
| 321 // Wait to allow the 3 workers to get done. | 321 // Wait to allow the 3 workers to get done. |
| 322 queue.SpinUntilAllThreadsAreWaiting(); | 322 queue.SpinUntilAllThreadsAreWaiting(); |
| 323 | 323 |
| 324 { | 324 { |
| 325 AutoLock auto_lock(*queue.lock()); | 325 base::AutoLock auto_lock(*queue.lock()); |
| 326 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); | 326 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); |
| 327 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); | 327 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); |
| 328 EXPECT_EQ(0, queue.task_count()); | 328 EXPECT_EQ(0, queue.task_count()); |
| 329 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); | 329 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); |
| 330 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 330 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
| 331 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); | 331 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); |
| 332 | 332 |
| 333 // Set up to make each task get help from another worker. | 333 // Set up to make each task get help from another worker. |
| 334 queue.ResetHistory(); | 334 queue.ResetHistory(); |
| 335 queue.SetTaskCount(20); // 2 tasks per thread. | 335 queue.SetTaskCount(20); // 2 tasks per thread. |
| 336 queue.SetWorkTime(kThirtyMs); | 336 queue.SetWorkTime(kThirtyMs); |
| 337 queue.SetAllowHelp(true); | 337 queue.SetAllowHelp(true); |
| 338 } | 338 } |
| 339 queue.work_is_available()->Signal(); // But each worker can signal another. | 339 queue.work_is_available()->Signal(); // But each worker can signal another. |
| 340 // Wait till we at least start to handle tasks (and we're not all waiting). | 340 // Wait till we at least start to handle tasks (and we're not all waiting). |
| 341 queue.SpinUntilTaskCountLessThan(20); | 341 queue.SpinUntilTaskCountLessThan(20); |
| 342 // Wait to allow the 10 workers to get done. | 342 // Wait to allow the 10 workers to get done. |
| 343 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. | 343 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. |
| 344 | 344 |
| 345 { | 345 { |
| 346 AutoLock auto_lock(*queue.lock()); | 346 base::AutoLock auto_lock(*queue.lock()); |
| 347 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); | 347 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); |
| 348 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); | 348 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); |
| 349 EXPECT_EQ(0, queue.task_count()); | 349 EXPECT_EQ(0, queue.task_count()); |
| 350 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); | 350 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); |
| 351 | 351 |
| 352 // Same as last test, but with Broadcast(). | 352 // Same as last test, but with Broadcast(). |
| 353 queue.ResetHistory(); | 353 queue.ResetHistory(); |
| 354 queue.SetTaskCount(20); // 2 tasks per thread. | 354 queue.SetTaskCount(20); // 2 tasks per thread. |
| 355 queue.SetWorkTime(kThirtyMs); | 355 queue.SetWorkTime(kThirtyMs); |
| 356 queue.SetAllowHelp(true); | 356 queue.SetAllowHelp(true); |
| 357 } | 357 } |
| 358 queue.work_is_available()->Broadcast(); | 358 queue.work_is_available()->Broadcast(); |
| 359 // Wait till we at least start to handle tasks (and we're not all waiting). | 359 // Wait till we at least start to handle tasks (and we're not all waiting). |
| 360 queue.SpinUntilTaskCountLessThan(20); | 360 queue.SpinUntilTaskCountLessThan(20); |
| 361 // Wait to allow the 10 workers to get done. | 361 // Wait to allow the 10 workers to get done. |
| 362 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. | 362 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. |
| 363 | 363 |
| 364 { | 364 { |
| 365 AutoLock auto_lock(*queue.lock()); | 365 base::AutoLock auto_lock(*queue.lock()); |
| 366 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); | 366 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); |
| 367 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); | 367 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); |
| 368 EXPECT_EQ(0, queue.task_count()); | 368 EXPECT_EQ(0, queue.task_count()); |
| 369 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); | 369 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); |
| 370 | 370 |
| 371 queue.SetShutdown(); | 371 queue.SetShutdown(); |
| 372 } | 372 } |
| 373 queue.work_is_available()->Broadcast(); // Force check for shutdown. | 373 queue.work_is_available()->Broadcast(); // Force check for shutdown. |
| 374 | 374 |
| 375 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1), | 375 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1), |
| 376 queue.ThreadSafeCheckShutdown(kThreadCount)); | 376 queue.ThreadSafeCheckShutdown(kThreadCount)); |
| 377 } | 377 } |
| 378 | 378 |
| 379 TEST_F(ConditionVariableTest, LargeFastTaskTest) { | 379 TEST_F(ConditionVariableTest, LargeFastTaskTest) { |
| 380 const int kThreadCount = 200; | 380 const int kThreadCount = 200; |
| 381 WorkQueue queue(kThreadCount); // Start the threads. | 381 WorkQueue queue(kThreadCount); // Start the threads. |
| 382 | 382 |
| 383 Lock private_lock; // Used locally for master to wait. | 383 Lock private_lock; // Used locally for master to wait. |
| 384 AutoLock private_held_lock(private_lock); | 384 base::AutoLock private_held_lock(private_lock); |
| 385 ConditionVariable private_cv(&private_lock); | 385 ConditionVariable private_cv(&private_lock); |
| 386 | 386 |
| 387 { | 387 { |
| 388 AutoLock auto_lock(*queue.lock()); | 388 base::AutoLock auto_lock(*queue.lock()); |
| 389 while (!queue.EveryIdWasAllocated()) | 389 while (!queue.EveryIdWasAllocated()) |
| 390 queue.all_threads_have_ids()->Wait(); | 390 queue.all_threads_have_ids()->Wait(); |
| 391 } | 391 } |
| 392 | 392 |
| 393 // Wait a bit more to allow threads to reach their wait state. | 393 // Wait a bit more to allow threads to reach their wait state. |
| 394 queue.SpinUntilAllThreadsAreWaiting(); | 394 queue.SpinUntilAllThreadsAreWaiting(); |
| 395 | 395 |
| 396 { | 396 { |
| 397 // Since we have no tasks, all threads should be waiting by now. | 397 // Since we have no tasks, all threads should be waiting by now. |
| 398 AutoLock auto_lock(*queue.lock()); | 398 base::AutoLock auto_lock(*queue.lock()); |
| 399 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); | 399 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); |
| 400 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); | 400 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); |
| 401 EXPECT_EQ(0, queue.task_count()); | 401 EXPECT_EQ(0, queue.task_count()); |
| 402 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); | 402 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); |
| 403 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 403 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
| 404 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); | 404 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); |
| 405 | 405 |
| 406 // Set up to make all workers do (an average of) 20 tasks. | 406 // Set up to make all workers do (an average of) 20 tasks. |
| 407 queue.ResetHistory(); | 407 queue.ResetHistory(); |
| 408 queue.SetTaskCount(20 * kThreadCount); | 408 queue.SetTaskCount(20 * kThreadCount); |
| 409 queue.SetWorkTime(kFortyFiveMs); | 409 queue.SetWorkTime(kFortyFiveMs); |
| 410 queue.SetAllowHelp(false); | 410 queue.SetAllowHelp(false); |
| 411 } | 411 } |
| 412 queue.work_is_available()->Broadcast(); // Start up all threads. | 412 queue.work_is_available()->Broadcast(); // Start up all threads. |
| 413 // Wait until we've handed out all tasks. | 413 // Wait until we've handed out all tasks. |
| 414 { | 414 { |
| 415 AutoLock auto_lock(*queue.lock()); | 415 base::AutoLock auto_lock(*queue.lock()); |
| 416 while (queue.task_count() != 0) | 416 while (queue.task_count() != 0) |
| 417 queue.no_more_tasks()->Wait(); | 417 queue.no_more_tasks()->Wait(); |
| 418 } | 418 } |
| 419 | 419 |
| 420 // Wait till the last of the tasks complete. | 420 // Wait till the last of the tasks complete. |
| 421 queue.SpinUntilAllThreadsAreWaiting(); | 421 queue.SpinUntilAllThreadsAreWaiting(); |
| 422 | 422 |
| 423 { | 423 { |
| 424 // With Broadcast(), every thread should have participated. | 424 // With Broadcast(), every thread should have participated. |
| 425 // but with racing.. they may not all have done equal numbers of tasks. | 425 // but with racing.. they may not all have done equal numbers of tasks. |
| 426 AutoLock auto_lock(*queue.lock()); | 426 base::AutoLock auto_lock(*queue.lock()); |
| 427 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); | 427 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); |
| 428 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); | 428 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); |
| 429 EXPECT_EQ(0, queue.task_count()); | 429 EXPECT_EQ(0, queue.task_count()); |
| 430 EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread()); | 430 EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread()); |
| 431 EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks()); | 431 EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks()); |
| 432 | 432 |
| 433 // Set up to make all workers do (an average of) 4 tasks. | 433 // Set up to make all workers do (an average of) 4 tasks. |
| 434 queue.ResetHistory(); | 434 queue.ResetHistory(); |
| 435 queue.SetTaskCount(kThreadCount * 4); | 435 queue.SetTaskCount(kThreadCount * 4); |
| 436 queue.SetWorkTime(kFortyFiveMs); | 436 queue.SetWorkTime(kFortyFiveMs); |
| 437 queue.SetAllowHelp(true); // Might outperform Broadcast(). | 437 queue.SetAllowHelp(true); // Might outperform Broadcast(). |
| 438 } | 438 } |
| 439 queue.work_is_available()->Signal(); // Start up one thread. | 439 queue.work_is_available()->Signal(); // Start up one thread. |
| 440 | 440 |
| 441 // Wait until we've handed out all tasks | 441 // Wait until we've handed out all tasks |
| 442 { | 442 { |
| 443 AutoLock auto_lock(*queue.lock()); | 443 base::AutoLock auto_lock(*queue.lock()); |
| 444 while (queue.task_count() != 0) | 444 while (queue.task_count() != 0) |
| 445 queue.no_more_tasks()->Wait(); | 445 queue.no_more_tasks()->Wait(); |
| 446 } | 446 } |
| 447 | 447 |
| 448 // Wait till the last of the tasks complete. | 448 // Wait till the last of the tasks complete. |
| 449 queue.SpinUntilAllThreadsAreWaiting(); | 449 queue.SpinUntilAllThreadsAreWaiting(); |
| 450 | 450 |
| 451 { | 451 { |
| 452 // With Signal(), every thread should have participated. | 452 // With Signal(), every thread should have participated. |
| 453 // but with racing.. they may not all have done four tasks. | 453 // but with racing.. they may not all have done four tasks. |
| 454 AutoLock auto_lock(*queue.lock()); | 454 base::AutoLock auto_lock(*queue.lock()); |
| 455 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); | 455 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); |
| 456 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); | 456 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); |
| 457 EXPECT_EQ(0, queue.task_count()); | 457 EXPECT_EQ(0, queue.task_count()); |
| 458 EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread()); | 458 EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread()); |
| 459 EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks()); | 459 EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks()); |
| 460 | 460 |
| 461 queue.SetShutdown(); | 461 queue.SetShutdown(); |
| 462 } | 462 } |
| 463 queue.work_is_available()->Broadcast(); // Force check for shutdown. | 463 queue.work_is_available()->Broadcast(); // Force check for shutdown. |
| 464 | 464 |
| (...skipping 28 matching lines...) Expand all Loading... |
| 493 | 493 |
| 494 for (int i = 0; i < thread_count_; ++i) { | 494 for (int i = 0; i < thread_count_; ++i) { |
| 495 PlatformThreadHandle pth; | 495 PlatformThreadHandle pth; |
| 496 EXPECT_TRUE(PlatformThread::Create(0, this, &pth)); | 496 EXPECT_TRUE(PlatformThread::Create(0, this, &pth)); |
| 497 thread_handles_[i] = pth; | 497 thread_handles_[i] = pth; |
| 498 } | 498 } |
| 499 } | 499 } |
| 500 | 500 |
| 501 WorkQueue::~WorkQueue() { | 501 WorkQueue::~WorkQueue() { |
| 502 { | 502 { |
| 503 AutoLock auto_lock(lock_); | 503 base::AutoLock auto_lock(lock_); |
| 504 SetShutdown(); | 504 SetShutdown(); |
| 505 } | 505 } |
| 506 work_is_available_.Broadcast(); // Tell them all to terminate. | 506 work_is_available_.Broadcast(); // Tell them all to terminate. |
| 507 | 507 |
| 508 for (int i = 0; i < thread_count_; ++i) { | 508 for (int i = 0; i < thread_count_; ++i) { |
| 509 PlatformThread::Join(thread_handles_[i]); | 509 PlatformThread::Join(thread_handles_[i]); |
| 510 } | 510 } |
| 511 EXPECT_EQ(0, waiting_thread_count_); | 511 EXPECT_EQ(0, waiting_thread_count_); |
| 512 } | 512 } |
| 513 | 513 |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 551 lock_.AssertAcquired(); | 551 lock_.AssertAcquired(); |
| 552 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); | 552 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); |
| 553 return shutdown_; | 553 return shutdown_; |
| 554 } | 554 } |
| 555 | 555 |
| 556 // Because this method is called from the test's main thread we need to actually | 556 // Because this method is called from the test's main thread we need to actually |
| 557 // take the lock. Threads will call the thread_shutting_down() method with the | 557 // take the lock. Threads will call the thread_shutting_down() method with the |
| 558 // lock already acquired. | 558 // lock already acquired. |
| 559 bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) { | 559 bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) { |
| 560 bool all_shutdown; | 560 bool all_shutdown; |
| 561 AutoLock auto_lock(lock_); | 561 base::AutoLock auto_lock(lock_); |
| 562 { | 562 { |
| 563 // Declare in scope so DFAKE is guranteed to be destroyed before AutoLock. | 563 // Declare in scope so DFAKE is guranteed to be destroyed before AutoLock. |
| 564 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); | 564 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); |
| 565 all_shutdown = (shutdown_task_count_ == thread_count); | 565 all_shutdown = (shutdown_task_count_ == thread_count); |
| 566 } | 566 } |
| 567 return all_shutdown; | 567 return all_shutdown; |
| 568 } | 568 } |
| 569 | 569 |
| 570 void WorkQueue::thread_shutting_down() { | 570 void WorkQueue::thread_shutting_down() { |
| 571 lock_.AssertAcquired(); | 571 lock_.AssertAcquired(); |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 650 } | 650 } |
| 651 | 651 |
| 652 void WorkQueue::SetShutdown() { | 652 void WorkQueue::SetShutdown() { |
| 653 lock_.AssertAcquired(); | 653 lock_.AssertAcquired(); |
| 654 shutdown_ = true; | 654 shutdown_ = true; |
| 655 } | 655 } |
| 656 | 656 |
| 657 void WorkQueue::SpinUntilAllThreadsAreWaiting() { | 657 void WorkQueue::SpinUntilAllThreadsAreWaiting() { |
| 658 while (true) { | 658 while (true) { |
| 659 { | 659 { |
| 660 AutoLock auto_lock(lock_); | 660 base::AutoLock auto_lock(lock_); |
| 661 if (waiting_thread_count_ == thread_count_) | 661 if (waiting_thread_count_ == thread_count_) |
| 662 break; | 662 break; |
| 663 } | 663 } |
| 664 PlatformThread::Sleep(30); | 664 PlatformThread::Sleep(30); |
| 665 } | 665 } |
| 666 } | 666 } |
| 667 | 667 |
| 668 void WorkQueue::SpinUntilTaskCountLessThan(int task_count) { | 668 void WorkQueue::SpinUntilTaskCountLessThan(int task_count) { |
| 669 while (true) { | 669 while (true) { |
| 670 { | 670 { |
| 671 AutoLock auto_lock(lock_); | 671 base::AutoLock auto_lock(lock_); |
| 672 if (task_count_ < task_count) | 672 if (task_count_ < task_count) |
| 673 break; | 673 break; |
| 674 } | 674 } |
| 675 PlatformThread::Sleep(30); | 675 PlatformThread::Sleep(30); |
| 676 } | 676 } |
| 677 } | 677 } |
| 678 | 678 |
| 679 | 679 |
| 680 //------------------------------------------------------------------------------ | 680 //------------------------------------------------------------------------------ |
| 681 // Define the standard worker task. Several tests will spin out many of these | 681 // Define the standard worker task. Several tests will spin out many of these |
| 682 // threads. | 682 // threads. |
| 683 //------------------------------------------------------------------------------ | 683 //------------------------------------------------------------------------------ |
| 684 | 684 |
| 685 // The multithread tests involve several threads with a task to perform as | 685 // The multithread tests involve several threads with a task to perform as |
| 686 // directed by an instance of the class WorkQueue. | 686 // directed by an instance of the class WorkQueue. |
| 687 // The task is to: | 687 // The task is to: |
| 688 // a) Check to see if there are more tasks (there is a task counter). | 688 // a) Check to see if there are more tasks (there is a task counter). |
| 689 // a1) Wait on condition variable if there are no tasks currently. | 689 // a1) Wait on condition variable if there are no tasks currently. |
| 690 // b) Call a function to see what should be done. | 690 // b) Call a function to see what should be done. |
| 691 // c) Do some computation based on the number of milliseconds returned in (b). | 691 // c) Do some computation based on the number of milliseconds returned in (b). |
| 692 // d) go back to (a). | 692 // d) go back to (a). |
| 693 | 693 |
| 694 // WorkQueue::ThreadMain() implements the above task for all threads. | 694 // WorkQueue::ThreadMain() implements the above task for all threads. |
| 695 // It calls the controlling object to tell the creator about progress, and to | 695 // It calls the controlling object to tell the creator about progress, and to |
| 696 // ask about tasks. | 696 // ask about tasks. |
| 697 | 697 |
| 698 void WorkQueue::ThreadMain() { | 698 void WorkQueue::ThreadMain() { |
| 699 int thread_id; | 699 int thread_id; |
| 700 { | 700 { |
| 701 AutoLock auto_lock(lock_); | 701 base::AutoLock auto_lock(lock_); |
| 702 thread_id = GetThreadId(); | 702 thread_id = GetThreadId(); |
| 703 if (EveryIdWasAllocated()) | 703 if (EveryIdWasAllocated()) |
| 704 all_threads_have_ids()->Signal(); // Tell creator we're ready. | 704 all_threads_have_ids()->Signal(); // Tell creator we're ready. |
| 705 } | 705 } |
| 706 | 706 |
| 707 Lock private_lock; // Used to waste time on "our work". | 707 Lock private_lock; // Used to waste time on "our work". |
| 708 while (1) { // This is the main consumer loop. | 708 while (1) { // This is the main consumer loop. |
| 709 TimeDelta work_time; | 709 TimeDelta work_time; |
| 710 bool could_use_help; | 710 bool could_use_help; |
| 711 { | 711 { |
| 712 AutoLock auto_lock(lock_); | 712 base::AutoLock auto_lock(lock_); |
| 713 while (0 == task_count() && !shutdown()) { | 713 while (0 == task_count() && !shutdown()) { |
| 714 ++waiting_thread_count_; | 714 ++waiting_thread_count_; |
| 715 work_is_available()->Wait(); | 715 work_is_available()->Wait(); |
| 716 --waiting_thread_count_; | 716 --waiting_thread_count_; |
| 717 } | 717 } |
| 718 if (shutdown()) { | 718 if (shutdown()) { |
| 719 // Ack the notification of a shutdown message back to the controller. | 719 // Ack the notification of a shutdown message back to the controller. |
| 720 thread_shutting_down(); | 720 thread_shutting_down(); |
| 721 return; // Terminate. | 721 return; // Terminate. |
| 722 } | 722 } |
| 723 // Get our task duration from the queue. | 723 // Get our task duration from the queue. |
| 724 work_time = GetAnAssignment(thread_id); | 724 work_time = GetAnAssignment(thread_id); |
| 725 could_use_help = (task_count() > 0) && allow_help_requests(); | 725 could_use_help = (task_count() > 0) && allow_help_requests(); |
| 726 } // Release lock | 726 } // Release lock |
| 727 | 727 |
| 728 // Do work (outside of locked region. | 728 // Do work (outside of locked region. |
| 729 if (could_use_help) | 729 if (could_use_help) |
| 730 work_is_available()->Signal(); // Get help from other threads. | 730 work_is_available()->Signal(); // Get help from other threads. |
| 731 | 731 |
| 732 if (work_time > TimeDelta::FromMilliseconds(0)) { | 732 if (work_time > TimeDelta::FromMilliseconds(0)) { |
| 733 // We could just sleep(), but we'll instead further exercise the | 733 // We could just sleep(), but we'll instead further exercise the |
| 734 // condition variable class, and do a timed wait. | 734 // condition variable class, and do a timed wait. |
| 735 AutoLock auto_lock(private_lock); | 735 base::AutoLock auto_lock(private_lock); |
| 736 ConditionVariable private_cv(&private_lock); | 736 ConditionVariable private_cv(&private_lock); |
| 737 private_cv.TimedWait(work_time); // Unsynchronized waiting. | 737 private_cv.TimedWait(work_time); // Unsynchronized waiting. |
| 738 } | 738 } |
| 739 | 739 |
| 740 { | 740 { |
| 741 AutoLock auto_lock(lock_); | 741 base::AutoLock auto_lock(lock_); |
| 742 // Send notification that we completed our "work." | 742 // Send notification that we completed our "work." |
| 743 WorkIsCompleted(thread_id); | 743 WorkIsCompleted(thread_id); |
| 744 } | 744 } |
| 745 } | 745 } |
| 746 } | 746 } |
| 747 | 747 |
| 748 } // namespace | 748 } // namespace |
| 749 | 749 |
| 750 } // namespace base | 750 } // namespace base |
| OLD | NEW |