OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015-2016, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 #include <deque> |
| 35 #include <list> |
| 36 #include <thread> |
| 37 #include <unordered_map> |
| 38 #include <vector> |
| 39 |
| 40 #include <grpc++/channel.h> |
| 41 #include <grpc++/client_context.h> |
| 42 #include <grpc++/create_channel.h> |
| 43 #include <grpc/support/alloc.h> |
| 44 #include <grpc/support/host_port.h> |
| 45 #include <grpc/support/log.h> |
| 46 |
| 47 #include "src/core/support/env.h" |
| 48 #include "src/proto/grpc/testing/services.grpc.pb.h" |
| 49 #include "test/core/util/port.h" |
| 50 #include "test/core/util/test_config.h" |
| 51 #include "test/cpp/qps/driver.h" |
| 52 #include "test/cpp/qps/histogram.h" |
| 53 #include "test/cpp/qps/qps_worker.h" |
| 54 |
| 55 using std::list; |
| 56 using std::thread; |
| 57 using std::unique_ptr; |
| 58 using std::deque; |
| 59 using std::vector; |
| 60 |
| 61 namespace grpc { |
| 62 namespace testing { |
| 63 static std::string get_host(const std::string& worker) { |
| 64 char* host; |
| 65 char* port; |
| 66 |
| 67 gpr_split_host_port(worker.c_str(), &host, &port); |
| 68 const string s(host); |
| 69 |
| 70 gpr_free(host); |
| 71 gpr_free(port); |
| 72 return s; |
| 73 } |
| 74 |
| 75 static std::unordered_map<string, std::deque<int>> get_hosts_and_cores( |
| 76 const deque<string>& workers) { |
| 77 std::unordered_map<string, std::deque<int>> hosts; |
| 78 for (auto it = workers.begin(); it != workers.end(); it++) { |
| 79 const string host = get_host(*it); |
| 80 if (hosts.find(host) == hosts.end()) { |
| 81 auto stub = WorkerService::NewStub( |
| 82 CreateChannel(*it, InsecureChannelCredentials())); |
| 83 grpc::ClientContext ctx; |
| 84 CoreRequest dummy; |
| 85 CoreResponse cores; |
| 86 grpc::Status s = stub->CoreCount(&ctx, dummy, &cores); |
| 87 assert(s.ok()); |
| 88 std::deque<int> dq; |
| 89 for (int i = 0; i < cores.cores(); i++) { |
| 90 dq.push_back(i); |
| 91 } |
| 92 hosts[host] = dq; |
| 93 } |
| 94 } |
| 95 return hosts; |
| 96 } |
| 97 |
| 98 static deque<string> get_workers(const string& name) { |
| 99 char* env = gpr_getenv(name.c_str()); |
| 100 if (!env) return deque<string>(); |
| 101 |
| 102 deque<string> out; |
| 103 char* p = env; |
| 104 for (;;) { |
| 105 char* comma = strchr(p, ','); |
| 106 if (comma) { |
| 107 out.emplace_back(p, comma); |
| 108 p = comma + 1; |
| 109 } else { |
| 110 out.emplace_back(p); |
| 111 gpr_free(env); |
| 112 return out; |
| 113 } |
| 114 } |
| 115 } |
| 116 |
| 117 // Namespace for classes and functions used only in RunScenario |
| 118 // Using this rather than local definitions to workaround gcc-4.4 limitations |
| 119 // regarding using templates without linkage |
| 120 namespace runsc { |
| 121 |
| 122 // ClientContext allocator |
| 123 template <class T> |
| 124 static ClientContext* AllocContext(list<ClientContext>* contexts, T deadline) { |
| 125 contexts->emplace_back(); |
| 126 auto context = &contexts->back(); |
| 127 context->set_deadline(deadline); |
| 128 return context; |
| 129 } |
| 130 |
| 131 struct ServerData { |
| 132 unique_ptr<WorkerService::Stub> stub; |
| 133 unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream; |
| 134 }; |
| 135 |
| 136 struct ClientData { |
| 137 unique_ptr<WorkerService::Stub> stub; |
| 138 unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream; |
| 139 }; |
| 140 } // namespace runsc |
| 141 |
| 142 std::unique_ptr<ScenarioResult> RunScenario( |
| 143 const ClientConfig& initial_client_config, size_t num_clients, |
| 144 const ServerConfig& initial_server_config, size_t num_servers, |
| 145 int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) { |
| 146 // ClientContext allocations (all are destroyed at scope exit) |
| 147 list<ClientContext> contexts; |
| 148 |
| 149 // To be added to the result, containing the final configuration used for |
| 150 // client and config (including host, etc.) |
| 151 ClientConfig result_client_config; |
| 152 const ServerConfig result_server_config = initial_server_config; |
| 153 |
| 154 // Get client, server lists |
| 155 auto workers = get_workers("QPS_WORKERS"); |
| 156 ClientConfig client_config = initial_client_config; |
| 157 |
| 158 // Spawn some local workers if desired |
| 159 vector<unique_ptr<QpsWorker>> local_workers; |
| 160 for (int i = 0; i < abs(spawn_local_worker_count); i++) { |
| 161 // act as if we're a new test -- gets a good rng seed |
| 162 static bool called_init = false; |
| 163 if (!called_init) { |
| 164 char args_buf[100]; |
| 165 strcpy(args_buf, "some-benchmark"); |
| 166 char* args[] = {args_buf}; |
| 167 grpc_test_init(1, args); |
| 168 called_init = true; |
| 169 } |
| 170 |
| 171 int driver_port = grpc_pick_unused_port_or_die(); |
| 172 local_workers.emplace_back(new QpsWorker(driver_port)); |
| 173 char addr[256]; |
| 174 sprintf(addr, "localhost:%d", driver_port); |
| 175 if (spawn_local_worker_count < 0) { |
| 176 workers.push_front(addr); |
| 177 } else { |
| 178 workers.push_back(addr); |
| 179 } |
| 180 } |
| 181 |
| 182 // Setup the hosts and core counts |
| 183 auto hosts_cores = get_hosts_and_cores(workers); |
| 184 |
| 185 // if num_clients is set to <=0, do dynamic sizing: all workers |
| 186 // except for servers are clients |
| 187 if (num_clients <= 0) { |
| 188 num_clients = workers.size() - num_servers; |
| 189 } |
| 190 |
| 191 // TODO(ctiller): support running multiple configurations, and binpack |
| 192 // client/server pairs |
| 193 // to available workers |
| 194 GPR_ASSERT(workers.size() >= num_clients + num_servers); |
| 195 |
| 196 // Trim to just what we need |
| 197 workers.resize(num_clients + num_servers); |
| 198 |
| 199 gpr_timespec deadline = |
| 200 GRPC_TIMEOUT_SECONDS_TO_DEADLINE(warmup_seconds + benchmark_seconds + 20); |
| 201 |
| 202 // Start servers |
| 203 using runsc::ServerData; |
| 204 // servers is array rather than std::vector to avoid gcc-4.4 issues |
| 205 // where class contained in std::vector must have a copy constructor |
| 206 auto* servers = new ServerData[num_servers]; |
| 207 for (size_t i = 0; i < num_servers; i++) { |
| 208 gpr_log(GPR_INFO, "Starting server on %s (worker #%d)", workers[i].c_str(), |
| 209 i); |
| 210 servers[i].stub = WorkerService::NewStub( |
| 211 CreateChannel(workers[i], InsecureChannelCredentials())); |
| 212 |
| 213 ServerConfig server_config = initial_server_config; |
| 214 char* host; |
| 215 char* driver_port; |
| 216 char* cli_target; |
| 217 gpr_split_host_port(workers[i].c_str(), &host, &driver_port); |
| 218 string host_str(host); |
| 219 int server_core_limit = initial_server_config.core_limit(); |
| 220 int client_core_limit = initial_client_config.core_limit(); |
| 221 |
| 222 if (server_core_limit == 0 && client_core_limit > 0) { |
| 223 // In this case, limit the server cores if it matches the |
| 224 // same host as one or more clients |
| 225 const auto& dq = hosts_cores.at(host_str); |
| 226 bool match = false; |
| 227 int limit = dq.size(); |
| 228 for (size_t cli = 0; cli < num_clients; cli++) { |
| 229 if (host_str == get_host(workers[cli + num_servers])) { |
| 230 limit -= client_core_limit; |
| 231 match = true; |
| 232 } |
| 233 } |
| 234 if (match) { |
| 235 GPR_ASSERT(limit > 0); |
| 236 server_core_limit = limit; |
| 237 } |
| 238 } |
| 239 if (server_core_limit > 0) { |
| 240 auto& dq = hosts_cores.at(host_str); |
| 241 GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit)); |
| 242 for (int core = 0; core < server_core_limit; core++) { |
| 243 server_config.add_core_list(dq.front()); |
| 244 dq.pop_front(); |
| 245 } |
| 246 } |
| 247 |
| 248 ServerArgs args; |
| 249 *args.mutable_setup() = server_config; |
| 250 servers[i].stream = |
| 251 servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline)); |
| 252 GPR_ASSERT(servers[i].stream->Write(args)); |
| 253 ServerStatus init_status; |
| 254 GPR_ASSERT(servers[i].stream->Read(&init_status)); |
| 255 gpr_join_host_port(&cli_target, host, init_status.port()); |
| 256 client_config.add_server_targets(cli_target); |
| 257 gpr_free(host); |
| 258 gpr_free(driver_port); |
| 259 gpr_free(cli_target); |
| 260 } |
| 261 |
| 262 // Targets are all set by now |
| 263 result_client_config = client_config; |
| 264 // Start clients |
| 265 using runsc::ClientData; |
| 266 // clients is array rather than std::vector to avoid gcc-4.4 issues |
| 267 // where class contained in std::vector must have a copy constructor |
| 268 auto* clients = new ClientData[num_clients]; |
| 269 for (size_t i = 0; i < num_clients; i++) { |
| 270 const auto& worker = workers[i + num_servers]; |
| 271 gpr_log(GPR_INFO, "Starting client on %s (worker #%d)", worker.c_str(), |
| 272 i + num_servers); |
| 273 clients[i].stub = WorkerService::NewStub( |
| 274 CreateChannel(worker, InsecureChannelCredentials())); |
| 275 ClientConfig per_client_config = client_config; |
| 276 |
| 277 int server_core_limit = initial_server_config.core_limit(); |
| 278 int client_core_limit = initial_client_config.core_limit(); |
| 279 if ((server_core_limit > 0) || (client_core_limit > 0)) { |
| 280 auto& dq = hosts_cores.at(get_host(worker)); |
| 281 if (client_core_limit == 0) { |
| 282 // limit client cores if it matches a server host |
| 283 bool match = false; |
| 284 int limit = dq.size(); |
| 285 for (size_t srv = 0; srv < num_servers; srv++) { |
| 286 if (get_host(worker) == get_host(workers[srv])) { |
| 287 match = true; |
| 288 } |
| 289 } |
| 290 if (match) { |
| 291 GPR_ASSERT(limit > 0); |
| 292 client_core_limit = limit; |
| 293 } |
| 294 } |
| 295 if (client_core_limit > 0) { |
| 296 GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit)); |
| 297 for (int core = 0; core < client_core_limit; core++) { |
| 298 per_client_config.add_core_list(dq.front()); |
| 299 dq.pop_front(); |
| 300 } |
| 301 } |
| 302 } |
| 303 |
| 304 ClientArgs args; |
| 305 *args.mutable_setup() = per_client_config; |
| 306 clients[i].stream = |
| 307 clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline)); |
| 308 GPR_ASSERT(clients[i].stream->Write(args)); |
| 309 ClientStatus init_status; |
| 310 GPR_ASSERT(clients[i].stream->Read(&init_status)); |
| 311 } |
| 312 |
| 313 // Let everything warmup |
| 314 gpr_log(GPR_INFO, "Warming up"); |
| 315 gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME); |
| 316 gpr_sleep_until( |
| 317 gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN))); |
| 318 |
| 319 // Start a run |
| 320 gpr_log(GPR_INFO, "Starting"); |
| 321 ServerArgs server_mark; |
| 322 server_mark.mutable_mark()->set_reset(true); |
| 323 ClientArgs client_mark; |
| 324 client_mark.mutable_mark()->set_reset(true); |
| 325 for (auto server = &servers[0]; server != &servers[num_servers]; server++) { |
| 326 GPR_ASSERT(server->stream->Write(server_mark)); |
| 327 } |
| 328 for (auto client = &clients[0]; client != &clients[num_clients]; client++) { |
| 329 GPR_ASSERT(client->stream->Write(client_mark)); |
| 330 } |
| 331 ServerStatus server_status; |
| 332 ClientStatus client_status; |
| 333 for (auto server = &servers[0]; server != &servers[num_servers]; server++) { |
| 334 GPR_ASSERT(server->stream->Read(&server_status)); |
| 335 } |
| 336 for (auto client = &clients[0]; client != &clients[num_clients]; client++) { |
| 337 GPR_ASSERT(client->stream->Read(&client_status)); |
| 338 } |
| 339 |
| 340 // Wait some time |
| 341 gpr_log(GPR_INFO, "Running"); |
| 342 // Use gpr_sleep_until rather than this_thread::sleep_until to support |
| 343 // compilers that don't work with this_thread |
| 344 gpr_sleep_until(gpr_time_add( |
| 345 start, gpr_time_from_seconds(benchmark_seconds, GPR_TIMESPAN))); |
| 346 |
| 347 // Finish a run |
| 348 std::unique_ptr<ScenarioResult> result(new ScenarioResult); |
| 349 result->client_config = result_client_config; |
| 350 result->server_config = result_server_config; |
| 351 gpr_log(GPR_INFO, "Finishing"); |
| 352 for (auto server = &servers[0]; server != &servers[num_servers]; server++) { |
| 353 GPR_ASSERT(server->stream->Write(server_mark)); |
| 354 } |
| 355 for (auto client = &clients[0]; client != &clients[num_clients]; client++) { |
| 356 GPR_ASSERT(client->stream->Write(client_mark)); |
| 357 } |
| 358 for (auto server = &servers[0]; server != &servers[num_servers]; server++) { |
| 359 GPR_ASSERT(server->stream->Read(&server_status)); |
| 360 const auto& stats = server_status.stats(); |
| 361 result->server_resources.emplace_back( |
| 362 stats.time_elapsed(), stats.time_user(), stats.time_system(), |
| 363 server_status.cores()); |
| 364 } |
| 365 for (auto client = &clients[0]; client != &clients[num_clients]; client++) { |
| 366 GPR_ASSERT(client->stream->Read(&client_status)); |
| 367 const auto& stats = client_status.stats(); |
| 368 result->latencies.MergeProto(stats.latencies()); |
| 369 result->client_resources.emplace_back( |
| 370 stats.time_elapsed(), stats.time_user(), stats.time_system(), -1); |
| 371 } |
| 372 |
| 373 for (auto client = &clients[0]; client != &clients[num_clients]; client++) { |
| 374 GPR_ASSERT(client->stream->WritesDone()); |
| 375 GPR_ASSERT(client->stream->Finish().ok()); |
| 376 } |
| 377 for (auto server = &servers[0]; server != &servers[num_servers]; server++) { |
| 378 GPR_ASSERT(server->stream->WritesDone()); |
| 379 GPR_ASSERT(server->stream->Finish().ok()); |
| 380 } |
| 381 delete[] clients; |
| 382 delete[] servers; |
| 383 return result; |
| 384 } |
| 385 |
| 386 void RunQuit() { |
| 387 // Get client, server lists |
| 388 auto workers = get_workers("QPS_WORKERS"); |
| 389 for (size_t i = 0; i < workers.size(); i++) { |
| 390 auto stub = WorkerService::NewStub( |
| 391 CreateChannel(workers[i], InsecureChannelCredentials())); |
| 392 Void dummy; |
| 393 grpc::ClientContext ctx; |
| 394 GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok()); |
| 395 } |
| 396 } |
| 397 |
| 398 } // namespace testing |
| 399 } // namespace grpc |
OLD | NEW |