diff options
author | Muxi Yan <mxyan@google.com> | 2017-07-12 11:49:27 -0700 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2017-07-12 14:03:12 -0700 |
commit | e89c83a21bf2975e6c686e5dc11fc5d17a22d590 (patch) | |
tree | 4f9bf1e2120d142576898cdbeee09aaa6a56c1b4 /src | |
parent | 68198b57442ac59cd9e18fdcbfe580c52f7faa62 (diff) |
Transport layer send (compression) path
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 6 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 11 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/writing.c | 82 |
3 files changed, 79 insertions, 20 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index aea4ce4e28..00b0738b20 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -668,6 +668,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); + grpc_slice_buffer_init(&s->compressed_data_buffer); grpc_slice_buffer_init(&s->decompressed_data_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s, @@ -707,6 +708,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_destroy_internal(exec_ctx, &s->unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); + grpc_slice_buffer_destroy_internal(exec_ctx, &s->compressed_data_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->decompressed_data_buffer); grpc_chttp2_list_remove_stalled_by_transport(t, s); @@ -758,6 +760,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + if (s->stream_compression_ctx != NULL) { + grpc_stream_compression_context_destroy(s->stream_compression_ctx); + s->stream_compression_ctx = NULL; + } if (s->stream_decompression_ctx != NULL) { grpc_stream_compression_context_destroy(s->stream_decompression_ctx); s->stream_decompression_ctx = NULL; diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 64196d8c1c..0f29b10d37 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -522,12 +522,21 @@ struct grpc_chttp2_stream { /** Whether stream compression send is enabled or not */ bool stream_compression_recv_enabled; + /** Whether stream compression recv is enabled or not */ + bool stream_compression_send_enabled; /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed */ bool unprocessed_incoming_frames_decompressed; /** Stream compression decompress context */ grpc_stream_compression_context *stream_decompression_ctx; - + /** Stream compression compress context */ + grpc_stream_compression_context *stream_compression_ctx; + + /** Buffer storing data that is compressed but not sent */ + grpc_slice_buffer compressed_data_buffer; + /** Amount of uncompressed bytes sent out when compressed_data_buffer is + * emptied */ + size_t uncompressed_data_size; /** Temporary buffer storing decompressed data */ grpc_slice_buffer decompressed_data_buffer; }; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 315f2a67a2..d7221eaf30 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -303,7 +303,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } if (sent_initial_metadata) { /* send any body bytes, if allowed by flow control */ - if (s->flow_controlled_buffer.length > 0) { + if (s->flow_controlled_buffer.length > 0 || + s->compressed_data_buffer.length > 0) { uint32_t stream_outgoing_window = (uint32_t)GPR_MAX( 0, s->outgoing_window_delta + @@ -314,21 +315,63 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], GPR_MIN(stream_outgoing_window, t->outgoing_window)); if (max_outgoing > 0) { - uint32_t send_bytes = - (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length); - bool is_last_data_frame = - s->fetching_send_message == NULL && - send_bytes == s->flow_controlled_buffer.length; - bool is_last_frame = - is_last_data_frame && s->send_trailing_metadata != NULL && - grpc_metadata_batch_is_empty(s->send_trailing_metadata); - grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes, - is_last_frame, &s->stats.outgoing, - &t->outbuf); - GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta, - send_bytes); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, - send_bytes); + bool is_last_data_frame; + bool is_last_frame; + if (s->stream_compression_send_enabled) { + while ((s->flow_controlled_buffer.length > 0 || + s->compressed_data_buffer.length > 0) && + max_outgoing > 0) { + if (s->compressed_data_buffer.length > 0) { + uint32_t send_bytes = (uint32_t)GPR_MIN( + max_outgoing, s->compressed_data_buffer.length); + is_last_data_frame = + (send_bytes == s->compressed_data_buffer.length && + s->flow_controlled_buffer.length == 0 && + s->fetching_send_message == NULL); + is_last_frame = + is_last_data_frame && s->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty(s->send_trailing_metadata); + grpc_chttp2_encode_data(s->id, &s->compressed_data_buffer, + send_bytes, is_last_frame, + &s->stats.outgoing, &t->outbuf); + GRPC_CHTTP2_FLOW_DEBIT_STREAM( + "write", t, s, outgoing_window_delta, send_bytes); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, + send_bytes); + max_outgoing -= send_bytes; + if (s->compressed_data_buffer.length == 0) { + s->sending_bytes += s->uncompressed_data_size; + } + } else { + if (s->stream_compression_ctx == NULL) { + s->stream_compression_ctx = + grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_COMPRESS); + } + s->uncompressed_data_size = s->flow_controlled_buffer.length; + GPR_ASSERT(grpc_stream_compress( + s->stream_compression_ctx, &s->flow_controlled_buffer, + &s->compressed_data_buffer, NULL, ~(size_t)0, + GRPC_STREAM_COMPRESSION_FLUSH_SYNC)); + } + } + } else { + uint32_t send_bytes = (uint32_t)GPR_MIN( + max_outgoing, s->flow_controlled_buffer.length); + is_last_data_frame = s->fetching_send_message == NULL && + send_bytes == s->flow_controlled_buffer.length; + is_last_frame = + is_last_data_frame && s->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty(s->send_trailing_metadata); + grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, + send_bytes, is_last_frame, + &s->stats.outgoing, &t->outbuf); + GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta, + send_bytes); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, + send_bytes); + s->sending_bytes += send_bytes; + } t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; if (!t->is_client) { @@ -345,9 +388,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( &s->stats.outgoing)); } } - s->sending_bytes += send_bytes; now_writing = true; - if (s->flow_controlled_buffer.length > 0) { + if (s->flow_controlled_buffer.length > 0 || + s->compressed_data_buffer.length > 0) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } @@ -361,7 +404,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } if (s->send_trailing_metadata != NULL && s->fetching_send_message == NULL && - s->flow_controlled_buffer.length == 0) { + s->flow_controlled_buffer.length == 0 && + s->compressed_data_buffer.length == 0) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) { grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true, |