aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/channel/compress_filter.c38
-rw-r--r--src/core/surface/call.c30
-rw-r--r--src/core/surface/call.h6
-rw-r--r--src/core/surface/channel.c10
-rw-r--r--src/core/surface/channel.h2
-rw-r--r--test/core/end2end/tests/request_with_compressed_payload.c9
6 files changed, 74 insertions, 21 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 7d6f1a87a6..215e2dc02c 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -35,16 +35,19 @@
#include <string.h>
#include <grpc/compression.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include "src/core/channel/compress_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/compression/message_compress.h"
+#include "src/core/support/string.h"
typedef struct call_data {
gpr_slice_buffer slices;
grpc_linked_mdelem compression_algorithm_storage;
+ grpc_linked_mdelem accept_encoding_storage;
int remaining_slice_bytes;
int written_initial_metadata;
grpc_compression_algorithm compression_algorithm;
@@ -54,7 +57,9 @@ typedef struct call_data {
typedef struct channel_data {
grpc_mdstr *mdstr_request_compression_algorithm_key;
grpc_mdstr *mdstr_outgoing_compression_algorithm_key;
+ grpc_mdstr *mdstr_compression_capabilities_key;
grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
+ grpc_mdelem *mdelem_accept_encoding;
grpc_compression_algorithm default_compression_algorithm;
} channel_data;
@@ -190,10 +195,17 @@ static void process_send_ops(grpc_call_element *elem,
channeld->default_compression_algorithm;
calld->has_compression_algorithm = 1; /* GPR_TRUE */
}
+ /* hint compression algorithm */
grpc_metadata_batch_add_head(
&(sop->data.metadata), &calld->compression_algorithm_storage,
grpc_mdelem_ref(channeld->mdelem_compression_algorithms
[calld->compression_algorithm]));
+
+ /* convey supported compression algorithms */
+ grpc_metadata_batch_add_head(
+ &(sop->data.metadata), &calld->accept_encoding_storage,
+ grpc_mdelem_ref(channeld->mdelem_accept_encoding));
+
calld->written_initial_metadata = 1; /* GPR_TRUE */
}
break;
@@ -267,6 +279,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
int is_first, int is_last) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
+ const char* supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT-1];
+ char *accept_encoding_str;
+ size_t accept_encoding_str_len;
const grpc_compression_level clevel =
grpc_channel_args_get_compression_level(args);
@@ -279,6 +294,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channeld->mdstr_outgoing_compression_algorithm_key =
grpc_mdstr_from_string(mdctx, "grpc-encoding");
+ channeld->mdstr_compression_capabilities_key =
+ grpc_mdstr_from_string(mdctx, "grpc-accept-encoding");
+
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
char *algorith_name;
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0);
@@ -287,8 +305,26 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
mdctx,
grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorith_name));
+ if (algo_idx > 0) {
+ supported_algorithms_names[algo_idx-1] = algorith_name;
+ }
}
+ accept_encoding_str =
+ gpr_strjoin_sep(supported_algorithms_names,
+ GPR_ARRAY_SIZE(supported_algorithms_names),
+ ", ",
+ &accept_encoding_str_len);
+
+ channeld->mdelem_accept_encoding =
+ grpc_mdelem_from_metadata_strings(
+ mdctx,
+ grpc_mdstr_ref(channeld->mdstr_compression_capabilities_key),
+ grpc_mdstr_from_string(mdctx, accept_encoding_str));
+ /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
+ * arrays, as to avoid the heap allocs */
+ gpr_free(accept_encoding_str);
+
GPR_ASSERT(!is_last);
}
@@ -299,10 +335,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key);
grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key);
+ grpc_mdstr_unref(channeld->mdstr_compression_capabilities_key);
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
++algo_idx) {
grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]);
}
+ grpc_mdelem_unref(channeld->mdelem_accept_encoding);
}
const grpc_channel_filter grpc_compress_filter = {
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 58673e783e..f4b8ccd6df 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -39,6 +39,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include "src/core/census/grpc_context.h"
#include "src/core/channel/channel_stack.h"
@@ -239,8 +240,8 @@ struct grpc_call {
/* Compression algorithm for the call */
grpc_compression_algorithm compression_algorithm;
- /* Supported encodings (compression algorithms) */
- gpr_uint8 accept_encoding[GRPC_COMPRESS_ALGORITHMS_COUNT];
+ /* Supported encodings (compression algorithms), a bitset */
+ gpr_uint32 encodings_accepted_by_peer;
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -478,12 +479,7 @@ static void set_compression_algorithm(grpc_call *call,
call->compression_algorithm = algo;
}
-grpc_compression_algorithm grpc_call_get_compression_algorithm(
- const grpc_call *call) {
- return call->compression_algorithm;
-}
-
-static void set_accept_encoding(grpc_call *call,
+static void set_encodings_accepted_by_peer(grpc_call *call,
const gpr_slice accept_encoding_slice) {
size_t i;
grpc_compression_algorithm algorithm;
@@ -492,15 +488,15 @@ static void set_accept_encoding(grpc_call *call,
gpr_slice_buffer_init(&accept_encoding_parts);
gpr_slice_split(accept_encoding_slice, ", ", &accept_encoding_parts);
- /* No need to zero call->accept_encoding: grpc_call_create already zeroes the
- * whole grpc_call */
+ /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
+ * zeroes the whole grpc_call */
/* Always support no compression */
- call->accept_encoding[GRPC_COMPRESS_NONE] = 1; /* GPR_TRUE */
+ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
for (i = 0; i < accept_encoding_parts.count; i++) {
const gpr_slice* slice = &accept_encoding_parts.slices[i];
if (grpc_compression_algorithm_parse(
(const char *)GPR_SLICE_START_PTR(*slice), &algorithm)) {
- call->accept_encoding[algorithm] = 1; /* GPR_TRUE */
+ GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
} else {
/* TODO(dgq): it'd be nice to have a slice-to-cstr function to easily
* print the offending entry */
@@ -510,6 +506,10 @@ static void set_accept_encoding(grpc_call *call,
}
}
+gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call) {
+ return call->encodings_accepted_by_peer;
+}
+
static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) {
@@ -1360,9 +1360,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
} else if (key ==
grpc_channel_get_compression_algorithm_string(call->channel)) {
set_compression_algorithm(call, decode_compression(md));
- } else if (key ==
- grpc_channel_get_accept_encoding_string(call->channel)) {
- set_accept_encoding(call, md->value->slice);
+ } else if (key == grpc_channel_get_encodings_accepted_by_peer_string(
+ call->channel)) {
+ set_encodings_accepted_by_peer(call, md->value->slice);
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 3b6f9c942e..5736e97b59 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -153,4 +153,10 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem);
gpr_uint8 grpc_call_is_client(grpc_call *call);
+/** Returns a bitset for the encodings (compression algorithms) supported by \a
+ * call's peer.
+ *
+ * To be indexed by grpc_compression_algorithm enum values. */
+gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call);
+
#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 2a3e20a557..cd71e03b19 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -64,7 +64,7 @@ struct grpc_channel {
/** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_compression_algorithm_string;
- grpc_mdstr *grpc_accept_encoding_string;
+ grpc_mdstr *grpc_encodings_accepted_by_peer_string;
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
@@ -101,7 +101,7 @@ grpc_channel *grpc_channel_create_from_filters(
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_compression_algorithm_string =
grpc_mdstr_from_string(mdctx, "grpc-encoding");
- channel->grpc_accept_encoding_string =
+ channel->grpc_encodings_accepted_by_peer_string =
grpc_mdstr_from_string(mdctx, "grpc-accept-encoding");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
@@ -213,7 +213,7 @@ static void destroy_channel(void *p, int ok) {
}
GRPC_MDSTR_UNREF(channel->grpc_status_string);
GRPC_MDSTR_UNREF(channel->grpc_compression_algorithm_string);
- GRPC_MDSTR_UNREF(channel->grpc_accept_encoding_string);
+ GRPC_MDSTR_UNREF(channel->grpc_encodings_accepted_by_peer_string);
GRPC_MDSTR_UNREF(channel->grpc_message_string);
GRPC_MDSTR_UNREF(channel->path_string);
GRPC_MDSTR_UNREF(channel->authority_string);
@@ -271,9 +271,9 @@ grpc_mdstr *grpc_channel_get_compression_algorithm_string(
return channel->grpc_compression_algorithm_string;
}
-grpc_mdstr *grpc_channel_get_accept_encoding_string(
+grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string(
grpc_channel *channel) {
- return channel->grpc_accept_encoding_string;
+ return channel->grpc_encodings_accepted_by_peer_string;
}
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index db6874b630..1d1550bbe7 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -56,7 +56,7 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_compression_algorithm_string(
grpc_channel *channel);
-grpc_mdstr *grpc_channel_get_accept_encoding_string(
+grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string(
grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c
index 0c1b065bd8..784a6cdef4 100644
--- a/test/core/end2end/tests/request_with_compressed_payload.c
+++ b/test/core/end2end/tests/request_with_compressed_payload.c
@@ -46,6 +46,7 @@
#include "test/core/end2end/cq_verifier.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/compress_filter.h"
+#include "src/core/surface/call.h"
enum { TIMEOUT = 200000 };
@@ -187,6 +188,14 @@ static void request_with_payload_template(
cq_expect_completion(cqv, tag(101), 1);
cq_verify(cqv);
+ GPR_ASSERT(GPR_BITCOUNT(grpc_call_get_encodings_accepted_by_peer(s)) == 3);
+ GPR_ASSERT(GPR_BITGET(grpc_call_get_encodings_accepted_by_peer(s),
+ GRPC_COMPRESS_NONE) != 0);
+ GPR_ASSERT(GPR_BITGET(grpc_call_get_encodings_accepted_by_peer(s),
+ GRPC_COMPRESS_DEFLATE) != 0);
+ GPR_ASSERT(GPR_BITGET(grpc_call_get_encodings_accepted_by_peer(s),
+ GRPC_COMPRESS_GZIP) != 0);
+
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;