aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/http/message_compress/message_compress_filter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/http/message_compress/message_compress_filter.cc')
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.cc135
1 files changed, 52 insertions, 83 deletions
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
index d0b9750497..0218ec6e40 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
@@ -27,6 +27,7 @@
#include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/compression/algorithm_metadata.h"
+#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/profiling/timers.h"
@@ -53,7 +54,7 @@ struct call_data {
grpc_linked_mdelem accept_stream_encoding_storage;
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
- grpc_compression_algorithm compression_algorithm;
+ grpc_message_compression_algorithm message_compression_algorithm;
initial_metadata_state send_initial_metadata_state;
grpc_error* cancel_error;
grpc_closure start_send_message_batch_in_call_combiner;
@@ -68,15 +69,10 @@ struct call_data {
struct channel_data {
/** The default, channel-level, compression algorithm */
grpc_compression_algorithm default_compression_algorithm;
- /** Bitset of enabled algorithms */
+ /** Bitset of enabled compression algorithms */
uint32_t enabled_algorithms_bitset;
/** Supported compression algorithms */
- uint32_t supported_compression_algorithms;
-
- /** The default, channel-level, stream compression algorithm */
- grpc_stream_compression_algorithm default_stream_compression_algorithm;
- /** Bitset of enabled stream compression algorithms */
- uint32_t enabled_stream_compression_algorithms_bitset;
+ uint32_t supported_message_compression_algorithms;
/** Supported stream compression algorithms */
uint32_t supported_stream_compression_algorithms;
};
@@ -91,7 +87,7 @@ static bool skip_compression(grpc_call_element* elem, uint32_t flags,
return true;
}
if (has_compression_algorithm) {
- if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
+ if (calld->message_compression_algorithm == GRPC_MESSAGE_COMPRESS_NONE) {
return true;
}
return false; /* we have an actual call-specific algorithm */
@@ -110,70 +106,53 @@ static grpc_error* process_send_initial_metadata(
call_data* calld = (call_data*)elem->call_data;
channel_data* channeld = (channel_data*)elem->channel_data;
*has_compression_algorithm = false;
+ grpc_compression_algorithm compression_algorithm;
grpc_stream_compression_algorithm stream_compression_algorithm =
GRPC_STREAM_COMPRESS_NONE;
- if (initial_metadata->idx.named.grpc_internal_stream_encoding_request !=
- nullptr) {
+ if (initial_metadata->idx.named.grpc_internal_encoding_request != nullptr) {
grpc_mdelem md =
- initial_metadata->idx.named.grpc_internal_stream_encoding_request->md;
- if (!grpc_stream_compression_algorithm_parse(
- GRPC_MDVALUE(md), &stream_compression_algorithm)) {
+ initial_metadata->idx.named.grpc_internal_encoding_request->md;
+ if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
+ &compression_algorithm)) {
char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
gpr_log(GPR_ERROR,
- "Invalid stream compression algorithm: '%s' (unknown). Ignoring.",
- val);
+ "Invalid compression algorithm: '%s' (unknown). Ignoring.", val);
gpr_free(val);
+ calld->message_compression_algorithm = GRPC_MESSAGE_COMPRESS_NONE;
stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE;
}
- if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset,
- stream_compression_algorithm)) {
- char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
- gpr_log(
- GPR_ERROR,
- "Invalid stream compression algorithm: '%s' (previously disabled). "
- "Ignoring.",
- val);
- gpr_free(val);
- stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE;
- }
- *has_compression_algorithm = true;
- grpc_metadata_batch_remove(
- initial_metadata,
- initial_metadata->idx.named.grpc_internal_stream_encoding_request);
- /* Disable message-wise compression */
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- if (initial_metadata->idx.named.grpc_internal_encoding_request != nullptr) {
- grpc_metadata_batch_remove(
- initial_metadata,
- initial_metadata->idx.named.grpc_internal_encoding_request);
- }
- } else if (initial_metadata->idx.named.grpc_internal_encoding_request !=
- nullptr) {
- grpc_mdelem md =
- initial_metadata->idx.named.grpc_internal_encoding_request->md;
- if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
- &calld->compression_algorithm)) {
+ if (!GPR_BITGET(channeld->enabled_algorithms_bitset,
+ compression_algorithm)) {
char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
gpr_log(GPR_ERROR,
- "Invalid compression algorithm: '%s' (unknown). Ignoring.", val);
+ "Invalid compression algorithm: '%s' (previously disabled). "
+ "Ignoring.",
+ val);
gpr_free(val);
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ calld->message_compression_algorithm = GRPC_MESSAGE_COMPRESS_NONE;
+ stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE;
}
*has_compression_algorithm = true;
grpc_metadata_batch_remove(
initial_metadata,
initial_metadata->idx.named.grpc_internal_encoding_request);
+ calld->message_compression_algorithm =
+ grpc_compression_algorithm_to_message_compression_algorithm(
+ compression_algorithm);
+ stream_compression_algorithm =
+ grpc_compression_algorithm_to_stream_compression_algorithm(
+ compression_algorithm);
} else {
/* If no algorithm was found in the metadata and we aren't
* exceptionally skipping compression, fall back to the channel
* default */
- if (channeld->default_stream_compression_algorithm !=
- GRPC_STREAM_COMPRESS_NONE) {
+ if (channeld->default_compression_algorithm != GRPC_COMPRESS_NONE) {
+ calld->message_compression_algorithm =
+ grpc_compression_algorithm_to_message_compression_algorithm(
+ channeld->default_compression_algorithm);
stream_compression_algorithm =
- channeld->default_stream_compression_algorithm;
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- } else {
- calld->compression_algorithm = channeld->default_compression_algorithm;
+ grpc_compression_algorithm_to_stream_compression_algorithm(
+ channeld->default_compression_algorithm);
}
*has_compression_algorithm = true;
}
@@ -184,10 +163,12 @@ static grpc_error* process_send_initial_metadata(
error = grpc_metadata_batch_add_tail(
initial_metadata, &calld->stream_compression_algorithm_storage,
grpc_stream_compression_encoding_mdelem(stream_compression_algorithm));
- } else if (calld->compression_algorithm != GRPC_COMPRESS_NONE) {
+ } else if (calld->message_compression_algorithm !=
+ GRPC_MESSAGE_COMPRESS_NONE) {
error = grpc_metadata_batch_add_tail(
initial_metadata, &calld->compression_algorithm_storage,
- grpc_compression_encoding_mdelem(calld->compression_algorithm));
+ grpc_message_compression_encoding_mdelem(
+ calld->message_compression_algorithm));
}
if (error != GRPC_ERROR_NONE) return error;
@@ -196,11 +177,12 @@ static grpc_error* process_send_initial_metadata(
error = grpc_metadata_batch_add_tail(
initial_metadata, &calld->accept_encoding_storage,
GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
- channeld->supported_compression_algorithms));
+ channeld->supported_message_compression_algorithms));
if (error != GRPC_ERROR_NONE) return error;
- /* Do not overwrite accept-encoding header if it already presents. */
+ /* Do not overwrite accept-encoding header if it already presents (e.g. added
+ * by some proxy). */
if (!initial_metadata->idx.named.accept_encoding) {
error = grpc_metadata_batch_add_tail(
initial_metadata, &calld->accept_stream_encoding_storage,
@@ -237,16 +219,16 @@ static void finish_send_message(grpc_call_element* elem) {
grpc_slice_buffer_init(&tmp);
uint32_t send_flags =
calld->send_message_batch->payload->send_message.send_message->flags;
- bool did_compress =
- grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp);
+ bool did_compress = grpc_msg_compress(calld->message_compression_algorithm,
+ &calld->slices, &tmp);
if (did_compress) {
if (grpc_compression_trace.enabled()) {
const char* algo_name;
const size_t before_size = calld->slices.length;
const size_t after_size = tmp.length;
const float savings_ratio = 1.0f - (float)after_size / (float)before_size;
- GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
- &algo_name));
+ GPR_ASSERT(grpc_message_compression_algorithm_name(
+ calld->message_compression_algorithm, &algo_name));
gpr_log(GPR_DEBUG,
"Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
" bytes (%.2f%% savings)",
@@ -257,8 +239,8 @@ static void finish_send_message(grpc_call_element* elem) {
} else {
if (grpc_compression_trace.enabled()) {
const char* algo_name;
- GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
- &algo_name));
+ GPR_ASSERT(grpc_message_compression_algorithm_name(
+ calld->message_compression_algorithm, &algo_name));
gpr_log(GPR_DEBUG,
"Algorithm '%s' enabled but decided not to compress. Input size: "
"%" PRIuPTR,
@@ -381,6 +363,7 @@ static void compress_start_transport_stream_op_batch(
GRPC_ERROR_REF(calld->cancel_error), "failing send_message op");
} else {
grpc_byte_stream_shutdown(
+
calld->send_message_batch->payload->send_message.send_message,
GRPC_ERROR_REF(calld->cancel_error));
}
@@ -470,12 +453,11 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
channel_data* channeld = (channel_data*)elem->channel_data;
- /* Configuration for message compression */
channeld->enabled_algorithms_bitset =
grpc_channel_args_compression_algorithm_get_states(args->channel_args);
-
channeld->default_compression_algorithm =
grpc_channel_args_get_compression_algorithm(args->channel_args);
+
/* Make sure the default isn't disabled. */
if (!GPR_BITGET(channeld->enabled_algorithms_bitset,
channeld->default_compression_algorithm)) {
@@ -485,31 +467,18 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
channeld->default_compression_algorithm = GRPC_COMPRESS_NONE;
}
- channeld->supported_compression_algorithms =
+ uint32_t supported_compression_algorithms =
(((1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1) &
channeld->enabled_algorithms_bitset) |
1u;
- /* Configuration for stream compression */
- channeld->enabled_stream_compression_algorithms_bitset =
- grpc_channel_args_stream_compression_algorithm_get_states(
- args->channel_args);
-
- channeld->default_stream_compression_algorithm =
- grpc_channel_args_get_stream_compression_algorithm(args->channel_args);
-
- if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset,
- channeld->default_stream_compression_algorithm)) {
- gpr_log(GPR_DEBUG,
- "stream compression algorithm %d not enabled: switching to none",
- channeld->default_stream_compression_algorithm);
- channeld->default_stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE;
- }
+ channeld->supported_message_compression_algorithms =
+ grpc_compression_bitset_to_message_bitset(
+ supported_compression_algorithms);
channeld->supported_stream_compression_algorithms =
- (((1u << GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) - 1) &
- channeld->enabled_stream_compression_algorithms_bitset) |
- 1u;
+ grpc_compression_bitset_to_stream_bitset(
+ supported_compression_algorithms);
GPR_ASSERT(!args->is_last);
return GRPC_ERROR_NONE;