OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 #include "test/core/end2end/fixtures/proxy.h" |
| 35 |
| 36 #include <string.h> |
| 37 |
| 38 #include <grpc/support/alloc.h> |
| 39 #include <grpc/support/host_port.h> |
| 40 #include <grpc/support/log.h> |
| 41 #include <grpc/support/sync.h> |
| 42 #include <grpc/support/thd.h> |
| 43 #include <grpc/support/useful.h> |
| 44 |
| 45 #include "test/core/util/port.h" |
| 46 |
| 47 struct grpc_end2end_proxy { |
| 48 gpr_thd_id thd; |
| 49 char *proxy_port; |
| 50 char *server_port; |
| 51 grpc_completion_queue *cq; |
| 52 grpc_server *server; |
| 53 grpc_channel *client; |
| 54 |
| 55 int shutdown; |
| 56 |
| 57 /* requested call */ |
| 58 grpc_call *new_call; |
| 59 grpc_call_details new_call_details; |
| 60 grpc_metadata_array new_call_metadata; |
| 61 }; |
| 62 |
| 63 typedef struct { |
| 64 void (*func)(void *arg, int success); |
| 65 void *arg; |
| 66 } closure; |
| 67 |
| 68 typedef struct { |
| 69 gpr_refcount refs; |
| 70 grpc_end2end_proxy *proxy; |
| 71 |
| 72 grpc_call *c2p; |
| 73 grpc_call *p2s; |
| 74 |
| 75 grpc_metadata_array c2p_initial_metadata; |
| 76 grpc_metadata_array p2s_initial_metadata; |
| 77 |
| 78 grpc_byte_buffer *c2p_msg; |
| 79 grpc_byte_buffer *p2s_msg; |
| 80 |
| 81 grpc_metadata_array p2s_trailing_metadata; |
| 82 grpc_status_code p2s_status; |
| 83 char *p2s_status_details; |
| 84 size_t p2s_status_details_capacity; |
| 85 |
| 86 int c2p_server_cancelled; |
| 87 } proxy_call; |
| 88 |
| 89 static void thread_main(void *arg); |
| 90 static void request_call(grpc_end2end_proxy *proxy); |
| 91 |
| 92 grpc_end2end_proxy *grpc_end2end_proxy_create( |
| 93 const grpc_end2end_proxy_def *def) { |
| 94 gpr_thd_options opt = gpr_thd_options_default(); |
| 95 int proxy_port = grpc_pick_unused_port_or_die(); |
| 96 int server_port = grpc_pick_unused_port_or_die(); |
| 97 |
| 98 grpc_end2end_proxy *proxy = gpr_malloc(sizeof(*proxy)); |
| 99 memset(proxy, 0, sizeof(*proxy)); |
| 100 |
| 101 gpr_join_host_port(&proxy->proxy_port, "localhost", proxy_port); |
| 102 gpr_join_host_port(&proxy->server_port, "localhost", server_port); |
| 103 |
| 104 gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port, |
| 105 proxy->server_port); |
| 106 |
| 107 proxy->cq = grpc_completion_queue_create(NULL); |
| 108 proxy->server = def->create_server(proxy->proxy_port); |
| 109 proxy->client = def->create_client(proxy->server_port); |
| 110 |
| 111 grpc_server_register_completion_queue(proxy->server, proxy->cq, NULL); |
| 112 grpc_server_start(proxy->server); |
| 113 |
| 114 gpr_thd_options_set_joinable(&opt); |
| 115 GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt)); |
| 116 |
| 117 request_call(proxy); |
| 118 |
| 119 return proxy; |
| 120 } |
| 121 |
| 122 static closure *new_closure(void (*func)(void *arg, int success), void *arg) { |
| 123 closure *cl = gpr_malloc(sizeof(*cl)); |
| 124 cl->func = func; |
| 125 cl->arg = arg; |
| 126 return cl; |
| 127 } |
| 128 |
| 129 static void shutdown_complete(void *arg, int success) { |
| 130 grpc_end2end_proxy *proxy = arg; |
| 131 proxy->shutdown = 1; |
| 132 grpc_completion_queue_shutdown(proxy->cq); |
| 133 } |
| 134 |
| 135 void grpc_end2end_proxy_destroy(grpc_end2end_proxy *proxy) { |
| 136 grpc_server_shutdown_and_notify(proxy->server, proxy->cq, |
| 137 new_closure(shutdown_complete, proxy)); |
| 138 gpr_thd_join(proxy->thd); |
| 139 gpr_free(proxy->proxy_port); |
| 140 gpr_free(proxy->server_port); |
| 141 grpc_server_destroy(proxy->server); |
| 142 grpc_channel_destroy(proxy->client); |
| 143 grpc_completion_queue_destroy(proxy->cq); |
| 144 grpc_call_details_destroy(&proxy->new_call_details); |
| 145 gpr_free(proxy); |
| 146 } |
| 147 |
| 148 static void unrefpc(proxy_call *pc, const char *reason) { |
| 149 if (gpr_unref(&pc->refs)) { |
| 150 grpc_call_destroy(pc->c2p); |
| 151 grpc_call_destroy(pc->p2s); |
| 152 grpc_metadata_array_destroy(&pc->c2p_initial_metadata); |
| 153 grpc_metadata_array_destroy(&pc->p2s_initial_metadata); |
| 154 grpc_metadata_array_destroy(&pc->p2s_trailing_metadata); |
| 155 gpr_free(pc->p2s_status_details); |
| 156 gpr_free(pc); |
| 157 } |
| 158 } |
| 159 |
| 160 static void refpc(proxy_call *pc, const char *reason) { gpr_ref(&pc->refs); } |
| 161 |
| 162 static void on_c2p_sent_initial_metadata(void *arg, int success) { |
| 163 proxy_call *pc = arg; |
| 164 unrefpc(pc, "on_c2p_sent_initial_metadata"); |
| 165 } |
| 166 |
| 167 static void on_p2s_recv_initial_metadata(void *arg, int success) { |
| 168 proxy_call *pc = arg; |
| 169 grpc_op op; |
| 170 grpc_call_error err; |
| 171 |
| 172 if (!pc->proxy->shutdown) { |
| 173 op.op = GRPC_OP_SEND_INITIAL_METADATA; |
| 174 op.flags = 0; |
| 175 op.reserved = NULL; |
| 176 op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count; |
| 177 op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata; |
| 178 refpc(pc, "on_c2p_sent_initial_metadata"); |
| 179 err = grpc_call_start_batch( |
| 180 pc->c2p, &op, 1, new_closure(on_c2p_sent_initial_metadata, pc), NULL); |
| 181 GPR_ASSERT(err == GRPC_CALL_OK); |
| 182 } |
| 183 |
| 184 unrefpc(pc, "on_p2s_recv_initial_metadata"); |
| 185 } |
| 186 |
| 187 static void on_p2s_sent_initial_metadata(void *arg, int success) { |
| 188 proxy_call *pc = arg; |
| 189 unrefpc(pc, "on_p2s_sent_initial_metadata"); |
| 190 } |
| 191 |
| 192 static void on_c2p_recv_msg(void *arg, int success); |
| 193 |
| 194 static void on_p2s_sent_message(void *arg, int success) { |
| 195 proxy_call *pc = arg; |
| 196 grpc_op op; |
| 197 grpc_call_error err; |
| 198 |
| 199 grpc_byte_buffer_destroy(pc->c2p_msg); |
| 200 if (!pc->proxy->shutdown && success) { |
| 201 op.op = GRPC_OP_RECV_MESSAGE; |
| 202 op.flags = 0; |
| 203 op.reserved = NULL; |
| 204 op.data.recv_message = &pc->c2p_msg; |
| 205 refpc(pc, "on_c2p_recv_msg"); |
| 206 err = grpc_call_start_batch(pc->c2p, &op, 1, |
| 207 new_closure(on_c2p_recv_msg, pc), NULL); |
| 208 GPR_ASSERT(err == GRPC_CALL_OK); |
| 209 } |
| 210 |
| 211 unrefpc(pc, "on_p2s_sent_message"); |
| 212 } |
| 213 |
| 214 static void on_p2s_sent_close(void *arg, int success) { |
| 215 proxy_call *pc = arg; |
| 216 unrefpc(pc, "on_p2s_sent_close"); |
| 217 } |
| 218 |
| 219 static void on_c2p_recv_msg(void *arg, int success) { |
| 220 proxy_call *pc = arg; |
| 221 grpc_op op; |
| 222 grpc_call_error err; |
| 223 |
| 224 if (!pc->proxy->shutdown && success) { |
| 225 if (pc->c2p_msg != NULL) { |
| 226 op.op = GRPC_OP_SEND_MESSAGE; |
| 227 op.flags = 0; |
| 228 op.reserved = NULL; |
| 229 op.data.send_message = pc->c2p_msg; |
| 230 refpc(pc, "on_p2s_sent_message"); |
| 231 err = grpc_call_start_batch(pc->p2s, &op, 1, |
| 232 new_closure(on_p2s_sent_message, pc), NULL); |
| 233 GPR_ASSERT(err == GRPC_CALL_OK); |
| 234 } else { |
| 235 op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; |
| 236 op.flags = 0; |
| 237 op.reserved = NULL; |
| 238 refpc(pc, "on_p2s_sent_close"); |
| 239 err = grpc_call_start_batch(pc->p2s, &op, 1, |
| 240 new_closure(on_p2s_sent_close, pc), NULL); |
| 241 GPR_ASSERT(err == GRPC_CALL_OK); |
| 242 } |
| 243 } |
| 244 |
| 245 unrefpc(pc, "on_c2p_recv_msg"); |
| 246 } |
| 247 |
| 248 static void on_p2s_recv_msg(void *arg, int success); |
| 249 |
| 250 static void on_c2p_sent_message(void *arg, int success) { |
| 251 proxy_call *pc = arg; |
| 252 grpc_op op; |
| 253 grpc_call_error err; |
| 254 |
| 255 grpc_byte_buffer_destroy(pc->p2s_msg); |
| 256 if (!pc->proxy->shutdown && success) { |
| 257 op.op = GRPC_OP_RECV_MESSAGE; |
| 258 op.flags = 0; |
| 259 op.reserved = NULL; |
| 260 op.data.recv_message = &pc->p2s_msg; |
| 261 refpc(pc, "on_p2s_recv_msg"); |
| 262 err = grpc_call_start_batch(pc->p2s, &op, 1, |
| 263 new_closure(on_p2s_recv_msg, pc), NULL); |
| 264 GPR_ASSERT(err == GRPC_CALL_OK); |
| 265 } |
| 266 |
| 267 unrefpc(pc, "on_c2p_sent_message"); |
| 268 } |
| 269 |
| 270 static void on_p2s_recv_msg(void *arg, int success) { |
| 271 proxy_call *pc = arg; |
| 272 grpc_op op; |
| 273 grpc_call_error err; |
| 274 |
| 275 if (!pc->proxy->shutdown && success && pc->p2s_msg) { |
| 276 op.op = GRPC_OP_SEND_MESSAGE; |
| 277 op.flags = 0; |
| 278 op.reserved = NULL; |
| 279 op.data.send_message = pc->p2s_msg; |
| 280 refpc(pc, "on_c2p_sent_message"); |
| 281 err = grpc_call_start_batch(pc->c2p, &op, 1, |
| 282 new_closure(on_c2p_sent_message, pc), NULL); |
| 283 GPR_ASSERT(err == GRPC_CALL_OK); |
| 284 } |
| 285 unrefpc(pc, "on_p2s_recv_msg"); |
| 286 } |
| 287 |
| 288 static void on_c2p_sent_status(void *arg, int success) { |
| 289 proxy_call *pc = arg; |
| 290 unrefpc(pc, "on_c2p_sent_status"); |
| 291 } |
| 292 |
| 293 static void on_p2s_status(void *arg, int success) { |
| 294 proxy_call *pc = arg; |
| 295 grpc_op op; |
| 296 grpc_call_error err; |
| 297 |
| 298 if (!pc->proxy->shutdown) { |
| 299 GPR_ASSERT(success); |
| 300 op.op = GRPC_OP_SEND_STATUS_FROM_SERVER; |
| 301 op.flags = 0; |
| 302 op.reserved = NULL; |
| 303 op.data.send_status_from_server.trailing_metadata_count = |
| 304 pc->p2s_trailing_metadata.count; |
| 305 op.data.send_status_from_server.trailing_metadata = |
| 306 pc->p2s_trailing_metadata.metadata; |
| 307 op.data.send_status_from_server.status = pc->p2s_status; |
| 308 op.data.send_status_from_server.status_details = pc->p2s_status_details; |
| 309 refpc(pc, "on_c2p_sent_status"); |
| 310 err = grpc_call_start_batch(pc->c2p, &op, 1, |
| 311 new_closure(on_c2p_sent_status, pc), NULL); |
| 312 GPR_ASSERT(err == GRPC_CALL_OK); |
| 313 } |
| 314 |
| 315 unrefpc(pc, "on_p2s_status"); |
| 316 } |
| 317 |
| 318 static void on_c2p_closed(void *arg, int success) { |
| 319 proxy_call *pc = arg; |
| 320 unrefpc(pc, "on_c2p_closed"); |
| 321 } |
| 322 |
| 323 static void on_new_call(void *arg, int success) { |
| 324 grpc_end2end_proxy *proxy = arg; |
| 325 grpc_call_error err; |
| 326 |
| 327 if (success) { |
| 328 grpc_op op; |
| 329 proxy_call *pc = gpr_malloc(sizeof(*pc)); |
| 330 memset(pc, 0, sizeof(*pc)); |
| 331 pc->proxy = proxy; |
| 332 GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata, |
| 333 proxy->new_call_metadata); |
| 334 pc->c2p = proxy->new_call; |
| 335 pc->p2s = grpc_channel_create_call( |
| 336 proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq, |
| 337 proxy->new_call_details.method, proxy->new_call_details.host, |
| 338 proxy->new_call_details.deadline, NULL); |
| 339 gpr_ref_init(&pc->refs, 1); |
| 340 |
| 341 op.flags = 0; |
| 342 op.reserved = NULL; |
| 343 |
| 344 op.op = GRPC_OP_RECV_INITIAL_METADATA; |
| 345 op.data.recv_initial_metadata = &pc->p2s_initial_metadata; |
| 346 refpc(pc, "on_p2s_recv_initial_metadata"); |
| 347 err = grpc_call_start_batch( |
| 348 pc->p2s, &op, 1, new_closure(on_p2s_recv_initial_metadata, pc), NULL); |
| 349 GPR_ASSERT(err == GRPC_CALL_OK); |
| 350 |
| 351 op.op = GRPC_OP_SEND_INITIAL_METADATA; |
| 352 op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count; |
| 353 op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata; |
| 354 refpc(pc, "on_p2s_sent_initial_metadata"); |
| 355 err = grpc_call_start_batch( |
| 356 pc->p2s, &op, 1, new_closure(on_p2s_sent_initial_metadata, pc), NULL); |
| 357 GPR_ASSERT(err == GRPC_CALL_OK); |
| 358 |
| 359 op.op = GRPC_OP_RECV_MESSAGE; |
| 360 op.data.recv_message = &pc->c2p_msg; |
| 361 refpc(pc, "on_c2p_recv_msg"); |
| 362 err = grpc_call_start_batch(pc->c2p, &op, 1, |
| 363 new_closure(on_c2p_recv_msg, pc), NULL); |
| 364 GPR_ASSERT(err == GRPC_CALL_OK); |
| 365 |
| 366 op.op = GRPC_OP_RECV_MESSAGE; |
| 367 op.data.recv_message = &pc->p2s_msg; |
| 368 refpc(pc, "on_p2s_recv_msg"); |
| 369 err = grpc_call_start_batch(pc->p2s, &op, 1, |
| 370 new_closure(on_p2s_recv_msg, pc), NULL); |
| 371 GPR_ASSERT(err == GRPC_CALL_OK); |
| 372 |
| 373 op.op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
| 374 op.data.recv_status_on_client.trailing_metadata = |
| 375 &pc->p2s_trailing_metadata; |
| 376 op.data.recv_status_on_client.status = &pc->p2s_status; |
| 377 op.data.recv_status_on_client.status_details = &pc->p2s_status_details; |
| 378 op.data.recv_status_on_client.status_details_capacity = |
| 379 &pc->p2s_status_details_capacity; |
| 380 refpc(pc, "on_p2s_status"); |
| 381 err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc), |
| 382 NULL); |
| 383 GPR_ASSERT(err == GRPC_CALL_OK); |
| 384 |
| 385 op.op = GRPC_OP_RECV_CLOSE_ON_SERVER; |
| 386 op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled; |
| 387 refpc(pc, "on_c2p_closed"); |
| 388 err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc), |
| 389 NULL); |
| 390 GPR_ASSERT(err == GRPC_CALL_OK); |
| 391 |
| 392 request_call(proxy); |
| 393 |
| 394 unrefpc(pc, "init"); |
| 395 } else { |
| 396 GPR_ASSERT(proxy->new_call == NULL); |
| 397 } |
| 398 } |
| 399 |
| 400 static void request_call(grpc_end2end_proxy *proxy) { |
| 401 proxy->new_call = NULL; |
| 402 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( |
| 403 proxy->server, &proxy->new_call, |
| 404 &proxy->new_call_details, |
| 405 &proxy->new_call_metadata, proxy->cq, |
| 406 proxy->cq, new_closure(on_new_call, proxy))); |
| 407 } |
| 408 |
| 409 static void thread_main(void *arg) { |
| 410 grpc_end2end_proxy *proxy = arg; |
| 411 closure *cl; |
| 412 for (;;) { |
| 413 grpc_event ev = grpc_completion_queue_next( |
| 414 proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL); |
| 415 switch (ev.type) { |
| 416 case GRPC_QUEUE_TIMEOUT: |
| 417 gpr_log(GPR_ERROR, "Should never reach here"); |
| 418 abort(); |
| 419 case GRPC_QUEUE_SHUTDOWN: |
| 420 return; |
| 421 case GRPC_OP_COMPLETE: |
| 422 cl = ev.tag; |
| 423 cl->func(cl->arg, ev.success); |
| 424 gpr_free(cl); |
| 425 break; |
| 426 } |
| 427 } |
| 428 } |
| 429 |
| 430 const char *grpc_end2end_proxy_get_client_target(grpc_end2end_proxy *proxy) { |
| 431 return proxy->proxy_port; |
| 432 } |
| 433 |
| 434 const char *grpc_end2end_proxy_get_server_port(grpc_end2end_proxy *proxy) { |
| 435 return proxy->server_port; |
| 436 } |
OLD | NEW |