OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 #include <grpc/grpc.h> |
| 35 #include <grpc/grpc_security.h> |
| 36 |
| 37 #include <signal.h> |
| 38 #include <stdio.h> |
| 39 #include <stdlib.h> |
| 40 #include <string.h> |
| 41 #include <time.h> |
| 42 #ifndef _WIN32 |
| 43 /* This is for _exit() below, which is temporary. */ |
| 44 #include <unistd.h> |
| 45 #endif |
| 46 |
| 47 #include <grpc/support/alloc.h> |
| 48 #include <grpc/support/cmdline.h> |
| 49 #include <grpc/support/host_port.h> |
| 50 #include <grpc/support/log.h> |
| 51 #include <grpc/support/time.h> |
| 52 #include "src/core/profiling/timers.h" |
| 53 #include "test/core/end2end/data/ssl_test_data.h" |
| 54 #include "test/core/util/grpc_profiler.h" |
| 55 #include "test/core/util/port.h" |
| 56 #include "test/core/util/test_config.h" |
| 57 |
| 58 static grpc_completion_queue *cq; |
| 59 static grpc_server *server; |
| 60 static grpc_call *call; |
| 61 static grpc_call_details call_details; |
| 62 static grpc_metadata_array request_metadata_recv; |
| 63 static grpc_metadata_array initial_metadata_send; |
| 64 static grpc_byte_buffer *payload_buffer = NULL; |
| 65 /* Used to drain the terminal read in unary calls. */ |
| 66 static grpc_byte_buffer *terminal_buffer = NULL; |
| 67 |
| 68 static grpc_op read_op; |
| 69 static grpc_op metadata_send_op; |
| 70 static grpc_op write_op; |
| 71 static grpc_op status_op[2]; |
| 72 static int was_cancelled = 2; |
| 73 static grpc_op unary_ops[6]; |
| 74 static int got_sigint = 0; |
| 75 |
| 76 static void *tag(intptr_t t) { return (void *)t; } |
| 77 |
| 78 typedef enum { |
| 79 FLING_SERVER_NEW_REQUEST = 1, |
| 80 FLING_SERVER_READ_FOR_UNARY, |
| 81 FLING_SERVER_BATCH_OPS_FOR_UNARY, |
| 82 FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING, |
| 83 FLING_SERVER_READ_FOR_STREAMING, |
| 84 FLING_SERVER_WRITE_FOR_STREAMING, |
| 85 FLING_SERVER_SEND_STATUS_FOR_STREAMING |
| 86 } fling_server_tags; |
| 87 |
| 88 typedef struct { |
| 89 gpr_refcount pending_ops; |
| 90 uint32_t flags; |
| 91 } call_state; |
| 92 |
| 93 static void request_call(void) { |
| 94 grpc_metadata_array_init(&request_metadata_recv); |
| 95 grpc_server_request_call(server, &call, &call_details, &request_metadata_recv, |
| 96 cq, cq, tag(FLING_SERVER_NEW_REQUEST)); |
| 97 } |
| 98 |
| 99 static void handle_unary_method(void) { |
| 100 grpc_op *op; |
| 101 grpc_call_error error; |
| 102 |
| 103 grpc_metadata_array_init(&initial_metadata_send); |
| 104 |
| 105 op = unary_ops; |
| 106 op->op = GRPC_OP_SEND_INITIAL_METADATA; |
| 107 op->data.send_initial_metadata.count = 0; |
| 108 op++; |
| 109 op->op = GRPC_OP_RECV_MESSAGE; |
| 110 op->data.recv_message = &terminal_buffer; |
| 111 op++; |
| 112 op->op = GRPC_OP_SEND_MESSAGE; |
| 113 if (payload_buffer == NULL) { |
| 114 gpr_log(GPR_INFO, "NULL payload buffer !!!"); |
| 115 } |
| 116 op->data.send_message = payload_buffer; |
| 117 op++; |
| 118 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; |
| 119 op->data.send_status_from_server.status = GRPC_STATUS_OK; |
| 120 op->data.send_status_from_server.trailing_metadata_count = 0; |
| 121 op->data.send_status_from_server.status_details = ""; |
| 122 op++; |
| 123 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; |
| 124 op->data.recv_close_on_server.cancelled = &was_cancelled; |
| 125 op++; |
| 126 |
| 127 error = grpc_call_start_batch(call, unary_ops, (size_t)(op - unary_ops), |
| 128 tag(FLING_SERVER_BATCH_OPS_FOR_UNARY), NULL); |
| 129 GPR_ASSERT(GRPC_CALL_OK == error); |
| 130 } |
| 131 |
| 132 static void send_initial_metadata(void) { |
| 133 grpc_call_error error; |
| 134 void *tagarg = tag(FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING); |
| 135 grpc_metadata_array_init(&initial_metadata_send); |
| 136 metadata_send_op.op = GRPC_OP_SEND_INITIAL_METADATA; |
| 137 metadata_send_op.data.send_initial_metadata.count = 0; |
| 138 error = grpc_call_start_batch(call, &metadata_send_op, 1, tagarg, NULL); |
| 139 |
| 140 GPR_ASSERT(GRPC_CALL_OK == error); |
| 141 } |
| 142 |
| 143 static void start_read_op(int t) { |
| 144 grpc_call_error error; |
| 145 /* Starting read at server */ |
| 146 read_op.op = GRPC_OP_RECV_MESSAGE; |
| 147 read_op.data.recv_message = &payload_buffer; |
| 148 error = grpc_call_start_batch(call, &read_op, 1, tag(t), NULL); |
| 149 GPR_ASSERT(GRPC_CALL_OK == error); |
| 150 } |
| 151 |
| 152 static void start_write_op(void) { |
| 153 grpc_call_error error; |
| 154 void *tagarg = tag(FLING_SERVER_WRITE_FOR_STREAMING); |
| 155 /* Starting write at server */ |
| 156 write_op.op = GRPC_OP_SEND_MESSAGE; |
| 157 if (payload_buffer == NULL) { |
| 158 gpr_log(GPR_INFO, "NULL payload buffer !!!"); |
| 159 } |
| 160 write_op.data.send_message = payload_buffer; |
| 161 error = grpc_call_start_batch(call, &write_op, 1, tagarg, NULL); |
| 162 GPR_ASSERT(GRPC_CALL_OK == error); |
| 163 } |
| 164 |
| 165 static void start_send_status(void) { |
| 166 grpc_call_error error; |
| 167 void *tagarg = tag(FLING_SERVER_SEND_STATUS_FOR_STREAMING); |
| 168 status_op[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; |
| 169 status_op[0].data.send_status_from_server.status = GRPC_STATUS_OK; |
| 170 status_op[0].data.send_status_from_server.trailing_metadata_count = 0; |
| 171 status_op[0].data.send_status_from_server.status_details = ""; |
| 172 status_op[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; |
| 173 status_op[1].data.recv_close_on_server.cancelled = &was_cancelled; |
| 174 |
| 175 error = grpc_call_start_batch(call, status_op, 2, tagarg, NULL); |
| 176 GPR_ASSERT(GRPC_CALL_OK == error); |
| 177 } |
| 178 |
| 179 /* We have some sort of deadlock, so let's not exit gracefully for now. |
| 180 When that is resolved, please remove the #include <unistd.h> above. */ |
| 181 static void sigint_handler(int x) { _exit(0); } |
| 182 |
| 183 int main(int argc, char **argv) { |
| 184 grpc_event ev; |
| 185 call_state *s; |
| 186 char *addr_buf = NULL; |
| 187 gpr_cmdline *cl; |
| 188 int shutdown_started = 0; |
| 189 int shutdown_finished = 0; |
| 190 |
| 191 int secure = 0; |
| 192 char *addr = NULL; |
| 193 |
| 194 char *fake_argv[1]; |
| 195 |
| 196 gpr_timers_set_log_filename("latency_trace.fling_server.txt"); |
| 197 |
| 198 GPR_ASSERT(argc >= 1); |
| 199 fake_argv[0] = argv[0]; |
| 200 grpc_test_init(1, fake_argv); |
| 201 |
| 202 grpc_init(); |
| 203 srand((unsigned)clock()); |
| 204 |
| 205 cl = gpr_cmdline_create("fling server"); |
| 206 gpr_cmdline_add_string(cl, "bind", "Bind host:port", &addr); |
| 207 gpr_cmdline_add_flag(cl, "secure", "Run with security?", &secure); |
| 208 gpr_cmdline_parse(cl, argc, argv); |
| 209 gpr_cmdline_destroy(cl); |
| 210 |
| 211 if (addr == NULL) { |
| 212 gpr_join_host_port(&addr_buf, "::", grpc_pick_unused_port_or_die()); |
| 213 addr = addr_buf; |
| 214 } |
| 215 gpr_log(GPR_INFO, "creating server on: %s", addr); |
| 216 |
| 217 cq = grpc_completion_queue_create(NULL); |
| 218 if (secure) { |
| 219 grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {test_server1_key, |
| 220 test_server1_cert}; |
| 221 grpc_server_credentials *ssl_creds = grpc_ssl_server_credentials_create( |
| 222 NULL, &pem_key_cert_pair, 1, 0, NULL); |
| 223 server = grpc_server_create(NULL, NULL); |
| 224 GPR_ASSERT(grpc_server_add_secure_http2_port(server, addr, ssl_creds)); |
| 225 grpc_server_credentials_release(ssl_creds); |
| 226 } else { |
| 227 server = grpc_server_create(NULL, NULL); |
| 228 GPR_ASSERT(grpc_server_add_insecure_http2_port(server, addr)); |
| 229 } |
| 230 grpc_server_register_completion_queue(server, cq, NULL); |
| 231 grpc_server_start(server); |
| 232 |
| 233 gpr_free(addr_buf); |
| 234 addr = addr_buf = NULL; |
| 235 |
| 236 grpc_call_details_init(&call_details); |
| 237 |
| 238 request_call(); |
| 239 |
| 240 grpc_profiler_start("server.prof"); |
| 241 signal(SIGINT, sigint_handler); |
| 242 while (!shutdown_finished) { |
| 243 if (got_sigint && !shutdown_started) { |
| 244 gpr_log(GPR_INFO, "Shutting down due to SIGINT"); |
| 245 grpc_server_shutdown_and_notify(server, cq, tag(1000)); |
| 246 GPR_ASSERT(grpc_completion_queue_pluck( |
| 247 cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) |
| 248 .type == GRPC_OP_COMPLETE); |
| 249 grpc_completion_queue_shutdown(cq); |
| 250 shutdown_started = 1; |
| 251 } |
| 252 ev = grpc_completion_queue_next( |
| 253 cq, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), |
| 254 gpr_time_from_micros(1000000, GPR_TIMESPAN)), |
| 255 NULL); |
| 256 s = ev.tag; |
| 257 switch (ev.type) { |
| 258 case GRPC_OP_COMPLETE: |
| 259 switch ((intptr_t)s) { |
| 260 case FLING_SERVER_NEW_REQUEST: |
| 261 if (call != NULL) { |
| 262 if (0 == |
| 263 strcmp(call_details.method, "/Reflector/reflectStream")) { |
| 264 /* Received streaming call. Send metadata here. */ |
| 265 start_read_op(FLING_SERVER_READ_FOR_STREAMING); |
| 266 send_initial_metadata(); |
| 267 } else { |
| 268 /* Received unary call. Can do all ops in one batch. */ |
| 269 start_read_op(FLING_SERVER_READ_FOR_UNARY); |
| 270 } |
| 271 } else { |
| 272 GPR_ASSERT(shutdown_started); |
| 273 } |
| 274 /* request_call(); |
| 275 */ |
| 276 break; |
| 277 case FLING_SERVER_READ_FOR_STREAMING: |
| 278 if (payload_buffer != NULL) { |
| 279 /* Received payload from client. */ |
| 280 start_write_op(); |
| 281 } else { |
| 282 /* Received end of stream from client. */ |
| 283 start_send_status(); |
| 284 } |
| 285 break; |
| 286 case FLING_SERVER_WRITE_FOR_STREAMING: |
| 287 /* Write completed at server */ |
| 288 grpc_byte_buffer_destroy(payload_buffer); |
| 289 payload_buffer = NULL; |
| 290 start_read_op(FLING_SERVER_READ_FOR_STREAMING); |
| 291 break; |
| 292 case FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING: |
| 293 /* Metadata send completed at server */ |
| 294 break; |
| 295 case FLING_SERVER_SEND_STATUS_FOR_STREAMING: |
| 296 /* Send status and close completed at server */ |
| 297 grpc_call_destroy(call); |
| 298 if (!shutdown_started) request_call(); |
| 299 break; |
| 300 case FLING_SERVER_READ_FOR_UNARY: |
| 301 /* Finished payload read for unary. Start all reamaining |
| 302 * unary ops in a batch. |
| 303 */ |
| 304 handle_unary_method(); |
| 305 break; |
| 306 case FLING_SERVER_BATCH_OPS_FOR_UNARY: |
| 307 /* Finished unary call. */ |
| 308 grpc_byte_buffer_destroy(payload_buffer); |
| 309 payload_buffer = NULL; |
| 310 grpc_call_destroy(call); |
| 311 if (!shutdown_started) request_call(); |
| 312 break; |
| 313 } |
| 314 break; |
| 315 case GRPC_QUEUE_SHUTDOWN: |
| 316 GPR_ASSERT(shutdown_started); |
| 317 shutdown_finished = 1; |
| 318 break; |
| 319 case GRPC_QUEUE_TIMEOUT: |
| 320 break; |
| 321 } |
| 322 } |
| 323 grpc_profiler_stop(); |
| 324 grpc_call_details_destroy(&call_details); |
| 325 |
| 326 grpc_server_destroy(server); |
| 327 grpc_completion_queue_destroy(cq); |
| 328 grpc_shutdown(); |
| 329 return 0; |
| 330 } |
OLD | NEW |