diff options
-rw-r--r-- | src/core/channel/compress_filter.c | 38 | ||||
-rw-r--r-- | src/core/surface/call.c | 30 | ||||
-rw-r--r-- | src/core/surface/call.h | 6 | ||||
-rw-r--r-- | src/core/surface/channel.c | 10 | ||||
-rw-r--r-- | src/core/surface/channel.h | 2 | ||||
-rw-r--r-- | test/core/end2end/tests/request_with_compressed_payload.c | 9 |
6 files changed, 74 insertions, 21 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 7d6f1a87a6..215e2dc02c 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -35,16 +35,19 @@ #include <string.h> #include <grpc/compression.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice_buffer.h> #include "src/core/channel/compress_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/compression/message_compress.h" +#include "src/core/support/string.h" typedef struct call_data { gpr_slice_buffer slices; grpc_linked_mdelem compression_algorithm_storage; + grpc_linked_mdelem accept_encoding_storage; int remaining_slice_bytes; int written_initial_metadata; grpc_compression_algorithm compression_algorithm; @@ -54,7 +57,9 @@ typedef struct call_data { typedef struct channel_data { grpc_mdstr *mdstr_request_compression_algorithm_key; grpc_mdstr *mdstr_outgoing_compression_algorithm_key; + grpc_mdstr *mdstr_compression_capabilities_key; grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT]; + grpc_mdelem *mdelem_accept_encoding; grpc_compression_algorithm default_compression_algorithm; } channel_data; @@ -190,10 +195,17 @@ static void process_send_ops(grpc_call_element *elem, channeld->default_compression_algorithm; calld->has_compression_algorithm = 1; /* GPR_TRUE */ } + /* hint compression algorithm */ grpc_metadata_batch_add_head( &(sop->data.metadata), &calld->compression_algorithm_storage, grpc_mdelem_ref(channeld->mdelem_compression_algorithms [calld->compression_algorithm])); + + /* convey supported compression algorithms */ + grpc_metadata_batch_add_head( + &(sop->data.metadata), &calld->accept_encoding_storage, + grpc_mdelem_ref(channeld->mdelem_accept_encoding)); + calld->written_initial_metadata = 1; /* GPR_TRUE */ } break; @@ -267,6 +279,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, int is_first, int is_last) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; + const char* supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT-1]; + char *accept_encoding_str; + size_t accept_encoding_str_len; const grpc_compression_level clevel = grpc_channel_args_get_compression_level(args); @@ -279,6 +294,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channeld->mdstr_outgoing_compression_algorithm_key = grpc_mdstr_from_string(mdctx, "grpc-encoding"); + channeld->mdstr_compression_capabilities_key = + grpc_mdstr_from_string(mdctx, "grpc-accept-encoding"); + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { char *algorith_name; GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0); @@ -287,8 +305,26 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, mdctx, grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string(mdctx, algorith_name)); + if (algo_idx > 0) { + supported_algorithms_names[algo_idx-1] = algorith_name; + } } + accept_encoding_str = + gpr_strjoin_sep(supported_algorithms_names, + GPR_ARRAY_SIZE(supported_algorithms_names), + ", ", + &accept_encoding_str_len); + + channeld->mdelem_accept_encoding = + grpc_mdelem_from_metadata_strings( + mdctx, + grpc_mdstr_ref(channeld->mdstr_compression_capabilities_key), + grpc_mdstr_from_string(mdctx, accept_encoding_str)); + /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated + * arrays, as to avoid the heap allocs */ + gpr_free(accept_encoding_str); + GPR_ASSERT(!is_last); } @@ -299,10 +335,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) { grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key); grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key); + grpc_mdstr_unref(channeld->mdstr_compression_capabilities_key); for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]); } + grpc_mdelem_unref(channeld->mdelem_accept_encoding); } const grpc_channel_filter grpc_compress_filter = { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 58673e783e..f4b8ccd6df 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -39,6 +39,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <grpc/support/useful.h> #include "src/core/census/grpc_context.h" #include "src/core/channel/channel_stack.h" @@ -239,8 +240,8 @@ struct grpc_call { /* Compression algorithm for the call */ grpc_compression_algorithm compression_algorithm; - /* Supported encodings (compression algorithms) */ - gpr_uint8 accept_encoding[GRPC_COMPRESS_ALGORITHMS_COUNT]; + /* Supported encodings (compression algorithms), a bitset */ + gpr_uint32 encodings_accepted_by_peer; /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -478,12 +479,7 @@ static void set_compression_algorithm(grpc_call *call, call->compression_algorithm = algo; } -grpc_compression_algorithm grpc_call_get_compression_algorithm( - const grpc_call *call) { - return call->compression_algorithm; -} - -static void set_accept_encoding(grpc_call *call, +static void set_encodings_accepted_by_peer(grpc_call *call, const gpr_slice accept_encoding_slice) { size_t i; grpc_compression_algorithm algorithm; @@ -492,15 +488,15 @@ static void set_accept_encoding(grpc_call *call, gpr_slice_buffer_init(&accept_encoding_parts); gpr_slice_split(accept_encoding_slice, ", ", &accept_encoding_parts); - /* No need to zero call->accept_encoding: grpc_call_create already zeroes the - * whole grpc_call */ + /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already + * zeroes the whole grpc_call */ /* Always support no compression */ - call->accept_encoding[GRPC_COMPRESS_NONE] = 1; /* GPR_TRUE */ + GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); for (i = 0; i < accept_encoding_parts.count; i++) { const gpr_slice* slice = &accept_encoding_parts.slices[i]; if (grpc_compression_algorithm_parse( (const char *)GPR_SLICE_START_PTR(*slice), &algorithm)) { - call->accept_encoding[algorithm] = 1; /* GPR_TRUE */ + GPR_BITSET(&call->encodings_accepted_by_peer, algorithm); } else { /* TODO(dgq): it'd be nice to have a slice-to-cstr function to easily * print the offending entry */ @@ -510,6 +506,10 @@ static void set_accept_encoding(grpc_call *call, } } +gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call) { + return call->encodings_accepted_by_peer; +} + static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) { if (call->status[source].details != NULL) { @@ -1360,9 +1360,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { } else if (key == grpc_channel_get_compression_algorithm_string(call->channel)) { set_compression_algorithm(call, decode_compression(md)); - } else if (key == - grpc_channel_get_accept_encoding_string(call->channel)) { - set_accept_encoding(call, md->value->slice); + } else if (key == grpc_channel_get_encodings_accepted_by_peer_string( + call->channel)) { + set_encodings_accepted_by_peer(call, md->value->slice); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 3b6f9c942e..5736e97b59 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -153,4 +153,10 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem); gpr_uint8 grpc_call_is_client(grpc_call *call); +/** Returns a bitset for the encodings (compression algorithms) supported by \a + * call's peer. + * + * To be indexed by grpc_compression_algorithm enum values. */ +gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call); + #endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 2a3e20a557..cd71e03b19 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -64,7 +64,7 @@ struct grpc_channel { /** mdstr for the grpc-status key */ grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_compression_algorithm_string; - grpc_mdstr *grpc_accept_encoding_string; + grpc_mdstr *grpc_encodings_accepted_by_peer_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; @@ -101,7 +101,7 @@ grpc_channel *grpc_channel_create_from_filters( channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_compression_algorithm_string = grpc_mdstr_from_string(mdctx, "grpc-encoding"); - channel->grpc_accept_encoding_string = + channel->grpc_encodings_accepted_by_peer_string = grpc_mdstr_from_string(mdctx, "grpc-accept-encoding"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { @@ -213,7 +213,7 @@ static void destroy_channel(void *p, int ok) { } GRPC_MDSTR_UNREF(channel->grpc_status_string); GRPC_MDSTR_UNREF(channel->grpc_compression_algorithm_string); - GRPC_MDSTR_UNREF(channel->grpc_accept_encoding_string); + GRPC_MDSTR_UNREF(channel->grpc_encodings_accepted_by_peer_string); GRPC_MDSTR_UNREF(channel->grpc_message_string); GRPC_MDSTR_UNREF(channel->path_string); GRPC_MDSTR_UNREF(channel->authority_string); @@ -271,9 +271,9 @@ grpc_mdstr *grpc_channel_get_compression_algorithm_string( return channel->grpc_compression_algorithm_string; } -grpc_mdstr *grpc_channel_get_accept_encoding_string( +grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( grpc_channel *channel) { - return channel->grpc_accept_encoding_string; + return channel->grpc_encodings_accepted_by_peer_string; } grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index db6874b630..1d1550bbe7 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -56,7 +56,7 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_compression_algorithm_string( grpc_channel *channel); -grpc_mdstr *grpc_channel_get_accept_encoding_string( +grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c index 0c1b065bd8..784a6cdef4 100644 --- a/test/core/end2end/tests/request_with_compressed_payload.c +++ b/test/core/end2end/tests/request_with_compressed_payload.c @@ -46,6 +46,7 @@ #include "test/core/end2end/cq_verifier.h" #include "src/core/channel/channel_args.h" #include "src/core/channel/compress_filter.h" +#include "src/core/surface/call.h" enum { TIMEOUT = 200000 }; @@ -187,6 +188,14 @@ static void request_with_payload_template( cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + GPR_ASSERT(GPR_BITCOUNT(grpc_call_get_encodings_accepted_by_peer(s)) == 3); + GPR_ASSERT(GPR_BITGET(grpc_call_get_encodings_accepted_by_peer(s), + GRPC_COMPRESS_NONE) != 0); + GPR_ASSERT(GPR_BITGET(grpc_call_get_encodings_accepted_by_peer(s), + GRPC_COMPRESS_DEFLATE) != 0); + GPR_ASSERT(GPR_BITGET(grpc_call_get_encodings_accepted_by_peer(s), + GRPC_COMPRESS_GZIP) != 0); + op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; |