From e6d888d0a086e0d208cad1849def4326f28670f0 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Thu, 14 Sep 2017 09:42:56 -0700 Subject: Change C core surface API --- .../message_compress/message_compress_filter.c | 133 ++++++++------------- 1 file changed, 50 insertions(+), 83 deletions(-) (limited to 'src/core/ext') diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index 0145dffab0..a66916fbe1 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -27,6 +27,7 @@ #include "src/core/ext/filters/http/message_compress/message_compress_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/compression/algorithm_metadata.h" +#include "src/core/lib/compression/compression_internal.h" #include "src/core/lib/compression/message_compress.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -52,7 +53,7 @@ typedef struct call_data { grpc_linked_mdelem accept_stream_encoding_storage; /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ - grpc_compression_algorithm compression_algorithm; + grpc_message_compression_algorithm message_compression_algorithm; initial_metadata_state send_initial_metadata_state; grpc_error *cancel_error; grpc_closure start_send_message_batch_in_call_combiner; @@ -67,15 +68,10 @@ typedef struct call_data { typedef struct channel_data { /** The default, channel-level, compression algorithm */ grpc_compression_algorithm default_compression_algorithm; - /** Bitset of enabled algorithms */ + /** Bitset of enabled compression algorithms */ uint32_t enabled_algorithms_bitset; /** Supported compression algorithms */ - uint32_t supported_compression_algorithms; - - /** The default, channel-level, stream compression algorithm */ - grpc_stream_compression_algorithm default_stream_compression_algorithm; - /** Bitset of enabled stream compression algorithms */ - uint32_t enabled_stream_compression_algorithms_bitset; + uint32_t supported_message_compression_algorithms; /** Supported stream compression algorithms */ uint32_t supported_stream_compression_algorithms; } channel_data; @@ -89,7 +85,7 @@ static bool skip_compression(grpc_call_element *elem, uint32_t flags, return true; } if (has_compression_algorithm) { - if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { + if (calld->message_compression_algorithm == GRPC_MESSAGE_COMPRESS_NONE) { return true; } return false; /* we have an actual call-specific algorithm */ @@ -109,70 +105,53 @@ static grpc_error *process_send_initial_metadata( call_data *calld = (call_data *)elem->call_data; channel_data *channeld = (channel_data *)elem->channel_data; *has_compression_algorithm = false; + grpc_compression_algorithm compression_algorithm; grpc_stream_compression_algorithm stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; - if (initial_metadata->idx.named.grpc_internal_stream_encoding_request != - NULL) { + if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) { grpc_mdelem md = - initial_metadata->idx.named.grpc_internal_stream_encoding_request->md; - if (!grpc_stream_compression_algorithm_parse( - GRPC_MDVALUE(md), &stream_compression_algorithm)) { + initial_metadata->idx.named.grpc_internal_encoding_request->md; + if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md), + &compression_algorithm)) { char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, - "Invalid stream compression algorithm: '%s' (unknown). Ignoring.", - val); - gpr_free(val); - stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; - } - if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset, - stream_compression_algorithm)) { - char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); - gpr_log( - GPR_ERROR, - "Invalid stream compression algorithm: '%s' (previously disabled). " - "Ignoring.", - val); + "Invalid compression algorithm: '%s' (unknown). Ignoring.", val); gpr_free(val); + calld->message_compression_algorithm = GRPC_MESSAGE_COMPRESS_NONE; stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; } - *has_compression_algorithm = true; - grpc_metadata_batch_remove( - exec_ctx, initial_metadata, - initial_metadata->idx.named.grpc_internal_stream_encoding_request); - /* Disable message-wise compression */ - calld->compression_algorithm = GRPC_COMPRESS_NONE; - if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) { - grpc_metadata_batch_remove( - exec_ctx, initial_metadata, - initial_metadata->idx.named.grpc_internal_encoding_request); - } - } else if (initial_metadata->idx.named.grpc_internal_encoding_request != - NULL) { - grpc_mdelem md = - initial_metadata->idx.named.grpc_internal_encoding_request->md; - if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md), - &calld->compression_algorithm)) { + if (!GPR_BITGET(channeld->enabled_algorithms_bitset, + compression_algorithm)) { char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, - "Invalid compression algorithm: '%s' (unknown). Ignoring.", val); + "Invalid compression algorithm: '%s' (previously disabled). " + "Ignoring.", + val); gpr_free(val); - calld->compression_algorithm = GRPC_COMPRESS_NONE; + calld->message_compression_algorithm = GRPC_MESSAGE_COMPRESS_NONE; + stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; } *has_compression_algorithm = true; grpc_metadata_batch_remove( exec_ctx, initial_metadata, initial_metadata->idx.named.grpc_internal_encoding_request); + calld->message_compression_algorithm = + grpc_compression_algorithm_to_message_compression_algorithm( + compression_algorithm); + stream_compression_algorithm = + grpc_compression_algorithm_to_stream_compression_algorithm( + compression_algorithm); } else { /* If no algorithm was found in the metadata and we aren't * exceptionally skipping compression, fall back to the channel * default */ - if (channeld->default_stream_compression_algorithm != - GRPC_STREAM_COMPRESS_NONE) { + if (channeld->default_compression_algorithm != GRPC_COMPRESS_NONE) { + calld->message_compression_algorithm = + grpc_compression_algorithm_to_message_compression_algorithm( + channeld->default_compression_algorithm); stream_compression_algorithm = - channeld->default_stream_compression_algorithm; - calld->compression_algorithm = GRPC_COMPRESS_NONE; - } else { - calld->compression_algorithm = channeld->default_compression_algorithm; + grpc_compression_algorithm_to_stream_compression_algorithm( + channeld->default_compression_algorithm); } *has_compression_algorithm = true; } @@ -184,10 +163,11 @@ static grpc_error *process_send_initial_metadata( exec_ctx, initial_metadata, &calld->stream_compression_algorithm_storage, grpc_stream_compression_encoding_mdelem(stream_compression_algorithm)); - } else if (calld->compression_algorithm != GRPC_COMPRESS_NONE) { + } else if (calld->message_compression_algorithm != + GRPC_MESSAGE_COMPRESS_NONE) { error = grpc_metadata_batch_add_tail( exec_ctx, initial_metadata, &calld->compression_algorithm_storage, - grpc_compression_encoding_mdelem(calld->compression_algorithm)); + grpc_message_compression_encoding_mdelem(calld->message_compression_algorithm)); } if (error != GRPC_ERROR_NONE) return error; @@ -196,11 +176,12 @@ static grpc_error *process_send_initial_metadata( error = grpc_metadata_batch_add_tail( exec_ctx, initial_metadata, &calld->accept_encoding_storage, GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS( - channeld->supported_compression_algorithms)); + channeld->supported_message_compression_algorithms)); if (error != GRPC_ERROR_NONE) return error; - /* Do not overwrite accept-encoding header if it already presents. */ + /* Do not overwrite accept-encoding header if it already presents (e.g. added + * by some proxy). */ if (!initial_metadata->idx.named.accept_encoding) { error = grpc_metadata_batch_add_tail( exec_ctx, initial_metadata, &calld->accept_stream_encoding_storage, @@ -240,16 +221,16 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_init(&tmp); uint32_t send_flags = calld->send_message_batch->payload->send_message.send_message->flags; - bool did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, - &calld->slices, &tmp); + bool did_compress = grpc_msg_compress( + exec_ctx, calld->message_compression_algorithm, &calld->slices, &tmp); if (did_compress) { if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; const size_t before_size = calld->slices.length; const size_t after_size = tmp.length; const float savings_ratio = 1.0f - (float)after_size / (float)before_size; - GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, - &algo_name)); + GPR_ASSERT(grpc_message_compression_algorithm_name( + calld->message_compression_algorithm, &algo_name)); gpr_log(GPR_DEBUG, "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR " bytes (%.2f%% savings)", algo_name, before_size, after_size, 100 * savings_ratio); @@ -259,8 +240,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, } else { if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; - GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, - &algo_name)); + GPR_ASSERT(grpc_message_compression_algorithm_name( + calld->message_compression_algorithm, &algo_name)); gpr_log(GPR_DEBUG, "Algorithm '%s' enabled but decided not to compress. Input size: " "%" PRIuPTR, @@ -484,12 +465,11 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element_args *args) { channel_data *channeld = (channel_data *)elem->channel_data; - /* Configuration for message compression */ channeld->enabled_algorithms_bitset = grpc_channel_args_compression_algorithm_get_states(args->channel_args); - channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm(args->channel_args); + /* Make sure the default isn't disabled. */ if (!GPR_BITGET(channeld->enabled_algorithms_bitset, channeld->default_compression_algorithm)) { @@ -499,31 +479,18 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, channeld->default_compression_algorithm = GRPC_COMPRESS_NONE; } - channeld->supported_compression_algorithms = + uint32_t supported_compression_algorithms = (((1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1) & channeld->enabled_algorithms_bitset) | 1u; - /* Configuration for stream compression */ - channeld->enabled_stream_compression_algorithms_bitset = - grpc_channel_args_stream_compression_algorithm_get_states( - args->channel_args); - - channeld->default_stream_compression_algorithm = - grpc_channel_args_get_stream_compression_algorithm(args->channel_args); - - if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset, - channeld->default_stream_compression_algorithm)) { - gpr_log(GPR_DEBUG, - "stream compression algorithm %d not enabled: switching to none", - channeld->default_stream_compression_algorithm); - channeld->default_stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; - } + channeld->supported_message_compression_algorithms = + grpc_compression_bitset_to_message_bitset( + supported_compression_algorithms); channeld->supported_stream_compression_algorithms = - (((1u << GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) - 1) & - channeld->enabled_stream_compression_algorithms_bitset) | - 1u; + grpc_compression_bitset_to_stream_bitset( + supported_compression_algorithms); GPR_ASSERT(!args->is_last); return GRPC_ERROR_NONE; -- cgit v1.2.3