Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(370)

Unified Diff: third_party/grpc/test/cpp/qps/driver.cc

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/test/cpp/qps/driver.cc
diff --git a/third_party/grpc/test/cpp/qps/driver.cc b/third_party/grpc/test/cpp/qps/driver.cc
new file mode 100644
index 0000000000000000000000000000000000000000..1c7fdf8796090053dbb3d27c75e4f87e029f6a16
--- /dev/null
+++ b/third_party/grpc/test/cpp/qps/driver.cc
@@ -0,0 +1,399 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <deque>
+#include <list>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include <grpc++/channel.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+
+#include "src/core/support/env.h"
+#include "src/proto/grpc/testing/services.grpc.pb.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/qps/driver.h"
+#include "test/cpp/qps/histogram.h"
+#include "test/cpp/qps/qps_worker.h"
+
+using std::list;
+using std::thread;
+using std::unique_ptr;
+using std::deque;
+using std::vector;
+
+namespace grpc {
+namespace testing {
+static std::string get_host(const std::string& worker) {
+ char* host;
+ char* port;
+
+ gpr_split_host_port(worker.c_str(), &host, &port);
+ const string s(host);
+
+ gpr_free(host);
+ gpr_free(port);
+ return s;
+}
+
+static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
+ const deque<string>& workers) {
+ std::unordered_map<string, std::deque<int>> hosts;
+ for (auto it = workers.begin(); it != workers.end(); it++) {
+ const string host = get_host(*it);
+ if (hosts.find(host) == hosts.end()) {
+ auto stub = WorkerService::NewStub(
+ CreateChannel(*it, InsecureChannelCredentials()));
+ grpc::ClientContext ctx;
+ CoreRequest dummy;
+ CoreResponse cores;
+ grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
+ assert(s.ok());
+ std::deque<int> dq;
+ for (int i = 0; i < cores.cores(); i++) {
+ dq.push_back(i);
+ }
+ hosts[host] = dq;
+ }
+ }
+ return hosts;
+}
+
+static deque<string> get_workers(const string& name) {
+ char* env = gpr_getenv(name.c_str());
+ if (!env) return deque<string>();
+
+ deque<string> out;
+ char* p = env;
+ for (;;) {
+ char* comma = strchr(p, ',');
+ if (comma) {
+ out.emplace_back(p, comma);
+ p = comma + 1;
+ } else {
+ out.emplace_back(p);
+ gpr_free(env);
+ return out;
+ }
+ }
+}
+
+// Namespace for classes and functions used only in RunScenario
+// Using this rather than local definitions to workaround gcc-4.4 limitations
+// regarding using templates without linkage
+namespace runsc {
+
+// ClientContext allocator
+template <class T>
+static ClientContext* AllocContext(list<ClientContext>* contexts, T deadline) {
+ contexts->emplace_back();
+ auto context = &contexts->back();
+ context->set_deadline(deadline);
+ return context;
+}
+
+struct ServerData {
+ unique_ptr<WorkerService::Stub> stub;
+ unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
+};
+
+struct ClientData {
+ unique_ptr<WorkerService::Stub> stub;
+ unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
+};
+} // namespace runsc
+
+std::unique_ptr<ScenarioResult> RunScenario(
+ const ClientConfig& initial_client_config, size_t num_clients,
+ const ServerConfig& initial_server_config, size_t num_servers,
+ int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) {
+ // ClientContext allocations (all are destroyed at scope exit)
+ list<ClientContext> contexts;
+
+ // To be added to the result, containing the final configuration used for
+ // client and config (including host, etc.)
+ ClientConfig result_client_config;
+ const ServerConfig result_server_config = initial_server_config;
+
+ // Get client, server lists
+ auto workers = get_workers("QPS_WORKERS");
+ ClientConfig client_config = initial_client_config;
+
+ // Spawn some local workers if desired
+ vector<unique_ptr<QpsWorker>> local_workers;
+ for (int i = 0; i < abs(spawn_local_worker_count); i++) {
+ // act as if we're a new test -- gets a good rng seed
+ static bool called_init = false;
+ if (!called_init) {
+ char args_buf[100];
+ strcpy(args_buf, "some-benchmark");
+ char* args[] = {args_buf};
+ grpc_test_init(1, args);
+ called_init = true;
+ }
+
+ int driver_port = grpc_pick_unused_port_or_die();
+ local_workers.emplace_back(new QpsWorker(driver_port));
+ char addr[256];
+ sprintf(addr, "localhost:%d", driver_port);
+ if (spawn_local_worker_count < 0) {
+ workers.push_front(addr);
+ } else {
+ workers.push_back(addr);
+ }
+ }
+
+ // Setup the hosts and core counts
+ auto hosts_cores = get_hosts_and_cores(workers);
+
+ // if num_clients is set to <=0, do dynamic sizing: all workers
+ // except for servers are clients
+ if (num_clients <= 0) {
+ num_clients = workers.size() - num_servers;
+ }
+
+ // TODO(ctiller): support running multiple configurations, and binpack
+ // client/server pairs
+ // to available workers
+ GPR_ASSERT(workers.size() >= num_clients + num_servers);
+
+ // Trim to just what we need
+ workers.resize(num_clients + num_servers);
+
+ gpr_timespec deadline =
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(warmup_seconds + benchmark_seconds + 20);
+
+ // Start servers
+ using runsc::ServerData;
+ // servers is array rather than std::vector to avoid gcc-4.4 issues
+ // where class contained in std::vector must have a copy constructor
+ auto* servers = new ServerData[num_servers];
+ for (size_t i = 0; i < num_servers; i++) {
+ gpr_log(GPR_INFO, "Starting server on %s (worker #%d)", workers[i].c_str(),
+ i);
+ servers[i].stub = WorkerService::NewStub(
+ CreateChannel(workers[i], InsecureChannelCredentials()));
+
+ ServerConfig server_config = initial_server_config;
+ char* host;
+ char* driver_port;
+ char* cli_target;
+ gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
+ string host_str(host);
+ int server_core_limit = initial_server_config.core_limit();
+ int client_core_limit = initial_client_config.core_limit();
+
+ if (server_core_limit == 0 && client_core_limit > 0) {
+ // In this case, limit the server cores if it matches the
+ // same host as one or more clients
+ const auto& dq = hosts_cores.at(host_str);
+ bool match = false;
+ int limit = dq.size();
+ for (size_t cli = 0; cli < num_clients; cli++) {
+ if (host_str == get_host(workers[cli + num_servers])) {
+ limit -= client_core_limit;
+ match = true;
+ }
+ }
+ if (match) {
+ GPR_ASSERT(limit > 0);
+ server_core_limit = limit;
+ }
+ }
+ if (server_core_limit > 0) {
+ auto& dq = hosts_cores.at(host_str);
+ GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit));
+ for (int core = 0; core < server_core_limit; core++) {
+ server_config.add_core_list(dq.front());
+ dq.pop_front();
+ }
+ }
+
+ ServerArgs args;
+ *args.mutable_setup() = server_config;
+ servers[i].stream =
+ servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline));
+ GPR_ASSERT(servers[i].stream->Write(args));
+ ServerStatus init_status;
+ GPR_ASSERT(servers[i].stream->Read(&init_status));
+ gpr_join_host_port(&cli_target, host, init_status.port());
+ client_config.add_server_targets(cli_target);
+ gpr_free(host);
+ gpr_free(driver_port);
+ gpr_free(cli_target);
+ }
+
+ // Targets are all set by now
+ result_client_config = client_config;
+ // Start clients
+ using runsc::ClientData;
+ // clients is array rather than std::vector to avoid gcc-4.4 issues
+ // where class contained in std::vector must have a copy constructor
+ auto* clients = new ClientData[num_clients];
+ for (size_t i = 0; i < num_clients; i++) {
+ const auto& worker = workers[i + num_servers];
+ gpr_log(GPR_INFO, "Starting client on %s (worker #%d)", worker.c_str(),
+ i + num_servers);
+ clients[i].stub = WorkerService::NewStub(
+ CreateChannel(worker, InsecureChannelCredentials()));
+ ClientConfig per_client_config = client_config;
+
+ int server_core_limit = initial_server_config.core_limit();
+ int client_core_limit = initial_client_config.core_limit();
+ if ((server_core_limit > 0) || (client_core_limit > 0)) {
+ auto& dq = hosts_cores.at(get_host(worker));
+ if (client_core_limit == 0) {
+ // limit client cores if it matches a server host
+ bool match = false;
+ int limit = dq.size();
+ for (size_t srv = 0; srv < num_servers; srv++) {
+ if (get_host(worker) == get_host(workers[srv])) {
+ match = true;
+ }
+ }
+ if (match) {
+ GPR_ASSERT(limit > 0);
+ client_core_limit = limit;
+ }
+ }
+ if (client_core_limit > 0) {
+ GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit));
+ for (int core = 0; core < client_core_limit; core++) {
+ per_client_config.add_core_list(dq.front());
+ dq.pop_front();
+ }
+ }
+ }
+
+ ClientArgs args;
+ *args.mutable_setup() = per_client_config;
+ clients[i].stream =
+ clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline));
+ GPR_ASSERT(clients[i].stream->Write(args));
+ ClientStatus init_status;
+ GPR_ASSERT(clients[i].stream->Read(&init_status));
+ }
+
+ // Let everything warmup
+ gpr_log(GPR_INFO, "Warming up");
+ gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
+ gpr_sleep_until(
+ gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN)));
+
+ // Start a run
+ gpr_log(GPR_INFO, "Starting");
+ ServerArgs server_mark;
+ server_mark.mutable_mark()->set_reset(true);
+ ClientArgs client_mark;
+ client_mark.mutable_mark()->set_reset(true);
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
+ GPR_ASSERT(server->stream->Write(server_mark));
+ }
+ for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
+ GPR_ASSERT(client->stream->Write(client_mark));
+ }
+ ServerStatus server_status;
+ ClientStatus client_status;
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
+ GPR_ASSERT(server->stream->Read(&server_status));
+ }
+ for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
+ GPR_ASSERT(client->stream->Read(&client_status));
+ }
+
+ // Wait some time
+ gpr_log(GPR_INFO, "Running");
+ // Use gpr_sleep_until rather than this_thread::sleep_until to support
+ // compilers that don't work with this_thread
+ gpr_sleep_until(gpr_time_add(
+ start, gpr_time_from_seconds(benchmark_seconds, GPR_TIMESPAN)));
+
+ // Finish a run
+ std::unique_ptr<ScenarioResult> result(new ScenarioResult);
+ result->client_config = result_client_config;
+ result->server_config = result_server_config;
+ gpr_log(GPR_INFO, "Finishing");
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
+ GPR_ASSERT(server->stream->Write(server_mark));
+ }
+ for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
+ GPR_ASSERT(client->stream->Write(client_mark));
+ }
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
+ GPR_ASSERT(server->stream->Read(&server_status));
+ const auto& stats = server_status.stats();
+ result->server_resources.emplace_back(
+ stats.time_elapsed(), stats.time_user(), stats.time_system(),
+ server_status.cores());
+ }
+ for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
+ GPR_ASSERT(client->stream->Read(&client_status));
+ const auto& stats = client_status.stats();
+ result->latencies.MergeProto(stats.latencies());
+ result->client_resources.emplace_back(
+ stats.time_elapsed(), stats.time_user(), stats.time_system(), -1);
+ }
+
+ for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
+ GPR_ASSERT(client->stream->WritesDone());
+ GPR_ASSERT(client->stream->Finish().ok());
+ }
+ for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
+ GPR_ASSERT(server->stream->WritesDone());
+ GPR_ASSERT(server->stream->Finish().ok());
+ }
+ delete[] clients;
+ delete[] servers;
+ return result;
+}
+
+void RunQuit() {
+ // Get client, server lists
+ auto workers = get_workers("QPS_WORKERS");
+ for (size_t i = 0; i < workers.size(); i++) {
+ auto stub = WorkerService::NewStub(
+ CreateChannel(workers[i], InsecureChannelCredentials()));
+ Void dummy;
+ grpc::ClientContext ctx;
+ GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
+ }
+}
+
+} // namespace testing
+} // namespace grpc
« no previous file with comments | « third_party/grpc/test/cpp/qps/driver.h ('k') | third_party/grpc/test/cpp/qps/generic_async_streaming_ping_pong_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698