From a4dc077d3c1eef677102f68496732b7dd2374875 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 12 Jul 2017 12:19:58 -0700 Subject: Stream compression configuration --- src/core/lib/surface/call.c | 95 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 20 deletions(-) (limited to 'src/core/lib/surface/call.c') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index c769866ceb..63496ac8f5 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -769,8 +769,9 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { static void destroy_encodings_accepted_by_peer(void *p) { return; } -static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, - grpc_call *call, grpc_mdelem mdel) { +static void parse_encodings_accepted_by_peer( + grpc_exec_ctx *exec_ctx, grpc_mdelem mdel, + uint32_t *encodings_accepted_by_peer, bool stream_compression) { size_t i; grpc_compression_algorithm algorithm; grpc_slice_buffer accept_encoding_parts; @@ -780,24 +781,46 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, accepted_user_data = grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer); if (accepted_user_data != NULL) { - call->encodings_accepted_by_peer = - (uint32_t)(((uintptr_t)accepted_user_data) - 1); + uint32_t flag = (uint32_t)(((uintptr_t)accepted_user_data) - 1); + if (stream_compression) { + *encodings_accepted_by_peer |= flag & 1u; + *encodings_accepted_by_peer |= (flag & ~(uint32_t)1) + << GRPC_STREAM_COMPRESS_FLAG_OFFSET; + } else { + *encodings_accepted_by_peer |= flag; + } return; } + uint32_t user_data = 0; + accept_encoding_slice = GRPC_MDVALUE(mdel); grpc_slice_buffer_init(&accept_encoding_parts); grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); - /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already - * zeroes the whole grpc_call */ /* Always support no compression */ - GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); + GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE); for (i = 0; i < accept_encoding_parts.count; i++) { grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i]; - if (grpc_compression_algorithm_parse(accept_encoding_entry_slice, - &algorithm)) { - GPR_BITSET(&call->encodings_accepted_by_peer, algorithm); + int result; + if (stream_compression) { + result = grpc_stream_compression_algorithm_parse( + accept_encoding_entry_slice, &algorithm); + if (result) { + user_data |= + (1u << (algorithm == GRPC_COMPRESS_NONE + ? 0 + : algorithm - (GRPC_STREAM_COMPRESS_FLAG_OFFSET))); + } + } else { + result = grpc_compression_algorithm_parse(accept_encoding_entry_slice, + &algorithm); + if (result) { + user_data |= (1u << algorithm); + } + } + if (result) { + GPR_BITSET(encodings_accepted_by_peer, algorithm); } else { char *accept_encoding_entry_str = grpc_slice_to_c_string(accept_encoding_entry_slice); @@ -812,7 +835,7 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, grpc_mdelem_set_user_data( mdel, destroy_encodings_accepted_by_peer, - (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1)); + (void *)(((uintptr_t)encodings_accepted_by_peer) + 1)); } uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { @@ -914,9 +937,14 @@ static uint32_t decode_status(grpc_mdelem md) { return status; } -static grpc_compression_algorithm decode_compression(grpc_mdelem md) { - grpc_compression_algorithm algorithm = - grpc_compression_algorithm_from_slice(GRPC_MDVALUE(md)); +static grpc_compression_algorithm decode_compression(grpc_mdelem md, + bool stream_compression) { + grpc_compression_algorithm algorithm; + if (stream_compression) { + algorithm = grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md)); + } else { + algorithm = grpc_compression_algorithm_from_slice(GRPC_MDVALUE(md)); + } if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) { char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, @@ -953,20 +981,43 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch *b) { - if (b->idx.named.grpc_encoding != NULL) { + if (b->idx.named.content_encoding != NULL) { + GPR_TIMER_BEGIN("incoming_compression_algorithm", 0); + set_incoming_compression_algorithm( + call, decode_compression(b->idx.named.content_encoding->md, true)); + GPR_TIMER_END("incoming_compression_algorithm", 0); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_encoding); + /* Stream compression overrides message compression */ + if (b->idx.named.grpc_encoding != NULL) { + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding); + } + } else if (b->idx.named.grpc_encoding != NULL) { GPR_TIMER_BEGIN("incoming_compression_algorithm", 0); set_incoming_compression_algorithm( - call, decode_compression(b->idx.named.grpc_encoding->md)); + call, decode_compression(b->idx.named.grpc_encoding->md, false)); GPR_TIMER_END("incoming_compression_algorithm", 0); grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding); } + + uint32_t encodings_accepted_by_peer = 0; + GPR_BITSET(&encodings_accepted_by_peer, GRPC_COMPRESS_NONE); if (b->idx.named.grpc_accept_encoding != NULL) { GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); - set_encodings_accepted_by_peer(exec_ctx, call, - b->idx.named.grpc_accept_encoding->md); + parse_encodings_accepted_by_peer(exec_ctx, + b->idx.named.grpc_accept_encoding->md, + &encodings_accepted_by_peer, false); grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding); GPR_TIMER_END("encodings_accepted_by_peer", 0); } + if (b->idx.named.accept_encoding != NULL) { + GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); + parse_encodings_accepted_by_peer(exec_ctx, b->idx.named.accept_encoding->md, + &encodings_accepted_by_peer, true); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.accept_encoding); + GPR_TIMER_END("encodings_accepted_by_peer", 0); + } + + call->encodings_accepted_by_peer = encodings_accepted_by_peer; publish_app_metadata(call, b, false); } @@ -1258,7 +1309,8 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, } else { call->test_only_last_message_flags = call->receiving_stream->flags; if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->incoming_compression_algorithm > GRPC_COMPRESS_NONE)) { + (GRPC_IS_MESSAGE_COMPRESSION_ALGORITHM( + call->incoming_compression_algorithm))) { *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( NULL, 0, call->incoming_compression_algorithm); } else { @@ -1484,7 +1536,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call, effective_compression_level); // the following will be picked up by the compress filter and used as // the call's compression algorithm. - compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; + compression_md.key = + GRPC_IS_STREAM_COMPRESSION_ALGORITHM(calgo) + ? GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST + : GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; compression_md.value = grpc_compression_algorithm_slice(calgo); additional_metadata_count++; } -- cgit v1.2.3