OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015-2016, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 #include "src/core/client_config/subchannel.h" |
| 35 |
| 36 #include <string.h> |
| 37 |
| 38 #include <grpc/support/alloc.h> |
| 39 #include <grpc/support/avl.h> |
| 40 |
| 41 #include "src/core/channel/channel_args.h" |
| 42 #include "src/core/channel/client_channel.h" |
| 43 #include "src/core/channel/connected_channel.h" |
| 44 #include "src/core/client_config/initial_connect_string.h" |
| 45 #include "src/core/client_config/subchannel_index.h" |
| 46 #include "src/core/iomgr/timer.h" |
| 47 #include "src/core/profiling/timers.h" |
| 48 #include "src/core/surface/channel.h" |
| 49 #include "src/core/transport/connectivity_state.h" |
| 50 |
| 51 #define INTERNAL_REF_BITS 16 |
| 52 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) |
| 53 |
| 54 #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20 |
| 55 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 |
| 56 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 |
| 57 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 |
| 58 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 |
| 59 |
| 60 #define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ |
| 61 ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \ |
| 62 &(subchannel)->connected_subchannel))) |
| 63 |
| 64 typedef struct { |
| 65 grpc_closure closure; |
| 66 grpc_subchannel *subchannel; |
| 67 grpc_connectivity_state connectivity_state; |
| 68 } state_watcher; |
| 69 |
| 70 typedef struct external_state_watcher { |
| 71 grpc_subchannel *subchannel; |
| 72 grpc_pollset_set *pollset_set; |
| 73 grpc_closure *notify; |
| 74 grpc_closure closure; |
| 75 struct external_state_watcher *next; |
| 76 struct external_state_watcher *prev; |
| 77 } external_state_watcher; |
| 78 |
| 79 struct grpc_subchannel { |
| 80 grpc_connector *connector; |
| 81 |
| 82 /** refcount |
| 83 - lower INTERNAL_REF_BITS bits are for internal references: |
| 84 these do not keep the subchannel open. |
| 85 - upper remaining bits are for public references: these do |
| 86 keep the subchannel open */ |
| 87 gpr_atm ref_pair; |
| 88 |
| 89 /** non-transport related channel filters */ |
| 90 const grpc_channel_filter **filters; |
| 91 size_t num_filters; |
| 92 /** channel arguments */ |
| 93 grpc_channel_args *args; |
| 94 /** address to connect to */ |
| 95 struct sockaddr *addr; |
| 96 size_t addr_len; |
| 97 |
| 98 grpc_subchannel_key *key; |
| 99 |
| 100 /** initial string to send to peer */ |
| 101 gpr_slice initial_connect_string; |
| 102 |
| 103 /** set during connection */ |
| 104 grpc_connect_out_args connecting_result; |
| 105 |
| 106 /** callback for connection finishing */ |
| 107 grpc_closure connected; |
| 108 |
| 109 /** pollset_set tracking who's interested in a connection |
| 110 being setup */ |
| 111 grpc_pollset_set *pollset_set; |
| 112 |
| 113 /** active connection, or null; of type grpc_connected_subchannel */ |
| 114 gpr_atm connected_subchannel; |
| 115 |
| 116 /** mutex protecting remaining elements */ |
| 117 gpr_mu mu; |
| 118 |
| 119 /** have we seen a disconnection? */ |
| 120 int disconnected; |
| 121 /** are we connecting */ |
| 122 int connecting; |
| 123 /** connectivity state tracking */ |
| 124 grpc_connectivity_state_tracker state_tracker; |
| 125 |
| 126 external_state_watcher root_external_state_watcher; |
| 127 |
| 128 /** next connect attempt time */ |
| 129 gpr_timespec next_attempt; |
| 130 /** amount to backoff each failure */ |
| 131 gpr_timespec backoff_delta; |
| 132 /** do we have an active alarm? */ |
| 133 int have_alarm; |
| 134 /** our alarm */ |
| 135 grpc_timer alarm; |
| 136 /** current random value */ |
| 137 uint32_t random; |
| 138 }; |
| 139 |
| 140 struct grpc_subchannel_call { |
| 141 grpc_connected_subchannel *connection; |
| 142 }; |
| 143 |
| 144 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) |
| 145 #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)(con)) |
| 146 #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ |
| 147 (((grpc_subchannel_call *)(callstack)) - 1) |
| 148 |
| 149 static gpr_timespec compute_connect_deadline(grpc_subchannel *c); |
| 150 static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, |
| 151 bool iomgr_success); |
| 152 |
| 153 #ifdef GRPC_STREAM_REFCOUNT_DEBUG |
| 154 #define REF_REASON reason |
| 155 #define REF_LOG(name, p) \ |
| 156 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ |
| 157 (name), (p), (p)->refs.count, (p)->refs.count + 1, reason) |
| 158 #define UNREF_LOG(name, p) \ |
| 159 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \ |
| 160 (name), (p), (p)->refs.count, (p)->refs.count - 1, reason) |
| 161 #define REF_MUTATE_EXTRA_ARGS \ |
| 162 GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose |
| 163 #define REF_MUTATE_PURPOSE(x) , file, line, reason, x |
| 164 #else |
| 165 #define REF_REASON "" |
| 166 #define REF_LOG(name, p) \ |
| 167 do { \ |
| 168 } while (0) |
| 169 #define UNREF_LOG(name, p) \ |
| 170 do { \ |
| 171 } while (0) |
| 172 #define REF_MUTATE_EXTRA_ARGS |
| 173 #define REF_MUTATE_PURPOSE(x) |
| 174 #endif |
| 175 |
| 176 /* |
| 177 * connection implementation |
| 178 */ |
| 179 |
| 180 static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, |
| 181 bool success) { |
| 182 grpc_connected_subchannel *c = arg; |
| 183 grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c)); |
| 184 gpr_free(c); |
| 185 } |
| 186 |
| 187 void grpc_connected_subchannel_ref(grpc_connected_subchannel *c |
| 188 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| 189 GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); |
| 190 } |
| 191 |
| 192 void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, |
| 193 grpc_connected_subchannel *c |
| 194 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| 195 GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c), |
| 196 REF_REASON); |
| 197 } |
| 198 |
| 199 /* |
| 200 * grpc_subchannel implementation |
| 201 */ |
| 202 |
| 203 static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, |
| 204 bool success) { |
| 205 grpc_subchannel *c = arg; |
| 206 gpr_free((void *)c->filters); |
| 207 grpc_channel_args_destroy(c->args); |
| 208 gpr_free(c->addr); |
| 209 gpr_slice_unref(c->initial_connect_string); |
| 210 grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); |
| 211 grpc_connector_unref(exec_ctx, c->connector); |
| 212 grpc_pollset_set_destroy(c->pollset_set); |
| 213 grpc_subchannel_key_destroy(exec_ctx, c->key); |
| 214 gpr_free(c); |
| 215 } |
| 216 |
| 217 static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, |
| 218 int barrier REF_MUTATE_EXTRA_ARGS) { |
| 219 gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) |
| 220 : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); |
| 221 #ifdef GRPC_STREAM_REFCOUNT_DEBUG |
| 222 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, |
| 223 "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, |
| 224 old_val + delta, reason); |
| 225 #endif |
| 226 return old_val; |
| 227 } |
| 228 |
| 229 grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c |
| 230 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| 231 gpr_atm old_refs; |
| 232 old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), |
| 233 0 REF_MUTATE_PURPOSE("STRONG_REF")); |
| 234 GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); |
| 235 return c; |
| 236 } |
| 237 |
| 238 grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c |
| 239 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| 240 gpr_atm old_refs; |
| 241 old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); |
| 242 GPR_ASSERT(old_refs != 0); |
| 243 return c; |
| 244 } |
| 245 |
| 246 grpc_subchannel *grpc_subchannel_ref_from_weak_ref( |
| 247 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| 248 if (!c) return NULL; |
| 249 for (;;) { |
| 250 gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair); |
| 251 if (old_refs >= (1 << INTERNAL_REF_BITS)) { |
| 252 gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); |
| 253 if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) { |
| 254 return c; |
| 255 } |
| 256 } else { |
| 257 return NULL; |
| 258 } |
| 259 } |
| 260 } |
| 261 |
| 262 static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { |
| 263 grpc_connected_subchannel *con; |
| 264 grpc_subchannel_index_unregister(exec_ctx, c->key, c); |
| 265 gpr_mu_lock(&c->mu); |
| 266 GPR_ASSERT(!c->disconnected); |
| 267 c->disconnected = 1; |
| 268 grpc_connector_shutdown(exec_ctx, c->connector); |
| 269 con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); |
| 270 if (con != NULL) { |
| 271 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection"); |
| 272 gpr_atm_no_barrier_store(&c->connected_subchannel, 0xdeadbeef); |
| 273 } |
| 274 gpr_mu_unlock(&c->mu); |
| 275 } |
| 276 |
| 277 void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, |
| 278 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| 279 gpr_atm old_refs; |
| 280 old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), |
| 281 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); |
| 282 if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { |
| 283 disconnect(exec_ctx, c); |
| 284 } |
| 285 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref"); |
| 286 } |
| 287 |
| 288 void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, |
| 289 grpc_subchannel *c |
| 290 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| 291 gpr_atm old_refs; |
| 292 old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); |
| 293 if (old_refs == 1) { |
| 294 grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), |
| 295 true, NULL); |
| 296 } |
| 297 } |
| 298 |
| 299 static uint32_t random_seed() { |
| 300 return (uint32_t)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); |
| 301 } |
| 302 |
| 303 grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, |
| 304 grpc_connector *connector, |
| 305 grpc_subchannel_args *args) { |
| 306 grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args); |
| 307 grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key); |
| 308 if (c) { |
| 309 grpc_subchannel_key_destroy(exec_ctx, key); |
| 310 return c; |
| 311 } |
| 312 |
| 313 c = gpr_malloc(sizeof(*c)); |
| 314 memset(c, 0, sizeof(*c)); |
| 315 c->key = key; |
| 316 gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); |
| 317 c->connector = connector; |
| 318 grpc_connector_ref(c->connector); |
| 319 c->num_filters = args->filter_count; |
| 320 if (c->num_filters > 0) { |
| 321 c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters); |
| 322 memcpy((void *)c->filters, args->filters, |
| 323 sizeof(grpc_channel_filter *) * c->num_filters); |
| 324 } else { |
| 325 c->filters = NULL; |
| 326 } |
| 327 c->addr = gpr_malloc(args->addr_len); |
| 328 memcpy(c->addr, args->addr, args->addr_len); |
| 329 c->pollset_set = grpc_pollset_set_create(); |
| 330 c->addr_len = args->addr_len; |
| 331 grpc_set_initial_connect_string(&c->addr, &c->addr_len, |
| 332 &c->initial_connect_string); |
| 333 c->args = grpc_channel_args_copy(args->args); |
| 334 c->random = random_seed(); |
| 335 c->root_external_state_watcher.next = c->root_external_state_watcher.prev = |
| 336 &c->root_external_state_watcher; |
| 337 grpc_closure_init(&c->connected, subchannel_connected, c); |
| 338 grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, |
| 339 "subchannel"); |
| 340 gpr_mu_init(&c->mu); |
| 341 |
| 342 return grpc_subchannel_index_register(exec_ctx, key, c); |
| 343 } |
| 344 |
| 345 static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { |
| 346 grpc_connect_in_args args; |
| 347 |
| 348 args.interested_parties = c->pollset_set; |
| 349 args.addr = c->addr; |
| 350 args.addr_len = c->addr_len; |
| 351 args.deadline = compute_connect_deadline(c); |
| 352 args.channel_args = c->args; |
| 353 args.initial_connect_string = c->initial_connect_string; |
| 354 |
| 355 grpc_connectivity_state_set(exec_ctx, &c->state_tracker, |
| 356 GRPC_CHANNEL_CONNECTING, "state_change"); |
| 357 grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result, |
| 358 &c->connected); |
| 359 } |
| 360 |
| 361 static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { |
| 362 c->backoff_delta = gpr_time_from_seconds( |
| 363 GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); |
| 364 c->next_attempt = |
| 365 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); |
| 366 continue_connect(exec_ctx, c); |
| 367 } |
| 368 |
| 369 grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { |
| 370 grpc_connectivity_state state; |
| 371 gpr_mu_lock(&c->mu); |
| 372 state = grpc_connectivity_state_check(&c->state_tracker); |
| 373 gpr_mu_unlock(&c->mu); |
| 374 return state; |
| 375 } |
| 376 |
| 377 static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, |
| 378 bool success) { |
| 379 external_state_watcher *w = arg; |
| 380 grpc_closure *follow_up = w->notify; |
| 381 if (w->pollset_set != NULL) { |
| 382 grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set, |
| 383 w->pollset_set); |
| 384 } |
| 385 gpr_mu_lock(&w->subchannel->mu); |
| 386 w->next->prev = w->prev; |
| 387 w->prev->next = w->next; |
| 388 gpr_mu_unlock(&w->subchannel->mu); |
| 389 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); |
| 390 gpr_free(w); |
| 391 follow_up->cb(exec_ctx, follow_up->cb_arg, success); |
| 392 } |
| 393 |
| 394 void grpc_subchannel_notify_on_state_change( |
| 395 grpc_exec_ctx *exec_ctx, grpc_subchannel *c, |
| 396 grpc_pollset_set *interested_parties, grpc_connectivity_state *state, |
| 397 grpc_closure *notify) { |
| 398 external_state_watcher *w; |
| 399 |
| 400 if (state == NULL) { |
| 401 gpr_mu_lock(&c->mu); |
| 402 for (w = c->root_external_state_watcher.next; |
| 403 w != &c->root_external_state_watcher; w = w->next) { |
| 404 if (w->notify == notify) { |
| 405 grpc_connectivity_state_notify_on_state_change( |
| 406 exec_ctx, &c->state_tracker, NULL, &w->closure); |
| 407 } |
| 408 } |
| 409 gpr_mu_unlock(&c->mu); |
| 410 } else { |
| 411 w = gpr_malloc(sizeof(*w)); |
| 412 w->subchannel = c; |
| 413 w->pollset_set = interested_parties; |
| 414 w->notify = notify; |
| 415 grpc_closure_init(&w->closure, on_external_state_watcher_done, w); |
| 416 if (interested_parties != NULL) { |
| 417 grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set, |
| 418 interested_parties); |
| 419 } |
| 420 GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); |
| 421 gpr_mu_lock(&c->mu); |
| 422 w->next = &c->root_external_state_watcher; |
| 423 w->prev = w->next->prev; |
| 424 w->next->prev = w->prev->next = w; |
| 425 if (grpc_connectivity_state_notify_on_state_change( |
| 426 exec_ctx, &c->state_tracker, state, &w->closure)) { |
| 427 c->connecting = 1; |
| 428 /* released by connection */ |
| 429 GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); |
| 430 start_connect(exec_ctx, c); |
| 431 } |
| 432 gpr_mu_unlock(&c->mu); |
| 433 } |
| 434 } |
| 435 |
| 436 void grpc_connected_subchannel_process_transport_op( |
| 437 grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, |
| 438 grpc_transport_op *op) { |
| 439 grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); |
| 440 grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0); |
| 441 top_elem->filter->start_transport_op(exec_ctx, top_elem, op); |
| 442 } |
| 443 |
| 444 static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, |
| 445 bool iomgr_success) { |
| 446 state_watcher *sw = p; |
| 447 grpc_subchannel *c = sw->subchannel; |
| 448 gpr_mu *mu = &c->mu; |
| 449 |
| 450 gpr_mu_lock(mu); |
| 451 |
| 452 /* if we failed just leave this closure */ |
| 453 if (iomgr_success) { |
| 454 if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
| 455 /* any errors on a subchannel ==> we're done, create a new one */ |
| 456 sw->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; |
| 457 } |
| 458 grpc_connectivity_state_set(exec_ctx, &c->state_tracker, |
| 459 sw->connectivity_state, "reflect_child"); |
| 460 if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { |
| 461 grpc_connected_subchannel_notify_on_state_change( |
| 462 exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL, |
| 463 &sw->connectivity_state, &sw->closure); |
| 464 GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); |
| 465 sw = NULL; |
| 466 } |
| 467 } |
| 468 |
| 469 gpr_mu_unlock(mu); |
| 470 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "state_watcher"); |
| 471 gpr_free(sw); |
| 472 } |
| 473 |
| 474 static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, |
| 475 grpc_connected_subchannel *con, |
| 476 grpc_pollset_set *interested_parties, |
| 477 grpc_connectivity_state *state, |
| 478 grpc_closure *closure) { |
| 479 grpc_transport_op op; |
| 480 grpc_channel_element *elem; |
| 481 memset(&op, 0, sizeof(op)); |
| 482 op.connectivity_state = state; |
| 483 op.on_connectivity_state_change = closure; |
| 484 op.bind_pollset_set = interested_parties; |
| 485 elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); |
| 486 elem->filter->start_transport_op(exec_ctx, elem, &op); |
| 487 } |
| 488 |
| 489 void grpc_connected_subchannel_notify_on_state_change( |
| 490 grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, |
| 491 grpc_pollset_set *interested_parties, grpc_connectivity_state *state, |
| 492 grpc_closure *closure) { |
| 493 connected_subchannel_state_op(exec_ctx, con, interested_parties, state, |
| 494 closure); |
| 495 } |
| 496 |
| 497 void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, |
| 498 grpc_connected_subchannel *con, |
| 499 grpc_closure *closure) { |
| 500 grpc_transport_op op; |
| 501 grpc_channel_element *elem; |
| 502 memset(&op, 0, sizeof(op)); |
| 503 op.send_ping = closure; |
| 504 elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); |
| 505 elem->filter->start_transport_op(exec_ctx, elem, &op); |
| 506 } |
| 507 |
| 508 static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { |
| 509 size_t channel_stack_size; |
| 510 grpc_connected_subchannel *con; |
| 511 grpc_channel_stack *stk; |
| 512 size_t num_filters; |
| 513 const grpc_channel_filter **filters; |
| 514 state_watcher *sw_subchannel; |
| 515 |
| 516 /* build final filter list */ |
| 517 num_filters = c->num_filters + c->connecting_result.num_filters + 1; |
| 518 filters = gpr_malloc(sizeof(*filters) * num_filters); |
| 519 if (c->num_filters > 0) { |
| 520 memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters); |
| 521 } |
| 522 memcpy((void *)(filters + c->num_filters), c->connecting_result.filters, |
| 523 sizeof(*filters) * c->connecting_result.num_filters); |
| 524 filters[num_filters - 1] = &grpc_connected_channel_filter; |
| 525 |
| 526 /* construct channel stack */ |
| 527 channel_stack_size = grpc_channel_stack_size(filters, num_filters); |
| 528 con = gpr_malloc(channel_stack_size); |
| 529 stk = CHANNEL_STACK_FROM_CONNECTION(con); |
| 530 grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters, |
| 531 num_filters, c->connecting_result.channel_args, |
| 532 "CONNECTED_SUBCHANNEL", stk); |
| 533 grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); |
| 534 gpr_free((void *)c->connecting_result.filters); |
| 535 memset(&c->connecting_result, 0, sizeof(c->connecting_result)); |
| 536 |
| 537 /* initialize state watcher */ |
| 538 sw_subchannel = gpr_malloc(sizeof(*sw_subchannel)); |
| 539 sw_subchannel->subchannel = c; |
| 540 sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; |
| 541 grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, |
| 542 sw_subchannel); |
| 543 |
| 544 gpr_mu_lock(&c->mu); |
| 545 |
| 546 if (c->disconnected) { |
| 547 gpr_mu_unlock(&c->mu); |
| 548 gpr_free(sw_subchannel); |
| 549 gpr_free((void *)filters); |
| 550 grpc_channel_stack_destroy(exec_ctx, stk); |
| 551 gpr_free(con); |
| 552 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); |
| 553 return; |
| 554 } |
| 555 |
| 556 /* publish */ |
| 557 /* TODO(ctiller): this full barrier seems to clear up a TSAN failure. |
| 558 I'd have expected the rel_cas below to be enough, but |
| 559 seemingly it's not. |
| 560 Re-evaluate if we really need this. */ |
| 561 gpr_atm_full_barrier(); |
| 562 GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); |
| 563 c->connecting = 0; |
| 564 |
| 565 /* setup subchannel watching connected subchannel for changes; subchannel ref |
| 566 for connecting is donated |
| 567 to the state watcher */ |
| 568 GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); |
| 569 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); |
| 570 grpc_connected_subchannel_notify_on_state_change( |
| 571 exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state, |
| 572 &sw_subchannel->closure); |
| 573 |
| 574 /* signal completion */ |
| 575 grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, |
| 576 "connected"); |
| 577 |
| 578 gpr_mu_unlock(&c->mu); |
| 579 gpr_free((void *)filters); |
| 580 } |
| 581 |
| 582 /* Generate a random number between 0 and 1. */ |
| 583 static double generate_uniform_random_number(grpc_subchannel *c) { |
| 584 c->random = (1103515245 * c->random + 12345) % ((uint32_t)1 << 31); |
| 585 return c->random / (double)((uint32_t)1 << 31); |
| 586 } |
| 587 |
| 588 /* Update backoff_delta and next_attempt in subchannel */ |
| 589 static void update_reconnect_parameters(grpc_subchannel *c) { |
| 590 size_t i; |
| 591 int32_t backoff_delta_millis, jitter; |
| 592 int32_t max_backoff_millis = |
| 593 GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; |
| 594 double jitter_range; |
| 595 |
| 596 if (c->args) { |
| 597 for (i = 0; i < c->args->num_args; i++) { |
| 598 if (0 == strcmp(c->args->args[i].key, |
| 599 "grpc.testing.fixed_reconnect_backoff")) { |
| 600 GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); |
| 601 c->next_attempt = gpr_time_add( |
| 602 gpr_now(GPR_CLOCK_MONOTONIC), |
| 603 gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN)); |
| 604 return; |
| 605 } |
| 606 } |
| 607 } |
| 608 |
| 609 backoff_delta_millis = |
| 610 (int32_t)(gpr_time_to_millis(c->backoff_delta) * |
| 611 GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); |
| 612 if (backoff_delta_millis > max_backoff_millis) { |
| 613 backoff_delta_millis = max_backoff_millis; |
| 614 } |
| 615 c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN); |
| 616 c->next_attempt = |
| 617 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); |
| 618 |
| 619 jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis; |
| 620 jitter = |
| 621 (int32_t)((2 * generate_uniform_random_number(c) - 1) * jitter_range); |
| 622 c->next_attempt = |
| 623 gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); |
| 624 } |
| 625 |
| 626 static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { |
| 627 grpc_subchannel *c = arg; |
| 628 gpr_mu_lock(&c->mu); |
| 629 c->have_alarm = 0; |
| 630 if (c->disconnected) { |
| 631 iomgr_success = 0; |
| 632 } |
| 633 if (iomgr_success) { |
| 634 update_reconnect_parameters(c); |
| 635 continue_connect(exec_ctx, c); |
| 636 gpr_mu_unlock(&c->mu); |
| 637 } else { |
| 638 gpr_mu_unlock(&c->mu); |
| 639 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); |
| 640 } |
| 641 } |
| 642 |
| 643 static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, |
| 644 bool iomgr_success) { |
| 645 grpc_subchannel *c = arg; |
| 646 |
| 647 if (c->connecting_result.transport != NULL) { |
| 648 publish_transport(exec_ctx, c); |
| 649 } else if (c->disconnected) { |
| 650 GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); |
| 651 } else { |
| 652 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); |
| 653 gpr_mu_lock(&c->mu); |
| 654 GPR_ASSERT(!c->have_alarm); |
| 655 c->have_alarm = 1; |
| 656 grpc_connectivity_state_set(exec_ctx, &c->state_tracker, |
| 657 GRPC_CHANNEL_TRANSIENT_FAILURE, |
| 658 "connect_failed"); |
| 659 grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); |
| 660 gpr_mu_unlock(&c->mu); |
| 661 } |
| 662 } |
| 663 |
| 664 static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { |
| 665 gpr_timespec current_deadline = |
| 666 gpr_time_add(c->next_attempt, c->backoff_delta); |
| 667 gpr_timespec min_deadline = gpr_time_add( |
| 668 gpr_now(GPR_CLOCK_MONOTONIC), |
| 669 gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, |
| 670 GPR_TIMESPAN)); |
| 671 return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline |
| 672 : min_deadline; |
| 673 } |
| 674 |
| 675 /* |
| 676 * grpc_subchannel_call implementation |
| 677 */ |
| 678 |
| 679 static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, |
| 680 bool success) { |
| 681 grpc_subchannel_call *c = call; |
| 682 GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); |
| 683 grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); |
| 684 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call"); |
| 685 gpr_free(c); |
| 686 GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); |
| 687 } |
| 688 |
| 689 void grpc_subchannel_call_ref(grpc_subchannel_call *c |
| 690 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| 691 GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); |
| 692 } |
| 693 |
| 694 void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, |
| 695 grpc_subchannel_call *c |
| 696 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| 697 GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); |
| 698 } |
| 699 |
| 700 char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, |
| 701 grpc_subchannel_call *call) { |
| 702 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); |
| 703 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); |
| 704 return top_elem->filter->get_peer(exec_ctx, top_elem); |
| 705 } |
| 706 |
| 707 void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, |
| 708 grpc_subchannel_call *call, |
| 709 grpc_transport_stream_op *op) { |
| 710 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); |
| 711 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); |
| 712 top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op); |
| 713 } |
| 714 |
| 715 grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( |
| 716 grpc_subchannel *c) { |
| 717 return GET_CONNECTED_SUBCHANNEL(c, acq); |
| 718 } |
| 719 |
| 720 grpc_subchannel_call *grpc_connected_subchannel_create_call( |
| 721 grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, |
| 722 grpc_pollset *pollset) { |
| 723 grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); |
| 724 grpc_subchannel_call *call = |
| 725 gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); |
| 726 grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); |
| 727 call->connection = con; |
| 728 GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); |
| 729 grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call, |
| 730 NULL, NULL, callstk); |
| 731 grpc_call_stack_set_pollset(exec_ctx, callstk, pollset); |
| 732 return call; |
| 733 } |
| 734 |
| 735 grpc_call_stack *grpc_subchannel_call_get_call_stack( |
| 736 grpc_subchannel_call *subchannel_call) { |
| 737 return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); |
| 738 } |
OLD | NEW |