aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.c
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-07-12 12:19:58 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-07-12 14:04:30 -0700
commita4dc077d3c1eef677102f68496732b7dd2374875 (patch)
tree5ed3896ce6db5112aa7e443aa8eef97d1bc0e864 /src/core/lib/surface/call.c
parent96e49785518fa6e8723a68433d783632acd43108 (diff)
Stream compression configuration
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r--src/core/lib/surface/call.c95
1 files changed, 75 insertions, 20 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index c769866ceb..63496ac8f5 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -769,8 +769,9 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
static void destroy_encodings_accepted_by_peer(void *p) { return; }
-static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
- grpc_call *call, grpc_mdelem mdel) {
+static void parse_encodings_accepted_by_peer(
+ grpc_exec_ctx *exec_ctx, grpc_mdelem mdel,
+ uint32_t *encodings_accepted_by_peer, bool stream_compression) {
size_t i;
grpc_compression_algorithm algorithm;
grpc_slice_buffer accept_encoding_parts;
@@ -780,24 +781,46 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
accepted_user_data =
grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
if (accepted_user_data != NULL) {
- call->encodings_accepted_by_peer =
- (uint32_t)(((uintptr_t)accepted_user_data) - 1);
+ uint32_t flag = (uint32_t)(((uintptr_t)accepted_user_data) - 1);
+ if (stream_compression) {
+ *encodings_accepted_by_peer |= flag & 1u;
+ *encodings_accepted_by_peer |= (flag & ~(uint32_t)1)
+ << GRPC_STREAM_COMPRESS_FLAG_OFFSET;
+ } else {
+ *encodings_accepted_by_peer |= flag;
+ }
return;
}
+ uint32_t user_data = 0;
+
accept_encoding_slice = GRPC_MDVALUE(mdel);
grpc_slice_buffer_init(&accept_encoding_parts);
grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
- /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
- * zeroes the whole grpc_call */
/* Always support no compression */
- GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
+ GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
for (i = 0; i < accept_encoding_parts.count; i++) {
grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
- if (grpc_compression_algorithm_parse(accept_encoding_entry_slice,
- &algorithm)) {
- GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
+ int result;
+ if (stream_compression) {
+ result = grpc_stream_compression_algorithm_parse(
+ accept_encoding_entry_slice, &algorithm);
+ if (result) {
+ user_data |=
+ (1u << (algorithm == GRPC_COMPRESS_NONE
+ ? 0
+ : algorithm - (GRPC_STREAM_COMPRESS_FLAG_OFFSET)));
+ }
+ } else {
+ result = grpc_compression_algorithm_parse(accept_encoding_entry_slice,
+ &algorithm);
+ if (result) {
+ user_data |= (1u << algorithm);
+ }
+ }
+ if (result) {
+ GPR_BITSET(encodings_accepted_by_peer, algorithm);
} else {
char *accept_encoding_entry_str =
grpc_slice_to_c_string(accept_encoding_entry_slice);
@@ -812,7 +835,7 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
grpc_mdelem_set_user_data(
mdel, destroy_encodings_accepted_by_peer,
- (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
+ (void *)(((uintptr_t)encodings_accepted_by_peer) + 1));
}
uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
@@ -914,9 +937,14 @@ static uint32_t decode_status(grpc_mdelem md) {
return status;
}
-static grpc_compression_algorithm decode_compression(grpc_mdelem md) {
- grpc_compression_algorithm algorithm =
- grpc_compression_algorithm_from_slice(GRPC_MDVALUE(md));
+static grpc_compression_algorithm decode_compression(grpc_mdelem md,
+ bool stream_compression) {
+ grpc_compression_algorithm algorithm;
+ if (stream_compression) {
+ algorithm = grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
+ } else {
+ algorithm = grpc_compression_algorithm_from_slice(GRPC_MDVALUE(md));
+ }
if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) {
char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
gpr_log(GPR_ERROR,
@@ -953,20 +981,43 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_metadata_batch *b) {
- if (b->idx.named.grpc_encoding != NULL) {
+ if (b->idx.named.content_encoding != NULL) {
+ GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
+ set_incoming_compression_algorithm(
+ call, decode_compression(b->idx.named.content_encoding->md, true));
+ GPR_TIMER_END("incoming_compression_algorithm", 0);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_encoding);
+ /* Stream compression overrides message compression */
+ if (b->idx.named.grpc_encoding != NULL) {
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
+ }
+ } else if (b->idx.named.grpc_encoding != NULL) {
GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
set_incoming_compression_algorithm(
- call, decode_compression(b->idx.named.grpc_encoding->md));
+ call, decode_compression(b->idx.named.grpc_encoding->md, false));
GPR_TIMER_END("incoming_compression_algorithm", 0);
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
}
+
+ uint32_t encodings_accepted_by_peer = 0;
+ GPR_BITSET(&encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
if (b->idx.named.grpc_accept_encoding != NULL) {
GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
- set_encodings_accepted_by_peer(exec_ctx, call,
- b->idx.named.grpc_accept_encoding->md);
+ parse_encodings_accepted_by_peer(exec_ctx,
+ b->idx.named.grpc_accept_encoding->md,
+ &encodings_accepted_by_peer, false);
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding);
GPR_TIMER_END("encodings_accepted_by_peer", 0);
}
+ if (b->idx.named.accept_encoding != NULL) {
+ GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
+ parse_encodings_accepted_by_peer(exec_ctx, b->idx.named.accept_encoding->md,
+ &encodings_accepted_by_peer, true);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.accept_encoding);
+ GPR_TIMER_END("encodings_accepted_by_peer", 0);
+ }
+
+ call->encodings_accepted_by_peer = encodings_accepted_by_peer;
publish_app_metadata(call, b, false);
}
@@ -1258,7 +1309,8 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx,
} else {
call->test_only_last_message_flags = call->receiving_stream->flags;
if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
- (call->incoming_compression_algorithm > GRPC_COMPRESS_NONE)) {
+ (GRPC_IS_MESSAGE_COMPRESSION_ALGORITHM(
+ call->incoming_compression_algorithm))) {
*call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
NULL, 0, call->incoming_compression_algorithm);
} else {
@@ -1484,7 +1536,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call, effective_compression_level);
// the following will be picked up by the compress filter and used as
// the call's compression algorithm.
- compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
+ compression_md.key =
+ GRPC_IS_STREAM_COMPRESSION_ALGORITHM(calgo)
+ ? GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST
+ : GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
compression_md.value = grpc_compression_algorithm_slice(calgo);
additional_metadata_count++;
}