Index: third_party/grpc/test/core/end2end/fixtures/proxy.c |
diff --git a/third_party/grpc/test/core/end2end/fixtures/proxy.c b/third_party/grpc/test/core/end2end/fixtures/proxy.c |
new file mode 100644 |
index 0000000000000000000000000000000000000000..434e75dd15ed22b9c3a80708ec91746d1f022cad |
--- /dev/null |
+++ b/third_party/grpc/test/core/end2end/fixtures/proxy.c |
@@ -0,0 +1,436 @@ |
+/* |
+ * |
+ * Copyright 2015, Google Inc. |
+ * All rights reserved. |
+ * |
+ * Redistribution and use in source and binary forms, with or without |
+ * modification, are permitted provided that the following conditions are |
+ * met: |
+ * |
+ * * Redistributions of source code must retain the above copyright |
+ * notice, this list of conditions and the following disclaimer. |
+ * * Redistributions in binary form must reproduce the above |
+ * copyright notice, this list of conditions and the following disclaimer |
+ * in the documentation and/or other materials provided with the |
+ * distribution. |
+ * * Neither the name of Google Inc. nor the names of its |
+ * contributors may be used to endorse or promote products derived from |
+ * this software without specific prior written permission. |
+ * |
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ * |
+ */ |
+ |
+#include "test/core/end2end/fixtures/proxy.h" |
+ |
+#include <string.h> |
+ |
+#include <grpc/support/alloc.h> |
+#include <grpc/support/host_port.h> |
+#include <grpc/support/log.h> |
+#include <grpc/support/sync.h> |
+#include <grpc/support/thd.h> |
+#include <grpc/support/useful.h> |
+ |
+#include "test/core/util/port.h" |
+ |
+struct grpc_end2end_proxy { |
+ gpr_thd_id thd; |
+ char *proxy_port; |
+ char *server_port; |
+ grpc_completion_queue *cq; |
+ grpc_server *server; |
+ grpc_channel *client; |
+ |
+ int shutdown; |
+ |
+ /* requested call */ |
+ grpc_call *new_call; |
+ grpc_call_details new_call_details; |
+ grpc_metadata_array new_call_metadata; |
+}; |
+ |
+typedef struct { |
+ void (*func)(void *arg, int success); |
+ void *arg; |
+} closure; |
+ |
+typedef struct { |
+ gpr_refcount refs; |
+ grpc_end2end_proxy *proxy; |
+ |
+ grpc_call *c2p; |
+ grpc_call *p2s; |
+ |
+ grpc_metadata_array c2p_initial_metadata; |
+ grpc_metadata_array p2s_initial_metadata; |
+ |
+ grpc_byte_buffer *c2p_msg; |
+ grpc_byte_buffer *p2s_msg; |
+ |
+ grpc_metadata_array p2s_trailing_metadata; |
+ grpc_status_code p2s_status; |
+ char *p2s_status_details; |
+ size_t p2s_status_details_capacity; |
+ |
+ int c2p_server_cancelled; |
+} proxy_call; |
+ |
+static void thread_main(void *arg); |
+static void request_call(grpc_end2end_proxy *proxy); |
+ |
+grpc_end2end_proxy *grpc_end2end_proxy_create( |
+ const grpc_end2end_proxy_def *def) { |
+ gpr_thd_options opt = gpr_thd_options_default(); |
+ int proxy_port = grpc_pick_unused_port_or_die(); |
+ int server_port = grpc_pick_unused_port_or_die(); |
+ |
+ grpc_end2end_proxy *proxy = gpr_malloc(sizeof(*proxy)); |
+ memset(proxy, 0, sizeof(*proxy)); |
+ |
+ gpr_join_host_port(&proxy->proxy_port, "localhost", proxy_port); |
+ gpr_join_host_port(&proxy->server_port, "localhost", server_port); |
+ |
+ gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port, |
+ proxy->server_port); |
+ |
+ proxy->cq = grpc_completion_queue_create(NULL); |
+ proxy->server = def->create_server(proxy->proxy_port); |
+ proxy->client = def->create_client(proxy->server_port); |
+ |
+ grpc_server_register_completion_queue(proxy->server, proxy->cq, NULL); |
+ grpc_server_start(proxy->server); |
+ |
+ gpr_thd_options_set_joinable(&opt); |
+ GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt)); |
+ |
+ request_call(proxy); |
+ |
+ return proxy; |
+} |
+ |
+static closure *new_closure(void (*func)(void *arg, int success), void *arg) { |
+ closure *cl = gpr_malloc(sizeof(*cl)); |
+ cl->func = func; |
+ cl->arg = arg; |
+ return cl; |
+} |
+ |
+static void shutdown_complete(void *arg, int success) { |
+ grpc_end2end_proxy *proxy = arg; |
+ proxy->shutdown = 1; |
+ grpc_completion_queue_shutdown(proxy->cq); |
+} |
+ |
+void grpc_end2end_proxy_destroy(grpc_end2end_proxy *proxy) { |
+ grpc_server_shutdown_and_notify(proxy->server, proxy->cq, |
+ new_closure(shutdown_complete, proxy)); |
+ gpr_thd_join(proxy->thd); |
+ gpr_free(proxy->proxy_port); |
+ gpr_free(proxy->server_port); |
+ grpc_server_destroy(proxy->server); |
+ grpc_channel_destroy(proxy->client); |
+ grpc_completion_queue_destroy(proxy->cq); |
+ grpc_call_details_destroy(&proxy->new_call_details); |
+ gpr_free(proxy); |
+} |
+ |
+static void unrefpc(proxy_call *pc, const char *reason) { |
+ if (gpr_unref(&pc->refs)) { |
+ grpc_call_destroy(pc->c2p); |
+ grpc_call_destroy(pc->p2s); |
+ grpc_metadata_array_destroy(&pc->c2p_initial_metadata); |
+ grpc_metadata_array_destroy(&pc->p2s_initial_metadata); |
+ grpc_metadata_array_destroy(&pc->p2s_trailing_metadata); |
+ gpr_free(pc->p2s_status_details); |
+ gpr_free(pc); |
+ } |
+} |
+ |
+static void refpc(proxy_call *pc, const char *reason) { gpr_ref(&pc->refs); } |
+ |
+static void on_c2p_sent_initial_metadata(void *arg, int success) { |
+ proxy_call *pc = arg; |
+ unrefpc(pc, "on_c2p_sent_initial_metadata"); |
+} |
+ |
+static void on_p2s_recv_initial_metadata(void *arg, int success) { |
+ proxy_call *pc = arg; |
+ grpc_op op; |
+ grpc_call_error err; |
+ |
+ if (!pc->proxy->shutdown) { |
+ op.op = GRPC_OP_SEND_INITIAL_METADATA; |
+ op.flags = 0; |
+ op.reserved = NULL; |
+ op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count; |
+ op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata; |
+ refpc(pc, "on_c2p_sent_initial_metadata"); |
+ err = grpc_call_start_batch( |
+ pc->c2p, &op, 1, new_closure(on_c2p_sent_initial_metadata, pc), NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ } |
+ |
+ unrefpc(pc, "on_p2s_recv_initial_metadata"); |
+} |
+ |
+static void on_p2s_sent_initial_metadata(void *arg, int success) { |
+ proxy_call *pc = arg; |
+ unrefpc(pc, "on_p2s_sent_initial_metadata"); |
+} |
+ |
+static void on_c2p_recv_msg(void *arg, int success); |
+ |
+static void on_p2s_sent_message(void *arg, int success) { |
+ proxy_call *pc = arg; |
+ grpc_op op; |
+ grpc_call_error err; |
+ |
+ grpc_byte_buffer_destroy(pc->c2p_msg); |
+ if (!pc->proxy->shutdown && success) { |
+ op.op = GRPC_OP_RECV_MESSAGE; |
+ op.flags = 0; |
+ op.reserved = NULL; |
+ op.data.recv_message = &pc->c2p_msg; |
+ refpc(pc, "on_c2p_recv_msg"); |
+ err = grpc_call_start_batch(pc->c2p, &op, 1, |
+ new_closure(on_c2p_recv_msg, pc), NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ } |
+ |
+ unrefpc(pc, "on_p2s_sent_message"); |
+} |
+ |
+static void on_p2s_sent_close(void *arg, int success) { |
+ proxy_call *pc = arg; |
+ unrefpc(pc, "on_p2s_sent_close"); |
+} |
+ |
+static void on_c2p_recv_msg(void *arg, int success) { |
+ proxy_call *pc = arg; |
+ grpc_op op; |
+ grpc_call_error err; |
+ |
+ if (!pc->proxy->shutdown && success) { |
+ if (pc->c2p_msg != NULL) { |
+ op.op = GRPC_OP_SEND_MESSAGE; |
+ op.flags = 0; |
+ op.reserved = NULL; |
+ op.data.send_message = pc->c2p_msg; |
+ refpc(pc, "on_p2s_sent_message"); |
+ err = grpc_call_start_batch(pc->p2s, &op, 1, |
+ new_closure(on_p2s_sent_message, pc), NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ } else { |
+ op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; |
+ op.flags = 0; |
+ op.reserved = NULL; |
+ refpc(pc, "on_p2s_sent_close"); |
+ err = grpc_call_start_batch(pc->p2s, &op, 1, |
+ new_closure(on_p2s_sent_close, pc), NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ } |
+ } |
+ |
+ unrefpc(pc, "on_c2p_recv_msg"); |
+} |
+ |
+static void on_p2s_recv_msg(void *arg, int success); |
+ |
+static void on_c2p_sent_message(void *arg, int success) { |
+ proxy_call *pc = arg; |
+ grpc_op op; |
+ grpc_call_error err; |
+ |
+ grpc_byte_buffer_destroy(pc->p2s_msg); |
+ if (!pc->proxy->shutdown && success) { |
+ op.op = GRPC_OP_RECV_MESSAGE; |
+ op.flags = 0; |
+ op.reserved = NULL; |
+ op.data.recv_message = &pc->p2s_msg; |
+ refpc(pc, "on_p2s_recv_msg"); |
+ err = grpc_call_start_batch(pc->p2s, &op, 1, |
+ new_closure(on_p2s_recv_msg, pc), NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ } |
+ |
+ unrefpc(pc, "on_c2p_sent_message"); |
+} |
+ |
+static void on_p2s_recv_msg(void *arg, int success) { |
+ proxy_call *pc = arg; |
+ grpc_op op; |
+ grpc_call_error err; |
+ |
+ if (!pc->proxy->shutdown && success && pc->p2s_msg) { |
+ op.op = GRPC_OP_SEND_MESSAGE; |
+ op.flags = 0; |
+ op.reserved = NULL; |
+ op.data.send_message = pc->p2s_msg; |
+ refpc(pc, "on_c2p_sent_message"); |
+ err = grpc_call_start_batch(pc->c2p, &op, 1, |
+ new_closure(on_c2p_sent_message, pc), NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ } |
+ unrefpc(pc, "on_p2s_recv_msg"); |
+} |
+ |
+static void on_c2p_sent_status(void *arg, int success) { |
+ proxy_call *pc = arg; |
+ unrefpc(pc, "on_c2p_sent_status"); |
+} |
+ |
+static void on_p2s_status(void *arg, int success) { |
+ proxy_call *pc = arg; |
+ grpc_op op; |
+ grpc_call_error err; |
+ |
+ if (!pc->proxy->shutdown) { |
+ GPR_ASSERT(success); |
+ op.op = GRPC_OP_SEND_STATUS_FROM_SERVER; |
+ op.flags = 0; |
+ op.reserved = NULL; |
+ op.data.send_status_from_server.trailing_metadata_count = |
+ pc->p2s_trailing_metadata.count; |
+ op.data.send_status_from_server.trailing_metadata = |
+ pc->p2s_trailing_metadata.metadata; |
+ op.data.send_status_from_server.status = pc->p2s_status; |
+ op.data.send_status_from_server.status_details = pc->p2s_status_details; |
+ refpc(pc, "on_c2p_sent_status"); |
+ err = grpc_call_start_batch(pc->c2p, &op, 1, |
+ new_closure(on_c2p_sent_status, pc), NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ } |
+ |
+ unrefpc(pc, "on_p2s_status"); |
+} |
+ |
+static void on_c2p_closed(void *arg, int success) { |
+ proxy_call *pc = arg; |
+ unrefpc(pc, "on_c2p_closed"); |
+} |
+ |
+static void on_new_call(void *arg, int success) { |
+ grpc_end2end_proxy *proxy = arg; |
+ grpc_call_error err; |
+ |
+ if (success) { |
+ grpc_op op; |
+ proxy_call *pc = gpr_malloc(sizeof(*pc)); |
+ memset(pc, 0, sizeof(*pc)); |
+ pc->proxy = proxy; |
+ GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata, |
+ proxy->new_call_metadata); |
+ pc->c2p = proxy->new_call; |
+ pc->p2s = grpc_channel_create_call( |
+ proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq, |
+ proxy->new_call_details.method, proxy->new_call_details.host, |
+ proxy->new_call_details.deadline, NULL); |
+ gpr_ref_init(&pc->refs, 1); |
+ |
+ op.flags = 0; |
+ op.reserved = NULL; |
+ |
+ op.op = GRPC_OP_RECV_INITIAL_METADATA; |
+ op.data.recv_initial_metadata = &pc->p2s_initial_metadata; |
+ refpc(pc, "on_p2s_recv_initial_metadata"); |
+ err = grpc_call_start_batch( |
+ pc->p2s, &op, 1, new_closure(on_p2s_recv_initial_metadata, pc), NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ |
+ op.op = GRPC_OP_SEND_INITIAL_METADATA; |
+ op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count; |
+ op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata; |
+ refpc(pc, "on_p2s_sent_initial_metadata"); |
+ err = grpc_call_start_batch( |
+ pc->p2s, &op, 1, new_closure(on_p2s_sent_initial_metadata, pc), NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ |
+ op.op = GRPC_OP_RECV_MESSAGE; |
+ op.data.recv_message = &pc->c2p_msg; |
+ refpc(pc, "on_c2p_recv_msg"); |
+ err = grpc_call_start_batch(pc->c2p, &op, 1, |
+ new_closure(on_c2p_recv_msg, pc), NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ |
+ op.op = GRPC_OP_RECV_MESSAGE; |
+ op.data.recv_message = &pc->p2s_msg; |
+ refpc(pc, "on_p2s_recv_msg"); |
+ err = grpc_call_start_batch(pc->p2s, &op, 1, |
+ new_closure(on_p2s_recv_msg, pc), NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ |
+ op.op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
+ op.data.recv_status_on_client.trailing_metadata = |
+ &pc->p2s_trailing_metadata; |
+ op.data.recv_status_on_client.status = &pc->p2s_status; |
+ op.data.recv_status_on_client.status_details = &pc->p2s_status_details; |
+ op.data.recv_status_on_client.status_details_capacity = |
+ &pc->p2s_status_details_capacity; |
+ refpc(pc, "on_p2s_status"); |
+ err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc), |
+ NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ |
+ op.op = GRPC_OP_RECV_CLOSE_ON_SERVER; |
+ op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled; |
+ refpc(pc, "on_c2p_closed"); |
+ err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc), |
+ NULL); |
+ GPR_ASSERT(err == GRPC_CALL_OK); |
+ |
+ request_call(proxy); |
+ |
+ unrefpc(pc, "init"); |
+ } else { |
+ GPR_ASSERT(proxy->new_call == NULL); |
+ } |
+} |
+ |
+static void request_call(grpc_end2end_proxy *proxy) { |
+ proxy->new_call = NULL; |
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( |
+ proxy->server, &proxy->new_call, |
+ &proxy->new_call_details, |
+ &proxy->new_call_metadata, proxy->cq, |
+ proxy->cq, new_closure(on_new_call, proxy))); |
+} |
+ |
+static void thread_main(void *arg) { |
+ grpc_end2end_proxy *proxy = arg; |
+ closure *cl; |
+ for (;;) { |
+ grpc_event ev = grpc_completion_queue_next( |
+ proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL); |
+ switch (ev.type) { |
+ case GRPC_QUEUE_TIMEOUT: |
+ gpr_log(GPR_ERROR, "Should never reach here"); |
+ abort(); |
+ case GRPC_QUEUE_SHUTDOWN: |
+ return; |
+ case GRPC_OP_COMPLETE: |
+ cl = ev.tag; |
+ cl->func(cl->arg, ev.success); |
+ gpr_free(cl); |
+ break; |
+ } |
+ } |
+} |
+ |
+const char *grpc_end2end_proxy_get_client_target(grpc_end2end_proxy *proxy) { |
+ return proxy->proxy_port; |
+} |
+ |
+const char *grpc_end2end_proxy_get_server_port(grpc_end2end_proxy *proxy) { |
+ return proxy->server_port; |
+} |