Index: third_party/grpc/src/core/channel/client_uchannel.c |
diff --git a/third_party/grpc/src/core/channel/client_uchannel.c b/third_party/grpc/src/core/channel/client_uchannel.c |
new file mode 100644 |
index 0000000000000000000000000000000000000000..83fcc3a87f52f644f81b6a801edf9eb3b7522fc9 |
--- /dev/null |
+++ b/third_party/grpc/src/core/channel/client_uchannel.c |
@@ -0,0 +1,243 @@ |
+/* |
+ * |
+ * Copyright 2015-2016, Google Inc. |
+ * All rights reserved. |
+ * |
+ * Redistribution and use in source and binary forms, with or without |
+ * modification, are permitted provided that the following conditions are |
+ * met: |
+ * |
+ * * Redistributions of source code must retain the above copyright |
+ * notice, this list of conditions and the following disclaimer. |
+ * * Redistributions in binary form must reproduce the above |
+ * copyright notice, this list of conditions and the following disclaimer |
+ * in the documentation and/or other materials provided with the |
+ * distribution. |
+ * * Neither the name of Google Inc. nor the names of its |
+ * contributors may be used to endorse or promote products derived from |
+ * this software without specific prior written permission. |
+ * |
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ * |
+ */ |
+ |
+#include "src/core/channel/client_uchannel.h" |
+ |
+#include <string.h> |
+ |
+#include "src/core/census/grpc_filter.h" |
+#include "src/core/channel/channel_args.h" |
+#include "src/core/channel/client_channel.h" |
+#include "src/core/channel/compress_filter.h" |
+#include "src/core/channel/subchannel_call_holder.h" |
+#include "src/core/iomgr/iomgr.h" |
+#include "src/core/support/string.h" |
+#include "src/core/surface/channel.h" |
+#include "src/core/transport/connectivity_state.h" |
+ |
+#include <grpc/support/alloc.h> |
+#include <grpc/support/log.h> |
+#include <grpc/support/sync.h> |
+#include <grpc/support/useful.h> |
+ |
+/** Microchannel (uchannel) implementation: a lightweight channel without any |
+ * load-balancing mechanisms meant for communication from within the core. */ |
+ |
+typedef struct client_uchannel_channel_data { |
+ /** master channel - the grpc_channel instance that ultimately owns |
+ this channel_data via its channel stack. |
+ We occasionally use this to bump the refcount on the master channel |
+ to keep ourselves alive through an asynchronous operation. */ |
+ grpc_channel_stack *owning_stack; |
+ |
+ /** connectivity state being tracked */ |
+ grpc_connectivity_state_tracker state_tracker; |
+ |
+ /** the subchannel wrapped by the microchannel */ |
+ grpc_connected_subchannel *connected_subchannel; |
+ |
+ /** the callback used to stay subscribed to subchannel connectivity |
+ * notifications */ |
+ grpc_closure connectivity_cb; |
+ |
+ /** the current connectivity state of the wrapped subchannel */ |
+ grpc_connectivity_state subchannel_connectivity; |
+ |
+ gpr_mu mu_state; |
+} channel_data; |
+ |
+typedef grpc_subchannel_call_holder call_data; |
+ |
+static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, |
+ bool iomgr_success) { |
+ channel_data *chand = arg; |
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, |
+ chand->subchannel_connectivity, |
+ "uchannel_monitor_subchannel"); |
+ grpc_connected_subchannel_notify_on_state_change( |
+ exec_ctx, chand->connected_subchannel, NULL, |
+ &chand->subchannel_connectivity, &chand->connectivity_cb); |
+} |
+ |
+static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { |
+ return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); |
+} |
+ |
+static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, |
+ grpc_call_element *elem, |
+ grpc_transport_stream_op *op) { |
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op); |
+ grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op); |
+} |
+ |
+static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, |
+ grpc_channel_element *elem, |
+ grpc_transport_op *op) { |
+ channel_data *chand = elem->channel_data; |
+ |
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); |
+ |
+ GPR_ASSERT(op->set_accept_stream == false); |
+ GPR_ASSERT(op->bind_pollset == NULL); |
+ |
+ if (op->on_connectivity_state_change != NULL) { |
+ grpc_connectivity_state_notify_on_state_change( |
+ exec_ctx, &chand->state_tracker, op->connectivity_state, |
+ op->on_connectivity_state_change); |
+ op->on_connectivity_state_change = NULL; |
+ op->connectivity_state = NULL; |
+ } |
+ |
+ if (op->disconnect) { |
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, |
+ GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); |
+ } |
+} |
+ |
+static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, |
+ grpc_metadata_batch *initial_metadata, |
+ grpc_connected_subchannel **connected_subchannel, |
+ grpc_closure *on_ready) { |
+ channel_data *chand = arg; |
+ GPR_ASSERT(initial_metadata != NULL); |
+ *connected_subchannel = chand->connected_subchannel; |
+ return 1; |
+} |
+ |
+/* Constructor for call_data */ |
+static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, |
+ grpc_call_element_args *args) { |
+ grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel, |
+ elem->channel_data, args->call_stack); |
+} |
+ |
+/* Destructor for call_data */ |
+static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx, |
+ grpc_call_element *elem) { |
+ grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data); |
+} |
+ |
+/* Constructor for channel_data */ |
+static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, |
+ grpc_channel_element *elem, |
+ grpc_channel_element_args *args) { |
+ channel_data *chand = elem->channel_data; |
+ memset(chand, 0, sizeof(*chand)); |
+ grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand); |
+ GPR_ASSERT(args->is_last); |
+ GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); |
+ chand->owning_stack = args->channel_stack; |
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, |
+ "client_uchannel"); |
+ gpr_mu_init(&chand->mu_state); |
+} |
+ |
+/* Destructor for channel_data */ |
+static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, |
+ grpc_channel_element *elem) { |
+ channel_data *chand = elem->channel_data; |
+ /* cancel subscription */ |
+ grpc_connected_subchannel_notify_on_state_change( |
+ exec_ctx, chand->connected_subchannel, NULL, NULL, |
+ &chand->connectivity_cb); |
+ grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); |
+ gpr_mu_destroy(&chand->mu_state); |
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, chand->connected_subchannel, |
+ "uchannel"); |
+} |
+ |
+static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, |
+ grpc_pollset *pollset) { |
+ call_data *calld = elem->call_data; |
+ calld->pollset = pollset; |
+} |
+ |
+const grpc_channel_filter grpc_client_uchannel_filter = { |
+ cuc_start_transport_stream_op, cuc_start_transport_op, sizeof(call_data), |
+ cuc_init_call_elem, cuc_set_pollset, cuc_destroy_call_elem, |
+ sizeof(channel_data), cuc_init_channel_elem, cuc_destroy_channel_elem, |
+ cuc_get_peer, "client-uchannel", |
+}; |
+ |
+grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( |
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { |
+ channel_data *chand = elem->channel_data; |
+ grpc_connectivity_state out; |
+ gpr_mu_lock(&chand->mu_state); |
+ out = grpc_connectivity_state_check(&chand->state_tracker); |
+ gpr_mu_unlock(&chand->mu_state); |
+ return out; |
+} |
+ |
+void grpc_client_uchannel_watch_connectivity_state( |
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, |
+ grpc_connectivity_state *state, grpc_closure *on_complete) { |
+ channel_data *chand = elem->channel_data; |
+ gpr_mu_lock(&chand->mu_state); |
+ grpc_connectivity_state_notify_on_state_change( |
+ exec_ctx, &chand->state_tracker, state, on_complete); |
+ gpr_mu_unlock(&chand->mu_state); |
+} |
+ |
+grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, |
+ grpc_channel_args *args) { |
+ grpc_channel *channel = NULL; |
+#define MAX_FILTERS 3 |
+ const grpc_channel_filter *filters[MAX_FILTERS]; |
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
+ size_t n = 0; |
+ |
+ if (grpc_channel_args_is_census_enabled(args)) { |
+ filters[n++] = &grpc_client_census_filter; |
+ } |
+ filters[n++] = &grpc_compress_filter; |
+ filters[n++] = &grpc_client_uchannel_filter; |
+ GPR_ASSERT(n <= MAX_FILTERS); |
+ |
+ channel = |
+ grpc_channel_create_from_filters(&exec_ctx, NULL, filters, n, args, 1); |
+ |
+ return channel; |
+} |
+ |
+void grpc_client_uchannel_set_connected_subchannel( |
+ grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel) { |
+ grpc_channel_element *elem = |
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel)); |
+ channel_data *chand = elem->channel_data; |
+ GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); |
+ gpr_mu_lock(&chand->mu_state); |
+ chand->connected_subchannel = connected_subchannel; |
+ GRPC_CONNECTED_SUBCHANNEL_REF(connected_subchannel, "uchannel"); |
+ gpr_mu_unlock(&chand->mu_state); |
+} |