| Index: third_party/grpc/test/core/end2end/fixtures/proxy.c
|
| diff --git a/third_party/grpc/test/core/end2end/fixtures/proxy.c b/third_party/grpc/test/core/end2end/fixtures/proxy.c
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..434e75dd15ed22b9c3a80708ec91746d1f022cad
|
| --- /dev/null
|
| +++ b/third_party/grpc/test/core/end2end/fixtures/proxy.c
|
| @@ -0,0 +1,436 @@
|
| +/*
|
| + *
|
| + * Copyright 2015, Google Inc.
|
| + * All rights reserved.
|
| + *
|
| + * Redistribution and use in source and binary forms, with or without
|
| + * modification, are permitted provided that the following conditions are
|
| + * met:
|
| + *
|
| + * * Redistributions of source code must retain the above copyright
|
| + * notice, this list of conditions and the following disclaimer.
|
| + * * Redistributions in binary form must reproduce the above
|
| + * copyright notice, this list of conditions and the following disclaimer
|
| + * in the documentation and/or other materials provided with the
|
| + * distribution.
|
| + * * Neither the name of Google Inc. nor the names of its
|
| + * contributors may be used to endorse or promote products derived from
|
| + * this software without specific prior written permission.
|
| + *
|
| + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| + *
|
| + */
|
| +
|
| +#include "test/core/end2end/fixtures/proxy.h"
|
| +
|
| +#include <string.h>
|
| +
|
| +#include <grpc/support/alloc.h>
|
| +#include <grpc/support/host_port.h>
|
| +#include <grpc/support/log.h>
|
| +#include <grpc/support/sync.h>
|
| +#include <grpc/support/thd.h>
|
| +#include <grpc/support/useful.h>
|
| +
|
| +#include "test/core/util/port.h"
|
| +
|
| +struct grpc_end2end_proxy {
|
| + gpr_thd_id thd;
|
| + char *proxy_port;
|
| + char *server_port;
|
| + grpc_completion_queue *cq;
|
| + grpc_server *server;
|
| + grpc_channel *client;
|
| +
|
| + int shutdown;
|
| +
|
| + /* requested call */
|
| + grpc_call *new_call;
|
| + grpc_call_details new_call_details;
|
| + grpc_metadata_array new_call_metadata;
|
| +};
|
| +
|
| +typedef struct {
|
| + void (*func)(void *arg, int success);
|
| + void *arg;
|
| +} closure;
|
| +
|
| +typedef struct {
|
| + gpr_refcount refs;
|
| + grpc_end2end_proxy *proxy;
|
| +
|
| + grpc_call *c2p;
|
| + grpc_call *p2s;
|
| +
|
| + grpc_metadata_array c2p_initial_metadata;
|
| + grpc_metadata_array p2s_initial_metadata;
|
| +
|
| + grpc_byte_buffer *c2p_msg;
|
| + grpc_byte_buffer *p2s_msg;
|
| +
|
| + grpc_metadata_array p2s_trailing_metadata;
|
| + grpc_status_code p2s_status;
|
| + char *p2s_status_details;
|
| + size_t p2s_status_details_capacity;
|
| +
|
| + int c2p_server_cancelled;
|
| +} proxy_call;
|
| +
|
| +static void thread_main(void *arg);
|
| +static void request_call(grpc_end2end_proxy *proxy);
|
| +
|
| +grpc_end2end_proxy *grpc_end2end_proxy_create(
|
| + const grpc_end2end_proxy_def *def) {
|
| + gpr_thd_options opt = gpr_thd_options_default();
|
| + int proxy_port = grpc_pick_unused_port_or_die();
|
| + int server_port = grpc_pick_unused_port_or_die();
|
| +
|
| + grpc_end2end_proxy *proxy = gpr_malloc(sizeof(*proxy));
|
| + memset(proxy, 0, sizeof(*proxy));
|
| +
|
| + gpr_join_host_port(&proxy->proxy_port, "localhost", proxy_port);
|
| + gpr_join_host_port(&proxy->server_port, "localhost", server_port);
|
| +
|
| + gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port,
|
| + proxy->server_port);
|
| +
|
| + proxy->cq = grpc_completion_queue_create(NULL);
|
| + proxy->server = def->create_server(proxy->proxy_port);
|
| + proxy->client = def->create_client(proxy->server_port);
|
| +
|
| + grpc_server_register_completion_queue(proxy->server, proxy->cq, NULL);
|
| + grpc_server_start(proxy->server);
|
| +
|
| + gpr_thd_options_set_joinable(&opt);
|
| + GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt));
|
| +
|
| + request_call(proxy);
|
| +
|
| + return proxy;
|
| +}
|
| +
|
| +static closure *new_closure(void (*func)(void *arg, int success), void *arg) {
|
| + closure *cl = gpr_malloc(sizeof(*cl));
|
| + cl->func = func;
|
| + cl->arg = arg;
|
| + return cl;
|
| +}
|
| +
|
| +static void shutdown_complete(void *arg, int success) {
|
| + grpc_end2end_proxy *proxy = arg;
|
| + proxy->shutdown = 1;
|
| + grpc_completion_queue_shutdown(proxy->cq);
|
| +}
|
| +
|
| +void grpc_end2end_proxy_destroy(grpc_end2end_proxy *proxy) {
|
| + grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
|
| + new_closure(shutdown_complete, proxy));
|
| + gpr_thd_join(proxy->thd);
|
| + gpr_free(proxy->proxy_port);
|
| + gpr_free(proxy->server_port);
|
| + grpc_server_destroy(proxy->server);
|
| + grpc_channel_destroy(proxy->client);
|
| + grpc_completion_queue_destroy(proxy->cq);
|
| + grpc_call_details_destroy(&proxy->new_call_details);
|
| + gpr_free(proxy);
|
| +}
|
| +
|
| +static void unrefpc(proxy_call *pc, const char *reason) {
|
| + if (gpr_unref(&pc->refs)) {
|
| + grpc_call_destroy(pc->c2p);
|
| + grpc_call_destroy(pc->p2s);
|
| + grpc_metadata_array_destroy(&pc->c2p_initial_metadata);
|
| + grpc_metadata_array_destroy(&pc->p2s_initial_metadata);
|
| + grpc_metadata_array_destroy(&pc->p2s_trailing_metadata);
|
| + gpr_free(pc->p2s_status_details);
|
| + gpr_free(pc);
|
| + }
|
| +}
|
| +
|
| +static void refpc(proxy_call *pc, const char *reason) { gpr_ref(&pc->refs); }
|
| +
|
| +static void on_c2p_sent_initial_metadata(void *arg, int success) {
|
| + proxy_call *pc = arg;
|
| + unrefpc(pc, "on_c2p_sent_initial_metadata");
|
| +}
|
| +
|
| +static void on_p2s_recv_initial_metadata(void *arg, int success) {
|
| + proxy_call *pc = arg;
|
| + grpc_op op;
|
| + grpc_call_error err;
|
| +
|
| + if (!pc->proxy->shutdown) {
|
| + op.op = GRPC_OP_SEND_INITIAL_METADATA;
|
| + op.flags = 0;
|
| + op.reserved = NULL;
|
| + op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
|
| + op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata;
|
| + refpc(pc, "on_c2p_sent_initial_metadata");
|
| + err = grpc_call_start_batch(
|
| + pc->c2p, &op, 1, new_closure(on_c2p_sent_initial_metadata, pc), NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| + }
|
| +
|
| + unrefpc(pc, "on_p2s_recv_initial_metadata");
|
| +}
|
| +
|
| +static void on_p2s_sent_initial_metadata(void *arg, int success) {
|
| + proxy_call *pc = arg;
|
| + unrefpc(pc, "on_p2s_sent_initial_metadata");
|
| +}
|
| +
|
| +static void on_c2p_recv_msg(void *arg, int success);
|
| +
|
| +static void on_p2s_sent_message(void *arg, int success) {
|
| + proxy_call *pc = arg;
|
| + grpc_op op;
|
| + grpc_call_error err;
|
| +
|
| + grpc_byte_buffer_destroy(pc->c2p_msg);
|
| + if (!pc->proxy->shutdown && success) {
|
| + op.op = GRPC_OP_RECV_MESSAGE;
|
| + op.flags = 0;
|
| + op.reserved = NULL;
|
| + op.data.recv_message = &pc->c2p_msg;
|
| + refpc(pc, "on_c2p_recv_msg");
|
| + err = grpc_call_start_batch(pc->c2p, &op, 1,
|
| + new_closure(on_c2p_recv_msg, pc), NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| + }
|
| +
|
| + unrefpc(pc, "on_p2s_sent_message");
|
| +}
|
| +
|
| +static void on_p2s_sent_close(void *arg, int success) {
|
| + proxy_call *pc = arg;
|
| + unrefpc(pc, "on_p2s_sent_close");
|
| +}
|
| +
|
| +static void on_c2p_recv_msg(void *arg, int success) {
|
| + proxy_call *pc = arg;
|
| + grpc_op op;
|
| + grpc_call_error err;
|
| +
|
| + if (!pc->proxy->shutdown && success) {
|
| + if (pc->c2p_msg != NULL) {
|
| + op.op = GRPC_OP_SEND_MESSAGE;
|
| + op.flags = 0;
|
| + op.reserved = NULL;
|
| + op.data.send_message = pc->c2p_msg;
|
| + refpc(pc, "on_p2s_sent_message");
|
| + err = grpc_call_start_batch(pc->p2s, &op, 1,
|
| + new_closure(on_p2s_sent_message, pc), NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| + } else {
|
| + op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
|
| + op.flags = 0;
|
| + op.reserved = NULL;
|
| + refpc(pc, "on_p2s_sent_close");
|
| + err = grpc_call_start_batch(pc->p2s, &op, 1,
|
| + new_closure(on_p2s_sent_close, pc), NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| + }
|
| + }
|
| +
|
| + unrefpc(pc, "on_c2p_recv_msg");
|
| +}
|
| +
|
| +static void on_p2s_recv_msg(void *arg, int success);
|
| +
|
| +static void on_c2p_sent_message(void *arg, int success) {
|
| + proxy_call *pc = arg;
|
| + grpc_op op;
|
| + grpc_call_error err;
|
| +
|
| + grpc_byte_buffer_destroy(pc->p2s_msg);
|
| + if (!pc->proxy->shutdown && success) {
|
| + op.op = GRPC_OP_RECV_MESSAGE;
|
| + op.flags = 0;
|
| + op.reserved = NULL;
|
| + op.data.recv_message = &pc->p2s_msg;
|
| + refpc(pc, "on_p2s_recv_msg");
|
| + err = grpc_call_start_batch(pc->p2s, &op, 1,
|
| + new_closure(on_p2s_recv_msg, pc), NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| + }
|
| +
|
| + unrefpc(pc, "on_c2p_sent_message");
|
| +}
|
| +
|
| +static void on_p2s_recv_msg(void *arg, int success) {
|
| + proxy_call *pc = arg;
|
| + grpc_op op;
|
| + grpc_call_error err;
|
| +
|
| + if (!pc->proxy->shutdown && success && pc->p2s_msg) {
|
| + op.op = GRPC_OP_SEND_MESSAGE;
|
| + op.flags = 0;
|
| + op.reserved = NULL;
|
| + op.data.send_message = pc->p2s_msg;
|
| + refpc(pc, "on_c2p_sent_message");
|
| + err = grpc_call_start_batch(pc->c2p, &op, 1,
|
| + new_closure(on_c2p_sent_message, pc), NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| + }
|
| + unrefpc(pc, "on_p2s_recv_msg");
|
| +}
|
| +
|
| +static void on_c2p_sent_status(void *arg, int success) {
|
| + proxy_call *pc = arg;
|
| + unrefpc(pc, "on_c2p_sent_status");
|
| +}
|
| +
|
| +static void on_p2s_status(void *arg, int success) {
|
| + proxy_call *pc = arg;
|
| + grpc_op op;
|
| + grpc_call_error err;
|
| +
|
| + if (!pc->proxy->shutdown) {
|
| + GPR_ASSERT(success);
|
| + op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
|
| + op.flags = 0;
|
| + op.reserved = NULL;
|
| + op.data.send_status_from_server.trailing_metadata_count =
|
| + pc->p2s_trailing_metadata.count;
|
| + op.data.send_status_from_server.trailing_metadata =
|
| + pc->p2s_trailing_metadata.metadata;
|
| + op.data.send_status_from_server.status = pc->p2s_status;
|
| + op.data.send_status_from_server.status_details = pc->p2s_status_details;
|
| + refpc(pc, "on_c2p_sent_status");
|
| + err = grpc_call_start_batch(pc->c2p, &op, 1,
|
| + new_closure(on_c2p_sent_status, pc), NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| + }
|
| +
|
| + unrefpc(pc, "on_p2s_status");
|
| +}
|
| +
|
| +static void on_c2p_closed(void *arg, int success) {
|
| + proxy_call *pc = arg;
|
| + unrefpc(pc, "on_c2p_closed");
|
| +}
|
| +
|
| +static void on_new_call(void *arg, int success) {
|
| + grpc_end2end_proxy *proxy = arg;
|
| + grpc_call_error err;
|
| +
|
| + if (success) {
|
| + grpc_op op;
|
| + proxy_call *pc = gpr_malloc(sizeof(*pc));
|
| + memset(pc, 0, sizeof(*pc));
|
| + pc->proxy = proxy;
|
| + GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata,
|
| + proxy->new_call_metadata);
|
| + pc->c2p = proxy->new_call;
|
| + pc->p2s = grpc_channel_create_call(
|
| + proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq,
|
| + proxy->new_call_details.method, proxy->new_call_details.host,
|
| + proxy->new_call_details.deadline, NULL);
|
| + gpr_ref_init(&pc->refs, 1);
|
| +
|
| + op.flags = 0;
|
| + op.reserved = NULL;
|
| +
|
| + op.op = GRPC_OP_RECV_INITIAL_METADATA;
|
| + op.data.recv_initial_metadata = &pc->p2s_initial_metadata;
|
| + refpc(pc, "on_p2s_recv_initial_metadata");
|
| + err = grpc_call_start_batch(
|
| + pc->p2s, &op, 1, new_closure(on_p2s_recv_initial_metadata, pc), NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| +
|
| + op.op = GRPC_OP_SEND_INITIAL_METADATA;
|
| + op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
|
| + op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
|
| + refpc(pc, "on_p2s_sent_initial_metadata");
|
| + err = grpc_call_start_batch(
|
| + pc->p2s, &op, 1, new_closure(on_p2s_sent_initial_metadata, pc), NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| +
|
| + op.op = GRPC_OP_RECV_MESSAGE;
|
| + op.data.recv_message = &pc->c2p_msg;
|
| + refpc(pc, "on_c2p_recv_msg");
|
| + err = grpc_call_start_batch(pc->c2p, &op, 1,
|
| + new_closure(on_c2p_recv_msg, pc), NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| +
|
| + op.op = GRPC_OP_RECV_MESSAGE;
|
| + op.data.recv_message = &pc->p2s_msg;
|
| + refpc(pc, "on_p2s_recv_msg");
|
| + err = grpc_call_start_batch(pc->p2s, &op, 1,
|
| + new_closure(on_p2s_recv_msg, pc), NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| +
|
| + op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
|
| + op.data.recv_status_on_client.trailing_metadata =
|
| + &pc->p2s_trailing_metadata;
|
| + op.data.recv_status_on_client.status = &pc->p2s_status;
|
| + op.data.recv_status_on_client.status_details = &pc->p2s_status_details;
|
| + op.data.recv_status_on_client.status_details_capacity =
|
| + &pc->p2s_status_details_capacity;
|
| + refpc(pc, "on_p2s_status");
|
| + err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc),
|
| + NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| +
|
| + op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
|
| + op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;
|
| + refpc(pc, "on_c2p_closed");
|
| + err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc),
|
| + NULL);
|
| + GPR_ASSERT(err == GRPC_CALL_OK);
|
| +
|
| + request_call(proxy);
|
| +
|
| + unrefpc(pc, "init");
|
| + } else {
|
| + GPR_ASSERT(proxy->new_call == NULL);
|
| + }
|
| +}
|
| +
|
| +static void request_call(grpc_end2end_proxy *proxy) {
|
| + proxy->new_call = NULL;
|
| + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
|
| + proxy->server, &proxy->new_call,
|
| + &proxy->new_call_details,
|
| + &proxy->new_call_metadata, proxy->cq,
|
| + proxy->cq, new_closure(on_new_call, proxy)));
|
| +}
|
| +
|
| +static void thread_main(void *arg) {
|
| + grpc_end2end_proxy *proxy = arg;
|
| + closure *cl;
|
| + for (;;) {
|
| + grpc_event ev = grpc_completion_queue_next(
|
| + proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
|
| + switch (ev.type) {
|
| + case GRPC_QUEUE_TIMEOUT:
|
| + gpr_log(GPR_ERROR, "Should never reach here");
|
| + abort();
|
| + case GRPC_QUEUE_SHUTDOWN:
|
| + return;
|
| + case GRPC_OP_COMPLETE:
|
| + cl = ev.tag;
|
| + cl->func(cl->arg, ev.success);
|
| + gpr_free(cl);
|
| + break;
|
| + }
|
| + }
|
| +}
|
| +
|
| +const char *grpc_end2end_proxy_get_client_target(grpc_end2end_proxy *proxy) {
|
| + return proxy->proxy_port;
|
| +}
|
| +
|
| +const char *grpc_end2end_proxy_get_server_port(grpc_end2end_proxy *proxy) {
|
| + return proxy->server_port;
|
| +}
|
|
|