Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(43)

Side by Side Diff: third_party/grpc/src/core/channel/client_uchannel.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include "src/core/channel/client_uchannel.h"
35
36 #include <string.h>
37
38 #include "src/core/census/grpc_filter.h"
39 #include "src/core/channel/channel_args.h"
40 #include "src/core/channel/client_channel.h"
41 #include "src/core/channel/compress_filter.h"
42 #include "src/core/channel/subchannel_call_holder.h"
43 #include "src/core/iomgr/iomgr.h"
44 #include "src/core/support/string.h"
45 #include "src/core/surface/channel.h"
46 #include "src/core/transport/connectivity_state.h"
47
48 #include <grpc/support/alloc.h>
49 #include <grpc/support/log.h>
50 #include <grpc/support/sync.h>
51 #include <grpc/support/useful.h>
52
53 /** Microchannel (uchannel) implementation: a lightweight channel without any
54 * load-balancing mechanisms meant for communication from within the core. */
55
56 typedef struct client_uchannel_channel_data {
57 /** master channel - the grpc_channel instance that ultimately owns
58 this channel_data via its channel stack.
59 We occasionally use this to bump the refcount on the master channel
60 to keep ourselves alive through an asynchronous operation. */
61 grpc_channel_stack *owning_stack;
62
63 /** connectivity state being tracked */
64 grpc_connectivity_state_tracker state_tracker;
65
66 /** the subchannel wrapped by the microchannel */
67 grpc_connected_subchannel *connected_subchannel;
68
69 /** the callback used to stay subscribed to subchannel connectivity
70 * notifications */
71 grpc_closure connectivity_cb;
72
73 /** the current connectivity state of the wrapped subchannel */
74 grpc_connectivity_state subchannel_connectivity;
75
76 gpr_mu mu_state;
77 } channel_data;
78
79 typedef grpc_subchannel_call_holder call_data;
80
81 static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
82 bool iomgr_success) {
83 channel_data *chand = arg;
84 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
85 chand->subchannel_connectivity,
86 "uchannel_monitor_subchannel");
87 grpc_connected_subchannel_notify_on_state_change(
88 exec_ctx, chand->connected_subchannel, NULL,
89 &chand->subchannel_connectivity, &chand->connectivity_cb);
90 }
91
92 static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
93 return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data);
94 }
95
96 static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
97 grpc_call_element *elem,
98 grpc_transport_stream_op *op) {
99 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
100 grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op);
101 }
102
103 static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
104 grpc_channel_element *elem,
105 grpc_transport_op *op) {
106 channel_data *chand = elem->channel_data;
107
108 grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
109
110 GPR_ASSERT(op->set_accept_stream == false);
111 GPR_ASSERT(op->bind_pollset == NULL);
112
113 if (op->on_connectivity_state_change != NULL) {
114 grpc_connectivity_state_notify_on_state_change(
115 exec_ctx, &chand->state_tracker, op->connectivity_state,
116 op->on_connectivity_state_change);
117 op->on_connectivity_state_change = NULL;
118 op->connectivity_state = NULL;
119 }
120
121 if (op->disconnect) {
122 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
123 GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
124 }
125 }
126
127 static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
128 grpc_metadata_batch *initial_metadata,
129 grpc_connected_subchannel **connected_subchannel,
130 grpc_closure *on_ready) {
131 channel_data *chand = arg;
132 GPR_ASSERT(initial_metadata != NULL);
133 *connected_subchannel = chand->connected_subchannel;
134 return 1;
135 }
136
137 /* Constructor for call_data */
138 static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
139 grpc_call_element_args *args) {
140 grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel,
141 elem->channel_data, args->call_stack);
142 }
143
144 /* Destructor for call_data */
145 static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
146 grpc_call_element *elem) {
147 grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
148 }
149
150 /* Constructor for channel_data */
151 static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
152 grpc_channel_element *elem,
153 grpc_channel_element_args *args) {
154 channel_data *chand = elem->channel_data;
155 memset(chand, 0, sizeof(*chand));
156 grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand);
157 GPR_ASSERT(args->is_last);
158 GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
159 chand->owning_stack = args->channel_stack;
160 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
161 "client_uchannel");
162 gpr_mu_init(&chand->mu_state);
163 }
164
165 /* Destructor for channel_data */
166 static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
167 grpc_channel_element *elem) {
168 channel_data *chand = elem->channel_data;
169 /* cancel subscription */
170 grpc_connected_subchannel_notify_on_state_change(
171 exec_ctx, chand->connected_subchannel, NULL, NULL,
172 &chand->connectivity_cb);
173 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
174 gpr_mu_destroy(&chand->mu_state);
175 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, chand->connected_subchannel,
176 "uchannel");
177 }
178
179 static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
180 grpc_pollset *pollset) {
181 call_data *calld = elem->call_data;
182 calld->pollset = pollset;
183 }
184
185 const grpc_channel_filter grpc_client_uchannel_filter = {
186 cuc_start_transport_stream_op, cuc_start_transport_op, sizeof(call_data),
187 cuc_init_call_elem, cuc_set_pollset, cuc_destroy_call_elem,
188 sizeof(channel_data), cuc_init_channel_elem, cuc_destroy_channel_elem,
189 cuc_get_peer, "client-uchannel",
190 };
191
192 grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
193 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
194 channel_data *chand = elem->channel_data;
195 grpc_connectivity_state out;
196 gpr_mu_lock(&chand->mu_state);
197 out = grpc_connectivity_state_check(&chand->state_tracker);
198 gpr_mu_unlock(&chand->mu_state);
199 return out;
200 }
201
202 void grpc_client_uchannel_watch_connectivity_state(
203 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
204 grpc_connectivity_state *state, grpc_closure *on_complete) {
205 channel_data *chand = elem->channel_data;
206 gpr_mu_lock(&chand->mu_state);
207 grpc_connectivity_state_notify_on_state_change(
208 exec_ctx, &chand->state_tracker, state, on_complete);
209 gpr_mu_unlock(&chand->mu_state);
210 }
211
212 grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
213 grpc_channel_args *args) {
214 grpc_channel *channel = NULL;
215 #define MAX_FILTERS 3
216 const grpc_channel_filter *filters[MAX_FILTERS];
217 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
218 size_t n = 0;
219
220 if (grpc_channel_args_is_census_enabled(args)) {
221 filters[n++] = &grpc_client_census_filter;
222 }
223 filters[n++] = &grpc_compress_filter;
224 filters[n++] = &grpc_client_uchannel_filter;
225 GPR_ASSERT(n <= MAX_FILTERS);
226
227 channel =
228 grpc_channel_create_from_filters(&exec_ctx, NULL, filters, n, args, 1);
229
230 return channel;
231 }
232
233 void grpc_client_uchannel_set_connected_subchannel(
234 grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel) {
235 grpc_channel_element *elem =
236 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel));
237 channel_data *chand = elem->channel_data;
238 GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
239 gpr_mu_lock(&chand->mu_state);
240 chand->connected_subchannel = connected_subchannel;
241 GRPC_CONNECTED_SUBCHANNEL_REF(connected_subchannel, "uchannel");
242 gpr_mu_unlock(&chand->mu_state);
243 }
OLDNEW
« no previous file with comments | « third_party/grpc/src/core/channel/client_uchannel.h ('k') | third_party/grpc/src/core/channel/compress_filter.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698