Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(20)

Side by Side Diff: third_party/grpc/src/core/client_config/subchannel.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include "src/core/client_config/subchannel.h"
35
36 #include <string.h>
37
38 #include <grpc/support/alloc.h>
39 #include <grpc/support/avl.h>
40
41 #include "src/core/channel/channel_args.h"
42 #include "src/core/channel/client_channel.h"
43 #include "src/core/channel/connected_channel.h"
44 #include "src/core/client_config/initial_connect_string.h"
45 #include "src/core/client_config/subchannel_index.h"
46 #include "src/core/iomgr/timer.h"
47 #include "src/core/profiling/timers.h"
48 #include "src/core/surface/channel.h"
49 #include "src/core/transport/connectivity_state.h"
50
51 #define INTERNAL_REF_BITS 16
52 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
53
54 #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
55 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
56 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
57 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
58 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
59
60 #define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
61 ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \
62 &(subchannel)->connected_subchannel)))
63
64 typedef struct {
65 grpc_closure closure;
66 grpc_subchannel *subchannel;
67 grpc_connectivity_state connectivity_state;
68 } state_watcher;
69
70 typedef struct external_state_watcher {
71 grpc_subchannel *subchannel;
72 grpc_pollset_set *pollset_set;
73 grpc_closure *notify;
74 grpc_closure closure;
75 struct external_state_watcher *next;
76 struct external_state_watcher *prev;
77 } external_state_watcher;
78
79 struct grpc_subchannel {
80 grpc_connector *connector;
81
82 /** refcount
83 - lower INTERNAL_REF_BITS bits are for internal references:
84 these do not keep the subchannel open.
85 - upper remaining bits are for public references: these do
86 keep the subchannel open */
87 gpr_atm ref_pair;
88
89 /** non-transport related channel filters */
90 const grpc_channel_filter **filters;
91 size_t num_filters;
92 /** channel arguments */
93 grpc_channel_args *args;
94 /** address to connect to */
95 struct sockaddr *addr;
96 size_t addr_len;
97
98 grpc_subchannel_key *key;
99
100 /** initial string to send to peer */
101 gpr_slice initial_connect_string;
102
103 /** set during connection */
104 grpc_connect_out_args connecting_result;
105
106 /** callback for connection finishing */
107 grpc_closure connected;
108
109 /** pollset_set tracking who's interested in a connection
110 being setup */
111 grpc_pollset_set *pollset_set;
112
113 /** active connection, or null; of type grpc_connected_subchannel */
114 gpr_atm connected_subchannel;
115
116 /** mutex protecting remaining elements */
117 gpr_mu mu;
118
119 /** have we seen a disconnection? */
120 int disconnected;
121 /** are we connecting */
122 int connecting;
123 /** connectivity state tracking */
124 grpc_connectivity_state_tracker state_tracker;
125
126 external_state_watcher root_external_state_watcher;
127
128 /** next connect attempt time */
129 gpr_timespec next_attempt;
130 /** amount to backoff each failure */
131 gpr_timespec backoff_delta;
132 /** do we have an active alarm? */
133 int have_alarm;
134 /** our alarm */
135 grpc_timer alarm;
136 /** current random value */
137 uint32_t random;
138 };
139
140 struct grpc_subchannel_call {
141 grpc_connected_subchannel *connection;
142 };
143
144 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
145 #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)(con))
146 #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
147 (((grpc_subchannel_call *)(callstack)) - 1)
148
149 static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
150 static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
151 bool iomgr_success);
152
153 #ifdef GRPC_STREAM_REFCOUNT_DEBUG
154 #define REF_REASON reason
155 #define REF_LOG(name, p) \
156 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
157 (name), (p), (p)->refs.count, (p)->refs.count + 1, reason)
158 #define UNREF_LOG(name, p) \
159 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
160 (name), (p), (p)->refs.count, (p)->refs.count - 1, reason)
161 #define REF_MUTATE_EXTRA_ARGS \
162 GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose
163 #define REF_MUTATE_PURPOSE(x) , file, line, reason, x
164 #else
165 #define REF_REASON ""
166 #define REF_LOG(name, p) \
167 do { \
168 } while (0)
169 #define UNREF_LOG(name, p) \
170 do { \
171 } while (0)
172 #define REF_MUTATE_EXTRA_ARGS
173 #define REF_MUTATE_PURPOSE(x)
174 #endif
175
176 /*
177 * connection implementation
178 */
179
180 static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
181 bool success) {
182 grpc_connected_subchannel *c = arg;
183 grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
184 gpr_free(c);
185 }
186
187 void grpc_connected_subchannel_ref(grpc_connected_subchannel *c
188 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
189 GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
190 }
191
192 void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
193 grpc_connected_subchannel *c
194 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
195 GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c),
196 REF_REASON);
197 }
198
199 /*
200 * grpc_subchannel implementation
201 */
202
203 static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
204 bool success) {
205 grpc_subchannel *c = arg;
206 gpr_free((void *)c->filters);
207 grpc_channel_args_destroy(c->args);
208 gpr_free(c->addr);
209 gpr_slice_unref(c->initial_connect_string);
210 grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
211 grpc_connector_unref(exec_ctx, c->connector);
212 grpc_pollset_set_destroy(c->pollset_set);
213 grpc_subchannel_key_destroy(exec_ctx, c->key);
214 gpr_free(c);
215 }
216
217 static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
218 int barrier REF_MUTATE_EXTRA_ARGS) {
219 gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
220 : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
221 #ifdef GRPC_STREAM_REFCOUNT_DEBUG
222 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
223 "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val,
224 old_val + delta, reason);
225 #endif
226 return old_val;
227 }
228
229 grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c
230 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
231 gpr_atm old_refs;
232 old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
233 0 REF_MUTATE_PURPOSE("STRONG_REF"));
234 GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
235 return c;
236 }
237
238 grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c
239 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
240 gpr_atm old_refs;
241 old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
242 GPR_ASSERT(old_refs != 0);
243 return c;
244 }
245
246 grpc_subchannel *grpc_subchannel_ref_from_weak_ref(
247 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
248 if (!c) return NULL;
249 for (;;) {
250 gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair);
251 if (old_refs >= (1 << INTERNAL_REF_BITS)) {
252 gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS);
253 if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) {
254 return c;
255 }
256 } else {
257 return NULL;
258 }
259 }
260 }
261
262 static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
263 grpc_connected_subchannel *con;
264 grpc_subchannel_index_unregister(exec_ctx, c->key, c);
265 gpr_mu_lock(&c->mu);
266 GPR_ASSERT(!c->disconnected);
267 c->disconnected = 1;
268 grpc_connector_shutdown(exec_ctx, c->connector);
269 con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
270 if (con != NULL) {
271 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");
272 gpr_atm_no_barrier_store(&c->connected_subchannel, 0xdeadbeef);
273 }
274 gpr_mu_unlock(&c->mu);
275 }
276
277 void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
278 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
279 gpr_atm old_refs;
280 old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS),
281 1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
282 if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
283 disconnect(exec_ctx, c);
284 }
285 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref");
286 }
287
288 void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
289 grpc_subchannel *c
290 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
291 gpr_atm old_refs;
292 old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
293 if (old_refs == 1) {
294 grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c),
295 true, NULL);
296 }
297 }
298
299 static uint32_t random_seed() {
300 return (uint32_t)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC)));
301 }
302
303 grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
304 grpc_connector *connector,
305 grpc_subchannel_args *args) {
306 grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args);
307 grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key);
308 if (c) {
309 grpc_subchannel_key_destroy(exec_ctx, key);
310 return c;
311 }
312
313 c = gpr_malloc(sizeof(*c));
314 memset(c, 0, sizeof(*c));
315 c->key = key;
316 gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
317 c->connector = connector;
318 grpc_connector_ref(c->connector);
319 c->num_filters = args->filter_count;
320 if (c->num_filters > 0) {
321 c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
322 memcpy((void *)c->filters, args->filters,
323 sizeof(grpc_channel_filter *) * c->num_filters);
324 } else {
325 c->filters = NULL;
326 }
327 c->addr = gpr_malloc(args->addr_len);
328 memcpy(c->addr, args->addr, args->addr_len);
329 c->pollset_set = grpc_pollset_set_create();
330 c->addr_len = args->addr_len;
331 grpc_set_initial_connect_string(&c->addr, &c->addr_len,
332 &c->initial_connect_string);
333 c->args = grpc_channel_args_copy(args->args);
334 c->random = random_seed();
335 c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
336 &c->root_external_state_watcher;
337 grpc_closure_init(&c->connected, subchannel_connected, c);
338 grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
339 "subchannel");
340 gpr_mu_init(&c->mu);
341
342 return grpc_subchannel_index_register(exec_ctx, key, c);
343 }
344
345 static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
346 grpc_connect_in_args args;
347
348 args.interested_parties = c->pollset_set;
349 args.addr = c->addr;
350 args.addr_len = c->addr_len;
351 args.deadline = compute_connect_deadline(c);
352 args.channel_args = c->args;
353 args.initial_connect_string = c->initial_connect_string;
354
355 grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
356 GRPC_CHANNEL_CONNECTING, "state_change");
357 grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
358 &c->connected);
359 }
360
361 static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
362 c->backoff_delta = gpr_time_from_seconds(
363 GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
364 c->next_attempt =
365 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
366 continue_connect(exec_ctx, c);
367 }
368
369 grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
370 grpc_connectivity_state state;
371 gpr_mu_lock(&c->mu);
372 state = grpc_connectivity_state_check(&c->state_tracker);
373 gpr_mu_unlock(&c->mu);
374 return state;
375 }
376
377 static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
378 bool success) {
379 external_state_watcher *w = arg;
380 grpc_closure *follow_up = w->notify;
381 if (w->pollset_set != NULL) {
382 grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set,
383 w->pollset_set);
384 }
385 gpr_mu_lock(&w->subchannel->mu);
386 w->next->prev = w->prev;
387 w->prev->next = w->next;
388 gpr_mu_unlock(&w->subchannel->mu);
389 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
390 gpr_free(w);
391 follow_up->cb(exec_ctx, follow_up->cb_arg, success);
392 }
393
394 void grpc_subchannel_notify_on_state_change(
395 grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
396 grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
397 grpc_closure *notify) {
398 external_state_watcher *w;
399
400 if (state == NULL) {
401 gpr_mu_lock(&c->mu);
402 for (w = c->root_external_state_watcher.next;
403 w != &c->root_external_state_watcher; w = w->next) {
404 if (w->notify == notify) {
405 grpc_connectivity_state_notify_on_state_change(
406 exec_ctx, &c->state_tracker, NULL, &w->closure);
407 }
408 }
409 gpr_mu_unlock(&c->mu);
410 } else {
411 w = gpr_malloc(sizeof(*w));
412 w->subchannel = c;
413 w->pollset_set = interested_parties;
414 w->notify = notify;
415 grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
416 if (interested_parties != NULL) {
417 grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set,
418 interested_parties);
419 }
420 GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
421 gpr_mu_lock(&c->mu);
422 w->next = &c->root_external_state_watcher;
423 w->prev = w->next->prev;
424 w->next->prev = w->prev->next = w;
425 if (grpc_connectivity_state_notify_on_state_change(
426 exec_ctx, &c->state_tracker, state, &w->closure)) {
427 c->connecting = 1;
428 /* released by connection */
429 GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
430 start_connect(exec_ctx, c);
431 }
432 gpr_mu_unlock(&c->mu);
433 }
434 }
435
436 void grpc_connected_subchannel_process_transport_op(
437 grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
438 grpc_transport_op *op) {
439 grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
440 grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
441 top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
442 }
443
444 static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
445 bool iomgr_success) {
446 state_watcher *sw = p;
447 grpc_subchannel *c = sw->subchannel;
448 gpr_mu *mu = &c->mu;
449
450 gpr_mu_lock(mu);
451
452 /* if we failed just leave this closure */
453 if (iomgr_success) {
454 if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
455 /* any errors on a subchannel ==> we're done, create a new one */
456 sw->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE;
457 }
458 grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
459 sw->connectivity_state, "reflect_child");
460 if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
461 grpc_connected_subchannel_notify_on_state_change(
462 exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL,
463 &sw->connectivity_state, &sw->closure);
464 GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
465 sw = NULL;
466 }
467 }
468
469 gpr_mu_unlock(mu);
470 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "state_watcher");
471 gpr_free(sw);
472 }
473
474 static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
475 grpc_connected_subchannel *con,
476 grpc_pollset_set *interested_parties,
477 grpc_connectivity_state *state,
478 grpc_closure *closure) {
479 grpc_transport_op op;
480 grpc_channel_element *elem;
481 memset(&op, 0, sizeof(op));
482 op.connectivity_state = state;
483 op.on_connectivity_state_change = closure;
484 op.bind_pollset_set = interested_parties;
485 elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
486 elem->filter->start_transport_op(exec_ctx, elem, &op);
487 }
488
489 void grpc_connected_subchannel_notify_on_state_change(
490 grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
491 grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
492 grpc_closure *closure) {
493 connected_subchannel_state_op(exec_ctx, con, interested_parties, state,
494 closure);
495 }
496
497 void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
498 grpc_connected_subchannel *con,
499 grpc_closure *closure) {
500 grpc_transport_op op;
501 grpc_channel_element *elem;
502 memset(&op, 0, sizeof(op));
503 op.send_ping = closure;
504 elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
505 elem->filter->start_transport_op(exec_ctx, elem, &op);
506 }
507
508 static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
509 size_t channel_stack_size;
510 grpc_connected_subchannel *con;
511 grpc_channel_stack *stk;
512 size_t num_filters;
513 const grpc_channel_filter **filters;
514 state_watcher *sw_subchannel;
515
516 /* build final filter list */
517 num_filters = c->num_filters + c->connecting_result.num_filters + 1;
518 filters = gpr_malloc(sizeof(*filters) * num_filters);
519 if (c->num_filters > 0) {
520 memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
521 }
522 memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
523 sizeof(*filters) * c->connecting_result.num_filters);
524 filters[num_filters - 1] = &grpc_connected_channel_filter;
525
526 /* construct channel stack */
527 channel_stack_size = grpc_channel_stack_size(filters, num_filters);
528 con = gpr_malloc(channel_stack_size);
529 stk = CHANNEL_STACK_FROM_CONNECTION(con);
530 grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters,
531 num_filters, c->connecting_result.channel_args,
532 "CONNECTED_SUBCHANNEL", stk);
533 grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
534 gpr_free((void *)c->connecting_result.filters);
535 memset(&c->connecting_result, 0, sizeof(c->connecting_result));
536
537 /* initialize state watcher */
538 sw_subchannel = gpr_malloc(sizeof(*sw_subchannel));
539 sw_subchannel->subchannel = c;
540 sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
541 grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed,
542 sw_subchannel);
543
544 gpr_mu_lock(&c->mu);
545
546 if (c->disconnected) {
547 gpr_mu_unlock(&c->mu);
548 gpr_free(sw_subchannel);
549 gpr_free((void *)filters);
550 grpc_channel_stack_destroy(exec_ctx, stk);
551 gpr_free(con);
552 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
553 return;
554 }
555
556 /* publish */
557 /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
558 I'd have expected the rel_cas below to be enough, but
559 seemingly it's not.
560 Re-evaluate if we really need this. */
561 gpr_atm_full_barrier();
562 GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
563 c->connecting = 0;
564
565 /* setup subchannel watching connected subchannel for changes; subchannel ref
566 for connecting is donated
567 to the state watcher */
568 GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
569 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
570 grpc_connected_subchannel_notify_on_state_change(
571 exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state,
572 &sw_subchannel->closure);
573
574 /* signal completion */
575 grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY,
576 "connected");
577
578 gpr_mu_unlock(&c->mu);
579 gpr_free((void *)filters);
580 }
581
582 /* Generate a random number between 0 and 1. */
583 static double generate_uniform_random_number(grpc_subchannel *c) {
584 c->random = (1103515245 * c->random + 12345) % ((uint32_t)1 << 31);
585 return c->random / (double)((uint32_t)1 << 31);
586 }
587
588 /* Update backoff_delta and next_attempt in subchannel */
589 static void update_reconnect_parameters(grpc_subchannel *c) {
590 size_t i;
591 int32_t backoff_delta_millis, jitter;
592 int32_t max_backoff_millis =
593 GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
594 double jitter_range;
595
596 if (c->args) {
597 for (i = 0; i < c->args->num_args; i++) {
598 if (0 == strcmp(c->args->args[i].key,
599 "grpc.testing.fixed_reconnect_backoff")) {
600 GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
601 c->next_attempt = gpr_time_add(
602 gpr_now(GPR_CLOCK_MONOTONIC),
603 gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN));
604 return;
605 }
606 }
607 }
608
609 backoff_delta_millis =
610 (int32_t)(gpr_time_to_millis(c->backoff_delta) *
611 GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER);
612 if (backoff_delta_millis > max_backoff_millis) {
613 backoff_delta_millis = max_backoff_millis;
614 }
615 c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN);
616 c->next_attempt =
617 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
618
619 jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis;
620 jitter =
621 (int32_t)((2 * generate_uniform_random_number(c) - 1) * jitter_range);
622 c->next_attempt =
623 gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
624 }
625
626 static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
627 grpc_subchannel *c = arg;
628 gpr_mu_lock(&c->mu);
629 c->have_alarm = 0;
630 if (c->disconnected) {
631 iomgr_success = 0;
632 }
633 if (iomgr_success) {
634 update_reconnect_parameters(c);
635 continue_connect(exec_ctx, c);
636 gpr_mu_unlock(&c->mu);
637 } else {
638 gpr_mu_unlock(&c->mu);
639 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
640 }
641 }
642
643 static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
644 bool iomgr_success) {
645 grpc_subchannel *c = arg;
646
647 if (c->connecting_result.transport != NULL) {
648 publish_transport(exec_ctx, c);
649 } else if (c->disconnected) {
650 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
651 } else {
652 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
653 gpr_mu_lock(&c->mu);
654 GPR_ASSERT(!c->have_alarm);
655 c->have_alarm = 1;
656 grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
657 GRPC_CHANNEL_TRANSIENT_FAILURE,
658 "connect_failed");
659 grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
660 gpr_mu_unlock(&c->mu);
661 }
662 }
663
664 static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
665 gpr_timespec current_deadline =
666 gpr_time_add(c->next_attempt, c->backoff_delta);
667 gpr_timespec min_deadline = gpr_time_add(
668 gpr_now(GPR_CLOCK_MONOTONIC),
669 gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS,
670 GPR_TIMESPAN));
671 return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline
672 : min_deadline;
673 }
674
675 /*
676 * grpc_subchannel_call implementation
677 */
678
679 static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
680 bool success) {
681 grpc_subchannel_call *c = call;
682 GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
683 grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
684 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call");
685 gpr_free(c);
686 GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
687 }
688
689 void grpc_subchannel_call_ref(grpc_subchannel_call *c
690 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
691 GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
692 }
693
694 void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
695 grpc_subchannel_call *c
696 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
697 GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
698 }
699
700 char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
701 grpc_subchannel_call *call) {
702 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
703 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
704 return top_elem->filter->get_peer(exec_ctx, top_elem);
705 }
706
707 void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
708 grpc_subchannel_call *call,
709 grpc_transport_stream_op *op) {
710 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
711 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
712 top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
713 }
714
715 grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
716 grpc_subchannel *c) {
717 return GET_CONNECTED_SUBCHANNEL(c, acq);
718 }
719
720 grpc_subchannel_call *grpc_connected_subchannel_create_call(
721 grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
722 grpc_pollset *pollset) {
723 grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
724 grpc_subchannel_call *call =
725 gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
726 grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
727 call->connection = con;
728 GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
729 grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
730 NULL, NULL, callstk);
731 grpc_call_stack_set_pollset(exec_ctx, callstk, pollset);
732 return call;
733 }
734
735 grpc_call_stack *grpc_subchannel_call_get_call_stack(
736 grpc_subchannel_call *subchannel_call) {
737 return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
738 }
OLDNEW
« no previous file with comments | « third_party/grpc/src/core/client_config/subchannel.h ('k') | third_party/grpc/src/core/client_config/subchannel_factory.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698