Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(697)

Side by Side Diff: third_party/grpc/test/cpp/qps/client.h

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #ifndef TEST_QPS_CLIENT_H
35 #define TEST_QPS_CLIENT_H
36
37 #include <condition_variable>
38 #include <mutex>
39 #include <vector>
40
41 #include <grpc++/support/byte_buffer.h>
42 #include <grpc++/support/slice.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/time.h>
45
46 #include "src/proto/grpc/testing/payloads.grpc.pb.h"
47 #include "src/proto/grpc/testing/services.grpc.pb.h"
48
49 #include "test/cpp/qps/histogram.h"
50 #include "test/cpp/qps/interarrival.h"
51 #include "test/cpp/qps/limit_cores.h"
52 #include "test/cpp/qps/usage_timer.h"
53 #include "test/cpp/util/create_test_channel.h"
54
55 namespace grpc {
56 namespace testing {
57
58 template <class RequestType>
59 class ClientRequestCreator {
60 public:
61 ClientRequestCreator(RequestType* req, const PayloadConfig&) {
62 // this template must be specialized
63 // fail with an assertion rather than a compile-time
64 // check since these only happen at the beginning anyway
65 GPR_ASSERT(false);
66 }
67 };
68
69 template <>
70 class ClientRequestCreator<SimpleRequest> {
71 public:
72 ClientRequestCreator(SimpleRequest* req,
73 const PayloadConfig& payload_config) {
74 if (payload_config.has_bytebuf_params()) {
75 GPR_ASSERT(false); // not appropriate for this specialization
76 } else if (payload_config.has_simple_params()) {
77 req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
78 req->set_response_size(payload_config.simple_params().resp_size());
79 req->mutable_payload()->set_type(
80 grpc::testing::PayloadType::COMPRESSABLE);
81 int size = payload_config.simple_params().req_size();
82 std::unique_ptr<char[]> body(new char[size]);
83 req->mutable_payload()->set_body(body.get(), size);
84 } else if (payload_config.has_complex_params()) {
85 GPR_ASSERT(false); // not appropriate for this specialization
86 } else {
87 // default should be simple proto without payloads
88 req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
89 req->set_response_size(0);
90 req->mutable_payload()->set_type(
91 grpc::testing::PayloadType::COMPRESSABLE);
92 }
93 }
94 };
95
96 template <>
97 class ClientRequestCreator<ByteBuffer> {
98 public:
99 ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) {
100 if (payload_config.has_bytebuf_params()) {
101 std::unique_ptr<char[]> buf(
102 new char[payload_config.bytebuf_params().req_size()]);
103 gpr_slice s = gpr_slice_from_copied_buffer(
104 buf.get(), payload_config.bytebuf_params().req_size());
105 Slice slice(s, Slice::STEAL_REF);
106 *req = ByteBuffer(&slice, 1);
107 } else {
108 GPR_ASSERT(false); // not appropriate for this specialization
109 }
110 }
111 };
112
113 class Client {
114 public:
115 Client() : timer_(new UsageTimer), interarrival_timer_() {}
116 virtual ~Client() {}
117
118 ClientStats Mark(bool reset) {
119 Histogram latencies;
120 UsageTimer::Result timer_result;
121
122 // avoid std::vector for old compilers that expect a copy constructor
123 if (reset) {
124 Histogram* to_merge = new Histogram[threads_.size()];
125 for (size_t i = 0; i < threads_.size(); i++) {
126 threads_[i]->BeginSwap(&to_merge[i]);
127 }
128 std::unique_ptr<UsageTimer> timer(new UsageTimer);
129 timer_.swap(timer);
130 for (size_t i = 0; i < threads_.size(); i++) {
131 threads_[i]->EndSwap();
132 latencies.Merge(to_merge[i]);
133 }
134 delete[] to_merge;
135 timer_result = timer->Mark();
136 } else {
137 // merge snapshots of each thread histogram
138 for (size_t i = 0; i < threads_.size(); i++) {
139 threads_[i]->MergeStatsInto(&latencies);
140 }
141 timer_result = timer_->Mark();
142 }
143
144 ClientStats stats;
145 latencies.FillProto(stats.mutable_latencies());
146 stats.set_time_elapsed(timer_result.wall);
147 stats.set_time_system(timer_result.system);
148 stats.set_time_user(timer_result.user);
149 return stats;
150 }
151
152 protected:
153 bool closed_loop_;
154
155 void StartThreads(size_t num_threads) {
156 for (size_t i = 0; i < num_threads; i++) {
157 threads_.emplace_back(new Thread(this, i));
158 }
159 }
160
161 void EndThreads() { threads_.clear(); }
162
163 virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
164
165 void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
166 // Set up the load distribution based on the number of threads
167 const auto& load = config.load_params();
168
169 std::unique_ptr<RandomDistInterface> random_dist;
170 switch (load.load_case()) {
171 case LoadParams::kClosedLoop:
172 // Closed-loop doesn't use random dist at all
173 break;
174 case LoadParams::kPoisson:
175 random_dist.reset(
176 new ExpDist(load.poisson().offered_load() / num_threads));
177 break;
178 case LoadParams::kUniform:
179 random_dist.reset(
180 new UniformDist(load.uniform().interarrival_lo() * num_threads,
181 load.uniform().interarrival_hi() * num_threads));
182 break;
183 case LoadParams::kDeterm:
184 random_dist.reset(
185 new DetDist(num_threads / load.determ().offered_load()));
186 break;
187 case LoadParams::kPareto:
188 random_dist.reset(
189 new ParetoDist(load.pareto().interarrival_base() * num_threads,
190 load.pareto().alpha()));
191 break;
192 default:
193 GPR_ASSERT(false);
194 }
195
196 // Set closed_loop_ based on whether or not random_dist is set
197 if (!random_dist) {
198 closed_loop_ = true;
199 } else {
200 closed_loop_ = false;
201 // set up interarrival timer according to random dist
202 interarrival_timer_.init(*random_dist, num_threads);
203 const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
204 for (size_t i = 0; i < num_threads; i++) {
205 next_time_.push_back(gpr_time_add(
206 now,
207 gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
208 }
209 }
210 }
211
212 gpr_timespec NextIssueTime(int thread_idx) {
213 const gpr_timespec result = next_time_[thread_idx];
214 next_time_[thread_idx] =
215 gpr_time_add(next_time_[thread_idx],
216 gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
217 GPR_TIMESPAN));
218 return result;
219 }
220 std::function<gpr_timespec()> NextIssuer(int thread_idx) {
221 return closed_loop_ ? std::function<gpr_timespec()>()
222 : std::bind(&Client::NextIssueTime, this, thread_idx);
223 }
224
225 private:
226 class Thread {
227 public:
228 Thread(Client* client, size_t idx)
229 : done_(false),
230 new_stats_(nullptr),
231 client_(client),
232 idx_(idx),
233 impl_(&Thread::ThreadFunc, this) {}
234
235 ~Thread() {
236 {
237 std::lock_guard<std::mutex> g(mu_);
238 done_ = true;
239 }
240 impl_.join();
241 }
242
243 void BeginSwap(Histogram* n) {
244 std::lock_guard<std::mutex> g(mu_);
245 new_stats_ = n;
246 }
247
248 void EndSwap() {
249 std::unique_lock<std::mutex> g(mu_);
250 while (new_stats_ != nullptr) {
251 cv_.wait(g);
252 };
253 }
254
255 void MergeStatsInto(Histogram* hist) {
256 std::unique_lock<std::mutex> g(mu_);
257 hist->Merge(histogram_);
258 }
259
260 private:
261 Thread(const Thread&);
262 Thread& operator=(const Thread&);
263
264 void ThreadFunc() {
265 for (;;) {
266 // run the loop body
267 const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
268 // lock, see if we're done
269 std::lock_guard<std::mutex> g(mu_);
270 if (!thread_still_ok) {
271 gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
272 done_ = true;
273 }
274 if (done_) {
275 return;
276 }
277 // check if we're resetting stats, swap out the histogram if so
278 if (new_stats_) {
279 new_stats_->Swap(&histogram_);
280 new_stats_ = nullptr;
281 cv_.notify_one();
282 }
283 }
284 }
285
286 std::mutex mu_;
287 std::condition_variable cv_;
288 bool done_;
289 Histogram* new_stats_;
290 Histogram histogram_;
291 Client* client_;
292 const size_t idx_;
293 std::thread impl_;
294 };
295
296 std::vector<std::unique_ptr<Thread>> threads_;
297 std::unique_ptr<UsageTimer> timer_;
298
299 InterarrivalTimer interarrival_timer_;
300 std::vector<gpr_timespec> next_time_;
301 };
302
303 template <class StubType, class RequestType>
304 class ClientImpl : public Client {
305 public:
306 ClientImpl(const ClientConfig& config,
307 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
308 create_stub)
309 : cores_(LimitCores(config.core_list().data(), config.core_list_size())),
310 channels_(config.client_channels()),
311 create_stub_(create_stub) {
312 for (int i = 0; i < config.client_channels(); i++) {
313 channels_[i].init(config.server_targets(i % config.server_targets_size()),
314 config, create_stub_);
315 }
316
317 ClientRequestCreator<RequestType> create_req(&request_,
318 config.payload_config());
319 }
320 virtual ~ClientImpl() {}
321
322 protected:
323 const int cores_;
324 RequestType request_;
325
326 class ClientChannelInfo {
327 public:
328 ClientChannelInfo() {}
329 ClientChannelInfo(const ClientChannelInfo& i) {
330 // The copy constructor is to satisfy old compilers
331 // that need it for using std::vector . It is only ever
332 // used for empty entries
333 GPR_ASSERT(!i.channel_ && !i.stub_);
334 }
335 void init(const grpc::string& target, const ClientConfig& config,
336 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
337 create_stub) {
338 // We have to use a 2-phase init like this with a default
339 // constructor followed by an initializer function to make
340 // old compilers happy with using this in std::vector
341 channel_ = CreateTestChannel(
342 target, config.security_params().server_host_override(),
343 config.has_security_params(),
344 !config.security_params().use_test_ca());
345 stub_ = create_stub(channel_);
346 }
347 Channel* get_channel() { return channel_.get(); }
348 StubType* get_stub() { return stub_.get(); }
349
350 private:
351 std::shared_ptr<Channel> channel_;
352 std::unique_ptr<StubType> stub_;
353 };
354 std::vector<ClientChannelInfo> channels_;
355 std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
356 create_stub_;
357 };
358
359 std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
360 std::unique_ptr<Client> CreateSynchronousStreamingClient(
361 const ClientConfig& args);
362 std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args);
363 std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args);
364 std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
365 const ClientConfig& args);
366
367 } // namespace testing
368 } // namespace grpc
369
370 #endif
OLDNEW
« no previous file with comments | « third_party/grpc/test/cpp/qps/async_unary_ping_pong_test.cc ('k') | third_party/grpc/test/cpp/qps/client_async.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698