Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: third_party/grpc/test/cpp/qps/driver.cc

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include <deque>
35 #include <list>
36 #include <thread>
37 #include <unordered_map>
38 #include <vector>
39
40 #include <grpc++/channel.h>
41 #include <grpc++/client_context.h>
42 #include <grpc++/create_channel.h>
43 #include <grpc/support/alloc.h>
44 #include <grpc/support/host_port.h>
45 #include <grpc/support/log.h>
46
47 #include "src/core/support/env.h"
48 #include "src/proto/grpc/testing/services.grpc.pb.h"
49 #include "test/core/util/port.h"
50 #include "test/core/util/test_config.h"
51 #include "test/cpp/qps/driver.h"
52 #include "test/cpp/qps/histogram.h"
53 #include "test/cpp/qps/qps_worker.h"
54
55 using std::list;
56 using std::thread;
57 using std::unique_ptr;
58 using std::deque;
59 using std::vector;
60
61 namespace grpc {
62 namespace testing {
63 static std::string get_host(const std::string& worker) {
64 char* host;
65 char* port;
66
67 gpr_split_host_port(worker.c_str(), &host, &port);
68 const string s(host);
69
70 gpr_free(host);
71 gpr_free(port);
72 return s;
73 }
74
75 static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
76 const deque<string>& workers) {
77 std::unordered_map<string, std::deque<int>> hosts;
78 for (auto it = workers.begin(); it != workers.end(); it++) {
79 const string host = get_host(*it);
80 if (hosts.find(host) == hosts.end()) {
81 auto stub = WorkerService::NewStub(
82 CreateChannel(*it, InsecureChannelCredentials()));
83 grpc::ClientContext ctx;
84 CoreRequest dummy;
85 CoreResponse cores;
86 grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
87 assert(s.ok());
88 std::deque<int> dq;
89 for (int i = 0; i < cores.cores(); i++) {
90 dq.push_back(i);
91 }
92 hosts[host] = dq;
93 }
94 }
95 return hosts;
96 }
97
98 static deque<string> get_workers(const string& name) {
99 char* env = gpr_getenv(name.c_str());
100 if (!env) return deque<string>();
101
102 deque<string> out;
103 char* p = env;
104 for (;;) {
105 char* comma = strchr(p, ',');
106 if (comma) {
107 out.emplace_back(p, comma);
108 p = comma + 1;
109 } else {
110 out.emplace_back(p);
111 gpr_free(env);
112 return out;
113 }
114 }
115 }
116
117 // Namespace for classes and functions used only in RunScenario
118 // Using this rather than local definitions to workaround gcc-4.4 limitations
119 // regarding using templates without linkage
120 namespace runsc {
121
122 // ClientContext allocator
123 template <class T>
124 static ClientContext* AllocContext(list<ClientContext>* contexts, T deadline) {
125 contexts->emplace_back();
126 auto context = &contexts->back();
127 context->set_deadline(deadline);
128 return context;
129 }
130
131 struct ServerData {
132 unique_ptr<WorkerService::Stub> stub;
133 unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
134 };
135
136 struct ClientData {
137 unique_ptr<WorkerService::Stub> stub;
138 unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
139 };
140 } // namespace runsc
141
142 std::unique_ptr<ScenarioResult> RunScenario(
143 const ClientConfig& initial_client_config, size_t num_clients,
144 const ServerConfig& initial_server_config, size_t num_servers,
145 int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) {
146 // ClientContext allocations (all are destroyed at scope exit)
147 list<ClientContext> contexts;
148
149 // To be added to the result, containing the final configuration used for
150 // client and config (including host, etc.)
151 ClientConfig result_client_config;
152 const ServerConfig result_server_config = initial_server_config;
153
154 // Get client, server lists
155 auto workers = get_workers("QPS_WORKERS");
156 ClientConfig client_config = initial_client_config;
157
158 // Spawn some local workers if desired
159 vector<unique_ptr<QpsWorker>> local_workers;
160 for (int i = 0; i < abs(spawn_local_worker_count); i++) {
161 // act as if we're a new test -- gets a good rng seed
162 static bool called_init = false;
163 if (!called_init) {
164 char args_buf[100];
165 strcpy(args_buf, "some-benchmark");
166 char* args[] = {args_buf};
167 grpc_test_init(1, args);
168 called_init = true;
169 }
170
171 int driver_port = grpc_pick_unused_port_or_die();
172 local_workers.emplace_back(new QpsWorker(driver_port));
173 char addr[256];
174 sprintf(addr, "localhost:%d", driver_port);
175 if (spawn_local_worker_count < 0) {
176 workers.push_front(addr);
177 } else {
178 workers.push_back(addr);
179 }
180 }
181
182 // Setup the hosts and core counts
183 auto hosts_cores = get_hosts_and_cores(workers);
184
185 // if num_clients is set to <=0, do dynamic sizing: all workers
186 // except for servers are clients
187 if (num_clients <= 0) {
188 num_clients = workers.size() - num_servers;
189 }
190
191 // TODO(ctiller): support running multiple configurations, and binpack
192 // client/server pairs
193 // to available workers
194 GPR_ASSERT(workers.size() >= num_clients + num_servers);
195
196 // Trim to just what we need
197 workers.resize(num_clients + num_servers);
198
199 gpr_timespec deadline =
200 GRPC_TIMEOUT_SECONDS_TO_DEADLINE(warmup_seconds + benchmark_seconds + 20);
201
202 // Start servers
203 using runsc::ServerData;
204 // servers is array rather than std::vector to avoid gcc-4.4 issues
205 // where class contained in std::vector must have a copy constructor
206 auto* servers = new ServerData[num_servers];
207 for (size_t i = 0; i < num_servers; i++) {
208 gpr_log(GPR_INFO, "Starting server on %s (worker #%d)", workers[i].c_str(),
209 i);
210 servers[i].stub = WorkerService::NewStub(
211 CreateChannel(workers[i], InsecureChannelCredentials()));
212
213 ServerConfig server_config = initial_server_config;
214 char* host;
215 char* driver_port;
216 char* cli_target;
217 gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
218 string host_str(host);
219 int server_core_limit = initial_server_config.core_limit();
220 int client_core_limit = initial_client_config.core_limit();
221
222 if (server_core_limit == 0 && client_core_limit > 0) {
223 // In this case, limit the server cores if it matches the
224 // same host as one or more clients
225 const auto& dq = hosts_cores.at(host_str);
226 bool match = false;
227 int limit = dq.size();
228 for (size_t cli = 0; cli < num_clients; cli++) {
229 if (host_str == get_host(workers[cli + num_servers])) {
230 limit -= client_core_limit;
231 match = true;
232 }
233 }
234 if (match) {
235 GPR_ASSERT(limit > 0);
236 server_core_limit = limit;
237 }
238 }
239 if (server_core_limit > 0) {
240 auto& dq = hosts_cores.at(host_str);
241 GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit));
242 for (int core = 0; core < server_core_limit; core++) {
243 server_config.add_core_list(dq.front());
244 dq.pop_front();
245 }
246 }
247
248 ServerArgs args;
249 *args.mutable_setup() = server_config;
250 servers[i].stream =
251 servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline));
252 GPR_ASSERT(servers[i].stream->Write(args));
253 ServerStatus init_status;
254 GPR_ASSERT(servers[i].stream->Read(&init_status));
255 gpr_join_host_port(&cli_target, host, init_status.port());
256 client_config.add_server_targets(cli_target);
257 gpr_free(host);
258 gpr_free(driver_port);
259 gpr_free(cli_target);
260 }
261
262 // Targets are all set by now
263 result_client_config = client_config;
264 // Start clients
265 using runsc::ClientData;
266 // clients is array rather than std::vector to avoid gcc-4.4 issues
267 // where class contained in std::vector must have a copy constructor
268 auto* clients = new ClientData[num_clients];
269 for (size_t i = 0; i < num_clients; i++) {
270 const auto& worker = workers[i + num_servers];
271 gpr_log(GPR_INFO, "Starting client on %s (worker #%d)", worker.c_str(),
272 i + num_servers);
273 clients[i].stub = WorkerService::NewStub(
274 CreateChannel(worker, InsecureChannelCredentials()));
275 ClientConfig per_client_config = client_config;
276
277 int server_core_limit = initial_server_config.core_limit();
278 int client_core_limit = initial_client_config.core_limit();
279 if ((server_core_limit > 0) || (client_core_limit > 0)) {
280 auto& dq = hosts_cores.at(get_host(worker));
281 if (client_core_limit == 0) {
282 // limit client cores if it matches a server host
283 bool match = false;
284 int limit = dq.size();
285 for (size_t srv = 0; srv < num_servers; srv++) {
286 if (get_host(worker) == get_host(workers[srv])) {
287 match = true;
288 }
289 }
290 if (match) {
291 GPR_ASSERT(limit > 0);
292 client_core_limit = limit;
293 }
294 }
295 if (client_core_limit > 0) {
296 GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit));
297 for (int core = 0; core < client_core_limit; core++) {
298 per_client_config.add_core_list(dq.front());
299 dq.pop_front();
300 }
301 }
302 }
303
304 ClientArgs args;
305 *args.mutable_setup() = per_client_config;
306 clients[i].stream =
307 clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline));
308 GPR_ASSERT(clients[i].stream->Write(args));
309 ClientStatus init_status;
310 GPR_ASSERT(clients[i].stream->Read(&init_status));
311 }
312
313 // Let everything warmup
314 gpr_log(GPR_INFO, "Warming up");
315 gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
316 gpr_sleep_until(
317 gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN)));
318
319 // Start a run
320 gpr_log(GPR_INFO, "Starting");
321 ServerArgs server_mark;
322 server_mark.mutable_mark()->set_reset(true);
323 ClientArgs client_mark;
324 client_mark.mutable_mark()->set_reset(true);
325 for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
326 GPR_ASSERT(server->stream->Write(server_mark));
327 }
328 for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
329 GPR_ASSERT(client->stream->Write(client_mark));
330 }
331 ServerStatus server_status;
332 ClientStatus client_status;
333 for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
334 GPR_ASSERT(server->stream->Read(&server_status));
335 }
336 for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
337 GPR_ASSERT(client->stream->Read(&client_status));
338 }
339
340 // Wait some time
341 gpr_log(GPR_INFO, "Running");
342 // Use gpr_sleep_until rather than this_thread::sleep_until to support
343 // compilers that don't work with this_thread
344 gpr_sleep_until(gpr_time_add(
345 start, gpr_time_from_seconds(benchmark_seconds, GPR_TIMESPAN)));
346
347 // Finish a run
348 std::unique_ptr<ScenarioResult> result(new ScenarioResult);
349 result->client_config = result_client_config;
350 result->server_config = result_server_config;
351 gpr_log(GPR_INFO, "Finishing");
352 for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
353 GPR_ASSERT(server->stream->Write(server_mark));
354 }
355 for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
356 GPR_ASSERT(client->stream->Write(client_mark));
357 }
358 for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
359 GPR_ASSERT(server->stream->Read(&server_status));
360 const auto& stats = server_status.stats();
361 result->server_resources.emplace_back(
362 stats.time_elapsed(), stats.time_user(), stats.time_system(),
363 server_status.cores());
364 }
365 for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
366 GPR_ASSERT(client->stream->Read(&client_status));
367 const auto& stats = client_status.stats();
368 result->latencies.MergeProto(stats.latencies());
369 result->client_resources.emplace_back(
370 stats.time_elapsed(), stats.time_user(), stats.time_system(), -1);
371 }
372
373 for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
374 GPR_ASSERT(client->stream->WritesDone());
375 GPR_ASSERT(client->stream->Finish().ok());
376 }
377 for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
378 GPR_ASSERT(server->stream->WritesDone());
379 GPR_ASSERT(server->stream->Finish().ok());
380 }
381 delete[] clients;
382 delete[] servers;
383 return result;
384 }
385
386 void RunQuit() {
387 // Get client, server lists
388 auto workers = get_workers("QPS_WORKERS");
389 for (size_t i = 0; i < workers.size(); i++) {
390 auto stub = WorkerService::NewStub(
391 CreateChannel(workers[i], InsecureChannelCredentials()));
392 Void dummy;
393 grpc::ClientContext ctx;
394 GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
395 }
396 }
397
398 } // namespace testing
399 } // namespace grpc
OLDNEW
« no previous file with comments | « third_party/grpc/test/cpp/qps/driver.h ('k') | third_party/grpc/test/cpp/qps/generic_async_streaming_ping_pong_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698