OLD | NEW |
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 // Multi-threaded tests of ConditionVariable class. | 5 // Multi-threaded tests of ConditionVariable class. |
6 | 6 |
7 #include <time.h> | 7 #include <time.h> |
8 #include <algorithm> | 8 #include <algorithm> |
9 #include <vector> | 9 #include <vector> |
10 | 10 |
11 #include "base/synchronization/condition_variable.h" | |
12 #include "base/lock.h" | |
13 #include "base/logging.h" | 11 #include "base/logging.h" |
14 #include "base/scoped_ptr.h" | 12 #include "base/scoped_ptr.h" |
15 #include "base/spin_wait.h" | 13 #include "base/spin_wait.h" |
| 14 #include "base/synchronization/condition_variable.h" |
| 15 #include "base/synchronization/lock.h" |
16 #include "base/threading/platform_thread.h" | 16 #include "base/threading/platform_thread.h" |
17 #include "base/threading/thread_collision_warner.h" | 17 #include "base/threading/thread_collision_warner.h" |
18 #include "base/time.h" | 18 #include "base/time.h" |
19 #include "testing/gtest/include/gtest/gtest.h" | 19 #include "testing/gtest/include/gtest/gtest.h" |
20 #include "testing/platform_test.h" | 20 #include "testing/platform_test.h" |
21 | 21 |
22 namespace base { | 22 namespace base { |
23 | 23 |
24 namespace { | 24 namespace { |
25 //------------------------------------------------------------------------------ | 25 //------------------------------------------------------------------------------ |
(...skipping 165 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
191 // Test serial task servicing, as well as two parallel task servicing methods. | 191 // Test serial task servicing, as well as two parallel task servicing methods. |
192 TEST_F(ConditionVariableTest, MultiThreadConsumerTest) { | 192 TEST_F(ConditionVariableTest, MultiThreadConsumerTest) { |
193 const int kThreadCount = 10; | 193 const int kThreadCount = 10; |
194 WorkQueue queue(kThreadCount); // Start the threads. | 194 WorkQueue queue(kThreadCount); // Start the threads. |
195 | 195 |
196 const int kTaskCount = 10; // Number of tasks in each mini-test here. | 196 const int kTaskCount = 10; // Number of tasks in each mini-test here. |
197 | 197 |
198 Time start_time; // Used to time task processing. | 198 Time start_time; // Used to time task processing. |
199 | 199 |
200 { | 200 { |
201 AutoLock auto_lock(*queue.lock()); | 201 base::AutoLock auto_lock(*queue.lock()); |
202 while (!queue.EveryIdWasAllocated()) | 202 while (!queue.EveryIdWasAllocated()) |
203 queue.all_threads_have_ids()->Wait(); | 203 queue.all_threads_have_ids()->Wait(); |
204 } | 204 } |
205 | 205 |
206 // If threads aren't in a wait state, they may start to gobble up tasks in | 206 // If threads aren't in a wait state, they may start to gobble up tasks in |
207 // parallel, short-circuiting (breaking) this test. | 207 // parallel, short-circuiting (breaking) this test. |
208 queue.SpinUntilAllThreadsAreWaiting(); | 208 queue.SpinUntilAllThreadsAreWaiting(); |
209 | 209 |
210 { | 210 { |
211 // Since we have no tasks yet, all threads should be waiting by now. | 211 // Since we have no tasks yet, all threads should be waiting by now. |
212 AutoLock auto_lock(*queue.lock()); | 212 base::AutoLock auto_lock(*queue.lock()); |
213 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); | 213 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); |
214 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); | 214 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); |
215 EXPECT_EQ(0, queue.task_count()); | 215 EXPECT_EQ(0, queue.task_count()); |
216 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); | 216 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); |
217 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 217 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
218 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); | 218 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); |
219 | 219 |
220 // Set up to make one worker do 30ms tasks sequentially. | 220 // Set up to make one worker do 30ms tasks sequentially. |
221 queue.ResetHistory(); | 221 queue.ResetHistory(); |
222 queue.SetTaskCount(kTaskCount); | 222 queue.SetTaskCount(kTaskCount); |
223 queue.SetWorkTime(kThirtyMs); | 223 queue.SetWorkTime(kThirtyMs); |
224 queue.SetAllowHelp(false); | 224 queue.SetAllowHelp(false); |
225 | 225 |
226 start_time = Time::Now(); | 226 start_time = Time::Now(); |
227 } | 227 } |
228 | 228 |
229 queue.work_is_available()->Signal(); // Start up one thread. | 229 queue.work_is_available()->Signal(); // Start up one thread. |
230 // Wait till we at least start to handle tasks (and we're not all waiting). | 230 // Wait till we at least start to handle tasks (and we're not all waiting). |
231 queue.SpinUntilTaskCountLessThan(kTaskCount); | 231 queue.SpinUntilTaskCountLessThan(kTaskCount); |
232 | 232 |
233 { | 233 { |
234 // Wait until all 10 work tasks have at least been assigned. | 234 // Wait until all 10 work tasks have at least been assigned. |
235 AutoLock auto_lock(*queue.lock()); | 235 base::AutoLock auto_lock(*queue.lock()); |
236 while (queue.task_count()) | 236 while (queue.task_count()) |
237 queue.no_more_tasks()->Wait(); | 237 queue.no_more_tasks()->Wait(); |
238 // The last of the tasks *might* still be running, but... all but one should | 238 // The last of the tasks *might* still be running, but... all but one should |
239 // be done by now, since tasks are being done serially. | 239 // be done by now, since tasks are being done serially. |
240 EXPECT_LE(queue.GetWorkTime().InMilliseconds() * (kTaskCount - 1), | 240 EXPECT_LE(queue.GetWorkTime().InMilliseconds() * (kTaskCount - 1), |
241 (Time::Now() - start_time).InMilliseconds()); | 241 (Time::Now() - start_time).InMilliseconds()); |
242 | 242 |
243 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); | 243 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); |
244 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); | 244 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); |
245 EXPECT_LE(kTaskCount - 1, queue.GetMaxCompletionsByWorkerThread()); | 245 EXPECT_LE(kTaskCount - 1, queue.GetMaxCompletionsByWorkerThread()); |
246 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 246 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
247 EXPECT_LE(kTaskCount - 1, queue.GetNumberOfCompletedTasks()); | 247 EXPECT_LE(kTaskCount - 1, queue.GetNumberOfCompletedTasks()); |
248 } | 248 } |
249 | 249 |
250 // Wait to be sure all tasks are done. | 250 // Wait to be sure all tasks are done. |
251 queue.SpinUntilAllThreadsAreWaiting(); | 251 queue.SpinUntilAllThreadsAreWaiting(); |
252 | 252 |
253 { | 253 { |
254 // Check that all work was done by one thread id. | 254 // Check that all work was done by one thread id. |
255 AutoLock auto_lock(*queue.lock()); | 255 base::AutoLock auto_lock(*queue.lock()); |
256 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); | 256 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); |
257 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); | 257 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); |
258 EXPECT_EQ(0, queue.task_count()); | 258 EXPECT_EQ(0, queue.task_count()); |
259 EXPECT_EQ(kTaskCount, queue.GetMaxCompletionsByWorkerThread()); | 259 EXPECT_EQ(kTaskCount, queue.GetMaxCompletionsByWorkerThread()); |
260 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 260 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
261 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); | 261 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); |
262 | 262 |
263 // Set up to make each task include getting help from another worker, so | 263 // Set up to make each task include getting help from another worker, so |
264 // so that the work gets done in paralell. | 264 // so that the work gets done in paralell. |
265 queue.ResetHistory(); | 265 queue.ResetHistory(); |
266 queue.SetTaskCount(kTaskCount); | 266 queue.SetTaskCount(kTaskCount); |
267 queue.SetWorkTime(kThirtyMs); | 267 queue.SetWorkTime(kThirtyMs); |
268 queue.SetAllowHelp(true); | 268 queue.SetAllowHelp(true); |
269 | 269 |
270 start_time = Time::Now(); | 270 start_time = Time::Now(); |
271 } | 271 } |
272 | 272 |
273 queue.work_is_available()->Signal(); // But each worker can signal another. | 273 queue.work_is_available()->Signal(); // But each worker can signal another. |
274 // Wait till we at least start to handle tasks (and we're not all waiting). | 274 // Wait till we at least start to handle tasks (and we're not all waiting). |
275 queue.SpinUntilTaskCountLessThan(kTaskCount); | 275 queue.SpinUntilTaskCountLessThan(kTaskCount); |
276 // Wait to allow the all workers to get done. | 276 // Wait to allow the all workers to get done. |
277 queue.SpinUntilAllThreadsAreWaiting(); | 277 queue.SpinUntilAllThreadsAreWaiting(); |
278 | 278 |
279 { | 279 { |
280 // Wait until all work tasks have at least been assigned. | 280 // Wait until all work tasks have at least been assigned. |
281 AutoLock auto_lock(*queue.lock()); | 281 base::AutoLock auto_lock(*queue.lock()); |
282 while (queue.task_count()) | 282 while (queue.task_count()) |
283 queue.no_more_tasks()->Wait(); | 283 queue.no_more_tasks()->Wait(); |
284 | 284 |
285 // To avoid racy assumptions, we'll just assert that at least 2 threads | 285 // To avoid racy assumptions, we'll just assert that at least 2 threads |
286 // did work. We know that the first worker should have gone to sleep, and | 286 // did work. We know that the first worker should have gone to sleep, and |
287 // hence a second worker should have gotten an assignment. | 287 // hence a second worker should have gotten an assignment. |
288 EXPECT_LE(2, queue.GetNumThreadsTakingAssignments()); | 288 EXPECT_LE(2, queue.GetNumThreadsTakingAssignments()); |
289 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); | 289 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); |
290 | 290 |
291 // Try to ask all workers to help, and only a few will do the work. | 291 // Try to ask all workers to help, and only a few will do the work. |
292 queue.ResetHistory(); | 292 queue.ResetHistory(); |
293 queue.SetTaskCount(3); | 293 queue.SetTaskCount(3); |
294 queue.SetWorkTime(kThirtyMs); | 294 queue.SetWorkTime(kThirtyMs); |
295 queue.SetAllowHelp(false); | 295 queue.SetAllowHelp(false); |
296 } | 296 } |
297 queue.work_is_available()->Broadcast(); // Make them all try. | 297 queue.work_is_available()->Broadcast(); // Make them all try. |
298 // Wait till we at least start to handle tasks (and we're not all waiting). | 298 // Wait till we at least start to handle tasks (and we're not all waiting). |
299 queue.SpinUntilTaskCountLessThan(3); | 299 queue.SpinUntilTaskCountLessThan(3); |
300 // Wait to allow the 3 workers to get done. | 300 // Wait to allow the 3 workers to get done. |
301 queue.SpinUntilAllThreadsAreWaiting(); | 301 queue.SpinUntilAllThreadsAreWaiting(); |
302 | 302 |
303 { | 303 { |
304 AutoLock auto_lock(*queue.lock()); | 304 base::AutoLock auto_lock(*queue.lock()); |
305 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); | 305 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); |
306 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); | 306 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); |
307 EXPECT_EQ(0, queue.task_count()); | 307 EXPECT_EQ(0, queue.task_count()); |
308 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); | 308 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); |
309 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 309 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
310 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); | 310 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); |
311 | 311 |
312 // Set up to make each task get help from another worker. | 312 // Set up to make each task get help from another worker. |
313 queue.ResetHistory(); | 313 queue.ResetHistory(); |
314 queue.SetTaskCount(3); | 314 queue.SetTaskCount(3); |
315 queue.SetWorkTime(kThirtyMs); | 315 queue.SetWorkTime(kThirtyMs); |
316 queue.SetAllowHelp(true); // Allow (unnecessary) help requests. | 316 queue.SetAllowHelp(true); // Allow (unnecessary) help requests. |
317 } | 317 } |
318 queue.work_is_available()->Broadcast(); // Signal all threads. | 318 queue.work_is_available()->Broadcast(); // Signal all threads. |
319 // Wait till we at least start to handle tasks (and we're not all waiting). | 319 // Wait till we at least start to handle tasks (and we're not all waiting). |
320 queue.SpinUntilTaskCountLessThan(3); | 320 queue.SpinUntilTaskCountLessThan(3); |
321 // Wait to allow the 3 workers to get done. | 321 // Wait to allow the 3 workers to get done. |
322 queue.SpinUntilAllThreadsAreWaiting(); | 322 queue.SpinUntilAllThreadsAreWaiting(); |
323 | 323 |
324 { | 324 { |
325 AutoLock auto_lock(*queue.lock()); | 325 base::AutoLock auto_lock(*queue.lock()); |
326 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); | 326 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); |
327 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); | 327 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); |
328 EXPECT_EQ(0, queue.task_count()); | 328 EXPECT_EQ(0, queue.task_count()); |
329 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); | 329 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); |
330 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 330 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
331 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); | 331 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); |
332 | 332 |
333 // Set up to make each task get help from another worker. | 333 // Set up to make each task get help from another worker. |
334 queue.ResetHistory(); | 334 queue.ResetHistory(); |
335 queue.SetTaskCount(20); // 2 tasks per thread. | 335 queue.SetTaskCount(20); // 2 tasks per thread. |
336 queue.SetWorkTime(kThirtyMs); | 336 queue.SetWorkTime(kThirtyMs); |
337 queue.SetAllowHelp(true); | 337 queue.SetAllowHelp(true); |
338 } | 338 } |
339 queue.work_is_available()->Signal(); // But each worker can signal another. | 339 queue.work_is_available()->Signal(); // But each worker can signal another. |
340 // Wait till we at least start to handle tasks (and we're not all waiting). | 340 // Wait till we at least start to handle tasks (and we're not all waiting). |
341 queue.SpinUntilTaskCountLessThan(20); | 341 queue.SpinUntilTaskCountLessThan(20); |
342 // Wait to allow the 10 workers to get done. | 342 // Wait to allow the 10 workers to get done. |
343 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. | 343 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. |
344 | 344 |
345 { | 345 { |
346 AutoLock auto_lock(*queue.lock()); | 346 base::AutoLock auto_lock(*queue.lock()); |
347 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); | 347 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); |
348 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); | 348 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); |
349 EXPECT_EQ(0, queue.task_count()); | 349 EXPECT_EQ(0, queue.task_count()); |
350 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); | 350 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); |
351 | 351 |
352 // Same as last test, but with Broadcast(). | 352 // Same as last test, but with Broadcast(). |
353 queue.ResetHistory(); | 353 queue.ResetHistory(); |
354 queue.SetTaskCount(20); // 2 tasks per thread. | 354 queue.SetTaskCount(20); // 2 tasks per thread. |
355 queue.SetWorkTime(kThirtyMs); | 355 queue.SetWorkTime(kThirtyMs); |
356 queue.SetAllowHelp(true); | 356 queue.SetAllowHelp(true); |
357 } | 357 } |
358 queue.work_is_available()->Broadcast(); | 358 queue.work_is_available()->Broadcast(); |
359 // Wait till we at least start to handle tasks (and we're not all waiting). | 359 // Wait till we at least start to handle tasks (and we're not all waiting). |
360 queue.SpinUntilTaskCountLessThan(20); | 360 queue.SpinUntilTaskCountLessThan(20); |
361 // Wait to allow the 10 workers to get done. | 361 // Wait to allow the 10 workers to get done. |
362 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. | 362 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. |
363 | 363 |
364 { | 364 { |
365 AutoLock auto_lock(*queue.lock()); | 365 base::AutoLock auto_lock(*queue.lock()); |
366 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); | 366 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); |
367 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); | 367 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); |
368 EXPECT_EQ(0, queue.task_count()); | 368 EXPECT_EQ(0, queue.task_count()); |
369 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); | 369 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); |
370 | 370 |
371 queue.SetShutdown(); | 371 queue.SetShutdown(); |
372 } | 372 } |
373 queue.work_is_available()->Broadcast(); // Force check for shutdown. | 373 queue.work_is_available()->Broadcast(); // Force check for shutdown. |
374 | 374 |
375 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1), | 375 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1), |
376 queue.ThreadSafeCheckShutdown(kThreadCount)); | 376 queue.ThreadSafeCheckShutdown(kThreadCount)); |
377 } | 377 } |
378 | 378 |
379 TEST_F(ConditionVariableTest, LargeFastTaskTest) { | 379 TEST_F(ConditionVariableTest, LargeFastTaskTest) { |
380 const int kThreadCount = 200; | 380 const int kThreadCount = 200; |
381 WorkQueue queue(kThreadCount); // Start the threads. | 381 WorkQueue queue(kThreadCount); // Start the threads. |
382 | 382 |
383 Lock private_lock; // Used locally for master to wait. | 383 Lock private_lock; // Used locally for master to wait. |
384 AutoLock private_held_lock(private_lock); | 384 base::AutoLock private_held_lock(private_lock); |
385 ConditionVariable private_cv(&private_lock); | 385 ConditionVariable private_cv(&private_lock); |
386 | 386 |
387 { | 387 { |
388 AutoLock auto_lock(*queue.lock()); | 388 base::AutoLock auto_lock(*queue.lock()); |
389 while (!queue.EveryIdWasAllocated()) | 389 while (!queue.EveryIdWasAllocated()) |
390 queue.all_threads_have_ids()->Wait(); | 390 queue.all_threads_have_ids()->Wait(); |
391 } | 391 } |
392 | 392 |
393 // Wait a bit more to allow threads to reach their wait state. | 393 // Wait a bit more to allow threads to reach their wait state. |
394 queue.SpinUntilAllThreadsAreWaiting(); | 394 queue.SpinUntilAllThreadsAreWaiting(); |
395 | 395 |
396 { | 396 { |
397 // Since we have no tasks, all threads should be waiting by now. | 397 // Since we have no tasks, all threads should be waiting by now. |
398 AutoLock auto_lock(*queue.lock()); | 398 base::AutoLock auto_lock(*queue.lock()); |
399 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); | 399 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); |
400 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); | 400 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); |
401 EXPECT_EQ(0, queue.task_count()); | 401 EXPECT_EQ(0, queue.task_count()); |
402 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); | 402 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); |
403 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); | 403 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); |
404 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); | 404 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); |
405 | 405 |
406 // Set up to make all workers do (an average of) 20 tasks. | 406 // Set up to make all workers do (an average of) 20 tasks. |
407 queue.ResetHistory(); | 407 queue.ResetHistory(); |
408 queue.SetTaskCount(20 * kThreadCount); | 408 queue.SetTaskCount(20 * kThreadCount); |
409 queue.SetWorkTime(kFortyFiveMs); | 409 queue.SetWorkTime(kFortyFiveMs); |
410 queue.SetAllowHelp(false); | 410 queue.SetAllowHelp(false); |
411 } | 411 } |
412 queue.work_is_available()->Broadcast(); // Start up all threads. | 412 queue.work_is_available()->Broadcast(); // Start up all threads. |
413 // Wait until we've handed out all tasks. | 413 // Wait until we've handed out all tasks. |
414 { | 414 { |
415 AutoLock auto_lock(*queue.lock()); | 415 base::AutoLock auto_lock(*queue.lock()); |
416 while (queue.task_count() != 0) | 416 while (queue.task_count() != 0) |
417 queue.no_more_tasks()->Wait(); | 417 queue.no_more_tasks()->Wait(); |
418 } | 418 } |
419 | 419 |
420 // Wait till the last of the tasks complete. | 420 // Wait till the last of the tasks complete. |
421 queue.SpinUntilAllThreadsAreWaiting(); | 421 queue.SpinUntilAllThreadsAreWaiting(); |
422 | 422 |
423 { | 423 { |
424 // With Broadcast(), every thread should have participated. | 424 // With Broadcast(), every thread should have participated. |
425 // but with racing.. they may not all have done equal numbers of tasks. | 425 // but with racing.. they may not all have done equal numbers of tasks. |
426 AutoLock auto_lock(*queue.lock()); | 426 base::AutoLock auto_lock(*queue.lock()); |
427 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); | 427 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); |
428 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); | 428 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); |
429 EXPECT_EQ(0, queue.task_count()); | 429 EXPECT_EQ(0, queue.task_count()); |
430 EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread()); | 430 EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread()); |
431 EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks()); | 431 EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks()); |
432 | 432 |
433 // Set up to make all workers do (an average of) 4 tasks. | 433 // Set up to make all workers do (an average of) 4 tasks. |
434 queue.ResetHistory(); | 434 queue.ResetHistory(); |
435 queue.SetTaskCount(kThreadCount * 4); | 435 queue.SetTaskCount(kThreadCount * 4); |
436 queue.SetWorkTime(kFortyFiveMs); | 436 queue.SetWorkTime(kFortyFiveMs); |
437 queue.SetAllowHelp(true); // Might outperform Broadcast(). | 437 queue.SetAllowHelp(true); // Might outperform Broadcast(). |
438 } | 438 } |
439 queue.work_is_available()->Signal(); // Start up one thread. | 439 queue.work_is_available()->Signal(); // Start up one thread. |
440 | 440 |
441 // Wait until we've handed out all tasks | 441 // Wait until we've handed out all tasks |
442 { | 442 { |
443 AutoLock auto_lock(*queue.lock()); | 443 base::AutoLock auto_lock(*queue.lock()); |
444 while (queue.task_count() != 0) | 444 while (queue.task_count() != 0) |
445 queue.no_more_tasks()->Wait(); | 445 queue.no_more_tasks()->Wait(); |
446 } | 446 } |
447 | 447 |
448 // Wait till the last of the tasks complete. | 448 // Wait till the last of the tasks complete. |
449 queue.SpinUntilAllThreadsAreWaiting(); | 449 queue.SpinUntilAllThreadsAreWaiting(); |
450 | 450 |
451 { | 451 { |
452 // With Signal(), every thread should have participated. | 452 // With Signal(), every thread should have participated. |
453 // but with racing.. they may not all have done four tasks. | 453 // but with racing.. they may not all have done four tasks. |
454 AutoLock auto_lock(*queue.lock()); | 454 base::AutoLock auto_lock(*queue.lock()); |
455 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); | 455 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); |
456 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); | 456 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); |
457 EXPECT_EQ(0, queue.task_count()); | 457 EXPECT_EQ(0, queue.task_count()); |
458 EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread()); | 458 EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread()); |
459 EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks()); | 459 EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks()); |
460 | 460 |
461 queue.SetShutdown(); | 461 queue.SetShutdown(); |
462 } | 462 } |
463 queue.work_is_available()->Broadcast(); // Force check for shutdown. | 463 queue.work_is_available()->Broadcast(); // Force check for shutdown. |
464 | 464 |
(...skipping 28 matching lines...) Expand all Loading... |
493 | 493 |
494 for (int i = 0; i < thread_count_; ++i) { | 494 for (int i = 0; i < thread_count_; ++i) { |
495 PlatformThreadHandle pth; | 495 PlatformThreadHandle pth; |
496 EXPECT_TRUE(PlatformThread::Create(0, this, &pth)); | 496 EXPECT_TRUE(PlatformThread::Create(0, this, &pth)); |
497 thread_handles_[i] = pth; | 497 thread_handles_[i] = pth; |
498 } | 498 } |
499 } | 499 } |
500 | 500 |
501 WorkQueue::~WorkQueue() { | 501 WorkQueue::~WorkQueue() { |
502 { | 502 { |
503 AutoLock auto_lock(lock_); | 503 base::AutoLock auto_lock(lock_); |
504 SetShutdown(); | 504 SetShutdown(); |
505 } | 505 } |
506 work_is_available_.Broadcast(); // Tell them all to terminate. | 506 work_is_available_.Broadcast(); // Tell them all to terminate. |
507 | 507 |
508 for (int i = 0; i < thread_count_; ++i) { | 508 for (int i = 0; i < thread_count_; ++i) { |
509 PlatformThread::Join(thread_handles_[i]); | 509 PlatformThread::Join(thread_handles_[i]); |
510 } | 510 } |
511 EXPECT_EQ(0, waiting_thread_count_); | 511 EXPECT_EQ(0, waiting_thread_count_); |
512 } | 512 } |
513 | 513 |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
551 lock_.AssertAcquired(); | 551 lock_.AssertAcquired(); |
552 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); | 552 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); |
553 return shutdown_; | 553 return shutdown_; |
554 } | 554 } |
555 | 555 |
556 // Because this method is called from the test's main thread we need to actually | 556 // Because this method is called from the test's main thread we need to actually |
557 // take the lock. Threads will call the thread_shutting_down() method with the | 557 // take the lock. Threads will call the thread_shutting_down() method with the |
558 // lock already acquired. | 558 // lock already acquired. |
559 bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) { | 559 bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) { |
560 bool all_shutdown; | 560 bool all_shutdown; |
561 AutoLock auto_lock(lock_); | 561 base::AutoLock auto_lock(lock_); |
562 { | 562 { |
563 // Declare in scope so DFAKE is guranteed to be destroyed before AutoLock. | 563 // Declare in scope so DFAKE is guranteed to be destroyed before AutoLock. |
564 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); | 564 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); |
565 all_shutdown = (shutdown_task_count_ == thread_count); | 565 all_shutdown = (shutdown_task_count_ == thread_count); |
566 } | 566 } |
567 return all_shutdown; | 567 return all_shutdown; |
568 } | 568 } |
569 | 569 |
570 void WorkQueue::thread_shutting_down() { | 570 void WorkQueue::thread_shutting_down() { |
571 lock_.AssertAcquired(); | 571 lock_.AssertAcquired(); |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
650 } | 650 } |
651 | 651 |
652 void WorkQueue::SetShutdown() { | 652 void WorkQueue::SetShutdown() { |
653 lock_.AssertAcquired(); | 653 lock_.AssertAcquired(); |
654 shutdown_ = true; | 654 shutdown_ = true; |
655 } | 655 } |
656 | 656 |
657 void WorkQueue::SpinUntilAllThreadsAreWaiting() { | 657 void WorkQueue::SpinUntilAllThreadsAreWaiting() { |
658 while (true) { | 658 while (true) { |
659 { | 659 { |
660 AutoLock auto_lock(lock_); | 660 base::AutoLock auto_lock(lock_); |
661 if (waiting_thread_count_ == thread_count_) | 661 if (waiting_thread_count_ == thread_count_) |
662 break; | 662 break; |
663 } | 663 } |
664 PlatformThread::Sleep(30); | 664 PlatformThread::Sleep(30); |
665 } | 665 } |
666 } | 666 } |
667 | 667 |
668 void WorkQueue::SpinUntilTaskCountLessThan(int task_count) { | 668 void WorkQueue::SpinUntilTaskCountLessThan(int task_count) { |
669 while (true) { | 669 while (true) { |
670 { | 670 { |
671 AutoLock auto_lock(lock_); | 671 base::AutoLock auto_lock(lock_); |
672 if (task_count_ < task_count) | 672 if (task_count_ < task_count) |
673 break; | 673 break; |
674 } | 674 } |
675 PlatformThread::Sleep(30); | 675 PlatformThread::Sleep(30); |
676 } | 676 } |
677 } | 677 } |
678 | 678 |
679 | 679 |
680 //------------------------------------------------------------------------------ | 680 //------------------------------------------------------------------------------ |
681 // Define the standard worker task. Several tests will spin out many of these | 681 // Define the standard worker task. Several tests will spin out many of these |
682 // threads. | 682 // threads. |
683 //------------------------------------------------------------------------------ | 683 //------------------------------------------------------------------------------ |
684 | 684 |
685 // The multithread tests involve several threads with a task to perform as | 685 // The multithread tests involve several threads with a task to perform as |
686 // directed by an instance of the class WorkQueue. | 686 // directed by an instance of the class WorkQueue. |
687 // The task is to: | 687 // The task is to: |
688 // a) Check to see if there are more tasks (there is a task counter). | 688 // a) Check to see if there are more tasks (there is a task counter). |
689 // a1) Wait on condition variable if there are no tasks currently. | 689 // a1) Wait on condition variable if there are no tasks currently. |
690 // b) Call a function to see what should be done. | 690 // b) Call a function to see what should be done. |
691 // c) Do some computation based on the number of milliseconds returned in (b). | 691 // c) Do some computation based on the number of milliseconds returned in (b). |
692 // d) go back to (a). | 692 // d) go back to (a). |
693 | 693 |
694 // WorkQueue::ThreadMain() implements the above task for all threads. | 694 // WorkQueue::ThreadMain() implements the above task for all threads. |
695 // It calls the controlling object to tell the creator about progress, and to | 695 // It calls the controlling object to tell the creator about progress, and to |
696 // ask about tasks. | 696 // ask about tasks. |
697 | 697 |
698 void WorkQueue::ThreadMain() { | 698 void WorkQueue::ThreadMain() { |
699 int thread_id; | 699 int thread_id; |
700 { | 700 { |
701 AutoLock auto_lock(lock_); | 701 base::AutoLock auto_lock(lock_); |
702 thread_id = GetThreadId(); | 702 thread_id = GetThreadId(); |
703 if (EveryIdWasAllocated()) | 703 if (EveryIdWasAllocated()) |
704 all_threads_have_ids()->Signal(); // Tell creator we're ready. | 704 all_threads_have_ids()->Signal(); // Tell creator we're ready. |
705 } | 705 } |
706 | 706 |
707 Lock private_lock; // Used to waste time on "our work". | 707 Lock private_lock; // Used to waste time on "our work". |
708 while (1) { // This is the main consumer loop. | 708 while (1) { // This is the main consumer loop. |
709 TimeDelta work_time; | 709 TimeDelta work_time; |
710 bool could_use_help; | 710 bool could_use_help; |
711 { | 711 { |
712 AutoLock auto_lock(lock_); | 712 base::AutoLock auto_lock(lock_); |
713 while (0 == task_count() && !shutdown()) { | 713 while (0 == task_count() && !shutdown()) { |
714 ++waiting_thread_count_; | 714 ++waiting_thread_count_; |
715 work_is_available()->Wait(); | 715 work_is_available()->Wait(); |
716 --waiting_thread_count_; | 716 --waiting_thread_count_; |
717 } | 717 } |
718 if (shutdown()) { | 718 if (shutdown()) { |
719 // Ack the notification of a shutdown message back to the controller. | 719 // Ack the notification of a shutdown message back to the controller. |
720 thread_shutting_down(); | 720 thread_shutting_down(); |
721 return; // Terminate. | 721 return; // Terminate. |
722 } | 722 } |
723 // Get our task duration from the queue. | 723 // Get our task duration from the queue. |
724 work_time = GetAnAssignment(thread_id); | 724 work_time = GetAnAssignment(thread_id); |
725 could_use_help = (task_count() > 0) && allow_help_requests(); | 725 could_use_help = (task_count() > 0) && allow_help_requests(); |
726 } // Release lock | 726 } // Release lock |
727 | 727 |
728 // Do work (outside of locked region. | 728 // Do work (outside of locked region. |
729 if (could_use_help) | 729 if (could_use_help) |
730 work_is_available()->Signal(); // Get help from other threads. | 730 work_is_available()->Signal(); // Get help from other threads. |
731 | 731 |
732 if (work_time > TimeDelta::FromMilliseconds(0)) { | 732 if (work_time > TimeDelta::FromMilliseconds(0)) { |
733 // We could just sleep(), but we'll instead further exercise the | 733 // We could just sleep(), but we'll instead further exercise the |
734 // condition variable class, and do a timed wait. | 734 // condition variable class, and do a timed wait. |
735 AutoLock auto_lock(private_lock); | 735 base::AutoLock auto_lock(private_lock); |
736 ConditionVariable private_cv(&private_lock); | 736 ConditionVariable private_cv(&private_lock); |
737 private_cv.TimedWait(work_time); // Unsynchronized waiting. | 737 private_cv.TimedWait(work_time); // Unsynchronized waiting. |
738 } | 738 } |
739 | 739 |
740 { | 740 { |
741 AutoLock auto_lock(lock_); | 741 base::AutoLock auto_lock(lock_); |
742 // Send notification that we completed our "work." | 742 // Send notification that we completed our "work." |
743 WorkIsCompleted(thread_id); | 743 WorkIsCompleted(thread_id); |
744 } | 744 } |
745 } | 745 } |
746 } | 746 } |
747 | 747 |
748 } // namespace | 748 } // namespace |
749 | 749 |
750 } // namespace base | 750 } // namespace base |
OLD | NEW |