| Index: mojo/edk/system/wait_set_dispatcher_unittest.cc
|
| diff --git a/mojo/edk/system/wait_set_dispatcher_unittest.cc b/mojo/edk/system/wait_set_dispatcher_unittest.cc
|
| index 57d61cfdb9484828f4acaa7d044958c9b4603dd7..ea544e39d9023da3e92b84fc02f8b07e40bcb0db 100644
|
| --- a/mojo/edk/system/wait_set_dispatcher_unittest.cc
|
| +++ b/mojo/edk/system/wait_set_dispatcher_unittest.cc
|
| @@ -9,18 +9,26 @@
|
|
|
| #include "mojo/edk/system/wait_set_dispatcher.h"
|
|
|
| +#include <map>
|
| #include <thread>
|
| +#include <utility>
|
| #include <vector>
|
|
|
| +#include "base/logging.h"
|
| #include "mojo/edk/platform/test_stopwatch.h"
|
| #include "mojo/edk/platform/thread_utils.h"
|
| #include "mojo/edk/system/mock_simple_dispatcher.h"
|
| +#include "mojo/edk/system/test/random.h"
|
| #include "mojo/edk/system/test/timeouts.h"
|
| +#include "mojo/edk/util/mutex.h"
|
| #include "testing/gtest/include/gtest/gtest.h"
|
|
|
| using mojo::platform::test::Stopwatch;
|
| using mojo::platform::ThreadSleep;
|
| using mojo::util::MakeRefCounted;
|
| +using mojo::util::Mutex;
|
| +using mojo::util::MutexLocker;
|
| +using mojo::util::RefPtr;
|
|
|
| namespace mojo {
|
| namespace system {
|
| @@ -639,7 +647,184 @@ TEST(WaitSetDispatcherTest, BasicThreaded3) {
|
| EXPECT_EQ(MOJO_RESULT_OK, d->Close());
|
| }
|
|
|
| -// TODO(vtl): Stress tests.
|
| +// The set-up for this test is as follows:
|
| +// - We'll just use the "readable" handle signal everywhere.
|
| +// - There's one wait set.
|
| +// - It contains a single "quit" entry for a "quit" dispatcher ("owned" by
|
| +// the main thread).
|
| +// - There are a number of waiter threads waiting on it.
|
| +// - Upon being awoken, a waiter thread looks at the results.
|
| +// - If one of them was for "quit", the waiter thread ends.
|
| +// - Otherwise, it resets the signal for the things it was awoken for.
|
| +// - There are a bunch of "worker" threads.
|
| +// - Each worker thread operates in a tight loop.
|
| +// - In each iteration, it checks if the quit dispatcher is signaled; if
|
| +// it is, the worker thread ends.
|
| +// - Otherwise, it might create a dispatcher (which it owns) and add entry
|
| +// for it.
|
| +// - It might also signal an entry that it previously added.
|
| +// - It might also remove an entry that it previously added.
|
| +// - The main thread just sleeps for some desired amount of time, and then
|
| +// signals the quit dispatcher and joins all of the above threads.
|
| +TEST(WaitSetDispatcherTest, ThreadedStress) {
|
| + static constexpr auto kTestRunTime = static_cast<MojoDeadline>(1000 * 1000u);
|
| + static constexpr size_t kNumWaiters = 4;
|
| + static constexpr size_t kNumWorkers = 8;
|
| + static constexpr size_t kMaxEntriesPerWorker = 100;
|
| +
|
| + static constexpr auto kNone = MOJO_HANDLE_SIGNAL_NONE;
|
| + static constexpr auto kSignal = MOJO_HANDLE_SIGNAL_READABLE;
|
| + static constexpr uint64_t kQuitCookie = 0;
|
| +
|
| + Mutex mu;
|
| + // The next cookie to "allocate". Guarded by |mu|.
|
| + uint64_t next_cookie = kQuitCookie + 1;
|
| + // Cookie -> dispatcher map. Guarded by |mu|.
|
| + std::map<uint64_t, RefPtr<test::MockSimpleDispatcher>>
|
| + cookie_to_dispatcher_map;
|
| +
|
| + auto wait_set =
|
| + WaitSetDispatcher::Create(WaitSetDispatcher::kDefaultCreateOptions);
|
| + // The quit dispatcher and entry.
|
| + auto quit = MakeRefCounted<test::MockSimpleDispatcher>(kNone, kSignal);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + wait_set->WaitSetAdd(NullUserPointer(), quit.Clone(), kSignal,
|
| + kQuitCookie));
|
| +
|
| + std::vector<std::thread> threads;
|
| +
|
| + // Add waiter threads.
|
| + for (size_t i = 0; i < kNumWaiters; i++) {
|
| + threads.push_back(std::thread([&mu, &cookie_to_dispatcher_map, &wait_set,
|
| + i]() {
|
| + uint64_t total_wakeups = 0;
|
| +
|
| + for (;;) {
|
| + uint32_t num_results = 10u;
|
| + MojoWaitSetResult results[10] = {};
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + wait_set->WaitSetWait(
|
| + MOJO_DEADLINE_INDEFINITE, MakeUserPointer(&num_results),
|
| + MakeUserPointer(results), NullUserPointer()));
|
| + EXPECT_GE(num_results, 1u);
|
| + total_wakeups++;
|
| +
|
| + // First, see if we were woken up for a quit cookie.
|
| + for (uint32_t j = 0; j < num_results; j++) {
|
| + if (results[j].cookie == kQuitCookie) {
|
| + VLOG(1) << "Waiter thread #" << i
|
| + << ": total_wakeups = " << total_wakeups;
|
| + return;
|
| + }
|
| + }
|
| +
|
| + // Otherwise, get the dispatcher for each cookie and reset its signals.
|
| + MutexLocker locker(&mu);
|
| + for (uint32_t j = 0; j < num_results; j++) {
|
| + auto it = cookie_to_dispatcher_map.find(results[j].cookie);
|
| + if (it == cookie_to_dispatcher_map.end()) {
|
| + // This is not an error, since it may have been removed/destroyed by
|
| + // a worker thread.
|
| + continue;
|
| + }
|
| + it->second->SetSatisfiedSignals(kNone);
|
| + }
|
| + }
|
| + }));
|
| + }
|
| +
|
| + // Add worker threads.
|
| + for (size_t i = 0; i < kNumWorkers; i++) {
|
| + threads.push_back(std::thread([&mu, &next_cookie, &cookie_to_dispatcher_map,
|
| + &wait_set, &quit, i]() {
|
| + uint64_t total_adds = 0;
|
| + uint64_t total_triggers = 0;
|
| + uint64_t total_removes = 0;
|
| +
|
| + // These are parallel vectors.
|
| + std::vector<RefPtr<test::MockSimpleDispatcher>> dispatchers;
|
| + std::vector<uint64_t> cookies;
|
| +
|
| + for (;;) {
|
| + // If |quit| is signaled, quit.
|
| + if ((quit->GetHandleSignalsState().satisfied_signals & kSignal))
|
| + break;
|
| +
|
| + // Should we add an entry (i.e., a dispatcher)? Make the probability be
|
| + // 1 - (current number) / (maximum number).
|
| + if (test::RandomInt(1, static_cast<int>(kMaxEntriesPerWorker)) >
|
| + static_cast<int>(dispatchers.size())) {
|
| + total_adds++;
|
| +
|
| + auto new_dispatcher =
|
| + MakeRefCounted<test::MockSimpleDispatcher>(kNone, kSignal);
|
| + uint64_t new_cookie;
|
| + {
|
| + MutexLocker locker(&mu);
|
| + new_cookie = next_cookie++;
|
| + cookie_to_dispatcher_map[new_cookie] = new_dispatcher;
|
| + dispatchers.push_back(new_dispatcher);
|
| + cookies.push_back(new_cookie);
|
| + }
|
| + EXPECT_NE(new_cookie, kQuitCookie);
|
| + EXPECT_EQ(
|
| + MOJO_RESULT_OK,
|
| + wait_set->WaitSetAdd(NullUserPointer(), std::move(new_dispatcher),
|
| + kSignal, new_cookie));
|
| + }
|
| +
|
| + // Should we trigger an entry? Make the probability be (current number)
|
| + // / (maximum number).
|
| + int j = test::RandomInt(0, static_cast<int>(kMaxEntriesPerWorker - 1));
|
| + if (j < static_cast<int>(dispatchers.size())) {
|
| + total_triggers++;
|
| +
|
| + // Just use |j| as an index into |dispatchers|/|cookies|.
|
| + dispatchers[j]->SetSatisfiedSignals(kSignal);
|
| + }
|
| +
|
| + // Should we remove an entry? Make the probability be (current number) /
|
| + // (maximum number).
|
| + j = test::RandomInt(0, static_cast<int>(kMaxEntriesPerWorker - 1));
|
| + if (j < static_cast<int>(dispatchers.size())) {
|
| + total_removes++;
|
| +
|
| + EXPECT_NE(cookies[j], kQuitCookie);
|
| + EXPECT_EQ(MOJO_RESULT_OK, wait_set->WaitSetRemove(cookies[j]));
|
| + {
|
| + MutexLocker locker(&mu);
|
| + cookie_to_dispatcher_map.erase(cookies[j]);
|
| + }
|
| + EXPECT_EQ(MOJO_RESULT_OK, dispatchers[j]->Close());
|
| + dispatchers.erase(dispatchers.begin() + j);
|
| + cookies.erase(cookies.begin() + j);
|
| + }
|
| + }
|
| +
|
| + // Remove remaining entries.
|
| + for (auto cookie : cookies)
|
| + EXPECT_EQ(MOJO_RESULT_OK, wait_set->WaitSetRemove(cookie));
|
| +
|
| + // Close all our dispatchers.
|
| + for (auto& dispatcher : dispatchers)
|
| + EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
|
| +
|
| + VLOG(1) << "Worker thread #" << i << ": total_adds = " << total_adds
|
| + << ", total_triggers = " << total_triggers
|
| + << ", total_removes = " << total_removes;
|
| + }));
|
| + }
|
| +
|
| + // Main thread work: just sleep and then signal |quit|.
|
| + ThreadSleep(kTestRunTime);
|
| + quit->SetSatisfiedSignals(kSignal);
|
| +
|
| + for (auto& t : threads)
|
| + t.join();
|
| +
|
| + EXPECT_EQ(MOJO_RESULT_OK, quit->Close());
|
| + EXPECT_EQ(MOJO_RESULT_OK, wait_set->Close());
|
| +}
|
|
|
| // TODO(vtl): Test options validation for "create" and "add" (not that there's
|
| // much to test).
|
|
|