diff options
author | Muxi Yan <muxi@users.noreply.github.com> | 2017-07-26 17:00:53 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-26 17:00:53 -0700 |
commit | ddc0d374886f3db33db90c6c1be163214cc5147d (patch) | |
tree | 890dca6b3e5030ee313ab6ca6c6b32afa4f884e0 | |
parent | c44f036ee2bda2fb24b85b23eebc68213926fa2c (diff) | |
parent | c7a94c5052a2409ac68b991ae116d5f9e60eb545 (diff) |
Merge pull request #11780 from muxi/stream_compression_transport
Implement stream compression - transport layer
4 files changed, 206 insertions, 36 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index b1a7698811..dc35f4855c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -730,6 +730,14 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_destroy_internal(exec_ctx, &s->unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); + if (s->compressed_data_buffer) { + grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer); + gpr_free(s->compressed_data_buffer); + } + if (s->decompressed_data_buffer) { + grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer); + gpr_free(s->decompressed_data_buffer); + } grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s); @@ -780,6 +788,15 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + if (s->stream_compression_ctx != NULL) { + grpc_stream_compression_context_destroy(s->stream_compression_ctx); + s->stream_compression_ctx = NULL; + } + if (s->stream_decompression_ctx != NULL) { + grpc_stream_compression_context_destroy(s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + s->destroy_stream_arg = then_schedule_closure; GRPC_CLOSURE_SCHED( exec_ctx, GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, @@ -1367,8 +1384,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, "fetching_send_message_finished"); } else { GPR_ASSERT(s->fetching_send_message == NULL); - uint8_t *frame_hdr = - grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5); + uint8_t *frame_hdr = grpc_slice_buffer_tiny_add( + &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES); uint32_t flags = op_payload->send_message.send_message->flags; frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; size_t len = op_payload->send_message.send_message->length; @@ -1459,14 +1476,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, s->recv_message_ready = op_payload->recv_message.recv_message_ready; s->recv_message = op_payload->recv_message.recv_message; if (s->id != 0) { - if (s->pending_byte_stream) { - already_received = s->frame_storage.length; - } else { - already_received = s->frame_storage.length + - s->unprocessed_incoming_frames_buffer.length; - } - incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, - already_received); + already_received = s->frame_storage.length; + incoming_byte_stream_update_flow_control( + exec_ctx, t, s, GRPC_HEADER_SIZE_IN_BYTES, already_received); } grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } @@ -1703,10 +1715,43 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, if (s->unprocessed_incoming_frames_buffer.length == 0) { grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, &s->frame_storage); + s->unprocessed_incoming_frames_decompressed = false; + } + if (s->stream_compression_recv_enabled && + !s->unprocessed_incoming_frames_decompressed) { + GPR_ASSERT(s->decompressed_data_buffer->length == 0); + bool end_of_context; + if (!s->stream_decompression_ctx) { + s->stream_decompression_ctx = + grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_DECOMPRESS); + } + if (!grpc_stream_decompress(s->stream_decompression_ctx, + &s->unprocessed_incoming_frames_buffer, + s->decompressed_data_buffer, NULL, + GRPC_HEADER_SIZE_IN_BYTES, + &end_of_context)) { + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal( + exec_ctx, &s->unprocessed_incoming_frames_buffer); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Stream decompression error."); + } else { + error = grpc_deframe_unprocessed_incoming_frames( + exec_ctx, &s->data_parser, s, s->decompressed_data_buffer, NULL, + s->recv_message); + if (end_of_context) { + grpc_stream_compression_context_destroy( + s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + } + } else { + error = grpc_deframe_unprocessed_incoming_frames( + exec_ctx, &s->data_parser, s, + &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message); } - error = grpc_deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, - &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message); if (error != GRPC_ERROR_NONE) { s->seen_error = true; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, @@ -1744,7 +1789,37 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } bool pending_data = s->pending_byte_stream || s->unprocessed_incoming_frames_buffer.length > 0; + if (s->stream_compression_recv_enabled && s->read_closed && + s->frame_storage.length > 0 && + s->unprocessed_incoming_frames_buffer.length == 0 && !pending_data && + !s->seen_error && s->recv_trailing_metadata_finished != NULL) { + /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and + * maybe decompress the next 5 bytes in the stream. */ + bool end_of_context; + if (!s->stream_decompression_ctx) { + s->stream_decompression_ctx = grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_DECOMPRESS); + } + if (!grpc_stream_decompress(s->stream_decompression_ctx, + &s->frame_storage, + &s->unprocessed_incoming_frames_buffer, NULL, + GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) { + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal( + exec_ctx, &s->unprocessed_incoming_frames_buffer); + s->seen_error = true; + } else { + if (s->unprocessed_incoming_frames_buffer.length > 0) { + s->unprocessed_incoming_frames_decompressed = true; + } + if (end_of_context) { + grpc_stream_compression_context_destroy(s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + } + } if (s->read_closed && s->frame_storage.length == 0 && + s->unprocessed_incoming_frames_buffer.length == 0 && (!pending_data || s->seen_error) && s->recv_trailing_metadata_finished != NULL) { grpc_chttp2_incoming_metadata_buffer_publish( @@ -2612,6 +2687,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, if (s->frame_storage.length > 0) { grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer); + s->unprocessed_incoming_frames_decompressed = false; GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); } else if (s->byte_stream_error != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, @@ -2673,17 +2749,41 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; grpc_chttp2_stream *s = bs->stream; + grpc_error *error; if (s->unprocessed_incoming_frames_buffer.length > 0) { - grpc_error *error = grpc_deframe_unprocessed_incoming_frames( + if (s->stream_compression_recv_enabled && + !s->unprocessed_incoming_frames_decompressed) { + bool end_of_context; + if (!s->stream_decompression_ctx) { + s->stream_decompression_ctx = grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_DECOMPRESS); + } + if (!grpc_stream_decompress(s->stream_decompression_ctx, + &s->unprocessed_incoming_frames_buffer, + s->decompressed_data_buffer, NULL, MAX_SIZE_T, + &end_of_context)) { + error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error."); + return error; + } + GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); + grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, + s->decompressed_data_buffer); + s->unprocessed_incoming_frames_decompressed = true; + if (end_of_context) { + grpc_stream_compression_context_destroy(s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + } + error = grpc_deframe_unprocessed_incoming_frames( exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice, NULL); if (error != GRPC_ERROR_NONE) { return error; } } else { - grpc_error *error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); return error; } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index dead6be77f..222d2177b2 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -293,7 +293,6 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice slice, int is_last) { - /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */ if (!s->pending_byte_stream) { grpc_slice_ref_internal(slice); grpc_slice_buffer_add(&s->frame_storage, slice); @@ -304,6 +303,7 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice); GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_NONE); s->on_next = NULL; + s->unprocessed_incoming_frames_decompressed = false; } else { grpc_slice_ref_internal(slice); grpc_slice_buffer_add(&s->frame_storage, slice); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 4563b78e75..b538d1df17 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -526,6 +526,26 @@ struct grpc_chttp2_stream { grpc_chttp2_write_cb *on_write_finished_cbs; grpc_chttp2_write_cb *finish_after_write; size_t sending_bytes; + + /** Whether stream compression send is enabled */ + bool stream_compression_recv_enabled; + /** Whether stream compression recv is enabled */ + bool stream_compression_send_enabled; + /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed + */ + bool unprocessed_incoming_frames_decompressed; + /** Stream compression decompress context */ + grpc_stream_compression_context *stream_decompression_ctx; + /** Stream compression compress context */ + grpc_stream_compression_context *stream_compression_ctx; + + /** Buffer storing data that is compressed but not sent */ + grpc_slice_buffer *compressed_data_buffer; + /** Amount of uncompressed bytes sent out when compressed_data_buffer is + * emptied */ + size_t uncompressed_data_size; + /** Temporary buffer storing decompressed data */ + grpc_slice_buffer *decompressed_data_buffer; }; /** Transport writing call flow: @@ -621,6 +641,9 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, grpc_closure **pclosure, grpc_error *error, const char *desc); +#define GRPC_HEADER_SIZE_IN_BYTES 5 +#define MAX_SIZE_T (~(size_t)0) + #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 315f2a67a2..c3ede08343 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -303,7 +303,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } if (sent_initial_metadata) { /* send any body bytes, if allowed by flow control */ - if (s->flow_controlled_buffer.length > 0) { + if (s->flow_controlled_buffer.length > 0 || + (s->stream_compression_send_enabled && + s->compressed_data_buffer->length > 0)) { uint32_t stream_outgoing_window = (uint32_t)GPR_MAX( 0, s->outgoing_window_delta + @@ -314,21 +316,63 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], GPR_MIN(stream_outgoing_window, t->outgoing_window)); if (max_outgoing > 0) { - uint32_t send_bytes = - (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length); - bool is_last_data_frame = - s->fetching_send_message == NULL && - send_bytes == s->flow_controlled_buffer.length; - bool is_last_frame = - is_last_data_frame && s->send_trailing_metadata != NULL && - grpc_metadata_batch_is_empty(s->send_trailing_metadata); - grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes, - is_last_frame, &s->stats.outgoing, - &t->outbuf); - GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta, - send_bytes); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, - send_bytes); + bool is_last_data_frame = false; + bool is_last_frame = false; + if (s->stream_compression_send_enabled) { + while ((s->flow_controlled_buffer.length > 0 || + s->compressed_data_buffer->length > 0) && + max_outgoing > 0) { + if (s->compressed_data_buffer->length > 0) { + uint32_t send_bytes = (uint32_t)GPR_MIN( + max_outgoing, s->compressed_data_buffer->length); + is_last_data_frame = + (send_bytes == s->compressed_data_buffer->length && + s->flow_controlled_buffer.length == 0 && + s->fetching_send_message == NULL); + is_last_frame = + is_last_data_frame && s->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty(s->send_trailing_metadata); + grpc_chttp2_encode_data(s->id, s->compressed_data_buffer, + send_bytes, is_last_frame, + &s->stats.outgoing, &t->outbuf); + GRPC_CHTTP2_FLOW_DEBIT_STREAM( + "write", t, s, outgoing_window_delta, send_bytes); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, + send_bytes); + max_outgoing -= send_bytes; + if (s->compressed_data_buffer->length == 0) { + s->sending_bytes += s->uncompressed_data_size; + } + } else { + if (s->stream_compression_ctx == NULL) { + s->stream_compression_ctx = + grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_COMPRESS); + } + s->uncompressed_data_size = s->flow_controlled_buffer.length; + GPR_ASSERT(grpc_stream_compress( + s->stream_compression_ctx, &s->flow_controlled_buffer, + s->compressed_data_buffer, NULL, MAX_SIZE_T, + GRPC_STREAM_COMPRESSION_FLUSH_SYNC)); + } + } + } else { + uint32_t send_bytes = (uint32_t)GPR_MIN( + max_outgoing, s->flow_controlled_buffer.length); + is_last_data_frame = s->fetching_send_message == NULL && + send_bytes == s->flow_controlled_buffer.length; + is_last_frame = + is_last_data_frame && s->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty(s->send_trailing_metadata); + grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, + send_bytes, is_last_frame, + &s->stats.outgoing, &t->outbuf); + GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta, + send_bytes); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, + send_bytes); + s->sending_bytes += send_bytes; + } t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; if (!t->is_client) { @@ -345,9 +389,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( &s->stats.outgoing)); } } - s->sending_bytes += send_bytes; now_writing = true; - if (s->flow_controlled_buffer.length > 0) { + if (s->flow_controlled_buffer.length > 0 || + (s->stream_compression_send_enabled && + s->compressed_data_buffer->length > 0)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } @@ -361,7 +406,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } if (s->send_trailing_metadata != NULL && s->fetching_send_message == NULL && - s->flow_controlled_buffer.length == 0) { + s->flow_controlled_buffer.length == 0 && + (!s->stream_compression_send_enabled || + s->compressed_data_buffer->length == 0)) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) { grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true, |