aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel/compress_filter.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel/compress_filter.c')
-rw-r--r--src/core/lib/channel/compress_filter.c94
1 files changed, 50 insertions, 44 deletions
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 337c194b79..c860d60d88 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -45,6 +45,7 @@
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -80,39 +81,6 @@ typedef struct channel_data {
uint32_t supported_compression_algorithms;
} channel_data;
-/** For each \a md element from the incoming metadata, filter out the entry for
- * "grpc-encoding", using its value to populate the call data's
- * compression_algorithm field. */
-static grpc_mdelem *compression_md_filter(grpc_exec_ctx *exec_ctx,
- void *user_data, grpc_mdelem *md) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- if (md->key == GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST) {
- const char *md_c_str = grpc_mdstr_as_c_string(md->value);
- if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
- &calld->compression_algorithm)) {
- gpr_log(GPR_ERROR,
- "Invalid compression algorithm: '%s' (unknown). Ignoring.",
- md_c_str);
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- }
- if (!GPR_BITGET(channeld->enabled_algorithms_bitset,
- calld->compression_algorithm)) {
- gpr_log(GPR_ERROR,
- "Invalid compression algorithm: '%s' (previously disabled). "
- "Ignoring.",
- md_c_str);
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- }
- calld->has_compression_algorithm = 1;
- return NULL;
- }
-
- return md;
-}
-
static int skip_compression(grpc_call_element *elem, uint32_t flags) {
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -131,32 +99,65 @@ static int skip_compression(grpc_call_element *elem, uint32_t flags) {
}
/** Filter initial metadata */
-static void process_send_initial_metadata(
+static grpc_error *process_send_initial_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_metadata_batch *initial_metadata) GRPC_MUST_USE_RESULT;
+static grpc_error *process_send_initial_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata) {
+ grpc_error *error;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
/* Parse incoming request for compression. If any, it'll be available
* at calld->compression_algorithm */
- grpc_metadata_batch_filter(exec_ctx, initial_metadata, compression_md_filter,
- elem);
- if (!calld->has_compression_algorithm) {
+ if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) {
+ grpc_mdelem md =
+ initial_metadata->idx.named.grpc_internal_encoding_request->md;
+ if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
+ &calld->compression_algorithm)) {
+ char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(GPR_ERROR,
+ "Invalid compression algorithm: '%s' (unknown). Ignoring.", val);
+ gpr_free(val);
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ if (!GPR_BITGET(channeld->enabled_algorithms_bitset,
+ calld->compression_algorithm)) {
+ char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(GPR_ERROR,
+ "Invalid compression algorithm: '%s' (previously disabled). "
+ "Ignoring.",
+ val);
+ gpr_free(val);
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ calld->has_compression_algorithm = 1;
+
+ grpc_metadata_batch_remove(
+ exec_ctx, initial_metadata,
+ initial_metadata->idx.named.grpc_internal_encoding_request);
+ } else {
/* If no algorithm was found in the metadata and we aren't
* exceptionally skipping compression, fall back to the channel
* default */
calld->compression_algorithm = channeld->default_compression_algorithm;
calld->has_compression_algorithm = 1; /* GPR_TRUE */
}
+
/* hint compression algorithm */
- grpc_metadata_batch_add_tail(
- initial_metadata, &calld->compression_algorithm_storage,
+ error = grpc_metadata_batch_add_tail(
+ exec_ctx, initial_metadata, &calld->compression_algorithm_storage,
grpc_compression_encoding_mdelem(calld->compression_algorithm));
+ if (error != GRPC_ERROR_NONE) return error;
+
/* convey supported compression algorithms */
- grpc_metadata_batch_add_tail(initial_metadata,
- &calld->accept_encoding_storage,
- GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
- channeld->supported_compression_algorithms));
+ error = grpc_metadata_batch_add_tail(
+ exec_ctx, initial_metadata, &calld->accept_encoding_storage,
+ GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
+ channeld->supported_compression_algorithms));
+
+ return error;
}
static void continue_send_message(grpc_exec_ctx *exec_ctx,
@@ -247,7 +248,12 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0);
if (op->send_initial_metadata) {
- process_send_initial_metadata(exec_ctx, elem, op->send_initial_metadata);
+ grpc_error *error = process_send_initial_metadata(
+ exec_ctx, elem, op->send_initial_metadata);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
+ return;
+ }
}
if (op->send_message != NULL &&
!skip_compression(elem, op->send_message->flags)) {