Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(356)

Side by Side Diff: third_party/grpc/test/core/fling/server.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include <grpc/grpc.h>
35 #include <grpc/grpc_security.h>
36
37 #include <signal.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <time.h>
42 #ifndef _WIN32
43 /* This is for _exit() below, which is temporary. */
44 #include <unistd.h>
45 #endif
46
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/cmdline.h>
49 #include <grpc/support/host_port.h>
50 #include <grpc/support/log.h>
51 #include <grpc/support/time.h>
52 #include "src/core/profiling/timers.h"
53 #include "test/core/end2end/data/ssl_test_data.h"
54 #include "test/core/util/grpc_profiler.h"
55 #include "test/core/util/port.h"
56 #include "test/core/util/test_config.h"
57
58 static grpc_completion_queue *cq;
59 static grpc_server *server;
60 static grpc_call *call;
61 static grpc_call_details call_details;
62 static grpc_metadata_array request_metadata_recv;
63 static grpc_metadata_array initial_metadata_send;
64 static grpc_byte_buffer *payload_buffer = NULL;
65 /* Used to drain the terminal read in unary calls. */
66 static grpc_byte_buffer *terminal_buffer = NULL;
67
68 static grpc_op read_op;
69 static grpc_op metadata_send_op;
70 static grpc_op write_op;
71 static grpc_op status_op[2];
72 static int was_cancelled = 2;
73 static grpc_op unary_ops[6];
74 static int got_sigint = 0;
75
76 static void *tag(intptr_t t) { return (void *)t; }
77
78 typedef enum {
79 FLING_SERVER_NEW_REQUEST = 1,
80 FLING_SERVER_READ_FOR_UNARY,
81 FLING_SERVER_BATCH_OPS_FOR_UNARY,
82 FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING,
83 FLING_SERVER_READ_FOR_STREAMING,
84 FLING_SERVER_WRITE_FOR_STREAMING,
85 FLING_SERVER_SEND_STATUS_FOR_STREAMING
86 } fling_server_tags;
87
88 typedef struct {
89 gpr_refcount pending_ops;
90 uint32_t flags;
91 } call_state;
92
93 static void request_call(void) {
94 grpc_metadata_array_init(&request_metadata_recv);
95 grpc_server_request_call(server, &call, &call_details, &request_metadata_recv,
96 cq, cq, tag(FLING_SERVER_NEW_REQUEST));
97 }
98
99 static void handle_unary_method(void) {
100 grpc_op *op;
101 grpc_call_error error;
102
103 grpc_metadata_array_init(&initial_metadata_send);
104
105 op = unary_ops;
106 op->op = GRPC_OP_SEND_INITIAL_METADATA;
107 op->data.send_initial_metadata.count = 0;
108 op++;
109 op->op = GRPC_OP_RECV_MESSAGE;
110 op->data.recv_message = &terminal_buffer;
111 op++;
112 op->op = GRPC_OP_SEND_MESSAGE;
113 if (payload_buffer == NULL) {
114 gpr_log(GPR_INFO, "NULL payload buffer !!!");
115 }
116 op->data.send_message = payload_buffer;
117 op++;
118 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
119 op->data.send_status_from_server.status = GRPC_STATUS_OK;
120 op->data.send_status_from_server.trailing_metadata_count = 0;
121 op->data.send_status_from_server.status_details = "";
122 op++;
123 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
124 op->data.recv_close_on_server.cancelled = &was_cancelled;
125 op++;
126
127 error = grpc_call_start_batch(call, unary_ops, (size_t)(op - unary_ops),
128 tag(FLING_SERVER_BATCH_OPS_FOR_UNARY), NULL);
129 GPR_ASSERT(GRPC_CALL_OK == error);
130 }
131
132 static void send_initial_metadata(void) {
133 grpc_call_error error;
134 void *tagarg = tag(FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING);
135 grpc_metadata_array_init(&initial_metadata_send);
136 metadata_send_op.op = GRPC_OP_SEND_INITIAL_METADATA;
137 metadata_send_op.data.send_initial_metadata.count = 0;
138 error = grpc_call_start_batch(call, &metadata_send_op, 1, tagarg, NULL);
139
140 GPR_ASSERT(GRPC_CALL_OK == error);
141 }
142
143 static void start_read_op(int t) {
144 grpc_call_error error;
145 /* Starting read at server */
146 read_op.op = GRPC_OP_RECV_MESSAGE;
147 read_op.data.recv_message = &payload_buffer;
148 error = grpc_call_start_batch(call, &read_op, 1, tag(t), NULL);
149 GPR_ASSERT(GRPC_CALL_OK == error);
150 }
151
152 static void start_write_op(void) {
153 grpc_call_error error;
154 void *tagarg = tag(FLING_SERVER_WRITE_FOR_STREAMING);
155 /* Starting write at server */
156 write_op.op = GRPC_OP_SEND_MESSAGE;
157 if (payload_buffer == NULL) {
158 gpr_log(GPR_INFO, "NULL payload buffer !!!");
159 }
160 write_op.data.send_message = payload_buffer;
161 error = grpc_call_start_batch(call, &write_op, 1, tagarg, NULL);
162 GPR_ASSERT(GRPC_CALL_OK == error);
163 }
164
165 static void start_send_status(void) {
166 grpc_call_error error;
167 void *tagarg = tag(FLING_SERVER_SEND_STATUS_FOR_STREAMING);
168 status_op[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
169 status_op[0].data.send_status_from_server.status = GRPC_STATUS_OK;
170 status_op[0].data.send_status_from_server.trailing_metadata_count = 0;
171 status_op[0].data.send_status_from_server.status_details = "";
172 status_op[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
173 status_op[1].data.recv_close_on_server.cancelled = &was_cancelled;
174
175 error = grpc_call_start_batch(call, status_op, 2, tagarg, NULL);
176 GPR_ASSERT(GRPC_CALL_OK == error);
177 }
178
179 /* We have some sort of deadlock, so let's not exit gracefully for now.
180 When that is resolved, please remove the #include <unistd.h> above. */
181 static void sigint_handler(int x) { _exit(0); }
182
183 int main(int argc, char **argv) {
184 grpc_event ev;
185 call_state *s;
186 char *addr_buf = NULL;
187 gpr_cmdline *cl;
188 int shutdown_started = 0;
189 int shutdown_finished = 0;
190
191 int secure = 0;
192 char *addr = NULL;
193
194 char *fake_argv[1];
195
196 gpr_timers_set_log_filename("latency_trace.fling_server.txt");
197
198 GPR_ASSERT(argc >= 1);
199 fake_argv[0] = argv[0];
200 grpc_test_init(1, fake_argv);
201
202 grpc_init();
203 srand((unsigned)clock());
204
205 cl = gpr_cmdline_create("fling server");
206 gpr_cmdline_add_string(cl, "bind", "Bind host:port", &addr);
207 gpr_cmdline_add_flag(cl, "secure", "Run with security?", &secure);
208 gpr_cmdline_parse(cl, argc, argv);
209 gpr_cmdline_destroy(cl);
210
211 if (addr == NULL) {
212 gpr_join_host_port(&addr_buf, "::", grpc_pick_unused_port_or_die());
213 addr = addr_buf;
214 }
215 gpr_log(GPR_INFO, "creating server on: %s", addr);
216
217 cq = grpc_completion_queue_create(NULL);
218 if (secure) {
219 grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {test_server1_key,
220 test_server1_cert};
221 grpc_server_credentials *ssl_creds = grpc_ssl_server_credentials_create(
222 NULL, &pem_key_cert_pair, 1, 0, NULL);
223 server = grpc_server_create(NULL, NULL);
224 GPR_ASSERT(grpc_server_add_secure_http2_port(server, addr, ssl_creds));
225 grpc_server_credentials_release(ssl_creds);
226 } else {
227 server = grpc_server_create(NULL, NULL);
228 GPR_ASSERT(grpc_server_add_insecure_http2_port(server, addr));
229 }
230 grpc_server_register_completion_queue(server, cq, NULL);
231 grpc_server_start(server);
232
233 gpr_free(addr_buf);
234 addr = addr_buf = NULL;
235
236 grpc_call_details_init(&call_details);
237
238 request_call();
239
240 grpc_profiler_start("server.prof");
241 signal(SIGINT, sigint_handler);
242 while (!shutdown_finished) {
243 if (got_sigint && !shutdown_started) {
244 gpr_log(GPR_INFO, "Shutting down due to SIGINT");
245 grpc_server_shutdown_and_notify(server, cq, tag(1000));
246 GPR_ASSERT(grpc_completion_queue_pluck(
247 cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
248 .type == GRPC_OP_COMPLETE);
249 grpc_completion_queue_shutdown(cq);
250 shutdown_started = 1;
251 }
252 ev = grpc_completion_queue_next(
253 cq, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
254 gpr_time_from_micros(1000000, GPR_TIMESPAN)),
255 NULL);
256 s = ev.tag;
257 switch (ev.type) {
258 case GRPC_OP_COMPLETE:
259 switch ((intptr_t)s) {
260 case FLING_SERVER_NEW_REQUEST:
261 if (call != NULL) {
262 if (0 ==
263 strcmp(call_details.method, "/Reflector/reflectStream")) {
264 /* Received streaming call. Send metadata here. */
265 start_read_op(FLING_SERVER_READ_FOR_STREAMING);
266 send_initial_metadata();
267 } else {
268 /* Received unary call. Can do all ops in one batch. */
269 start_read_op(FLING_SERVER_READ_FOR_UNARY);
270 }
271 } else {
272 GPR_ASSERT(shutdown_started);
273 }
274 /* request_call();
275 */
276 break;
277 case FLING_SERVER_READ_FOR_STREAMING:
278 if (payload_buffer != NULL) {
279 /* Received payload from client. */
280 start_write_op();
281 } else {
282 /* Received end of stream from client. */
283 start_send_status();
284 }
285 break;
286 case FLING_SERVER_WRITE_FOR_STREAMING:
287 /* Write completed at server */
288 grpc_byte_buffer_destroy(payload_buffer);
289 payload_buffer = NULL;
290 start_read_op(FLING_SERVER_READ_FOR_STREAMING);
291 break;
292 case FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING:
293 /* Metadata send completed at server */
294 break;
295 case FLING_SERVER_SEND_STATUS_FOR_STREAMING:
296 /* Send status and close completed at server */
297 grpc_call_destroy(call);
298 if (!shutdown_started) request_call();
299 break;
300 case FLING_SERVER_READ_FOR_UNARY:
301 /* Finished payload read for unary. Start all reamaining
302 * unary ops in a batch.
303 */
304 handle_unary_method();
305 break;
306 case FLING_SERVER_BATCH_OPS_FOR_UNARY:
307 /* Finished unary call. */
308 grpc_byte_buffer_destroy(payload_buffer);
309 payload_buffer = NULL;
310 grpc_call_destroy(call);
311 if (!shutdown_started) request_call();
312 break;
313 }
314 break;
315 case GRPC_QUEUE_SHUTDOWN:
316 GPR_ASSERT(shutdown_started);
317 shutdown_finished = 1;
318 break;
319 case GRPC_QUEUE_TIMEOUT:
320 break;
321 }
322 }
323 grpc_profiler_stop();
324 grpc_call_details_destroy(&call_details);
325
326 grpc_server_destroy(server);
327 grpc_completion_queue_destroy(cq);
328 grpc_shutdown();
329 return 0;
330 }
OLDNEW
« no previous file with comments | « third_party/grpc/test/core/fling/fling_test.c ('k') | third_party/grpc/test/core/httpcli/format_request_test.c » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698