Index: third_party/grpc/src/core/transport/chttp2/writing.c |
diff --git a/third_party/grpc/src/core/transport/chttp2/writing.c b/third_party/grpc/src/core/transport/chttp2/writing.c |
new file mode 100644 |
index 0000000000000000000000000000000000000000..107725cbc797555d8e8d46d05f808faa84a1cc34 |
--- /dev/null |
+++ b/third_party/grpc/src/core/transport/chttp2/writing.c |
@@ -0,0 +1,350 @@ |
+/* |
+ * |
+ * Copyright 2015-2016, Google Inc. |
+ * All rights reserved. |
+ * |
+ * Redistribution and use in source and binary forms, with or without |
+ * modification, are permitted provided that the following conditions are |
+ * met: |
+ * |
+ * * Redistributions of source code must retain the above copyright |
+ * notice, this list of conditions and the following disclaimer. |
+ * * Redistributions in binary form must reproduce the above |
+ * copyright notice, this list of conditions and the following disclaimer |
+ * in the documentation and/or other materials provided with the |
+ * distribution. |
+ * * Neither the name of Google Inc. nor the names of its |
+ * contributors may be used to endorse or promote products derived from |
+ * this software without specific prior written permission. |
+ * |
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ * |
+ */ |
+ |
+#include "src/core/transport/chttp2/internal.h" |
+ |
+#include <limits.h> |
+ |
+#include <grpc/support/log.h> |
+ |
+#include "src/core/profiling/timers.h" |
+#include "src/core/transport/chttp2/http2_errors.h" |
+ |
+static void finalize_outbuf(grpc_exec_ctx *exec_ctx, |
+ grpc_chttp2_transport_writing *transport_writing); |
+ |
+int grpc_chttp2_unlocking_check_writes( |
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, |
+ grpc_chttp2_transport_writing *transport_writing, int is_parsing) { |
+ grpc_chttp2_stream_global *stream_global; |
+ grpc_chttp2_stream_writing *stream_writing; |
+ |
+ GPR_TIMER_BEGIN("grpc_chttp2_unlocking_check_writes", 0); |
+ |
+ /* simple writes are queued to qbuf, and flushed here */ |
+ gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf); |
+ GPR_ASSERT(transport_global->qbuf.count == 0); |
+ |
+ grpc_chttp2_hpack_compressor_set_max_table_size( |
+ &transport_writing->hpack_compressor, |
+ transport_global->settings[GRPC_PEER_SETTINGS] |
+ [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); |
+ |
+ if (transport_global->dirtied_local_settings && |
+ !transport_global->sent_local_settings && !is_parsing) { |
+ gpr_slice_buffer_add( |
+ &transport_writing->outbuf, |
+ grpc_chttp2_settings_create( |
+ transport_global->settings[GRPC_SENT_SETTINGS], |
+ transport_global->settings[GRPC_LOCAL_SETTINGS], |
+ transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); |
+ transport_global->force_send_settings = 0; |
+ transport_global->dirtied_local_settings = 0; |
+ transport_global->sent_local_settings = 1; |
+ } |
+ |
+ GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, |
+ transport_global, outgoing_window); |
+ bool is_window_available = transport_writing->outgoing_window > 0; |
+ grpc_chttp2_list_flush_writing_stalled_by_transport( |
+ exec_ctx, transport_writing, is_window_available); |
+ |
+ /* for each grpc_chttp2_stream that's become writable, frame it's data |
+ (according to available window sizes) and add to the output buffer */ |
+ while (grpc_chttp2_list_pop_writable_stream( |
+ transport_global, transport_writing, &stream_global, &stream_writing)) { |
+ bool sent_initial_metadata = stream_writing->sent_initial_metadata; |
+ bool become_writable = false; |
+ |
+ stream_writing->id = stream_global->id; |
+ stream_writing->read_closed = stream_global->read_closed; |
+ |
+ GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_writing, stream_writing, |
+ outgoing_window, stream_global, |
+ outgoing_window); |
+ |
+ if (!sent_initial_metadata && stream_global->send_initial_metadata) { |
+ stream_writing->send_initial_metadata = |
+ stream_global->send_initial_metadata; |
+ stream_global->send_initial_metadata = NULL; |
+ become_writable = true; |
+ sent_initial_metadata = true; |
+ } |
+ if (sent_initial_metadata) { |
+ if (stream_global->send_message != NULL) { |
+ gpr_slice hdr = gpr_slice_malloc(5); |
+ uint8_t *p = GPR_SLICE_START_PTR(hdr); |
+ uint32_t len = stream_global->send_message->length; |
+ GPR_ASSERT(stream_writing->send_message == NULL); |
+ p[0] = (stream_global->send_message->flags & |
+ GRPC_WRITE_INTERNAL_COMPRESS) != 0; |
+ p[1] = (uint8_t)(len >> 24); |
+ p[2] = (uint8_t)(len >> 16); |
+ p[3] = (uint8_t)(len >> 8); |
+ p[4] = (uint8_t)(len); |
+ gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer, hdr); |
+ if (stream_global->send_message->length > 0) { |
+ stream_writing->send_message = stream_global->send_message; |
+ } else { |
+ stream_writing->send_message = NULL; |
+ } |
+ stream_writing->stream_fetched = 0; |
+ stream_global->send_message = NULL; |
+ } |
+ if ((stream_writing->send_message != NULL || |
+ stream_writing->flow_controlled_buffer.length > 0) && |
+ stream_writing->outgoing_window > 0) { |
+ if (transport_writing->outgoing_window > 0) { |
+ become_writable = true; |
+ } else { |
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing, |
+ stream_writing); |
+ } |
+ } |
+ if (stream_global->send_trailing_metadata) { |
+ stream_writing->send_trailing_metadata = |
+ stream_global->send_trailing_metadata; |
+ stream_global->send_trailing_metadata = NULL; |
+ become_writable = true; |
+ } |
+ } |
+ |
+ if (!stream_global->read_closed && |
+ stream_global->unannounced_incoming_window_for_writing > 1024) { |
+ GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing, |
+ announce_window, stream_global, |
+ unannounced_incoming_window_for_writing); |
+ become_writable = true; |
+ } |
+ |
+ if (become_writable) { |
+ grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); |
+ } else { |
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); |
+ } |
+ } |
+ |
+ /* if the grpc_chttp2_transport is ready to send a window update, do so here |
+ also; 3/4 is a magic number that will likely get tuned soon */ |
+ if (transport_global->announce_incoming_window > 0) { |
+ uint32_t announced = (uint32_t)GPR_MIN( |
+ transport_global->announce_incoming_window, UINT32_MAX); |
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global, |
+ announce_incoming_window, announced); |
+ gpr_slice_buffer_add(&transport_writing->outbuf, |
+ grpc_chttp2_window_update_create(0, announced)); |
+ } |
+ |
+ GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0); |
+ |
+ return transport_writing->outbuf.count > 0 || |
+ grpc_chttp2_list_have_writing_streams(transport_writing); |
+} |
+ |
+void grpc_chttp2_perform_writes( |
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, |
+ grpc_endpoint *endpoint) { |
+ GPR_ASSERT(transport_writing->outbuf.count > 0 || |
+ grpc_chttp2_list_have_writing_streams(transport_writing)); |
+ |
+ finalize_outbuf(exec_ctx, transport_writing); |
+ |
+ GPR_ASSERT(endpoint); |
+ |
+ if (transport_writing->outbuf.count > 0) { |
+ grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf, |
+ &transport_writing->done_cb); |
+ } else { |
+ grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, true, NULL); |
+ } |
+} |
+ |
+static void finalize_outbuf(grpc_exec_ctx *exec_ctx, |
+ grpc_chttp2_transport_writing *transport_writing) { |
+ grpc_chttp2_stream_writing *stream_writing; |
+ |
+ GPR_TIMER_BEGIN("finalize_outbuf", 0); |
+ |
+ while ( |
+ grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { |
+ uint32_t max_outgoing = |
+ (uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH, |
+ GPR_MIN(stream_writing->outgoing_window, |
+ transport_writing->outgoing_window)); |
+ /* send initial metadata if it's available */ |
+ if (stream_writing->send_initial_metadata != NULL) { |
+ grpc_chttp2_encode_header( |
+ &transport_writing->hpack_compressor, stream_writing->id, |
+ stream_writing->send_initial_metadata, 0, &transport_writing->outbuf); |
+ stream_writing->send_initial_metadata = NULL; |
+ stream_writing->sent_initial_metadata = 1; |
+ } |
+ /* send any window updates */ |
+ if (stream_writing->announce_window > 0 && |
+ stream_writing->send_initial_metadata == NULL) { |
+ uint32_t announce = stream_writing->announce_window; |
+ gpr_slice_buffer_add( |
+ &transport_writing->outbuf, |
+ grpc_chttp2_window_update_create(stream_writing->id, |
+ stream_writing->announce_window)); |
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, stream_writing, |
+ announce_window, announce); |
+ stream_writing->announce_window = 0; |
+ } |
+ /* fetch any body bytes */ |
+ while (!stream_writing->fetching && stream_writing->send_message && |
+ stream_writing->flow_controlled_buffer.length < max_outgoing && |
+ stream_writing->stream_fetched < |
+ stream_writing->send_message->length) { |
+ if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message, |
+ &stream_writing->fetching_slice, max_outgoing, |
+ &stream_writing->finished_fetch)) { |
+ stream_writing->stream_fetched += |
+ GPR_SLICE_LENGTH(stream_writing->fetching_slice); |
+ if (stream_writing->stream_fetched == |
+ stream_writing->send_message->length) { |
+ stream_writing->send_message = NULL; |
+ } |
+ gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer, |
+ stream_writing->fetching_slice); |
+ } else { |
+ stream_writing->fetching = 1; |
+ } |
+ } |
+ /* send any body bytes */ |
+ if (stream_writing->flow_controlled_buffer.length > 0) { |
+ if (max_outgoing > 0) { |
+ uint32_t send_bytes = (uint32_t)GPR_MIN( |
+ max_outgoing, stream_writing->flow_controlled_buffer.length); |
+ int is_last_data_frame = |
+ stream_writing->send_message == NULL && |
+ send_bytes == stream_writing->flow_controlled_buffer.length; |
+ int is_last_frame = is_last_data_frame && |
+ stream_writing->send_trailing_metadata != NULL && |
+ grpc_metadata_batch_is_empty( |
+ stream_writing->send_trailing_metadata); |
+ grpc_chttp2_encode_data( |
+ stream_writing->id, &stream_writing->flow_controlled_buffer, |
+ send_bytes, is_last_frame, &transport_writing->outbuf); |
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, |
+ stream_writing, outgoing_window, |
+ send_bytes); |
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing, |
+ outgoing_window, send_bytes); |
+ if (is_last_frame) { |
+ stream_writing->send_trailing_metadata = NULL; |
+ stream_writing->sent_trailing_metadata = 1; |
+ } |
+ if (is_last_data_frame) { |
+ GPR_ASSERT(stream_writing->send_message == NULL); |
+ stream_writing->sent_message = 1; |
+ } |
+ } else if (transport_writing->outgoing_window == 0) { |
+ grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, |
+ stream_writing); |
+ grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); |
+ } |
+ } |
+ /* send trailing metadata if it's available and we're ready for it */ |
+ if (stream_writing->send_message == NULL && |
+ stream_writing->flow_controlled_buffer.length == 0 && |
+ stream_writing->send_trailing_metadata != NULL) { |
+ if (grpc_metadata_batch_is_empty( |
+ stream_writing->send_trailing_metadata)) { |
+ grpc_chttp2_encode_data(stream_writing->id, |
+ &stream_writing->flow_controlled_buffer, 0, 1, |
+ &transport_writing->outbuf); |
+ } else { |
+ grpc_chttp2_encode_header(&transport_writing->hpack_compressor, |
+ stream_writing->id, |
+ stream_writing->send_trailing_metadata, 1, |
+ &transport_writing->outbuf); |
+ } |
+ if (!transport_writing->is_client && !stream_writing->read_closed) { |
+ gpr_slice_buffer_add(&transport_writing->outbuf, |
+ grpc_chttp2_rst_stream_create( |
+ stream_writing->id, GRPC_CHTTP2_NO_ERROR)); |
+ } |
+ stream_writing->send_trailing_metadata = NULL; |
+ stream_writing->sent_trailing_metadata = 1; |
+ } |
+ /* if there's more to write, then loop, otherwise prepare to finish the |
+ * write */ |
+ if ((stream_writing->flow_controlled_buffer.length > 0 || |
+ (stream_writing->send_message && !stream_writing->fetching)) && |
+ stream_writing->outgoing_window > 0) { |
+ if (transport_writing->outgoing_window > 0) { |
+ grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); |
+ } else { |
+ grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, |
+ stream_writing); |
+ grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); |
+ } |
+ } else { |
+ grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); |
+ } |
+ } |
+ |
+ GPR_TIMER_END("finalize_outbuf", 0); |
+} |
+ |
+void grpc_chttp2_cleanup_writing( |
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, |
+ grpc_chttp2_transport_writing *transport_writing) { |
+ grpc_chttp2_stream_writing *stream_writing; |
+ grpc_chttp2_stream_global *stream_global; |
+ |
+ while (grpc_chttp2_list_pop_written_stream( |
+ transport_global, transport_writing, &stream_global, &stream_writing)) { |
+ if (stream_writing->sent_initial_metadata) { |
+ grpc_chttp2_complete_closure_step( |
+ exec_ctx, &stream_global->send_initial_metadata_finished, 1); |
+ } |
+ if (stream_writing->sent_message) { |
+ GPR_ASSERT(stream_writing->send_message == NULL); |
+ grpc_chttp2_complete_closure_step( |
+ exec_ctx, &stream_global->send_message_finished, 1); |
+ stream_writing->sent_message = 0; |
+ } |
+ if (stream_writing->sent_trailing_metadata) { |
+ grpc_chttp2_complete_closure_step( |
+ exec_ctx, &stream_global->send_trailing_metadata_finished, 1); |
+ } |
+ if (stream_writing->sent_trailing_metadata) { |
+ grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, |
+ !transport_global->is_client, 1); |
+ } |
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); |
+ } |
+ gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf); |
+} |