diff options
Diffstat (limited to 'src/core/ext')
3 files changed, 120 insertions, 20 deletions
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index 20a3488115..eb1a5a95e2 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -44,7 +44,9 @@ typedef struct call_data { grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; + grpc_linked_mdelem stream_compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; + grpc_linked_mdelem accept_stream_encoding_storage; uint32_t remaining_slice_bytes; /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ @@ -75,6 +77,13 @@ typedef struct channel_data { uint32_t enabled_algorithms_bitset; /** Supported compression algorithms */ uint32_t supported_compression_algorithms; + + /** The default, channel-level, stream compression algorithm */ + grpc_stream_compression_algorithm default_stream_compression_algorithm; + /** Bitset of enabled stream compression algorithms */ + uint32_t enabled_stream_compression_algorithms_bitset; + /** Supported stream compression algorithms */ + uint32_t supported_stream_compression_algorithms; } channel_data; static bool skip_compression(grpc_call_element *elem, uint32_t flags, @@ -106,31 +115,54 @@ static grpc_error *process_send_initial_metadata( call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; *has_compression_algorithm = false; - /* Parse incoming request for compression. If any, it'll be available - * at calld->compression_algorithm */ - if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) { + grpc_stream_compression_algorithm stream_compression_algorithm = + GRPC_STREAM_COMPRESS_NONE; + if (initial_metadata->idx.named.grpc_internal_stream_encoding_request != + NULL) { grpc_mdelem md = - initial_metadata->idx.named.grpc_internal_encoding_request->md; - if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md), - &calld->compression_algorithm)) { + initial_metadata->idx.named.grpc_internal_stream_encoding_request->md; + if (!grpc_stream_compression_algorithm_parse( + GRPC_MDVALUE(md), &stream_compression_algorithm)) { char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s' (unknown). Ignoring.", val); gpr_free(val); - calld->compression_algorithm = GRPC_COMPRESS_NONE; + stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; } - if (!GPR_BITGET(channeld->enabled_algorithms_bitset, - calld->compression_algorithm)) { + if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset, + stream_compression_algorithm)) { char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s' (previously disabled). " "Ignoring.", val); gpr_free(val); + stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; + } + *has_compression_algorithm = true; + grpc_metadata_batch_remove( + exec_ctx, initial_metadata, + initial_metadata->idx.named.grpc_internal_stream_encoding_request); + /* Disable message-wise compression */ + calld->compression_algorithm = GRPC_COMPRESS_NONE; + if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) { + grpc_metadata_batch_remove( + exec_ctx, initial_metadata, + initial_metadata->idx.named.grpc_internal_encoding_request); + } + } else if (initial_metadata->idx.named.grpc_internal_encoding_request != + NULL) { + grpc_mdelem md = + initial_metadata->idx.named.grpc_internal_encoding_request->md; + if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md), + &calld->compression_algorithm)) { + char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + gpr_log(GPR_ERROR, + "Invalid compression algorithm: '%s' (unknown). Ignoring.", val); + gpr_free(val); calld->compression_algorithm = GRPC_COMPRESS_NONE; } *has_compression_algorithm = true; - grpc_metadata_batch_remove( exec_ctx, initial_metadata, initial_metadata->idx.named.grpc_internal_encoding_request); @@ -138,13 +170,25 @@ static grpc_error *process_send_initial_metadata( /* If no algorithm was found in the metadata and we aren't * exceptionally skipping compression, fall back to the channel * default */ - calld->compression_algorithm = channeld->default_compression_algorithm; + if (channeld->default_stream_compression_algorithm != + GRPC_STREAM_COMPRESS_NONE) { + stream_compression_algorithm = + channeld->default_stream_compression_algorithm; + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } else { + calld->compression_algorithm = channeld->default_compression_algorithm; + } *has_compression_algorithm = true; } grpc_error *error = GRPC_ERROR_NONE; /* hint compression algorithm */ - if (calld->compression_algorithm != GRPC_COMPRESS_NONE) { + if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) { + error = grpc_metadata_batch_add_tail( + exec_ctx, initial_metadata, + &calld->stream_compression_algorithm_storage, + grpc_stream_compression_encoding_mdelem(stream_compression_algorithm)); + } else if (calld->compression_algorithm != GRPC_COMPRESS_NONE) { error = grpc_metadata_batch_add_tail( exec_ctx, initial_metadata, &calld->compression_algorithm_storage, grpc_compression_encoding_mdelem(calld->compression_algorithm)); @@ -158,6 +202,13 @@ static grpc_error *process_send_initial_metadata( GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS( channeld->supported_compression_algorithms)); + if (error != GRPC_ERROR_NONE) return error; + + error = grpc_metadata_batch_add_tail( + exec_ctx, initial_metadata, &calld->accept_stream_encoding_storage, + GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS( + channeld->supported_stream_compression_algorithms)); + return error; } @@ -435,6 +486,7 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element_args *args) { channel_data *channeld = elem->channel_data; + /* Configuration for message-wise compression */ channeld->enabled_algorithms_bitset = grpc_channel_args_compression_algorithm_get_states(args->channel_args); @@ -449,16 +501,32 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, channeld->default_compression_algorithm = GRPC_COMPRESS_NONE; } - channeld->supported_compression_algorithms = 1; /* always support identity */ - for (grpc_compression_algorithm algo_idx = 1; - algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { - /* skip disabled algorithms */ - if (!GPR_BITGET(channeld->enabled_algorithms_bitset, algo_idx)) { - continue; - } - channeld->supported_compression_algorithms |= 1u << algo_idx; + channeld->supported_compression_algorithms = + (((1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1) & + channeld->enabled_algorithms_bitset) | + 1u; + + /* Configuration for stream compression */ + channeld->enabled_stream_compression_algorithms_bitset = + grpc_channel_args_stream_compression_algorithm_get_states( + args->channel_args); + + channeld->default_stream_compression_algorithm = + grpc_channel_args_get_stream_compression_algorithm(args->channel_args); + + if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset, + channeld->default_stream_compression_algorithm)) { + gpr_log(GPR_DEBUG, + "stream compression algorithm %d not enabled: switching to none", + channeld->default_stream_compression_algorithm); + channeld->default_stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; } + channeld->supported_stream_compression_algorithms = + (((1u << GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) - 1) & + channeld->enabled_stream_compression_algorithms_bitset) | + 1u; + GPR_ASSERT(!args->is_last); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index dc35f4855c..ede05d57b7 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1307,6 +1307,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (op->send_initial_metadata) { GPR_ASSERT(s->send_initial_metadata_finished == NULL); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; + + /* Identify stream compression */ + if ((s->stream_compression_send_enabled = + (op_payload->send_initial_metadata.send_initial_metadata->idx.named + .content_encoding != NULL)) == true) { + s->compressed_data_buffer = gpr_malloc(sizeof(grpc_slice_buffer)); + grpc_slice_buffer_init(s->compressed_data_buffer); + } + s->send_initial_metadata_finished = add_closure_barrier(on_complete); s->send_initial_metadata = op_payload->send_initial_metadata.send_initial_metadata; diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 7f37365558..d4800aa701 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1655,6 +1655,23 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst"); } +static void parse_stream_compression_md(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + grpc_metadata_batch *initial_metadata) { + if (initial_metadata->idx.named.content_encoding != NULL) { + grpc_slice content_encoding = + GRPC_MDVALUE(initial_metadata->idx.named.content_encoding->md); + if (!grpc_slice_eq(content_encoding, GRPC_MDSTR_IDENTITY)) { + if (grpc_slice_eq(content_encoding, GRPC_MDSTR_GZIP)) { + s->stream_compression_recv_enabled = true; + s->decompressed_data_buffer = gpr_malloc(sizeof(grpc_slice_buffer)); + grpc_slice_buffer_init(s->decompressed_data_buffer); + } + } + } +} + grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, void *hpack_parser, grpc_chttp2_transport *t, @@ -1684,6 +1701,12 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Too many trailer frames"); } + /* Process stream compression md element if it exists */ + if (s->header_frames_received == + 0) { /* Only acts on initial metadata */ + parse_stream_compression_md(exec_ctx, t, s, + &s->metadata_buffer[0].batch); + } s->published_metadata[s->header_frames_received] = GRPC_METADATA_PUBLISHED_FROM_WIRE; maybe_complete_funcs[s->header_frames_received](exec_ctx, t, s); |