Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(197)

Side by Side Diff: third_party/grpc/src/core/surface/call.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33 #include <assert.h>
34 #include <limits.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38
39 #include <grpc/compression.h>
40 #include <grpc/grpc.h>
41 #include <grpc/support/alloc.h>
42 #include <grpc/support/log.h>
43 #include <grpc/support/string_util.h>
44 #include <grpc/support/useful.h>
45
46 #include "src/core/channel/channel_stack.h"
47 #include "src/core/compression/algorithm_metadata.h"
48 #include "src/core/iomgr/timer.h"
49 #include "src/core/profiling/timers.h"
50 #include "src/core/support/string.h"
51 #include "src/core/surface/api_trace.h"
52 #include "src/core/surface/call.h"
53 #include "src/core/surface/channel.h"
54 #include "src/core/surface/completion_queue.h"
55 #include "src/core/transport/static_metadata.h"
56
57 /** The maximum number of concurrent batches possible.
58 Based upon the maximum number of individually queueable ops in the batch
59 api:
60 - initial metadata send
61 - message send
62 - status/close send (depending on client/server)
63 - initial metadata recv
64 - message recv
65 - status/close recv (depending on client/server) */
66 #define MAX_CONCURRENT_BATCHES 6
67
68 typedef struct {
69 grpc_ioreq_completion_func on_complete;
70 void *user_data;
71 int success;
72 } completed_request;
73
74 #define MAX_SEND_EXTRA_METADATA_COUNT 3
75
76 /* Status data for a request can come from several sources; this
77 enumerates them all, and acts as a priority sorting for which
78 status to return to the application - earlier entries override
79 later ones */
80 typedef enum {
81 /* Status came from the application layer overriding whatever
82 the wire says */
83 STATUS_FROM_API_OVERRIDE = 0,
84 /* Status was created by some internal channel stack operation */
85 STATUS_FROM_CORE,
86 /* Status came from 'the wire' - or somewhere below the surface
87 layer */
88 STATUS_FROM_WIRE,
89 /* Status came from the server sending status */
90 STATUS_FROM_SERVER_STATUS,
91 STATUS_SOURCE_COUNT
92 } status_source;
93
94 typedef struct {
95 uint8_t is_set;
96 grpc_status_code code;
97 grpc_mdstr *details;
98 } received_status;
99
100 /* How far through the GRPC stream have we read? */
101 typedef enum {
102 /* We are still waiting for initial metadata to complete */
103 READ_STATE_INITIAL = 0,
104 /* We have gotten initial metadata, and are reading either
105 messages or trailing metadata */
106 READ_STATE_GOT_INITIAL_METADATA,
107 /* The stream is closed for reading */
108 READ_STATE_READ_CLOSED,
109 /* The stream is closed for reading & writing */
110 READ_STATE_STREAM_CLOSED
111 } read_state;
112
113 typedef enum {
114 WRITE_STATE_INITIAL = 0,
115 WRITE_STATE_STARTED,
116 WRITE_STATE_WRITE_CLOSED
117 } write_state;
118
119 typedef struct batch_control {
120 grpc_call *call;
121 grpc_cq_completion cq_completion;
122 grpc_closure finish_batch;
123 void *notify_tag;
124 gpr_refcount steps_to_complete;
125
126 uint8_t send_initial_metadata;
127 uint8_t send_message;
128 uint8_t send_final_op;
129 uint8_t recv_initial_metadata;
130 uint8_t recv_message;
131 uint8_t recv_final_op;
132 uint8_t is_notify_tag_closure;
133 uint8_t success;
134 } batch_control;
135
136 struct grpc_call {
137 grpc_completion_queue *cq;
138 grpc_channel *channel;
139 grpc_call *parent;
140 grpc_call *first_child;
141 /* TODO(ctiller): share with cq if possible? */
142 gpr_mu mu;
143
144 /* client or server call */
145 uint8_t is_client;
146 /* is the alarm set */
147 uint8_t have_alarm;
148 /** has grpc_call_destroy been called */
149 uint8_t destroy_called;
150 /** flag indicating that cancellation is inherited */
151 uint8_t cancellation_is_inherited;
152 /** bitmask of live batches */
153 uint8_t used_batches;
154 /** which ops are in-flight */
155 uint8_t sent_initial_metadata;
156 uint8_t sending_message;
157 uint8_t sent_final_op;
158 uint8_t received_initial_metadata;
159 uint8_t receiving_message;
160 uint8_t received_final_op;
161
162 /* have we received initial metadata */
163 bool has_initial_md_been_received;
164
165 batch_control active_batches[MAX_CONCURRENT_BATCHES];
166
167 /* first idx: is_receiving, second idx: is_trailing */
168 grpc_metadata_batch metadata_batch[2][2];
169
170 /* Buffered read metadata waiting to be returned to the application.
171 Element 0 is initial metadata, element 1 is trailing metadata. */
172 grpc_metadata_array *buffered_metadata[2];
173
174 /* Received call statuses from various sources */
175 received_status status[STATUS_SOURCE_COUNT];
176
177 /* Compression algorithm for the call */
178 grpc_compression_algorithm compression_algorithm;
179 /* Supported encodings (compression algorithms), a bitset */
180 uint32_t encodings_accepted_by_peer;
181
182 /* Contexts for various subsystems (security, tracing, ...). */
183 grpc_call_context_element context[GRPC_CONTEXT_COUNT];
184
185 /* Deadline alarm - if have_alarm is non-zero */
186 grpc_timer alarm;
187
188 /* for the client, extra metadata is initial metadata; for the
189 server, it's trailing metadata */
190 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
191 int send_extra_metadata_count;
192 gpr_timespec send_deadline;
193
194 /** siblings: children of the same parent form a list, and this list is
195 protected under
196 parent->mu */
197 grpc_call *sibling_next;
198 grpc_call *sibling_prev;
199
200 grpc_slice_buffer_stream sending_stream;
201 grpc_byte_stream *receiving_stream;
202 grpc_byte_buffer **receiving_buffer;
203 gpr_slice receiving_slice;
204 grpc_closure receiving_slice_ready;
205 grpc_closure receiving_stream_ready;
206 grpc_closure receiving_initial_metadata_ready;
207 uint32_t test_only_last_message_flags;
208
209 union {
210 struct {
211 grpc_status_code *status;
212 char **status_details;
213 size_t *status_details_capacity;
214 } client;
215 struct {
216 int *cancelled;
217 } server;
218 } final_op;
219
220 struct {
221 void *bctlp;
222 bool success;
223 } saved_receiving_stream_ready_ctx;
224 };
225
226 #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
227 #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
228 #define CALL_ELEM_FROM_CALL(call, idx) \
229 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
230 #define CALL_FROM_TOP_ELEM(top_elem) \
231 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
232
233 static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
234 gpr_timespec deadline);
235 static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
236 grpc_transport_stream_op *op);
237 static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
238 grpc_status_code status,
239 const char *description);
240 static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
241 bool success);
242 static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
243 bool success);
244
245 grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
246 uint32_t propagation_mask,
247 grpc_completion_queue *cq,
248 const void *server_transport_data,
249 grpc_mdelem **add_initial_metadata,
250 size_t add_initial_metadata_count,
251 gpr_timespec send_deadline) {
252 size_t i, j;
253 grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
254 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
255 grpc_call *call;
256 GPR_TIMER_BEGIN("grpc_call_create", 0);
257 call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
258 memset(call, 0, sizeof(grpc_call));
259 gpr_mu_init(&call->mu);
260 call->channel = channel;
261 call->cq = cq;
262 call->parent = parent_call;
263 call->is_client = server_transport_data == NULL;
264 if (call->is_client) {
265 GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT);
266 for (i = 0; i < add_initial_metadata_count; i++) {
267 call->send_extra_metadata[i].md = add_initial_metadata[i];
268 }
269 call->send_extra_metadata_count = (int)add_initial_metadata_count;
270 } else {
271 GPR_ASSERT(add_initial_metadata_count == 0);
272 call->send_extra_metadata_count = 0;
273 }
274 for (i = 0; i < 2; i++) {
275 for (j = 0; j < 2; j++) {
276 call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
277 }
278 }
279 call->send_deadline = send_deadline;
280 GRPC_CHANNEL_INTERNAL_REF(channel, "call");
281 /* initial refcount dropped by grpc_call_destroy */
282 grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call,
283 call->context, server_transport_data,
284 CALL_STACK_FROM_CALL(call));
285 if (cq != NULL) {
286 GRPC_CQ_INTERNAL_REF(cq, "bind");
287 grpc_call_stack_set_pollset(&exec_ctx, CALL_STACK_FROM_CALL(call),
288 grpc_cq_pollset(cq));
289 }
290 if (parent_call != NULL) {
291 GRPC_CALL_INTERNAL_REF(parent_call, "child");
292 GPR_ASSERT(call->is_client);
293 GPR_ASSERT(!parent_call->is_client);
294
295 gpr_mu_lock(&parent_call->mu);
296
297 if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
298 send_deadline = gpr_time_min(
299 gpr_convert_clock_type(send_deadline,
300 parent_call->send_deadline.clock_type),
301 parent_call->send_deadline);
302 }
303 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
304 * GRPC_PROPAGATE_STATS_CONTEXT */
305 /* TODO(ctiller): This should change to use the appropriate census start_op
306 * call. */
307 if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
308 GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
309 grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
310 parent_call->context[GRPC_CONTEXT_TRACING].value,
311 NULL);
312 } else {
313 GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
314 }
315 if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
316 call->cancellation_is_inherited = 1;
317 }
318
319 if (parent_call->first_child == NULL) {
320 parent_call->first_child = call;
321 call->sibling_next = call->sibling_prev = call;
322 } else {
323 call->sibling_next = parent_call->first_child;
324 call->sibling_prev = parent_call->first_child->sibling_prev;
325 call->sibling_next->sibling_prev = call->sibling_prev->sibling_next =
326 call;
327 }
328
329 gpr_mu_unlock(&parent_call->mu);
330 }
331 if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
332 0) {
333 set_deadline_alarm(&exec_ctx, call, send_deadline);
334 }
335 grpc_exec_ctx_finish(&exec_ctx);
336 GPR_TIMER_END("grpc_call_create", 0);
337 return call;
338 }
339
340 void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
341 grpc_completion_queue *cq) {
342 GPR_ASSERT(cq);
343 call->cq = cq;
344 GRPC_CQ_INTERNAL_REF(cq, "bind");
345 grpc_call_stack_set_pollset(exec_ctx, CALL_STACK_FROM_CALL(call),
346 grpc_cq_pollset(cq));
347 }
348
349 #ifdef GRPC_STREAM_REFCOUNT_DEBUG
350 #define REF_REASON reason
351 #define REF_ARG , const char *reason
352 #else
353 #define REF_REASON ""
354 #define REF_ARG
355 #endif
356 void grpc_call_internal_ref(grpc_call *c REF_ARG) {
357 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
358 }
359 void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
360 GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
361 }
362
363 static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
364 size_t i;
365 int ii;
366 grpc_call *c = call;
367 GPR_TIMER_BEGIN("destroy_call", 0);
368 for (i = 0; i < 2; i++) {
369 grpc_metadata_batch_destroy(
370 &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
371 }
372 if (c->receiving_stream != NULL) {
373 grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
374 }
375 grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
376 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
377 gpr_mu_destroy(&c->mu);
378 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
379 if (c->status[i].details) {
380 GRPC_MDSTR_UNREF(c->status[i].details);
381 }
382 }
383 for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
384 GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
385 }
386 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
387 if (c->context[i].destroy) {
388 c->context[i].destroy(c->context[i].value);
389 }
390 }
391 if (c->cq) {
392 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
393 }
394 gpr_free(c);
395 GPR_TIMER_END("destroy_call", 0);
396 }
397
398 static void set_status_code(grpc_call *call, status_source source,
399 uint32_t status) {
400 if (call->status[source].is_set) return;
401
402 call->status[source].is_set = 1;
403 call->status[source].code = (grpc_status_code)status;
404
405 /* TODO(ctiller): what to do about the flush that was previously here */
406 }
407
408 static void set_compression_algorithm(grpc_call *call,
409 grpc_compression_algorithm algo) {
410 call->compression_algorithm = algo;
411 }
412
413 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
414 grpc_call *call) {
415 grpc_compression_algorithm algorithm;
416 gpr_mu_lock(&call->mu);
417 algorithm = call->compression_algorithm;
418 gpr_mu_unlock(&call->mu);
419 return algorithm;
420 }
421
422 uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
423 uint32_t flags;
424 gpr_mu_lock(&call->mu);
425 flags = call->test_only_last_message_flags;
426 gpr_mu_unlock(&call->mu);
427 return flags;
428 }
429
430 static void destroy_encodings_accepted_by_peer(void *p) { return; }
431
432 static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) {
433 size_t i;
434 grpc_compression_algorithm algorithm;
435 gpr_slice_buffer accept_encoding_parts;
436 gpr_slice accept_encoding_slice;
437 void *accepted_user_data;
438
439 accepted_user_data =
440 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
441 if (accepted_user_data != NULL) {
442 call->encodings_accepted_by_peer =
443 (uint32_t)(((uintptr_t)accepted_user_data) - 1);
444 return;
445 }
446
447 accept_encoding_slice = mdel->value->slice;
448 gpr_slice_buffer_init(&accept_encoding_parts);
449 gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
450
451 /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
452 * zeroes the whole grpc_call */
453 /* Always support no compression */
454 GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
455 for (i = 0; i < accept_encoding_parts.count; i++) {
456 const gpr_slice *accept_encoding_entry_slice =
457 &accept_encoding_parts.slices[i];
458 if (grpc_compression_algorithm_parse(
459 (const char *)GPR_SLICE_START_PTR(*accept_encoding_entry_slice),
460 GPR_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) {
461 GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
462 } else {
463 char *accept_encoding_entry_str =
464 gpr_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII);
465 gpr_log(GPR_ERROR,
466 "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
467 accept_encoding_entry_str);
468 gpr_free(accept_encoding_entry_str);
469 }
470 }
471
472 gpr_slice_buffer_destroy(&accept_encoding_parts);
473
474 grpc_mdelem_set_user_data(
475 mdel, destroy_encodings_accepted_by_peer,
476 (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
477 }
478
479 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
480 uint32_t encodings_accepted_by_peer;
481 gpr_mu_lock(&call->mu);
482 encodings_accepted_by_peer = call->encodings_accepted_by_peer;
483 gpr_mu_unlock(&call->mu);
484 return encodings_accepted_by_peer;
485 }
486
487 static void set_status_details(grpc_call *call, status_source source,
488 grpc_mdstr *status) {
489 if (call->status[source].details != NULL) {
490 GRPC_MDSTR_UNREF(call->status[source].details);
491 }
492 call->status[source].details = status;
493 }
494
495 static void get_final_status(grpc_call *call,
496 void (*set_value)(grpc_status_code code,
497 void *user_data),
498 void *set_value_user_data) {
499 int i;
500 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
501 if (call->status[i].is_set) {
502 set_value(call->status[i].code, set_value_user_data);
503 return;
504 }
505 }
506 if (call->is_client) {
507 set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
508 } else {
509 set_value(GRPC_STATUS_OK, set_value_user_data);
510 }
511 }
512
513 static void get_final_details(grpc_call *call, char **out_details,
514 size_t *out_details_capacity) {
515 int i;
516 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
517 if (call->status[i].is_set) {
518 if (call->status[i].details) {
519 gpr_slice details = call->status[i].details->slice;
520 size_t len = GPR_SLICE_LENGTH(details);
521 if (len + 1 > *out_details_capacity) {
522 *out_details_capacity =
523 GPR_MAX(len + 1, *out_details_capacity * 3 / 2);
524 *out_details = gpr_realloc(*out_details, *out_details_capacity);
525 }
526 memcpy(*out_details, GPR_SLICE_START_PTR(details), len);
527 (*out_details)[len] = 0;
528 } else {
529 goto no_details;
530 }
531 return;
532 }
533 }
534
535 no_details:
536 if (0 == *out_details_capacity) {
537 *out_details_capacity = 8;
538 *out_details = gpr_malloc(*out_details_capacity);
539 }
540 **out_details = 0;
541 }
542
543 static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) {
544 return (grpc_linked_mdelem *)&md->internal_data;
545 }
546
547 static int prepare_application_metadata(grpc_call *call, int count,
548 grpc_metadata *metadata,
549 int is_trailing,
550 int prepend_extra_metadata) {
551 int i;
552 grpc_metadata_batch *batch =
553 &call->metadata_batch[0 /* is_receiving */][is_trailing];
554 if (prepend_extra_metadata) {
555 if (call->send_extra_metadata_count == 0) {
556 prepend_extra_metadata = 0;
557 } else {
558 for (i = 0; i < call->send_extra_metadata_count; i++) {
559 GRPC_MDELEM_REF(call->send_extra_metadata[i].md);
560 }
561 for (i = 1; i < call->send_extra_metadata_count; i++) {
562 call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1];
563 }
564 for (i = 0; i < call->send_extra_metadata_count - 1; i++) {
565 call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1];
566 }
567 }
568 }
569 for (i = 0; i < count; i++) {
570 grpc_metadata *md = &metadata[i];
571 grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
572 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
573 l->md = grpc_mdelem_from_string_and_buffer(
574 md->key, (const uint8_t *)md->value, md->value_length);
575 if (!grpc_header_key_is_legal(grpc_mdstr_as_c_string(l->md->key),
576 GRPC_MDSTR_LENGTH(l->md->key))) {
577 gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s",
578 grpc_mdstr_as_c_string(l->md->key));
579 return 0;
580 } else if (!grpc_is_binary_header(grpc_mdstr_as_c_string(l->md->key),
581 GRPC_MDSTR_LENGTH(l->md->key)) &&
582 !grpc_header_nonbin_value_is_legal(
583 grpc_mdstr_as_c_string(l->md->value),
584 GRPC_MDSTR_LENGTH(l->md->value))) {
585 gpr_log(GPR_ERROR, "attempt to send invalid metadata value");
586 return 0;
587 }
588 }
589 for (i = 1; i < count; i++) {
590 linked_from_md(&metadata[i])->prev = linked_from_md(&metadata[i - 1]);
591 }
592 for (i = 0; i < count - 1; i++) {
593 linked_from_md(&metadata[i])->next = linked_from_md(&metadata[i + 1]);
594 }
595 switch (prepend_extra_metadata * 2 + (count != 0)) {
596 case 0:
597 /* no prepend, no metadata => nothing to do */
598 batch->list.head = batch->list.tail = NULL;
599 break;
600 case 1:
601 /* metadata, but no prepend */
602 batch->list.head = linked_from_md(&metadata[0]);
603 batch->list.tail = linked_from_md(&metadata[count - 1]);
604 batch->list.head->prev = NULL;
605 batch->list.tail->next = NULL;
606 break;
607 case 2:
608 /* prepend, but no md */
609 batch->list.head = &call->send_extra_metadata[0];
610 batch->list.tail =
611 &call->send_extra_metadata[call->send_extra_metadata_count - 1];
612 batch->list.head->prev = NULL;
613 batch->list.tail->next = NULL;
614 break;
615 case 3:
616 /* prepend AND md */
617 batch->list.head = &call->send_extra_metadata[0];
618 call->send_extra_metadata[call->send_extra_metadata_count - 1].next =
619 linked_from_md(&metadata[0]);
620 linked_from_md(&metadata[0])->prev =
621 &call->send_extra_metadata[call->send_extra_metadata_count - 1];
622 batch->list.tail = linked_from_md(&metadata[count - 1]);
623 batch->list.head->prev = NULL;
624 batch->list.tail->next = NULL;
625 break;
626 default:
627 GPR_UNREACHABLE_CODE(return 0);
628 }
629
630 return 1;
631 }
632
633 void grpc_call_destroy(grpc_call *c) {
634 int cancel;
635 grpc_call *parent = c->parent;
636 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
637
638 GPR_TIMER_BEGIN("grpc_call_destroy", 0);
639 GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c));
640
641 if (parent) {
642 gpr_mu_lock(&parent->mu);
643 if (c == parent->first_child) {
644 parent->first_child = c->sibling_next;
645 if (c == parent->first_child) {
646 parent->first_child = NULL;
647 }
648 c->sibling_prev->sibling_next = c->sibling_next;
649 c->sibling_next->sibling_prev = c->sibling_prev;
650 }
651 gpr_mu_unlock(&parent->mu);
652 GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child");
653 }
654
655 gpr_mu_lock(&c->mu);
656 GPR_ASSERT(!c->destroy_called);
657 c->destroy_called = 1;
658 if (c->have_alarm) {
659 grpc_timer_cancel(&exec_ctx, &c->alarm);
660 }
661 cancel = !c->received_final_op;
662 gpr_mu_unlock(&c->mu);
663 if (cancel) grpc_call_cancel(c, NULL);
664 GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
665 grpc_exec_ctx_finish(&exec_ctx);
666 GPR_TIMER_END("grpc_call_destroy", 0);
667 }
668
669 grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
670 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
671 GPR_ASSERT(!reserved);
672 return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled",
673 NULL);
674 }
675
676 grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
677 grpc_status_code status,
678 const char *description,
679 void *reserved) {
680 grpc_call_error r;
681 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
682 GRPC_API_TRACE(
683 "grpc_call_cancel_with_status("
684 "c=%p, status=%d, description=%s, reserved=%p)",
685 4, (c, (int)status, description, reserved));
686 GPR_ASSERT(reserved == NULL);
687 gpr_mu_lock(&c->mu);
688 r = cancel_with_status(&exec_ctx, c, status, description);
689 gpr_mu_unlock(&c->mu);
690 grpc_exec_ctx_finish(&exec_ctx);
691 return r;
692 }
693
694 typedef struct cancel_closure {
695 grpc_closure closure;
696 grpc_call *call;
697 grpc_status_code status;
698 } cancel_closure;
699
700 static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
701 cancel_closure *cc = ccp;
702 GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel");
703 gpr_free(cc);
704 }
705
706 static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
707 grpc_transport_stream_op op;
708 cancel_closure *cc = ccp;
709 memset(&op, 0, sizeof(op));
710 op.cancel_with_status = cc->status;
711 /* reuse closure to catch completion */
712 grpc_closure_init(&cc->closure, done_cancel, cc);
713 op.on_complete = &cc->closure;
714 execute_op(exec_ctx, cc->call, &op);
715 }
716
717 static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
718 grpc_status_code status,
719 const char *description) {
720 grpc_mdstr *details =
721 description ? grpc_mdstr_from_string(description) : NULL;
722 cancel_closure *cc = gpr_malloc(sizeof(*cc));
723
724 GPR_ASSERT(status != GRPC_STATUS_OK);
725
726 set_status_code(c, STATUS_FROM_API_OVERRIDE, (uint32_t)status);
727 set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
728
729 grpc_closure_init(&cc->closure, send_cancel, cc);
730 cc->call = c;
731 cc->status = status;
732 GRPC_CALL_INTERNAL_REF(c, "cancel");
733 grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL);
734
735 return GRPC_CALL_OK;
736 }
737
738 static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
739 grpc_transport_stream_op *op) {
740 grpc_call_element *elem;
741
742 GPR_TIMER_BEGIN("execute_op", 0);
743 elem = CALL_ELEM_FROM_CALL(call, 0);
744 op->context = call->context;
745 elem->filter->start_transport_stream_op(exec_ctx, elem, op);
746 GPR_TIMER_END("execute_op", 0);
747 }
748
749 char *grpc_call_get_peer(grpc_call *call) {
750 grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
751 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
752 char *result;
753 GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
754 result = elem->filter->get_peer(&exec_ctx, elem);
755 if (result == NULL) {
756 result = grpc_channel_get_target(call->channel);
757 }
758 if (result == NULL) {
759 result = gpr_strdup("unknown");
760 }
761 grpc_exec_ctx_finish(&exec_ctx);
762 return result;
763 }
764
765 grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
766 return CALL_FROM_TOP_ELEM(elem);
767 }
768
769 static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
770 grpc_call *call = arg;
771 gpr_mu_lock(&call->mu);
772 call->have_alarm = 0;
773 if (success) {
774 cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED,
775 "Deadline Exceeded");
776 }
777 gpr_mu_unlock(&call->mu);
778 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm");
779 }
780
781 static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
782 gpr_timespec deadline) {
783 if (call->have_alarm) {
784 gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
785 assert(0);
786 return;
787 }
788 GRPC_CALL_INTERNAL_REF(call, "alarm");
789 call->have_alarm = 1;
790 call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
791 grpc_timer_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call,
792 gpr_now(GPR_CLOCK_MONOTONIC));
793 }
794
795 /* we offset status by a small amount when storing it into transport metadata
796 as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
797 */
798 #define STATUS_OFFSET 1
799 static void destroy_status(void *ignored) {}
800
801 static uint32_t decode_status(grpc_mdelem *md) {
802 uint32_t status;
803 void *user_data;
804 if (md == GRPC_MDELEM_GRPC_STATUS_0) return 0;
805 if (md == GRPC_MDELEM_GRPC_STATUS_1) return 1;
806 if (md == GRPC_MDELEM_GRPC_STATUS_2) return 2;
807 user_data = grpc_mdelem_get_user_data(md, destroy_status);
808 if (user_data != NULL) {
809 status = ((uint32_t)(intptr_t)user_data) - STATUS_OFFSET;
810 } else {
811 if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
812 GPR_SLICE_LENGTH(md->value->slice),
813 &status)) {
814 status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
815 }
816 grpc_mdelem_set_user_data(md, destroy_status,
817 (void *)(intptr_t)(status + STATUS_OFFSET));
818 }
819 return status;
820 }
821
822 static uint32_t decode_compression(grpc_mdelem *md) {
823 grpc_compression_algorithm algorithm =
824 grpc_compression_algorithm_from_mdstr(md->value);
825 if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) {
826 const char *md_c_str = grpc_mdstr_as_c_string(md->value);
827 gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
828 }
829 return algorithm;
830 }
831
832 static grpc_mdelem *recv_common_filter(grpc_call *call, grpc_mdelem *elem) {
833 if (elem->key == GRPC_MDSTR_GRPC_STATUS) {
834 GPR_TIMER_BEGIN("status", 0);
835 set_status_code(call, STATUS_FROM_WIRE, decode_status(elem));
836 GPR_TIMER_END("status", 0);
837 return NULL;
838 } else if (elem->key == GRPC_MDSTR_GRPC_MESSAGE) {
839 GPR_TIMER_BEGIN("status-details", 0);
840 set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(elem->value));
841 GPR_TIMER_END("status-details", 0);
842 return NULL;
843 }
844 return elem;
845 }
846
847 static grpc_mdelem *publish_app_metadata(grpc_call *call, grpc_mdelem *elem,
848 int is_trailing) {
849 grpc_metadata_array *dest;
850 grpc_metadata *mdusr;
851 GPR_TIMER_BEGIN("publish_app_metadata", 0);
852 dest = call->buffered_metadata[is_trailing];
853 if (dest->count == dest->capacity) {
854 dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
855 dest->metadata =
856 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
857 }
858 mdusr = &dest->metadata[dest->count++];
859 mdusr->key = grpc_mdstr_as_c_string(elem->key);
860 mdusr->value = grpc_mdstr_as_c_string(elem->value);
861 mdusr->value_length = GPR_SLICE_LENGTH(elem->value->slice);
862 GPR_TIMER_END("publish_app_metadata", 0);
863 return elem;
864 }
865
866 static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) {
867 grpc_call *call = callp;
868 elem = recv_common_filter(call, elem);
869 if (elem == NULL) {
870 return NULL;
871 } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) {
872 GPR_TIMER_BEGIN("compression_algorithm", 0);
873 set_compression_algorithm(call, decode_compression(elem));
874 GPR_TIMER_END("compression_algorithm", 0);
875 return NULL;
876 } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) {
877 GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
878 set_encodings_accepted_by_peer(call, elem);
879 GPR_TIMER_END("encodings_accepted_by_peer", 0);
880 return NULL;
881 } else {
882 return publish_app_metadata(call, elem, 0);
883 }
884 }
885
886 static grpc_mdelem *recv_trailing_filter(void *callp, grpc_mdelem *elem) {
887 grpc_call *call = callp;
888 elem = recv_common_filter(call, elem);
889 if (elem == NULL) {
890 return NULL;
891 } else {
892 return publish_app_metadata(call, elem, 1);
893 }
894 }
895
896 grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
897 return CALL_STACK_FROM_CALL(call);
898 }
899
900 /*
901 * BATCH API IMPLEMENTATION
902 */
903
904 static void set_status_value_directly(grpc_status_code status, void *dest) {
905 *(grpc_status_code *)dest = status;
906 }
907
908 static void set_cancelled_value(grpc_status_code status, void *dest) {
909 *(int *)dest = (status != GRPC_STATUS_OK);
910 }
911
912 static int are_write_flags_valid(uint32_t flags) {
913 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
914 const uint32_t allowed_write_positions =
915 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
916 const uint32_t invalid_positions = ~allowed_write_positions;
917 return !(flags & invalid_positions);
918 }
919
920 static batch_control *allocate_batch_control(grpc_call *call) {
921 size_t i;
922 for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) {
923 if ((call->used_batches & (1 << i)) == 0) {
924 call->used_batches = (uint8_t)(call->used_batches | (uint8_t)(1 << i));
925 return &call->active_batches[i];
926 }
927 }
928 return NULL;
929 }
930
931 static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
932 grpc_cq_completion *storage) {
933 batch_control *bctl = user_data;
934 grpc_call *call = bctl->call;
935 gpr_mu_lock(&call->mu);
936 call->used_batches = (uint8_t)(
937 call->used_batches & ~(uint8_t)(1 << (bctl - call->active_batches)));
938 gpr_mu_unlock(&call->mu);
939 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
940 }
941
942 static void post_batch_completion(grpc_exec_ctx *exec_ctx,
943 batch_control *bctl) {
944 grpc_call *call = bctl->call;
945 if (bctl->is_notify_tag_closure) {
946 grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success, NULL);
947 gpr_mu_lock(&call->mu);
948 bctl->call->used_batches =
949 (uint8_t)(bctl->call->used_batches &
950 ~(uint8_t)(1 << (bctl - bctl->call->active_batches)));
951 gpr_mu_unlock(&call->mu);
952 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
953 } else {
954 grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success,
955 finish_batch_completion, bctl, &bctl->cq_completion);
956 }
957 }
958
959 static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
960 batch_control *bctl) {
961 grpc_call *call = bctl->call;
962 for (;;) {
963 size_t remaining = call->receiving_stream->length -
964 (*call->receiving_buffer)->data.raw.slice_buffer.length;
965 if (remaining == 0) {
966 call->receiving_message = 0;
967 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
968 call->receiving_stream = NULL;
969 if (gpr_unref(&bctl->steps_to_complete)) {
970 post_batch_completion(exec_ctx, bctl);
971 }
972 return;
973 }
974 if (grpc_byte_stream_next(exec_ctx, call->receiving_stream,
975 &call->receiving_slice, remaining,
976 &call->receiving_slice_ready)) {
977 gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
978 call->receiving_slice);
979 } else {
980 return;
981 }
982 }
983 }
984
985 static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
986 bool success) {
987 batch_control *bctl = bctlp;
988 grpc_call *call = bctl->call;
989
990 if (success) {
991 gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
992 call->receiving_slice);
993 continue_receiving_slices(exec_ctx, bctl);
994 } else {
995 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
996 call->receiving_stream = NULL;
997 grpc_byte_buffer_destroy(*call->receiving_buffer);
998 *call->receiving_buffer = NULL;
999 if (gpr_unref(&bctl->steps_to_complete)) {
1000 post_batch_completion(exec_ctx, bctl);
1001 }
1002 }
1003 }
1004
1005 static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
1006 bool success) {
1007 grpc_call *call = bctl->call;
1008 if (call->receiving_stream == NULL) {
1009 *call->receiving_buffer = NULL;
1010 call->receiving_message = 0;
1011 if (gpr_unref(&bctl->steps_to_complete)) {
1012 post_batch_completion(exec_ctx, bctl);
1013 }
1014 } else if (call->receiving_stream->length >
1015 grpc_channel_get_max_message_length(call->channel)) {
1016 cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
1017 "Max message size exceeded");
1018 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
1019 call->receiving_stream = NULL;
1020 *call->receiving_buffer = NULL;
1021 call->receiving_message = 0;
1022 if (gpr_unref(&bctl->steps_to_complete)) {
1023 post_batch_completion(exec_ctx, bctl);
1024 }
1025 } else {
1026 call->test_only_last_message_flags = call->receiving_stream->flags;
1027 if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
1028 (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
1029 *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
1030 NULL, 0, call->compression_algorithm);
1031 } else {
1032 *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
1033 }
1034 grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
1035 bctl);
1036 continue_receiving_slices(exec_ctx, bctl);
1037 /* early out */
1038 return;
1039 }
1040 }
1041
1042 static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
1043 bool success) {
1044 batch_control *bctl = bctlp;
1045 grpc_call *call = bctl->call;
1046
1047 gpr_mu_lock(&bctl->call->mu);
1048 if (bctl->call->has_initial_md_been_received) {
1049 gpr_mu_unlock(&bctl->call->mu);
1050 process_data_after_md(exec_ctx, bctlp, success);
1051 } else {
1052 call->saved_receiving_stream_ready_ctx.bctlp = bctlp;
1053 call->saved_receiving_stream_ready_ctx.success = success;
1054 gpr_mu_unlock(&bctl->call->mu);
1055 }
1056 }
1057
1058 static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
1059 void *bctlp, bool success) {
1060 batch_control *bctl = bctlp;
1061 grpc_call *call = bctl->call;
1062
1063 gpr_mu_lock(&call->mu);
1064
1065 grpc_metadata_batch *md =
1066 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1067 grpc_metadata_batch_filter(md, recv_initial_filter, call);
1068 call->has_initial_md_been_received = true;
1069
1070 if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
1071 0 &&
1072 !call->is_client) {
1073 GPR_TIMER_BEGIN("set_deadline_alarm", 0);
1074 set_deadline_alarm(exec_ctx, call, md->deadline);
1075 GPR_TIMER_END("set_deadline_alarm", 0);
1076 }
1077
1078 if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) {
1079 grpc_closure *saved_rsr_closure = grpc_closure_create(
1080 receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp);
1081 grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure,
1082 call->saved_receiving_stream_ready_ctx.success, NULL);
1083 call->saved_receiving_stream_ready_ctx.bctlp = NULL;
1084 }
1085
1086 gpr_mu_unlock(&call->mu);
1087
1088 if (gpr_unref(&bctl->steps_to_complete)) {
1089 post_batch_completion(exec_ctx, bctl);
1090 }
1091 }
1092
1093 static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
1094 batch_control *bctl = bctlp;
1095 grpc_call *call = bctl->call;
1096 grpc_call *child_call;
1097 grpc_call *next_child_call;
1098
1099 gpr_mu_lock(&call->mu);
1100 if (bctl->send_initial_metadata) {
1101 grpc_metadata_batch_destroy(
1102 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1103 }
1104 if (bctl->send_message) {
1105 call->sending_message = 0;
1106 }
1107 if (bctl->send_final_op) {
1108 grpc_metadata_batch_destroy(
1109 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1110 }
1111 if (bctl->recv_final_op) {
1112 grpc_metadata_batch *md =
1113 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1114 grpc_metadata_batch_filter(md, recv_trailing_filter, call);
1115
1116 if (call->have_alarm) {
1117 grpc_timer_cancel(exec_ctx, &call->alarm);
1118 }
1119 /* propagate cancellation to any interested children */
1120 child_call = call->first_child;
1121 if (child_call != NULL) {
1122 do {
1123 next_child_call = child_call->sibling_next;
1124 if (child_call->cancellation_is_inherited) {
1125 GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
1126 grpc_call_cancel(child_call, NULL);
1127 GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
1128 }
1129 child_call = next_child_call;
1130 } while (child_call != call->first_child);
1131 }
1132
1133 if (call->is_client) {
1134 get_final_status(call, set_status_value_directly,
1135 call->final_op.client.status);
1136 get_final_details(call, call->final_op.client.status_details,
1137 call->final_op.client.status_details_capacity);
1138 } else {
1139 get_final_status(call, set_cancelled_value,
1140 call->final_op.server.cancelled);
1141 }
1142
1143 success = 1;
1144 }
1145 bctl->success = success != 0;
1146 gpr_mu_unlock(&call->mu);
1147 if (gpr_unref(&bctl->steps_to_complete)) {
1148 post_batch_completion(exec_ctx, bctl);
1149 }
1150 }
1151
1152 static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
1153 grpc_call *call, const grpc_op *ops,
1154 size_t nops, void *notify_tag,
1155 int is_notify_tag_closure) {
1156 grpc_transport_stream_op stream_op;
1157 size_t i;
1158 const grpc_op *op;
1159 batch_control *bctl;
1160 int num_completion_callbacks_needed = 1;
1161 grpc_call_error error = GRPC_CALL_OK;
1162
1163 GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
1164
1165 GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
1166
1167 memset(&stream_op, 0, sizeof(stream_op));
1168
1169 /* TODO(ctiller): this feels like it could be made lock-free */
1170 gpr_mu_lock(&call->mu);
1171 bctl = allocate_batch_control(call);
1172 memset(bctl, 0, sizeof(*bctl));
1173 bctl->call = call;
1174 bctl->notify_tag = notify_tag;
1175 bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
1176
1177 if (nops == 0) {
1178 GRPC_CALL_INTERNAL_REF(call, "completion");
1179 bctl->success = 1;
1180 if (!is_notify_tag_closure) {
1181 grpc_cq_begin_op(call->cq, notify_tag);
1182 }
1183 gpr_mu_unlock(&call->mu);
1184 post_batch_completion(exec_ctx, bctl);
1185 error = GRPC_CALL_OK;
1186 goto done;
1187 }
1188
1189 /* rewrite batch ops into a transport op */
1190 for (i = 0; i < nops; i++) {
1191 op = &ops[i];
1192 if (op->reserved != NULL) {
1193 error = GRPC_CALL_ERROR;
1194 goto done_with_error;
1195 }
1196 switch (op->op) {
1197 case GRPC_OP_SEND_INITIAL_METADATA:
1198 /* Flag validation: currently allow no flags */
1199 if (op->flags != 0) {
1200 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1201 goto done_with_error;
1202 }
1203 if (call->sent_initial_metadata) {
1204 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1205 goto done_with_error;
1206 }
1207 if (op->data.send_initial_metadata.count > INT_MAX) {
1208 error = GRPC_CALL_ERROR_INVALID_METADATA;
1209 goto done_with_error;
1210 }
1211 bctl->send_initial_metadata = 1;
1212 call->sent_initial_metadata = 1;
1213 if (!prepare_application_metadata(
1214 call, (int)op->data.send_initial_metadata.count,
1215 op->data.send_initial_metadata.metadata, 0, call->is_client)) {
1216 error = GRPC_CALL_ERROR_INVALID_METADATA;
1217 goto done_with_error;
1218 }
1219 /* TODO(ctiller): just make these the same variable? */
1220 call->metadata_batch[0][0].deadline = call->send_deadline;
1221 stream_op.send_initial_metadata =
1222 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1223 break;
1224 case GRPC_OP_SEND_MESSAGE:
1225 if (!are_write_flags_valid(op->flags)) {
1226 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1227 goto done_with_error;
1228 }
1229 if (op->data.send_message == NULL) {
1230 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1231 goto done_with_error;
1232 }
1233 if (call->sending_message) {
1234 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1235 goto done_with_error;
1236 }
1237 bctl->send_message = 1;
1238 call->sending_message = 1;
1239 grpc_slice_buffer_stream_init(
1240 &call->sending_stream,
1241 &op->data.send_message->data.raw.slice_buffer, op->flags);
1242 stream_op.send_message = &call->sending_stream.base;
1243 break;
1244 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1245 /* Flag validation: currently allow no flags */
1246 if (op->flags != 0) {
1247 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1248 goto done_with_error;
1249 }
1250 if (!call->is_client) {
1251 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1252 goto done_with_error;
1253 }
1254 if (call->sent_final_op) {
1255 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1256 goto done_with_error;
1257 }
1258 bctl->send_final_op = 1;
1259 call->sent_final_op = 1;
1260 stream_op.send_trailing_metadata =
1261 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1262 break;
1263 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1264 /* Flag validation: currently allow no flags */
1265 if (op->flags != 0) {
1266 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1267 goto done_with_error;
1268 }
1269 if (call->is_client) {
1270 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1271 goto done_with_error;
1272 }
1273 if (call->sent_final_op) {
1274 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1275 goto done_with_error;
1276 }
1277 if (op->data.send_status_from_server.trailing_metadata_count >
1278 INT_MAX) {
1279 error = GRPC_CALL_ERROR_INVALID_METADATA;
1280 goto done_with_error;
1281 }
1282 bctl->send_final_op = 1;
1283 call->sent_final_op = 1;
1284 call->send_extra_metadata_count = 1;
1285 call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
1286 call->channel, op->data.send_status_from_server.status);
1287 if (op->data.send_status_from_server.status_details != NULL) {
1288 call->send_extra_metadata[1].md = grpc_mdelem_from_metadata_strings(
1289 GRPC_MDSTR_GRPC_MESSAGE,
1290 grpc_mdstr_from_string(
1291 op->data.send_status_from_server.status_details));
1292 call->send_extra_metadata_count++;
1293 set_status_details(
1294 call, STATUS_FROM_API_OVERRIDE,
1295 GRPC_MDSTR_REF(call->send_extra_metadata[1].md->value));
1296 }
1297 set_status_code(call, STATUS_FROM_API_OVERRIDE,
1298 (uint32_t)op->data.send_status_from_server.status);
1299 if (!prepare_application_metadata(
1300 call,
1301 (int)op->data.send_status_from_server.trailing_metadata_count,
1302 op->data.send_status_from_server.trailing_metadata, 1, 1)) {
1303 error = GRPC_CALL_ERROR_INVALID_METADATA;
1304 goto done_with_error;
1305 }
1306 stream_op.send_trailing_metadata =
1307 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1308 break;
1309 case GRPC_OP_RECV_INITIAL_METADATA:
1310 /* Flag validation: currently allow no flags */
1311 if (op->flags != 0) {
1312 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1313 goto done_with_error;
1314 }
1315 if (call->received_initial_metadata) {
1316 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1317 goto done_with_error;
1318 }
1319 call->received_initial_metadata = 1;
1320 call->buffered_metadata[0] = op->data.recv_initial_metadata;
1321 grpc_closure_init(&call->receiving_initial_metadata_ready,
1322 receiving_initial_metadata_ready, bctl);
1323 bctl->recv_initial_metadata = 1;
1324 stream_op.recv_initial_metadata =
1325 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1326 stream_op.recv_initial_metadata_ready =
1327 &call->receiving_initial_metadata_ready;
1328 num_completion_callbacks_needed++;
1329 break;
1330 case GRPC_OP_RECV_MESSAGE:
1331 /* Flag validation: currently allow no flags */
1332 if (op->flags != 0) {
1333 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1334 goto done_with_error;
1335 }
1336 if (call->receiving_message) {
1337 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1338 goto done_with_error;
1339 }
1340 call->receiving_message = 1;
1341 bctl->recv_message = 1;
1342 call->receiving_buffer = op->data.recv_message;
1343 stream_op.recv_message = &call->receiving_stream;
1344 grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
1345 bctl);
1346 stream_op.recv_message_ready = &call->receiving_stream_ready;
1347 num_completion_callbacks_needed++;
1348 break;
1349 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1350 /* Flag validation: currently allow no flags */
1351 if (op->flags != 0) {
1352 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1353 goto done_with_error;
1354 }
1355 if (!call->is_client) {
1356 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1357 goto done_with_error;
1358 }
1359 if (call->received_final_op) {
1360 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1361 goto done_with_error;
1362 }
1363 call->received_final_op = 1;
1364 call->buffered_metadata[1] =
1365 op->data.recv_status_on_client.trailing_metadata;
1366 call->final_op.client.status = op->data.recv_status_on_client.status;
1367 call->final_op.client.status_details =
1368 op->data.recv_status_on_client.status_details;
1369 call->final_op.client.status_details_capacity =
1370 op->data.recv_status_on_client.status_details_capacity;
1371 bctl->recv_final_op = 1;
1372 stream_op.recv_trailing_metadata =
1373 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1374 break;
1375 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1376 /* Flag validation: currently allow no flags */
1377 if (op->flags != 0) {
1378 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1379 goto done_with_error;
1380 }
1381 if (call->is_client) {
1382 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1383 goto done_with_error;
1384 }
1385 if (call->received_final_op) {
1386 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1387 goto done_with_error;
1388 }
1389 call->received_final_op = 1;
1390 call->final_op.server.cancelled =
1391 op->data.recv_close_on_server.cancelled;
1392 bctl->recv_final_op = 1;
1393 stream_op.recv_trailing_metadata =
1394 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1395 break;
1396 }
1397 }
1398
1399 GRPC_CALL_INTERNAL_REF(call, "completion");
1400 if (!is_notify_tag_closure) {
1401 grpc_cq_begin_op(call->cq, notify_tag);
1402 }
1403 gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
1404
1405 stream_op.context = call->context;
1406 grpc_closure_init(&bctl->finish_batch, finish_batch, bctl);
1407 stream_op.on_complete = &bctl->finish_batch;
1408 gpr_mu_unlock(&call->mu);
1409
1410 execute_op(exec_ctx, call, &stream_op);
1411
1412 done:
1413 GPR_TIMER_END("grpc_call_start_batch", 0);
1414 return error;
1415
1416 done_with_error:
1417 /* reverse any mutations that occured */
1418 if (bctl->send_initial_metadata) {
1419 call->sent_initial_metadata = 0;
1420 grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1421 }
1422 if (bctl->send_message) {
1423 call->sending_message = 0;
1424 grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base);
1425 }
1426 if (bctl->send_final_op) {
1427 call->sent_final_op = 0;
1428 grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1429 }
1430 if (bctl->recv_initial_metadata) {
1431 call->received_initial_metadata = 0;
1432 }
1433 if (bctl->recv_message) {
1434 call->receiving_message = 0;
1435 }
1436 if (bctl->recv_final_op) {
1437 call->received_final_op = 0;
1438 }
1439 gpr_mu_unlock(&call->mu);
1440 goto done;
1441 }
1442
1443 grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
1444 size_t nops, void *tag, void *reserved) {
1445 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1446 grpc_call_error err;
1447
1448 GRPC_API_TRACE(
1449 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)",
1450 5, (call, ops, (unsigned long)nops, tag, reserved));
1451
1452 if (reserved != NULL) {
1453 err = GRPC_CALL_ERROR;
1454 } else {
1455 err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0);
1456 }
1457
1458 grpc_exec_ctx_finish(&exec_ctx);
1459 return err;
1460 }
1461
1462 grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx,
1463 grpc_call *call,
1464 const grpc_op *ops,
1465 size_t nops,
1466 grpc_closure *closure) {
1467 return call_start_batch(exec_ctx, call, ops, nops, closure, 1);
1468 }
1469
1470 void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
1471 void *value, void (*destroy)(void *value)) {
1472 if (call->context[elem].destroy) {
1473 call->context[elem].destroy(call->context[elem].value);
1474 }
1475 call->context[elem].value = value;
1476 call->context[elem].destroy = destroy;
1477 }
1478
1479 void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
1480 return call->context[elem].value;
1481 }
1482
1483 uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; }
OLDNEW
« no previous file with comments | « third_party/grpc/src/core/surface/call.h ('k') | third_party/grpc/src/core/surface/call_details.c » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698