aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.c108
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c9
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c23
3 files changed, 120 insertions, 20 deletions
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index 20a3488115..eb1a5a95e2 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -44,7 +44,9 @@
typedef struct call_data {
grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_linked_mdelem compression_algorithm_storage;
+ grpc_linked_mdelem stream_compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
+ grpc_linked_mdelem accept_stream_encoding_storage;
uint32_t remaining_slice_bytes;
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
@@ -75,6 +77,13 @@ typedef struct channel_data {
uint32_t enabled_algorithms_bitset;
/** Supported compression algorithms */
uint32_t supported_compression_algorithms;
+
+ /** The default, channel-level, stream compression algorithm */
+ grpc_stream_compression_algorithm default_stream_compression_algorithm;
+ /** Bitset of enabled stream compression algorithms */
+ uint32_t enabled_stream_compression_algorithms_bitset;
+ /** Supported stream compression algorithms */
+ uint32_t supported_stream_compression_algorithms;
} channel_data;
static bool skip_compression(grpc_call_element *elem, uint32_t flags,
@@ -106,31 +115,54 @@ static grpc_error *process_send_initial_metadata(
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
*has_compression_algorithm = false;
- /* Parse incoming request for compression. If any, it'll be available
- * at calld->compression_algorithm */
- if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) {
+ grpc_stream_compression_algorithm stream_compression_algorithm =
+ GRPC_STREAM_COMPRESS_NONE;
+ if (initial_metadata->idx.named.grpc_internal_stream_encoding_request !=
+ NULL) {
grpc_mdelem md =
- initial_metadata->idx.named.grpc_internal_encoding_request->md;
- if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
- &calld->compression_algorithm)) {
+ initial_metadata->idx.named.grpc_internal_stream_encoding_request->md;
+ if (!grpc_stream_compression_algorithm_parse(
+ GRPC_MDVALUE(md), &stream_compression_algorithm)) {
char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
gpr_log(GPR_ERROR,
"Invalid compression algorithm: '%s' (unknown). Ignoring.", val);
gpr_free(val);
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE;
}
- if (!GPR_BITGET(channeld->enabled_algorithms_bitset,
- calld->compression_algorithm)) {
+ if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset,
+ stream_compression_algorithm)) {
char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
gpr_log(GPR_ERROR,
"Invalid compression algorithm: '%s' (previously disabled). "
"Ignoring.",
val);
gpr_free(val);
+ stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE;
+ }
+ *has_compression_algorithm = true;
+ grpc_metadata_batch_remove(
+ exec_ctx, initial_metadata,
+ initial_metadata->idx.named.grpc_internal_stream_encoding_request);
+ /* Disable message-wise compression */
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) {
+ grpc_metadata_batch_remove(
+ exec_ctx, initial_metadata,
+ initial_metadata->idx.named.grpc_internal_encoding_request);
+ }
+ } else if (initial_metadata->idx.named.grpc_internal_encoding_request !=
+ NULL) {
+ grpc_mdelem md =
+ initial_metadata->idx.named.grpc_internal_encoding_request->md;
+ if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
+ &calld->compression_algorithm)) {
+ char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(GPR_ERROR,
+ "Invalid compression algorithm: '%s' (unknown). Ignoring.", val);
+ gpr_free(val);
calld->compression_algorithm = GRPC_COMPRESS_NONE;
}
*has_compression_algorithm = true;
-
grpc_metadata_batch_remove(
exec_ctx, initial_metadata,
initial_metadata->idx.named.grpc_internal_encoding_request);
@@ -138,13 +170,25 @@ static grpc_error *process_send_initial_metadata(
/* If no algorithm was found in the metadata and we aren't
* exceptionally skipping compression, fall back to the channel
* default */
- calld->compression_algorithm = channeld->default_compression_algorithm;
+ if (channeld->default_stream_compression_algorithm !=
+ GRPC_STREAM_COMPRESS_NONE) {
+ stream_compression_algorithm =
+ channeld->default_stream_compression_algorithm;
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ } else {
+ calld->compression_algorithm = channeld->default_compression_algorithm;
+ }
*has_compression_algorithm = true;
}
grpc_error *error = GRPC_ERROR_NONE;
/* hint compression algorithm */
- if (calld->compression_algorithm != GRPC_COMPRESS_NONE) {
+ if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) {
+ error = grpc_metadata_batch_add_tail(
+ exec_ctx, initial_metadata,
+ &calld->stream_compression_algorithm_storage,
+ grpc_stream_compression_encoding_mdelem(stream_compression_algorithm));
+ } else if (calld->compression_algorithm != GRPC_COMPRESS_NONE) {
error = grpc_metadata_batch_add_tail(
exec_ctx, initial_metadata, &calld->compression_algorithm_storage,
grpc_compression_encoding_mdelem(calld->compression_algorithm));
@@ -158,6 +202,13 @@ static grpc_error *process_send_initial_metadata(
GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
channeld->supported_compression_algorithms));
+ if (error != GRPC_ERROR_NONE) return error;
+
+ error = grpc_metadata_batch_add_tail(
+ exec_ctx, initial_metadata, &calld->accept_stream_encoding_storage,
+ GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS(
+ channeld->supported_stream_compression_algorithms));
+
return error;
}
@@ -435,6 +486,7 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element_args *args) {
channel_data *channeld = elem->channel_data;
+ /* Configuration for message-wise compression */
channeld->enabled_algorithms_bitset =
grpc_channel_args_compression_algorithm_get_states(args->channel_args);
@@ -449,16 +501,32 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
channeld->default_compression_algorithm = GRPC_COMPRESS_NONE;
}
- channeld->supported_compression_algorithms = 1; /* always support identity */
- for (grpc_compression_algorithm algo_idx = 1;
- algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
- /* skip disabled algorithms */
- if (!GPR_BITGET(channeld->enabled_algorithms_bitset, algo_idx)) {
- continue;
- }
- channeld->supported_compression_algorithms |= 1u << algo_idx;
+ channeld->supported_compression_algorithms =
+ (((1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1) &
+ channeld->enabled_algorithms_bitset) |
+ 1u;
+
+ /* Configuration for stream compression */
+ channeld->enabled_stream_compression_algorithms_bitset =
+ grpc_channel_args_stream_compression_algorithm_get_states(
+ args->channel_args);
+
+ channeld->default_stream_compression_algorithm =
+ grpc_channel_args_get_stream_compression_algorithm(args->channel_args);
+
+ if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset,
+ channeld->default_stream_compression_algorithm)) {
+ gpr_log(GPR_DEBUG,
+ "stream compression algorithm %d not enabled: switching to none",
+ channeld->default_stream_compression_algorithm);
+ channeld->default_stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE;
}
+ channeld->supported_stream_compression_algorithms =
+ (((1u << GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) - 1) &
+ channeld->enabled_stream_compression_algorithms_bitset) |
+ 1u;
+
GPR_ASSERT(!args->is_last);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index dc35f4855c..ede05d57b7 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1307,6 +1307,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (op->send_initial_metadata) {
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
+
+ /* Identify stream compression */
+ if ((s->stream_compression_send_enabled =
+ (op_payload->send_initial_metadata.send_initial_metadata->idx.named
+ .content_encoding != NULL)) == true) {
+ s->compressed_data_buffer = gpr_malloc(sizeof(grpc_slice_buffer));
+ grpc_slice_buffer_init(s->compressed_data_buffer);
+ }
+
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata =
op_payload->send_initial_metadata.send_initial_metadata;
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index 7f37365558..d4800aa701 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -1655,6 +1655,23 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst");
}
+static void parse_stream_compression_md(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ grpc_metadata_batch *initial_metadata) {
+ if (initial_metadata->idx.named.content_encoding != NULL) {
+ grpc_slice content_encoding =
+ GRPC_MDVALUE(initial_metadata->idx.named.content_encoding->md);
+ if (!grpc_slice_eq(content_encoding, GRPC_MDSTR_IDENTITY)) {
+ if (grpc_slice_eq(content_encoding, GRPC_MDSTR_GZIP)) {
+ s->stream_compression_recv_enabled = true;
+ s->decompressed_data_buffer = gpr_malloc(sizeof(grpc_slice_buffer));
+ grpc_slice_buffer_init(s->decompressed_data_buffer);
+ }
+ }
+ }
+}
+
grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx,
void *hpack_parser,
grpc_chttp2_transport *t,
@@ -1684,6 +1701,12 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Too many trailer frames");
}
+ /* Process stream compression md element if it exists */
+ if (s->header_frames_received ==
+ 0) { /* Only acts on initial metadata */
+ parse_stream_compression_md(exec_ctx, t, s,
+ &s->metadata_buffer[0].batch);
+ }
s->published_metadata[s->header_frames_received] =
GRPC_METADATA_PUBLISHED_FROM_WIRE;
maybe_complete_funcs[s->header_frames_received](exec_ctx, t, s);