Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(74)

Side by Side Diff: third_party/grpc/test/cpp/qps/qps_worker.cc

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/grpc/test/cpp/qps/qps_worker.h ('k') | third_party/grpc/test/cpp/qps/report.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include "test/cpp/qps/qps_worker.h"
35
36 #include <cassert>
37 #include <memory>
38 #include <mutex>
39 #include <sstream>
40 #include <string>
41 #include <thread>
42 #include <vector>
43
44 #include <grpc++/client_context.h>
45 #include <grpc++/security/server_credentials.h>
46 #include <grpc++/server.h>
47 #include <grpc++/server_builder.h>
48 #include <grpc/grpc.h>
49 #include <grpc/support/alloc.h>
50 #include <grpc/support/cpu.h>
51 #include <grpc/support/histogram.h>
52 #include <grpc/support/host_port.h>
53 #include <grpc/support/log.h>
54
55 #include "src/proto/grpc/testing/services.pb.h"
56 #include "test/core/util/grpc_profiler.h"
57 #include "test/cpp/qps/client.h"
58 #include "test/cpp/qps/server.h"
59 #include "test/cpp/util/create_test_channel.h"
60
61 namespace grpc {
62 namespace testing {
63
64 static std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
65 gpr_log(GPR_INFO, "Starting client of type %s %s %d",
66 ClientType_Name(config.client_type()).c_str(),
67 RpcType_Name(config.rpc_type()).c_str(),
68 config.payload_config().has_bytebuf_params());
69
70 switch (config.client_type()) {
71 case ClientType::SYNC_CLIENT:
72 return (config.rpc_type() == RpcType::UNARY)
73 ? CreateSynchronousUnaryClient(config)
74 : CreateSynchronousStreamingClient(config);
75 case ClientType::ASYNC_CLIENT:
76 return (config.rpc_type() == RpcType::UNARY)
77 ? CreateAsyncUnaryClient(config)
78 : (config.payload_config().has_bytebuf_params()
79 ? CreateGenericAsyncStreamingClient(config)
80 : CreateAsyncStreamingClient(config));
81 default:
82 abort();
83 }
84 abort();
85 }
86
87 static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
88 gpr_log(GPR_INFO, "Starting server of type %s",
89 ServerType_Name(config.server_type()).c_str());
90
91 switch (config.server_type()) {
92 case ServerType::SYNC_SERVER:
93 return CreateSynchronousServer(config);
94 case ServerType::ASYNC_SERVER:
95 return CreateAsyncServer(config);
96 case ServerType::ASYNC_GENERIC_SERVER:
97 return CreateAsyncGenericServer(config);
98 default:
99 abort();
100 }
101 abort();
102 }
103
104 class ScopedProfile GRPC_FINAL {
105 public:
106 ScopedProfile(const char* filename, bool enable) : enable_(enable) {
107 if (enable_) grpc_profiler_start(filename);
108 }
109 ~ScopedProfile() {
110 if (enable_) grpc_profiler_stop();
111 }
112
113 private:
114 const bool enable_;
115 };
116
117 class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
118 public:
119 WorkerServiceImpl(int server_port, QpsWorker* worker)
120 : acquired_(false), server_port_(server_port), worker_(worker) {}
121
122 Status RunClient(ServerContext* ctx,
123 ServerReaderWriter<ClientStatus, ClientArgs>* stream)
124 GRPC_OVERRIDE {
125 InstanceGuard g(this);
126 if (!g.Acquired()) {
127 return Status(StatusCode::RESOURCE_EXHAUSTED, "");
128 }
129
130 ScopedProfile profile("qps_client.prof", false);
131 Status ret = RunClientBody(ctx, stream);
132 return ret;
133 }
134
135 Status RunServer(ServerContext* ctx,
136 ServerReaderWriter<ServerStatus, ServerArgs>* stream)
137 GRPC_OVERRIDE {
138 InstanceGuard g(this);
139 if (!g.Acquired()) {
140 return Status(StatusCode::RESOURCE_EXHAUSTED, "");
141 }
142
143 ScopedProfile profile("qps_server.prof", false);
144 Status ret = RunServerBody(ctx, stream);
145 return ret;
146 }
147
148 Status CoreCount(ServerContext* ctx, const CoreRequest*,
149 CoreResponse* resp) GRPC_OVERRIDE {
150 resp->set_cores(gpr_cpu_num_cores());
151 return Status::OK;
152 }
153
154 Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE {
155 InstanceGuard g(this);
156 if (!g.Acquired()) {
157 return Status(StatusCode::RESOURCE_EXHAUSTED, "");
158 }
159
160 worker_->MarkDone();
161 return Status::OK;
162 }
163
164 private:
165 // Protect against multiple clients using this worker at once.
166 class InstanceGuard {
167 public:
168 InstanceGuard(WorkerServiceImpl* impl)
169 : impl_(impl), acquired_(impl->TryAcquireInstance()) {}
170 ~InstanceGuard() {
171 if (acquired_) {
172 impl_->ReleaseInstance();
173 }
174 }
175
176 bool Acquired() const { return acquired_; }
177
178 private:
179 WorkerServiceImpl* const impl_;
180 const bool acquired_;
181 };
182
183 bool TryAcquireInstance() {
184 std::lock_guard<std::mutex> g(mu_);
185 if (acquired_) return false;
186 acquired_ = true;
187 return true;
188 }
189
190 void ReleaseInstance() {
191 std::lock_guard<std::mutex> g(mu_);
192 GPR_ASSERT(acquired_);
193 acquired_ = false;
194 }
195
196 Status RunClientBody(ServerContext* ctx,
197 ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
198 ClientArgs args;
199 if (!stream->Read(&args)) {
200 return Status(StatusCode::INVALID_ARGUMENT, "");
201 }
202 if (!args.has_setup()) {
203 return Status(StatusCode::INVALID_ARGUMENT, "");
204 }
205 gpr_log(GPR_INFO, "RunClientBody: about to create client");
206 auto client = CreateClient(args.setup());
207 if (!client) {
208 return Status(StatusCode::INVALID_ARGUMENT, "");
209 }
210 gpr_log(GPR_INFO, "RunClientBody: client created");
211 ClientStatus status;
212 if (!stream->Write(status)) {
213 return Status(StatusCode::UNKNOWN, "");
214 }
215 gpr_log(GPR_INFO, "RunClientBody: creation status reported");
216 while (stream->Read(&args)) {
217 gpr_log(GPR_INFO, "RunClientBody: Message read");
218 if (!args.has_mark()) {
219 gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!");
220 return Status(StatusCode::INVALID_ARGUMENT, "");
221 }
222 *status.mutable_stats() = client->Mark(args.mark().reset());
223 stream->Write(status);
224 gpr_log(GPR_INFO, "RunClientBody: Mark response given");
225 }
226
227 gpr_log(GPR_INFO, "RunClientBody: Returning");
228 return Status::OK;
229 }
230
231 Status RunServerBody(ServerContext* ctx,
232 ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
233 ServerArgs args;
234 if (!stream->Read(&args)) {
235 return Status(StatusCode::INVALID_ARGUMENT, "");
236 }
237 if (!args.has_setup()) {
238 return Status(StatusCode::INVALID_ARGUMENT, "");
239 }
240 if (server_port_ != 0) {
241 args.mutable_setup()->set_port(server_port_);
242 }
243 gpr_log(GPR_INFO, "RunServerBody: about to create server");
244 auto server = CreateServer(args.setup());
245 if (!server) {
246 return Status(StatusCode::INVALID_ARGUMENT, "");
247 }
248 gpr_log(GPR_INFO, "RunServerBody: server created");
249 ServerStatus status;
250 status.set_port(server->port());
251 status.set_cores(server->cores());
252 if (!stream->Write(status)) {
253 return Status(StatusCode::UNKNOWN, "");
254 }
255 gpr_log(GPR_INFO, "RunServerBody: creation status reported");
256 while (stream->Read(&args)) {
257 gpr_log(GPR_INFO, "RunServerBody: Message read");
258 if (!args.has_mark()) {
259 gpr_log(GPR_INFO, "RunServerBody: Message not a mark!");
260 return Status(StatusCode::INVALID_ARGUMENT, "");
261 }
262 *status.mutable_stats() = server->Mark(args.mark().reset());
263 stream->Write(status);
264 gpr_log(GPR_INFO, "RunServerBody: Mark response given");
265 }
266
267 gpr_log(GPR_INFO, "RunServerBody: Returning");
268 return Status::OK;
269 }
270
271 std::mutex mu_;
272 bool acquired_;
273 int server_port_;
274 QpsWorker* worker_;
275 };
276
277 QpsWorker::QpsWorker(int driver_port, int server_port) {
278 impl_.reset(new WorkerServiceImpl(server_port, this));
279 gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0));
280
281 char* server_address = NULL;
282 gpr_join_host_port(&server_address, "::", driver_port);
283
284 ServerBuilder builder;
285 builder.AddListeningPort(server_address, InsecureServerCredentials());
286 builder.RegisterService(impl_.get());
287
288 gpr_free(server_address);
289
290 server_ = builder.BuildAndStart();
291 }
292
293 QpsWorker::~QpsWorker() {}
294
295 bool QpsWorker::Done() const {
296 return (gpr_atm_acq_load(&done_) != static_cast<gpr_atm>(0));
297 }
298 void QpsWorker::MarkDone() {
299 gpr_atm_rel_store(&done_, static_cast<gpr_atm>(1));
300 }
301 } // namespace testing
302 } // namespace grpc
OLDNEW
« no previous file with comments | « third_party/grpc/test/cpp/qps/qps_worker.h ('k') | third_party/grpc/test/cpp/qps/report.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698