OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015-2016, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 #include <assert.h> |
| 34 #include <limits.h> |
| 35 #include <stdio.h> |
| 36 #include <stdlib.h> |
| 37 #include <string.h> |
| 38 |
| 39 #include <grpc/compression.h> |
| 40 #include <grpc/grpc.h> |
| 41 #include <grpc/support/alloc.h> |
| 42 #include <grpc/support/log.h> |
| 43 #include <grpc/support/string_util.h> |
| 44 #include <grpc/support/useful.h> |
| 45 |
| 46 #include "src/core/channel/channel_stack.h" |
| 47 #include "src/core/compression/algorithm_metadata.h" |
| 48 #include "src/core/iomgr/timer.h" |
| 49 #include "src/core/profiling/timers.h" |
| 50 #include "src/core/support/string.h" |
| 51 #include "src/core/surface/api_trace.h" |
| 52 #include "src/core/surface/call.h" |
| 53 #include "src/core/surface/channel.h" |
| 54 #include "src/core/surface/completion_queue.h" |
| 55 #include "src/core/transport/static_metadata.h" |
| 56 |
| 57 /** The maximum number of concurrent batches possible. |
| 58 Based upon the maximum number of individually queueable ops in the batch |
| 59 api: |
| 60 - initial metadata send |
| 61 - message send |
| 62 - status/close send (depending on client/server) |
| 63 - initial metadata recv |
| 64 - message recv |
| 65 - status/close recv (depending on client/server) */ |
| 66 #define MAX_CONCURRENT_BATCHES 6 |
| 67 |
| 68 typedef struct { |
| 69 grpc_ioreq_completion_func on_complete; |
| 70 void *user_data; |
| 71 int success; |
| 72 } completed_request; |
| 73 |
| 74 #define MAX_SEND_EXTRA_METADATA_COUNT 3 |
| 75 |
| 76 /* Status data for a request can come from several sources; this |
| 77 enumerates them all, and acts as a priority sorting for which |
| 78 status to return to the application - earlier entries override |
| 79 later ones */ |
| 80 typedef enum { |
| 81 /* Status came from the application layer overriding whatever |
| 82 the wire says */ |
| 83 STATUS_FROM_API_OVERRIDE = 0, |
| 84 /* Status was created by some internal channel stack operation */ |
| 85 STATUS_FROM_CORE, |
| 86 /* Status came from 'the wire' - or somewhere below the surface |
| 87 layer */ |
| 88 STATUS_FROM_WIRE, |
| 89 /* Status came from the server sending status */ |
| 90 STATUS_FROM_SERVER_STATUS, |
| 91 STATUS_SOURCE_COUNT |
| 92 } status_source; |
| 93 |
| 94 typedef struct { |
| 95 uint8_t is_set; |
| 96 grpc_status_code code; |
| 97 grpc_mdstr *details; |
| 98 } received_status; |
| 99 |
| 100 /* How far through the GRPC stream have we read? */ |
| 101 typedef enum { |
| 102 /* We are still waiting for initial metadata to complete */ |
| 103 READ_STATE_INITIAL = 0, |
| 104 /* We have gotten initial metadata, and are reading either |
| 105 messages or trailing metadata */ |
| 106 READ_STATE_GOT_INITIAL_METADATA, |
| 107 /* The stream is closed for reading */ |
| 108 READ_STATE_READ_CLOSED, |
| 109 /* The stream is closed for reading & writing */ |
| 110 READ_STATE_STREAM_CLOSED |
| 111 } read_state; |
| 112 |
| 113 typedef enum { |
| 114 WRITE_STATE_INITIAL = 0, |
| 115 WRITE_STATE_STARTED, |
| 116 WRITE_STATE_WRITE_CLOSED |
| 117 } write_state; |
| 118 |
| 119 typedef struct batch_control { |
| 120 grpc_call *call; |
| 121 grpc_cq_completion cq_completion; |
| 122 grpc_closure finish_batch; |
| 123 void *notify_tag; |
| 124 gpr_refcount steps_to_complete; |
| 125 |
| 126 uint8_t send_initial_metadata; |
| 127 uint8_t send_message; |
| 128 uint8_t send_final_op; |
| 129 uint8_t recv_initial_metadata; |
| 130 uint8_t recv_message; |
| 131 uint8_t recv_final_op; |
| 132 uint8_t is_notify_tag_closure; |
| 133 uint8_t success; |
| 134 } batch_control; |
| 135 |
| 136 struct grpc_call { |
| 137 grpc_completion_queue *cq; |
| 138 grpc_channel *channel; |
| 139 grpc_call *parent; |
| 140 grpc_call *first_child; |
| 141 /* TODO(ctiller): share with cq if possible? */ |
| 142 gpr_mu mu; |
| 143 |
| 144 /* client or server call */ |
| 145 uint8_t is_client; |
| 146 /* is the alarm set */ |
| 147 uint8_t have_alarm; |
| 148 /** has grpc_call_destroy been called */ |
| 149 uint8_t destroy_called; |
| 150 /** flag indicating that cancellation is inherited */ |
| 151 uint8_t cancellation_is_inherited; |
| 152 /** bitmask of live batches */ |
| 153 uint8_t used_batches; |
| 154 /** which ops are in-flight */ |
| 155 uint8_t sent_initial_metadata; |
| 156 uint8_t sending_message; |
| 157 uint8_t sent_final_op; |
| 158 uint8_t received_initial_metadata; |
| 159 uint8_t receiving_message; |
| 160 uint8_t received_final_op; |
| 161 |
| 162 /* have we received initial metadata */ |
| 163 bool has_initial_md_been_received; |
| 164 |
| 165 batch_control active_batches[MAX_CONCURRENT_BATCHES]; |
| 166 |
| 167 /* first idx: is_receiving, second idx: is_trailing */ |
| 168 grpc_metadata_batch metadata_batch[2][2]; |
| 169 |
| 170 /* Buffered read metadata waiting to be returned to the application. |
| 171 Element 0 is initial metadata, element 1 is trailing metadata. */ |
| 172 grpc_metadata_array *buffered_metadata[2]; |
| 173 |
| 174 /* Received call statuses from various sources */ |
| 175 received_status status[STATUS_SOURCE_COUNT]; |
| 176 |
| 177 /* Compression algorithm for the call */ |
| 178 grpc_compression_algorithm compression_algorithm; |
| 179 /* Supported encodings (compression algorithms), a bitset */ |
| 180 uint32_t encodings_accepted_by_peer; |
| 181 |
| 182 /* Contexts for various subsystems (security, tracing, ...). */ |
| 183 grpc_call_context_element context[GRPC_CONTEXT_COUNT]; |
| 184 |
| 185 /* Deadline alarm - if have_alarm is non-zero */ |
| 186 grpc_timer alarm; |
| 187 |
| 188 /* for the client, extra metadata is initial metadata; for the |
| 189 server, it's trailing metadata */ |
| 190 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT]; |
| 191 int send_extra_metadata_count; |
| 192 gpr_timespec send_deadline; |
| 193 |
| 194 /** siblings: children of the same parent form a list, and this list is |
| 195 protected under |
| 196 parent->mu */ |
| 197 grpc_call *sibling_next; |
| 198 grpc_call *sibling_prev; |
| 199 |
| 200 grpc_slice_buffer_stream sending_stream; |
| 201 grpc_byte_stream *receiving_stream; |
| 202 grpc_byte_buffer **receiving_buffer; |
| 203 gpr_slice receiving_slice; |
| 204 grpc_closure receiving_slice_ready; |
| 205 grpc_closure receiving_stream_ready; |
| 206 grpc_closure receiving_initial_metadata_ready; |
| 207 uint32_t test_only_last_message_flags; |
| 208 |
| 209 union { |
| 210 struct { |
| 211 grpc_status_code *status; |
| 212 char **status_details; |
| 213 size_t *status_details_capacity; |
| 214 } client; |
| 215 struct { |
| 216 int *cancelled; |
| 217 } server; |
| 218 } final_op; |
| 219 |
| 220 struct { |
| 221 void *bctlp; |
| 222 bool success; |
| 223 } saved_receiving_stream_ready_ctx; |
| 224 }; |
| 225 |
| 226 #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) |
| 227 #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) |
| 228 #define CALL_ELEM_FROM_CALL(call, idx) \ |
| 229 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) |
| 230 #define CALL_FROM_TOP_ELEM(top_elem) \ |
| 231 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) |
| 232 |
| 233 static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call, |
| 234 gpr_timespec deadline); |
| 235 static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, |
| 236 grpc_transport_stream_op *op); |
| 237 static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, |
| 238 grpc_status_code status, |
| 239 const char *description); |
| 240 static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, |
| 241 bool success); |
| 242 static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, |
| 243 bool success); |
| 244 |
| 245 grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, |
| 246 uint32_t propagation_mask, |
| 247 grpc_completion_queue *cq, |
| 248 const void *server_transport_data, |
| 249 grpc_mdelem **add_initial_metadata, |
| 250 size_t add_initial_metadata_count, |
| 251 gpr_timespec send_deadline) { |
| 252 size_t i, j; |
| 253 grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); |
| 254 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
| 255 grpc_call *call; |
| 256 GPR_TIMER_BEGIN("grpc_call_create", 0); |
| 257 call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); |
| 258 memset(call, 0, sizeof(grpc_call)); |
| 259 gpr_mu_init(&call->mu); |
| 260 call->channel = channel; |
| 261 call->cq = cq; |
| 262 call->parent = parent_call; |
| 263 call->is_client = server_transport_data == NULL; |
| 264 if (call->is_client) { |
| 265 GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); |
| 266 for (i = 0; i < add_initial_metadata_count; i++) { |
| 267 call->send_extra_metadata[i].md = add_initial_metadata[i]; |
| 268 } |
| 269 call->send_extra_metadata_count = (int)add_initial_metadata_count; |
| 270 } else { |
| 271 GPR_ASSERT(add_initial_metadata_count == 0); |
| 272 call->send_extra_metadata_count = 0; |
| 273 } |
| 274 for (i = 0; i < 2; i++) { |
| 275 for (j = 0; j < 2; j++) { |
| 276 call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); |
| 277 } |
| 278 } |
| 279 call->send_deadline = send_deadline; |
| 280 GRPC_CHANNEL_INTERNAL_REF(channel, "call"); |
| 281 /* initial refcount dropped by grpc_call_destroy */ |
| 282 grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call, |
| 283 call->context, server_transport_data, |
| 284 CALL_STACK_FROM_CALL(call)); |
| 285 if (cq != NULL) { |
| 286 GRPC_CQ_INTERNAL_REF(cq, "bind"); |
| 287 grpc_call_stack_set_pollset(&exec_ctx, CALL_STACK_FROM_CALL(call), |
| 288 grpc_cq_pollset(cq)); |
| 289 } |
| 290 if (parent_call != NULL) { |
| 291 GRPC_CALL_INTERNAL_REF(parent_call, "child"); |
| 292 GPR_ASSERT(call->is_client); |
| 293 GPR_ASSERT(!parent_call->is_client); |
| 294 |
| 295 gpr_mu_lock(&parent_call->mu); |
| 296 |
| 297 if (propagation_mask & GRPC_PROPAGATE_DEADLINE) { |
| 298 send_deadline = gpr_time_min( |
| 299 gpr_convert_clock_type(send_deadline, |
| 300 parent_call->send_deadline.clock_type), |
| 301 parent_call->send_deadline); |
| 302 } |
| 303 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with |
| 304 * GRPC_PROPAGATE_STATS_CONTEXT */ |
| 305 /* TODO(ctiller): This should change to use the appropriate census start_op |
| 306 * call. */ |
| 307 if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { |
| 308 GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); |
| 309 grpc_call_context_set(call, GRPC_CONTEXT_TRACING, |
| 310 parent_call->context[GRPC_CONTEXT_TRACING].value, |
| 311 NULL); |
| 312 } else { |
| 313 GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); |
| 314 } |
| 315 if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) { |
| 316 call->cancellation_is_inherited = 1; |
| 317 } |
| 318 |
| 319 if (parent_call->first_child == NULL) { |
| 320 parent_call->first_child = call; |
| 321 call->sibling_next = call->sibling_prev = call; |
| 322 } else { |
| 323 call->sibling_next = parent_call->first_child; |
| 324 call->sibling_prev = parent_call->first_child->sibling_prev; |
| 325 call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = |
| 326 call; |
| 327 } |
| 328 |
| 329 gpr_mu_unlock(&parent_call->mu); |
| 330 } |
| 331 if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != |
| 332 0) { |
| 333 set_deadline_alarm(&exec_ctx, call, send_deadline); |
| 334 } |
| 335 grpc_exec_ctx_finish(&exec_ctx); |
| 336 GPR_TIMER_END("grpc_call_create", 0); |
| 337 return call; |
| 338 } |
| 339 |
| 340 void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, |
| 341 grpc_completion_queue *cq) { |
| 342 GPR_ASSERT(cq); |
| 343 call->cq = cq; |
| 344 GRPC_CQ_INTERNAL_REF(cq, "bind"); |
| 345 grpc_call_stack_set_pollset(exec_ctx, CALL_STACK_FROM_CALL(call), |
| 346 grpc_cq_pollset(cq)); |
| 347 } |
| 348 |
| 349 #ifdef GRPC_STREAM_REFCOUNT_DEBUG |
| 350 #define REF_REASON reason |
| 351 #define REF_ARG , const char *reason |
| 352 #else |
| 353 #define REF_REASON "" |
| 354 #define REF_ARG |
| 355 #endif |
| 356 void grpc_call_internal_ref(grpc_call *c REF_ARG) { |
| 357 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON); |
| 358 } |
| 359 void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { |
| 360 GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); |
| 361 } |
| 362 |
| 363 static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { |
| 364 size_t i; |
| 365 int ii; |
| 366 grpc_call *c = call; |
| 367 GPR_TIMER_BEGIN("destroy_call", 0); |
| 368 for (i = 0; i < 2; i++) { |
| 369 grpc_metadata_batch_destroy( |
| 370 &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); |
| 371 } |
| 372 if (c->receiving_stream != NULL) { |
| 373 grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); |
| 374 } |
| 375 grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c)); |
| 376 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); |
| 377 gpr_mu_destroy(&c->mu); |
| 378 for (i = 0; i < STATUS_SOURCE_COUNT; i++) { |
| 379 if (c->status[i].details) { |
| 380 GRPC_MDSTR_UNREF(c->status[i].details); |
| 381 } |
| 382 } |
| 383 for (ii = 0; ii < c->send_extra_metadata_count; ii++) { |
| 384 GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md); |
| 385 } |
| 386 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) { |
| 387 if (c->context[i].destroy) { |
| 388 c->context[i].destroy(c->context[i].value); |
| 389 } |
| 390 } |
| 391 if (c->cq) { |
| 392 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); |
| 393 } |
| 394 gpr_free(c); |
| 395 GPR_TIMER_END("destroy_call", 0); |
| 396 } |
| 397 |
| 398 static void set_status_code(grpc_call *call, status_source source, |
| 399 uint32_t status) { |
| 400 if (call->status[source].is_set) return; |
| 401 |
| 402 call->status[source].is_set = 1; |
| 403 call->status[source].code = (grpc_status_code)status; |
| 404 |
| 405 /* TODO(ctiller): what to do about the flush that was previously here */ |
| 406 } |
| 407 |
| 408 static void set_compression_algorithm(grpc_call *call, |
| 409 grpc_compression_algorithm algo) { |
| 410 call->compression_algorithm = algo; |
| 411 } |
| 412 |
| 413 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( |
| 414 grpc_call *call) { |
| 415 grpc_compression_algorithm algorithm; |
| 416 gpr_mu_lock(&call->mu); |
| 417 algorithm = call->compression_algorithm; |
| 418 gpr_mu_unlock(&call->mu); |
| 419 return algorithm; |
| 420 } |
| 421 |
| 422 uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { |
| 423 uint32_t flags; |
| 424 gpr_mu_lock(&call->mu); |
| 425 flags = call->test_only_last_message_flags; |
| 426 gpr_mu_unlock(&call->mu); |
| 427 return flags; |
| 428 } |
| 429 |
| 430 static void destroy_encodings_accepted_by_peer(void *p) { return; } |
| 431 |
| 432 static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) { |
| 433 size_t i; |
| 434 grpc_compression_algorithm algorithm; |
| 435 gpr_slice_buffer accept_encoding_parts; |
| 436 gpr_slice accept_encoding_slice; |
| 437 void *accepted_user_data; |
| 438 |
| 439 accepted_user_data = |
| 440 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer); |
| 441 if (accepted_user_data != NULL) { |
| 442 call->encodings_accepted_by_peer = |
| 443 (uint32_t)(((uintptr_t)accepted_user_data) - 1); |
| 444 return; |
| 445 } |
| 446 |
| 447 accept_encoding_slice = mdel->value->slice; |
| 448 gpr_slice_buffer_init(&accept_encoding_parts); |
| 449 gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); |
| 450 |
| 451 /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already |
| 452 * zeroes the whole grpc_call */ |
| 453 /* Always support no compression */ |
| 454 GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); |
| 455 for (i = 0; i < accept_encoding_parts.count; i++) { |
| 456 const gpr_slice *accept_encoding_entry_slice = |
| 457 &accept_encoding_parts.slices[i]; |
| 458 if (grpc_compression_algorithm_parse( |
| 459 (const char *)GPR_SLICE_START_PTR(*accept_encoding_entry_slice), |
| 460 GPR_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) { |
| 461 GPR_BITSET(&call->encodings_accepted_by_peer, algorithm); |
| 462 } else { |
| 463 char *accept_encoding_entry_str = |
| 464 gpr_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII); |
| 465 gpr_log(GPR_ERROR, |
| 466 "Invalid entry in accept encoding metadata: '%s'. Ignoring.", |
| 467 accept_encoding_entry_str); |
| 468 gpr_free(accept_encoding_entry_str); |
| 469 } |
| 470 } |
| 471 |
| 472 gpr_slice_buffer_destroy(&accept_encoding_parts); |
| 473 |
| 474 grpc_mdelem_set_user_data( |
| 475 mdel, destroy_encodings_accepted_by_peer, |
| 476 (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1)); |
| 477 } |
| 478 |
| 479 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { |
| 480 uint32_t encodings_accepted_by_peer; |
| 481 gpr_mu_lock(&call->mu); |
| 482 encodings_accepted_by_peer = call->encodings_accepted_by_peer; |
| 483 gpr_mu_unlock(&call->mu); |
| 484 return encodings_accepted_by_peer; |
| 485 } |
| 486 |
| 487 static void set_status_details(grpc_call *call, status_source source, |
| 488 grpc_mdstr *status) { |
| 489 if (call->status[source].details != NULL) { |
| 490 GRPC_MDSTR_UNREF(call->status[source].details); |
| 491 } |
| 492 call->status[source].details = status; |
| 493 } |
| 494 |
| 495 static void get_final_status(grpc_call *call, |
| 496 void (*set_value)(grpc_status_code code, |
| 497 void *user_data), |
| 498 void *set_value_user_data) { |
| 499 int i; |
| 500 for (i = 0; i < STATUS_SOURCE_COUNT; i++) { |
| 501 if (call->status[i].is_set) { |
| 502 set_value(call->status[i].code, set_value_user_data); |
| 503 return; |
| 504 } |
| 505 } |
| 506 if (call->is_client) { |
| 507 set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); |
| 508 } else { |
| 509 set_value(GRPC_STATUS_OK, set_value_user_data); |
| 510 } |
| 511 } |
| 512 |
| 513 static void get_final_details(grpc_call *call, char **out_details, |
| 514 size_t *out_details_capacity) { |
| 515 int i; |
| 516 for (i = 0; i < STATUS_SOURCE_COUNT; i++) { |
| 517 if (call->status[i].is_set) { |
| 518 if (call->status[i].details) { |
| 519 gpr_slice details = call->status[i].details->slice; |
| 520 size_t len = GPR_SLICE_LENGTH(details); |
| 521 if (len + 1 > *out_details_capacity) { |
| 522 *out_details_capacity = |
| 523 GPR_MAX(len + 1, *out_details_capacity * 3 / 2); |
| 524 *out_details = gpr_realloc(*out_details, *out_details_capacity); |
| 525 } |
| 526 memcpy(*out_details, GPR_SLICE_START_PTR(details), len); |
| 527 (*out_details)[len] = 0; |
| 528 } else { |
| 529 goto no_details; |
| 530 } |
| 531 return; |
| 532 } |
| 533 } |
| 534 |
| 535 no_details: |
| 536 if (0 == *out_details_capacity) { |
| 537 *out_details_capacity = 8; |
| 538 *out_details = gpr_malloc(*out_details_capacity); |
| 539 } |
| 540 **out_details = 0; |
| 541 } |
| 542 |
| 543 static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) { |
| 544 return (grpc_linked_mdelem *)&md->internal_data; |
| 545 } |
| 546 |
| 547 static int prepare_application_metadata(grpc_call *call, int count, |
| 548 grpc_metadata *metadata, |
| 549 int is_trailing, |
| 550 int prepend_extra_metadata) { |
| 551 int i; |
| 552 grpc_metadata_batch *batch = |
| 553 &call->metadata_batch[0 /* is_receiving */][is_trailing]; |
| 554 if (prepend_extra_metadata) { |
| 555 if (call->send_extra_metadata_count == 0) { |
| 556 prepend_extra_metadata = 0; |
| 557 } else { |
| 558 for (i = 0; i < call->send_extra_metadata_count; i++) { |
| 559 GRPC_MDELEM_REF(call->send_extra_metadata[i].md); |
| 560 } |
| 561 for (i = 1; i < call->send_extra_metadata_count; i++) { |
| 562 call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; |
| 563 } |
| 564 for (i = 0; i < call->send_extra_metadata_count - 1; i++) { |
| 565 call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1]; |
| 566 } |
| 567 } |
| 568 } |
| 569 for (i = 0; i < count; i++) { |
| 570 grpc_metadata *md = &metadata[i]; |
| 571 grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; |
| 572 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); |
| 573 l->md = grpc_mdelem_from_string_and_buffer( |
| 574 md->key, (const uint8_t *)md->value, md->value_length); |
| 575 if (!grpc_header_key_is_legal(grpc_mdstr_as_c_string(l->md->key), |
| 576 GRPC_MDSTR_LENGTH(l->md->key))) { |
| 577 gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s", |
| 578 grpc_mdstr_as_c_string(l->md->key)); |
| 579 return 0; |
| 580 } else if (!grpc_is_binary_header(grpc_mdstr_as_c_string(l->md->key), |
| 581 GRPC_MDSTR_LENGTH(l->md->key)) && |
| 582 !grpc_header_nonbin_value_is_legal( |
| 583 grpc_mdstr_as_c_string(l->md->value), |
| 584 GRPC_MDSTR_LENGTH(l->md->value))) { |
| 585 gpr_log(GPR_ERROR, "attempt to send invalid metadata value"); |
| 586 return 0; |
| 587 } |
| 588 } |
| 589 for (i = 1; i < count; i++) { |
| 590 linked_from_md(&metadata[i])->prev = linked_from_md(&metadata[i - 1]); |
| 591 } |
| 592 for (i = 0; i < count - 1; i++) { |
| 593 linked_from_md(&metadata[i])->next = linked_from_md(&metadata[i + 1]); |
| 594 } |
| 595 switch (prepend_extra_metadata * 2 + (count != 0)) { |
| 596 case 0: |
| 597 /* no prepend, no metadata => nothing to do */ |
| 598 batch->list.head = batch->list.tail = NULL; |
| 599 break; |
| 600 case 1: |
| 601 /* metadata, but no prepend */ |
| 602 batch->list.head = linked_from_md(&metadata[0]); |
| 603 batch->list.tail = linked_from_md(&metadata[count - 1]); |
| 604 batch->list.head->prev = NULL; |
| 605 batch->list.tail->next = NULL; |
| 606 break; |
| 607 case 2: |
| 608 /* prepend, but no md */ |
| 609 batch->list.head = &call->send_extra_metadata[0]; |
| 610 batch->list.tail = |
| 611 &call->send_extra_metadata[call->send_extra_metadata_count - 1]; |
| 612 batch->list.head->prev = NULL; |
| 613 batch->list.tail->next = NULL; |
| 614 break; |
| 615 case 3: |
| 616 /* prepend AND md */ |
| 617 batch->list.head = &call->send_extra_metadata[0]; |
| 618 call->send_extra_metadata[call->send_extra_metadata_count - 1].next = |
| 619 linked_from_md(&metadata[0]); |
| 620 linked_from_md(&metadata[0])->prev = |
| 621 &call->send_extra_metadata[call->send_extra_metadata_count - 1]; |
| 622 batch->list.tail = linked_from_md(&metadata[count - 1]); |
| 623 batch->list.head->prev = NULL; |
| 624 batch->list.tail->next = NULL; |
| 625 break; |
| 626 default: |
| 627 GPR_UNREACHABLE_CODE(return 0); |
| 628 } |
| 629 |
| 630 return 1; |
| 631 } |
| 632 |
| 633 void grpc_call_destroy(grpc_call *c) { |
| 634 int cancel; |
| 635 grpc_call *parent = c->parent; |
| 636 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
| 637 |
| 638 GPR_TIMER_BEGIN("grpc_call_destroy", 0); |
| 639 GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); |
| 640 |
| 641 if (parent) { |
| 642 gpr_mu_lock(&parent->mu); |
| 643 if (c == parent->first_child) { |
| 644 parent->first_child = c->sibling_next; |
| 645 if (c == parent->first_child) { |
| 646 parent->first_child = NULL; |
| 647 } |
| 648 c->sibling_prev->sibling_next = c->sibling_next; |
| 649 c->sibling_next->sibling_prev = c->sibling_prev; |
| 650 } |
| 651 gpr_mu_unlock(&parent->mu); |
| 652 GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); |
| 653 } |
| 654 |
| 655 gpr_mu_lock(&c->mu); |
| 656 GPR_ASSERT(!c->destroy_called); |
| 657 c->destroy_called = 1; |
| 658 if (c->have_alarm) { |
| 659 grpc_timer_cancel(&exec_ctx, &c->alarm); |
| 660 } |
| 661 cancel = !c->received_final_op; |
| 662 gpr_mu_unlock(&c->mu); |
| 663 if (cancel) grpc_call_cancel(c, NULL); |
| 664 GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); |
| 665 grpc_exec_ctx_finish(&exec_ctx); |
| 666 GPR_TIMER_END("grpc_call_destroy", 0); |
| 667 } |
| 668 |
| 669 grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { |
| 670 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); |
| 671 GPR_ASSERT(!reserved); |
| 672 return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled", |
| 673 NULL); |
| 674 } |
| 675 |
| 676 grpc_call_error grpc_call_cancel_with_status(grpc_call *c, |
| 677 grpc_status_code status, |
| 678 const char *description, |
| 679 void *reserved) { |
| 680 grpc_call_error r; |
| 681 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
| 682 GRPC_API_TRACE( |
| 683 "grpc_call_cancel_with_status(" |
| 684 "c=%p, status=%d, description=%s, reserved=%p)", |
| 685 4, (c, (int)status, description, reserved)); |
| 686 GPR_ASSERT(reserved == NULL); |
| 687 gpr_mu_lock(&c->mu); |
| 688 r = cancel_with_status(&exec_ctx, c, status, description); |
| 689 gpr_mu_unlock(&c->mu); |
| 690 grpc_exec_ctx_finish(&exec_ctx); |
| 691 return r; |
| 692 } |
| 693 |
| 694 typedef struct cancel_closure { |
| 695 grpc_closure closure; |
| 696 grpc_call *call; |
| 697 grpc_status_code status; |
| 698 } cancel_closure; |
| 699 |
| 700 static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { |
| 701 cancel_closure *cc = ccp; |
| 702 GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel"); |
| 703 gpr_free(cc); |
| 704 } |
| 705 |
| 706 static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { |
| 707 grpc_transport_stream_op op; |
| 708 cancel_closure *cc = ccp; |
| 709 memset(&op, 0, sizeof(op)); |
| 710 op.cancel_with_status = cc->status; |
| 711 /* reuse closure to catch completion */ |
| 712 grpc_closure_init(&cc->closure, done_cancel, cc); |
| 713 op.on_complete = &cc->closure; |
| 714 execute_op(exec_ctx, cc->call, &op); |
| 715 } |
| 716 |
| 717 static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, |
| 718 grpc_status_code status, |
| 719 const char *description) { |
| 720 grpc_mdstr *details = |
| 721 description ? grpc_mdstr_from_string(description) : NULL; |
| 722 cancel_closure *cc = gpr_malloc(sizeof(*cc)); |
| 723 |
| 724 GPR_ASSERT(status != GRPC_STATUS_OK); |
| 725 |
| 726 set_status_code(c, STATUS_FROM_API_OVERRIDE, (uint32_t)status); |
| 727 set_status_details(c, STATUS_FROM_API_OVERRIDE, details); |
| 728 |
| 729 grpc_closure_init(&cc->closure, send_cancel, cc); |
| 730 cc->call = c; |
| 731 cc->status = status; |
| 732 GRPC_CALL_INTERNAL_REF(c, "cancel"); |
| 733 grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL); |
| 734 |
| 735 return GRPC_CALL_OK; |
| 736 } |
| 737 |
| 738 static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, |
| 739 grpc_transport_stream_op *op) { |
| 740 grpc_call_element *elem; |
| 741 |
| 742 GPR_TIMER_BEGIN("execute_op", 0); |
| 743 elem = CALL_ELEM_FROM_CALL(call, 0); |
| 744 op->context = call->context; |
| 745 elem->filter->start_transport_stream_op(exec_ctx, elem, op); |
| 746 GPR_TIMER_END("execute_op", 0); |
| 747 } |
| 748 |
| 749 char *grpc_call_get_peer(grpc_call *call) { |
| 750 grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); |
| 751 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
| 752 char *result; |
| 753 GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); |
| 754 result = elem->filter->get_peer(&exec_ctx, elem); |
| 755 if (result == NULL) { |
| 756 result = grpc_channel_get_target(call->channel); |
| 757 } |
| 758 if (result == NULL) { |
| 759 result = gpr_strdup("unknown"); |
| 760 } |
| 761 grpc_exec_ctx_finish(&exec_ctx); |
| 762 return result; |
| 763 } |
| 764 |
| 765 grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { |
| 766 return CALL_FROM_TOP_ELEM(elem); |
| 767 } |
| 768 |
| 769 static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool success) { |
| 770 grpc_call *call = arg; |
| 771 gpr_mu_lock(&call->mu); |
| 772 call->have_alarm = 0; |
| 773 if (success) { |
| 774 cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED, |
| 775 "Deadline Exceeded"); |
| 776 } |
| 777 gpr_mu_unlock(&call->mu); |
| 778 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm"); |
| 779 } |
| 780 |
| 781 static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call, |
| 782 gpr_timespec deadline) { |
| 783 if (call->have_alarm) { |
| 784 gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); |
| 785 assert(0); |
| 786 return; |
| 787 } |
| 788 GRPC_CALL_INTERNAL_REF(call, "alarm"); |
| 789 call->have_alarm = 1; |
| 790 call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); |
| 791 grpc_timer_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call, |
| 792 gpr_now(GPR_CLOCK_MONOTONIC)); |
| 793 } |
| 794 |
| 795 /* we offset status by a small amount when storing it into transport metadata |
| 796 as metadata cannot store a 0 value (which is used as OK for grpc_status_codes |
| 797 */ |
| 798 #define STATUS_OFFSET 1 |
| 799 static void destroy_status(void *ignored) {} |
| 800 |
| 801 static uint32_t decode_status(grpc_mdelem *md) { |
| 802 uint32_t status; |
| 803 void *user_data; |
| 804 if (md == GRPC_MDELEM_GRPC_STATUS_0) return 0; |
| 805 if (md == GRPC_MDELEM_GRPC_STATUS_1) return 1; |
| 806 if (md == GRPC_MDELEM_GRPC_STATUS_2) return 2; |
| 807 user_data = grpc_mdelem_get_user_data(md, destroy_status); |
| 808 if (user_data != NULL) { |
| 809 status = ((uint32_t)(intptr_t)user_data) - STATUS_OFFSET; |
| 810 } else { |
| 811 if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), |
| 812 GPR_SLICE_LENGTH(md->value->slice), |
| 813 &status)) { |
| 814 status = GRPC_STATUS_UNKNOWN; /* could not parse status code */ |
| 815 } |
| 816 grpc_mdelem_set_user_data(md, destroy_status, |
| 817 (void *)(intptr_t)(status + STATUS_OFFSET)); |
| 818 } |
| 819 return status; |
| 820 } |
| 821 |
| 822 static uint32_t decode_compression(grpc_mdelem *md) { |
| 823 grpc_compression_algorithm algorithm = |
| 824 grpc_compression_algorithm_from_mdstr(md->value); |
| 825 if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) { |
| 826 const char *md_c_str = grpc_mdstr_as_c_string(md->value); |
| 827 gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); |
| 828 } |
| 829 return algorithm; |
| 830 } |
| 831 |
| 832 static grpc_mdelem *recv_common_filter(grpc_call *call, grpc_mdelem *elem) { |
| 833 if (elem->key == GRPC_MDSTR_GRPC_STATUS) { |
| 834 GPR_TIMER_BEGIN("status", 0); |
| 835 set_status_code(call, STATUS_FROM_WIRE, decode_status(elem)); |
| 836 GPR_TIMER_END("status", 0); |
| 837 return NULL; |
| 838 } else if (elem->key == GRPC_MDSTR_GRPC_MESSAGE) { |
| 839 GPR_TIMER_BEGIN("status-details", 0); |
| 840 set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(elem->value)); |
| 841 GPR_TIMER_END("status-details", 0); |
| 842 return NULL; |
| 843 } |
| 844 return elem; |
| 845 } |
| 846 |
| 847 static grpc_mdelem *publish_app_metadata(grpc_call *call, grpc_mdelem *elem, |
| 848 int is_trailing) { |
| 849 grpc_metadata_array *dest; |
| 850 grpc_metadata *mdusr; |
| 851 GPR_TIMER_BEGIN("publish_app_metadata", 0); |
| 852 dest = call->buffered_metadata[is_trailing]; |
| 853 if (dest->count == dest->capacity) { |
| 854 dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); |
| 855 dest->metadata = |
| 856 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); |
| 857 } |
| 858 mdusr = &dest->metadata[dest->count++]; |
| 859 mdusr->key = grpc_mdstr_as_c_string(elem->key); |
| 860 mdusr->value = grpc_mdstr_as_c_string(elem->value); |
| 861 mdusr->value_length = GPR_SLICE_LENGTH(elem->value->slice); |
| 862 GPR_TIMER_END("publish_app_metadata", 0); |
| 863 return elem; |
| 864 } |
| 865 |
| 866 static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) { |
| 867 grpc_call *call = callp; |
| 868 elem = recv_common_filter(call, elem); |
| 869 if (elem == NULL) { |
| 870 return NULL; |
| 871 } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) { |
| 872 GPR_TIMER_BEGIN("compression_algorithm", 0); |
| 873 set_compression_algorithm(call, decode_compression(elem)); |
| 874 GPR_TIMER_END("compression_algorithm", 0); |
| 875 return NULL; |
| 876 } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) { |
| 877 GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); |
| 878 set_encodings_accepted_by_peer(call, elem); |
| 879 GPR_TIMER_END("encodings_accepted_by_peer", 0); |
| 880 return NULL; |
| 881 } else { |
| 882 return publish_app_metadata(call, elem, 0); |
| 883 } |
| 884 } |
| 885 |
| 886 static grpc_mdelem *recv_trailing_filter(void *callp, grpc_mdelem *elem) { |
| 887 grpc_call *call = callp; |
| 888 elem = recv_common_filter(call, elem); |
| 889 if (elem == NULL) { |
| 890 return NULL; |
| 891 } else { |
| 892 return publish_app_metadata(call, elem, 1); |
| 893 } |
| 894 } |
| 895 |
| 896 grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { |
| 897 return CALL_STACK_FROM_CALL(call); |
| 898 } |
| 899 |
| 900 /* |
| 901 * BATCH API IMPLEMENTATION |
| 902 */ |
| 903 |
| 904 static void set_status_value_directly(grpc_status_code status, void *dest) { |
| 905 *(grpc_status_code *)dest = status; |
| 906 } |
| 907 |
| 908 static void set_cancelled_value(grpc_status_code status, void *dest) { |
| 909 *(int *)dest = (status != GRPC_STATUS_OK); |
| 910 } |
| 911 |
| 912 static int are_write_flags_valid(uint32_t flags) { |
| 913 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ |
| 914 const uint32_t allowed_write_positions = |
| 915 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); |
| 916 const uint32_t invalid_positions = ~allowed_write_positions; |
| 917 return !(flags & invalid_positions); |
| 918 } |
| 919 |
| 920 static batch_control *allocate_batch_control(grpc_call *call) { |
| 921 size_t i; |
| 922 for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) { |
| 923 if ((call->used_batches & (1 << i)) == 0) { |
| 924 call->used_batches = (uint8_t)(call->used_batches | (uint8_t)(1 << i)); |
| 925 return &call->active_batches[i]; |
| 926 } |
| 927 } |
| 928 return NULL; |
| 929 } |
| 930 |
| 931 static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, |
| 932 grpc_cq_completion *storage) { |
| 933 batch_control *bctl = user_data; |
| 934 grpc_call *call = bctl->call; |
| 935 gpr_mu_lock(&call->mu); |
| 936 call->used_batches = (uint8_t)( |
| 937 call->used_batches & ~(uint8_t)(1 << (bctl - call->active_batches))); |
| 938 gpr_mu_unlock(&call->mu); |
| 939 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); |
| 940 } |
| 941 |
| 942 static void post_batch_completion(grpc_exec_ctx *exec_ctx, |
| 943 batch_control *bctl) { |
| 944 grpc_call *call = bctl->call; |
| 945 if (bctl->is_notify_tag_closure) { |
| 946 grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success, NULL); |
| 947 gpr_mu_lock(&call->mu); |
| 948 bctl->call->used_batches = |
| 949 (uint8_t)(bctl->call->used_batches & |
| 950 ~(uint8_t)(1 << (bctl - bctl->call->active_batches))); |
| 951 gpr_mu_unlock(&call->mu); |
| 952 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); |
| 953 } else { |
| 954 grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success, |
| 955 finish_batch_completion, bctl, &bctl->cq_completion); |
| 956 } |
| 957 } |
| 958 |
| 959 static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, |
| 960 batch_control *bctl) { |
| 961 grpc_call *call = bctl->call; |
| 962 for (;;) { |
| 963 size_t remaining = call->receiving_stream->length - |
| 964 (*call->receiving_buffer)->data.raw.slice_buffer.length; |
| 965 if (remaining == 0) { |
| 966 call->receiving_message = 0; |
| 967 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); |
| 968 call->receiving_stream = NULL; |
| 969 if (gpr_unref(&bctl->steps_to_complete)) { |
| 970 post_batch_completion(exec_ctx, bctl); |
| 971 } |
| 972 return; |
| 973 } |
| 974 if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, |
| 975 &call->receiving_slice, remaining, |
| 976 &call->receiving_slice_ready)) { |
| 977 gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, |
| 978 call->receiving_slice); |
| 979 } else { |
| 980 return; |
| 981 } |
| 982 } |
| 983 } |
| 984 |
| 985 static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, |
| 986 bool success) { |
| 987 batch_control *bctl = bctlp; |
| 988 grpc_call *call = bctl->call; |
| 989 |
| 990 if (success) { |
| 991 gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, |
| 992 call->receiving_slice); |
| 993 continue_receiving_slices(exec_ctx, bctl); |
| 994 } else { |
| 995 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); |
| 996 call->receiving_stream = NULL; |
| 997 grpc_byte_buffer_destroy(*call->receiving_buffer); |
| 998 *call->receiving_buffer = NULL; |
| 999 if (gpr_unref(&bctl->steps_to_complete)) { |
| 1000 post_batch_completion(exec_ctx, bctl); |
| 1001 } |
| 1002 } |
| 1003 } |
| 1004 |
| 1005 static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, |
| 1006 bool success) { |
| 1007 grpc_call *call = bctl->call; |
| 1008 if (call->receiving_stream == NULL) { |
| 1009 *call->receiving_buffer = NULL; |
| 1010 call->receiving_message = 0; |
| 1011 if (gpr_unref(&bctl->steps_to_complete)) { |
| 1012 post_batch_completion(exec_ctx, bctl); |
| 1013 } |
| 1014 } else if (call->receiving_stream->length > |
| 1015 grpc_channel_get_max_message_length(call->channel)) { |
| 1016 cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, |
| 1017 "Max message size exceeded"); |
| 1018 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); |
| 1019 call->receiving_stream = NULL; |
| 1020 *call->receiving_buffer = NULL; |
| 1021 call->receiving_message = 0; |
| 1022 if (gpr_unref(&bctl->steps_to_complete)) { |
| 1023 post_batch_completion(exec_ctx, bctl); |
| 1024 } |
| 1025 } else { |
| 1026 call->test_only_last_message_flags = call->receiving_stream->flags; |
| 1027 if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && |
| 1028 (call->compression_algorithm > GRPC_COMPRESS_NONE)) { |
| 1029 *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( |
| 1030 NULL, 0, call->compression_algorithm); |
| 1031 } else { |
| 1032 *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); |
| 1033 } |
| 1034 grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, |
| 1035 bctl); |
| 1036 continue_receiving_slices(exec_ctx, bctl); |
| 1037 /* early out */ |
| 1038 return; |
| 1039 } |
| 1040 } |
| 1041 |
| 1042 static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, |
| 1043 bool success) { |
| 1044 batch_control *bctl = bctlp; |
| 1045 grpc_call *call = bctl->call; |
| 1046 |
| 1047 gpr_mu_lock(&bctl->call->mu); |
| 1048 if (bctl->call->has_initial_md_been_received) { |
| 1049 gpr_mu_unlock(&bctl->call->mu); |
| 1050 process_data_after_md(exec_ctx, bctlp, success); |
| 1051 } else { |
| 1052 call->saved_receiving_stream_ready_ctx.bctlp = bctlp; |
| 1053 call->saved_receiving_stream_ready_ctx.success = success; |
| 1054 gpr_mu_unlock(&bctl->call->mu); |
| 1055 } |
| 1056 } |
| 1057 |
| 1058 static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, |
| 1059 void *bctlp, bool success) { |
| 1060 batch_control *bctl = bctlp; |
| 1061 grpc_call *call = bctl->call; |
| 1062 |
| 1063 gpr_mu_lock(&call->mu); |
| 1064 |
| 1065 grpc_metadata_batch *md = |
| 1066 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; |
| 1067 grpc_metadata_batch_filter(md, recv_initial_filter, call); |
| 1068 call->has_initial_md_been_received = true; |
| 1069 |
| 1070 if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != |
| 1071 0 && |
| 1072 !call->is_client) { |
| 1073 GPR_TIMER_BEGIN("set_deadline_alarm", 0); |
| 1074 set_deadline_alarm(exec_ctx, call, md->deadline); |
| 1075 GPR_TIMER_END("set_deadline_alarm", 0); |
| 1076 } |
| 1077 |
| 1078 if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) { |
| 1079 grpc_closure *saved_rsr_closure = grpc_closure_create( |
| 1080 receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp); |
| 1081 grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, |
| 1082 call->saved_receiving_stream_ready_ctx.success, NULL); |
| 1083 call->saved_receiving_stream_ready_ctx.bctlp = NULL; |
| 1084 } |
| 1085 |
| 1086 gpr_mu_unlock(&call->mu); |
| 1087 |
| 1088 if (gpr_unref(&bctl->steps_to_complete)) { |
| 1089 post_batch_completion(exec_ctx, bctl); |
| 1090 } |
| 1091 } |
| 1092 |
| 1093 static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { |
| 1094 batch_control *bctl = bctlp; |
| 1095 grpc_call *call = bctl->call; |
| 1096 grpc_call *child_call; |
| 1097 grpc_call *next_child_call; |
| 1098 |
| 1099 gpr_mu_lock(&call->mu); |
| 1100 if (bctl->send_initial_metadata) { |
| 1101 grpc_metadata_batch_destroy( |
| 1102 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); |
| 1103 } |
| 1104 if (bctl->send_message) { |
| 1105 call->sending_message = 0; |
| 1106 } |
| 1107 if (bctl->send_final_op) { |
| 1108 grpc_metadata_batch_destroy( |
| 1109 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); |
| 1110 } |
| 1111 if (bctl->recv_final_op) { |
| 1112 grpc_metadata_batch *md = |
| 1113 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; |
| 1114 grpc_metadata_batch_filter(md, recv_trailing_filter, call); |
| 1115 |
| 1116 if (call->have_alarm) { |
| 1117 grpc_timer_cancel(exec_ctx, &call->alarm); |
| 1118 } |
| 1119 /* propagate cancellation to any interested children */ |
| 1120 child_call = call->first_child; |
| 1121 if (child_call != NULL) { |
| 1122 do { |
| 1123 next_child_call = child_call->sibling_next; |
| 1124 if (child_call->cancellation_is_inherited) { |
| 1125 GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); |
| 1126 grpc_call_cancel(child_call, NULL); |
| 1127 GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); |
| 1128 } |
| 1129 child_call = next_child_call; |
| 1130 } while (child_call != call->first_child); |
| 1131 } |
| 1132 |
| 1133 if (call->is_client) { |
| 1134 get_final_status(call, set_status_value_directly, |
| 1135 call->final_op.client.status); |
| 1136 get_final_details(call, call->final_op.client.status_details, |
| 1137 call->final_op.client.status_details_capacity); |
| 1138 } else { |
| 1139 get_final_status(call, set_cancelled_value, |
| 1140 call->final_op.server.cancelled); |
| 1141 } |
| 1142 |
| 1143 success = 1; |
| 1144 } |
| 1145 bctl->success = success != 0; |
| 1146 gpr_mu_unlock(&call->mu); |
| 1147 if (gpr_unref(&bctl->steps_to_complete)) { |
| 1148 post_batch_completion(exec_ctx, bctl); |
| 1149 } |
| 1150 } |
| 1151 |
| 1152 static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, |
| 1153 grpc_call *call, const grpc_op *ops, |
| 1154 size_t nops, void *notify_tag, |
| 1155 int is_notify_tag_closure) { |
| 1156 grpc_transport_stream_op stream_op; |
| 1157 size_t i; |
| 1158 const grpc_op *op; |
| 1159 batch_control *bctl; |
| 1160 int num_completion_callbacks_needed = 1; |
| 1161 grpc_call_error error = GRPC_CALL_OK; |
| 1162 |
| 1163 GPR_TIMER_BEGIN("grpc_call_start_batch", 0); |
| 1164 |
| 1165 GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); |
| 1166 |
| 1167 memset(&stream_op, 0, sizeof(stream_op)); |
| 1168 |
| 1169 /* TODO(ctiller): this feels like it could be made lock-free */ |
| 1170 gpr_mu_lock(&call->mu); |
| 1171 bctl = allocate_batch_control(call); |
| 1172 memset(bctl, 0, sizeof(*bctl)); |
| 1173 bctl->call = call; |
| 1174 bctl->notify_tag = notify_tag; |
| 1175 bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); |
| 1176 |
| 1177 if (nops == 0) { |
| 1178 GRPC_CALL_INTERNAL_REF(call, "completion"); |
| 1179 bctl->success = 1; |
| 1180 if (!is_notify_tag_closure) { |
| 1181 grpc_cq_begin_op(call->cq, notify_tag); |
| 1182 } |
| 1183 gpr_mu_unlock(&call->mu); |
| 1184 post_batch_completion(exec_ctx, bctl); |
| 1185 error = GRPC_CALL_OK; |
| 1186 goto done; |
| 1187 } |
| 1188 |
| 1189 /* rewrite batch ops into a transport op */ |
| 1190 for (i = 0; i < nops; i++) { |
| 1191 op = &ops[i]; |
| 1192 if (op->reserved != NULL) { |
| 1193 error = GRPC_CALL_ERROR; |
| 1194 goto done_with_error; |
| 1195 } |
| 1196 switch (op->op) { |
| 1197 case GRPC_OP_SEND_INITIAL_METADATA: |
| 1198 /* Flag validation: currently allow no flags */ |
| 1199 if (op->flags != 0) { |
| 1200 error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| 1201 goto done_with_error; |
| 1202 } |
| 1203 if (call->sent_initial_metadata) { |
| 1204 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| 1205 goto done_with_error; |
| 1206 } |
| 1207 if (op->data.send_initial_metadata.count > INT_MAX) { |
| 1208 error = GRPC_CALL_ERROR_INVALID_METADATA; |
| 1209 goto done_with_error; |
| 1210 } |
| 1211 bctl->send_initial_metadata = 1; |
| 1212 call->sent_initial_metadata = 1; |
| 1213 if (!prepare_application_metadata( |
| 1214 call, (int)op->data.send_initial_metadata.count, |
| 1215 op->data.send_initial_metadata.metadata, 0, call->is_client)) { |
| 1216 error = GRPC_CALL_ERROR_INVALID_METADATA; |
| 1217 goto done_with_error; |
| 1218 } |
| 1219 /* TODO(ctiller): just make these the same variable? */ |
| 1220 call->metadata_batch[0][0].deadline = call->send_deadline; |
| 1221 stream_op.send_initial_metadata = |
| 1222 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; |
| 1223 break; |
| 1224 case GRPC_OP_SEND_MESSAGE: |
| 1225 if (!are_write_flags_valid(op->flags)) { |
| 1226 error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| 1227 goto done_with_error; |
| 1228 } |
| 1229 if (op->data.send_message == NULL) { |
| 1230 error = GRPC_CALL_ERROR_INVALID_MESSAGE; |
| 1231 goto done_with_error; |
| 1232 } |
| 1233 if (call->sending_message) { |
| 1234 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| 1235 goto done_with_error; |
| 1236 } |
| 1237 bctl->send_message = 1; |
| 1238 call->sending_message = 1; |
| 1239 grpc_slice_buffer_stream_init( |
| 1240 &call->sending_stream, |
| 1241 &op->data.send_message->data.raw.slice_buffer, op->flags); |
| 1242 stream_op.send_message = &call->sending_stream.base; |
| 1243 break; |
| 1244 case GRPC_OP_SEND_CLOSE_FROM_CLIENT: |
| 1245 /* Flag validation: currently allow no flags */ |
| 1246 if (op->flags != 0) { |
| 1247 error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| 1248 goto done_with_error; |
| 1249 } |
| 1250 if (!call->is_client) { |
| 1251 error = GRPC_CALL_ERROR_NOT_ON_SERVER; |
| 1252 goto done_with_error; |
| 1253 } |
| 1254 if (call->sent_final_op) { |
| 1255 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| 1256 goto done_with_error; |
| 1257 } |
| 1258 bctl->send_final_op = 1; |
| 1259 call->sent_final_op = 1; |
| 1260 stream_op.send_trailing_metadata = |
| 1261 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; |
| 1262 break; |
| 1263 case GRPC_OP_SEND_STATUS_FROM_SERVER: |
| 1264 /* Flag validation: currently allow no flags */ |
| 1265 if (op->flags != 0) { |
| 1266 error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| 1267 goto done_with_error; |
| 1268 } |
| 1269 if (call->is_client) { |
| 1270 error = GRPC_CALL_ERROR_NOT_ON_CLIENT; |
| 1271 goto done_with_error; |
| 1272 } |
| 1273 if (call->sent_final_op) { |
| 1274 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| 1275 goto done_with_error; |
| 1276 } |
| 1277 if (op->data.send_status_from_server.trailing_metadata_count > |
| 1278 INT_MAX) { |
| 1279 error = GRPC_CALL_ERROR_INVALID_METADATA; |
| 1280 goto done_with_error; |
| 1281 } |
| 1282 bctl->send_final_op = 1; |
| 1283 call->sent_final_op = 1; |
| 1284 call->send_extra_metadata_count = 1; |
| 1285 call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( |
| 1286 call->channel, op->data.send_status_from_server.status); |
| 1287 if (op->data.send_status_from_server.status_details != NULL) { |
| 1288 call->send_extra_metadata[1].md = grpc_mdelem_from_metadata_strings( |
| 1289 GRPC_MDSTR_GRPC_MESSAGE, |
| 1290 grpc_mdstr_from_string( |
| 1291 op->data.send_status_from_server.status_details)); |
| 1292 call->send_extra_metadata_count++; |
| 1293 set_status_details( |
| 1294 call, STATUS_FROM_API_OVERRIDE, |
| 1295 GRPC_MDSTR_REF(call->send_extra_metadata[1].md->value)); |
| 1296 } |
| 1297 set_status_code(call, STATUS_FROM_API_OVERRIDE, |
| 1298 (uint32_t)op->data.send_status_from_server.status); |
| 1299 if (!prepare_application_metadata( |
| 1300 call, |
| 1301 (int)op->data.send_status_from_server.trailing_metadata_count, |
| 1302 op->data.send_status_from_server.trailing_metadata, 1, 1)) { |
| 1303 error = GRPC_CALL_ERROR_INVALID_METADATA; |
| 1304 goto done_with_error; |
| 1305 } |
| 1306 stream_op.send_trailing_metadata = |
| 1307 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; |
| 1308 break; |
| 1309 case GRPC_OP_RECV_INITIAL_METADATA: |
| 1310 /* Flag validation: currently allow no flags */ |
| 1311 if (op->flags != 0) { |
| 1312 error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| 1313 goto done_with_error; |
| 1314 } |
| 1315 if (call->received_initial_metadata) { |
| 1316 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| 1317 goto done_with_error; |
| 1318 } |
| 1319 call->received_initial_metadata = 1; |
| 1320 call->buffered_metadata[0] = op->data.recv_initial_metadata; |
| 1321 grpc_closure_init(&call->receiving_initial_metadata_ready, |
| 1322 receiving_initial_metadata_ready, bctl); |
| 1323 bctl->recv_initial_metadata = 1; |
| 1324 stream_op.recv_initial_metadata = |
| 1325 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; |
| 1326 stream_op.recv_initial_metadata_ready = |
| 1327 &call->receiving_initial_metadata_ready; |
| 1328 num_completion_callbacks_needed++; |
| 1329 break; |
| 1330 case GRPC_OP_RECV_MESSAGE: |
| 1331 /* Flag validation: currently allow no flags */ |
| 1332 if (op->flags != 0) { |
| 1333 error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| 1334 goto done_with_error; |
| 1335 } |
| 1336 if (call->receiving_message) { |
| 1337 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| 1338 goto done_with_error; |
| 1339 } |
| 1340 call->receiving_message = 1; |
| 1341 bctl->recv_message = 1; |
| 1342 call->receiving_buffer = op->data.recv_message; |
| 1343 stream_op.recv_message = &call->receiving_stream; |
| 1344 grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, |
| 1345 bctl); |
| 1346 stream_op.recv_message_ready = &call->receiving_stream_ready; |
| 1347 num_completion_callbacks_needed++; |
| 1348 break; |
| 1349 case GRPC_OP_RECV_STATUS_ON_CLIENT: |
| 1350 /* Flag validation: currently allow no flags */ |
| 1351 if (op->flags != 0) { |
| 1352 error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| 1353 goto done_with_error; |
| 1354 } |
| 1355 if (!call->is_client) { |
| 1356 error = GRPC_CALL_ERROR_NOT_ON_SERVER; |
| 1357 goto done_with_error; |
| 1358 } |
| 1359 if (call->received_final_op) { |
| 1360 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| 1361 goto done_with_error; |
| 1362 } |
| 1363 call->received_final_op = 1; |
| 1364 call->buffered_metadata[1] = |
| 1365 op->data.recv_status_on_client.trailing_metadata; |
| 1366 call->final_op.client.status = op->data.recv_status_on_client.status; |
| 1367 call->final_op.client.status_details = |
| 1368 op->data.recv_status_on_client.status_details; |
| 1369 call->final_op.client.status_details_capacity = |
| 1370 op->data.recv_status_on_client.status_details_capacity; |
| 1371 bctl->recv_final_op = 1; |
| 1372 stream_op.recv_trailing_metadata = |
| 1373 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; |
| 1374 break; |
| 1375 case GRPC_OP_RECV_CLOSE_ON_SERVER: |
| 1376 /* Flag validation: currently allow no flags */ |
| 1377 if (op->flags != 0) { |
| 1378 error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| 1379 goto done_with_error; |
| 1380 } |
| 1381 if (call->is_client) { |
| 1382 error = GRPC_CALL_ERROR_NOT_ON_CLIENT; |
| 1383 goto done_with_error; |
| 1384 } |
| 1385 if (call->received_final_op) { |
| 1386 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| 1387 goto done_with_error; |
| 1388 } |
| 1389 call->received_final_op = 1; |
| 1390 call->final_op.server.cancelled = |
| 1391 op->data.recv_close_on_server.cancelled; |
| 1392 bctl->recv_final_op = 1; |
| 1393 stream_op.recv_trailing_metadata = |
| 1394 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; |
| 1395 break; |
| 1396 } |
| 1397 } |
| 1398 |
| 1399 GRPC_CALL_INTERNAL_REF(call, "completion"); |
| 1400 if (!is_notify_tag_closure) { |
| 1401 grpc_cq_begin_op(call->cq, notify_tag); |
| 1402 } |
| 1403 gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); |
| 1404 |
| 1405 stream_op.context = call->context; |
| 1406 grpc_closure_init(&bctl->finish_batch, finish_batch, bctl); |
| 1407 stream_op.on_complete = &bctl->finish_batch; |
| 1408 gpr_mu_unlock(&call->mu); |
| 1409 |
| 1410 execute_op(exec_ctx, call, &stream_op); |
| 1411 |
| 1412 done: |
| 1413 GPR_TIMER_END("grpc_call_start_batch", 0); |
| 1414 return error; |
| 1415 |
| 1416 done_with_error: |
| 1417 /* reverse any mutations that occured */ |
| 1418 if (bctl->send_initial_metadata) { |
| 1419 call->sent_initial_metadata = 0; |
| 1420 grpc_metadata_batch_clear(&call->metadata_batch[0][0]); |
| 1421 } |
| 1422 if (bctl->send_message) { |
| 1423 call->sending_message = 0; |
| 1424 grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base); |
| 1425 } |
| 1426 if (bctl->send_final_op) { |
| 1427 call->sent_final_op = 0; |
| 1428 grpc_metadata_batch_clear(&call->metadata_batch[0][1]); |
| 1429 } |
| 1430 if (bctl->recv_initial_metadata) { |
| 1431 call->received_initial_metadata = 0; |
| 1432 } |
| 1433 if (bctl->recv_message) { |
| 1434 call->receiving_message = 0; |
| 1435 } |
| 1436 if (bctl->recv_final_op) { |
| 1437 call->received_final_op = 0; |
| 1438 } |
| 1439 gpr_mu_unlock(&call->mu); |
| 1440 goto done; |
| 1441 } |
| 1442 |
| 1443 grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, |
| 1444 size_t nops, void *tag, void *reserved) { |
| 1445 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
| 1446 grpc_call_error err; |
| 1447 |
| 1448 GRPC_API_TRACE( |
| 1449 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)", |
| 1450 5, (call, ops, (unsigned long)nops, tag, reserved)); |
| 1451 |
| 1452 if (reserved != NULL) { |
| 1453 err = GRPC_CALL_ERROR; |
| 1454 } else { |
| 1455 err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0); |
| 1456 } |
| 1457 |
| 1458 grpc_exec_ctx_finish(&exec_ctx); |
| 1459 return err; |
| 1460 } |
| 1461 |
| 1462 grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx, |
| 1463 grpc_call *call, |
| 1464 const grpc_op *ops, |
| 1465 size_t nops, |
| 1466 grpc_closure *closure) { |
| 1467 return call_start_batch(exec_ctx, call, ops, nops, closure, 1); |
| 1468 } |
| 1469 |
| 1470 void grpc_call_context_set(grpc_call *call, grpc_context_index elem, |
| 1471 void *value, void (*destroy)(void *value)) { |
| 1472 if (call->context[elem].destroy) { |
| 1473 call->context[elem].destroy(call->context[elem].value); |
| 1474 } |
| 1475 call->context[elem].value = value; |
| 1476 call->context[elem].destroy = destroy; |
| 1477 } |
| 1478 |
| 1479 void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) { |
| 1480 return call->context[elem].value; |
| 1481 } |
| 1482 |
| 1483 uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; } |
OLD | NEW |