aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/census_filter.h2
-rw-r--r--src/core/channel/client_channel.c38
-rw-r--r--src/core/channel/client_channel.h7
-rw-r--r--src/core/channel/compress_filter.c57
-rw-r--r--src/core/channel/compress_filter.h2
-rw-r--r--src/core/channel/http_client_filter.h2
-rw-r--r--src/core/channel/http_server_filter.h2
-rw-r--r--src/core/channel/noop_filter.h2
8 files changed, 78 insertions, 34 deletions
diff --git a/src/core/channel/census_filter.h b/src/core/channel/census_filter.h
index 4f9759f0db..1453c05d28 100644
--- a/src/core/channel/census_filter.h
+++ b/src/core/channel/census_filter.h
@@ -41,4 +41,4 @@
extern const grpc_channel_filter grpc_client_census_filter;
extern const grpc_channel_filter grpc_server_census_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CENSUS_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CENSUS_FILTER_H */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 6c2e6b38a8..2e25033813 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -84,8 +84,10 @@ typedef struct {
grpc_pollset_set pollset_set;
} channel_data;
-/** We create one watcher for each new lb_policy that is returned from a resolver,
- to watch for state changes from the lb_policy. When a state change is seen, we
+/** We create one watcher for each new lb_policy that is returned from a
+ resolver,
+ to watch for state changes from the lb_policy. When a state change is seen,
+ we
update the channel, and create a new watcher */
typedef struct {
channel_data *chand;
@@ -380,7 +382,8 @@ static void perform_transport_stream_op(grpc_call_element *elem,
if (lb_policy) {
grpc_transport_stream_op *op = &calld->waiting_op;
grpc_pollset *bind_pollset = op->bind_pollset;
- grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata;
+ grpc_metadata_batch *initial_metadata =
+ &op->send_ops->ops[0].data.metadata;
GRPC_LB_POLICY_REF(lb_policy, "pick");
gpr_mu_unlock(&chand->mu_config);
calld->state = CALL_WAITING_FOR_PICK;
@@ -388,13 +391,14 @@ static void perform_transport_stream_op(grpc_call_element *elem,
GPR_ASSERT(op->bind_pollset);
GPR_ASSERT(op->send_ops);
GPR_ASSERT(op->send_ops->nops >= 1);
- GPR_ASSERT(
- op->send_ops->ops[0].type == GRPC_OP_METADATA);
+ GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
+ grpc_iomgr_closure_init(&calld->async_setup_task, picked_target,
+ calld);
grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
- &calld->picked_channel, &calld->async_setup_task);
+ &calld->picked_channel,
+ &calld->async_setup_task);
GRPC_LB_POLICY_UNREF(lb_policy, "pick");
} else if (chand->resolver != NULL) {
@@ -430,7 +434,8 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
perform_transport_stream_op(elem, op, 0);
}
-static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state);
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state);
static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
@@ -450,7 +455,8 @@ static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
gpr_free(w);
}
-static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) {
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
@@ -499,13 +505,13 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (iomgr_success && chand->resolver) {
grpc_resolver *resolver = chand->resolver;
GRPC_RESOLVER_REF(resolver, "channel-next");
+ grpc_connectivity_state_set(&chand->state_tracker, state,
+ "new_lb+resolver");
gpr_mu_unlock(&chand->mu_config);
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(resolver, "channel-next");
- grpc_connectivity_state_set(&chand->state_tracker, state,
- "new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(chand, lb_policy, state);
}
@@ -663,7 +669,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
chand);
- grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel");
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
+ "client_channel");
}
/* Destructor for channel_data */
@@ -747,19 +754,20 @@ void grpc_client_channel_watch_connectivity_state(
gpr_mu_unlock(&chand->mu_config);
}
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem) {
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
+ grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
return &chand->pollset_set;
}
void grpc_client_channel_add_interested_party(grpc_channel_element *elem,
- grpc_pollset *pollset) {
+ grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
grpc_pollset_set_add_pollset(&chand->pollset_set, pollset);
}
void grpc_client_channel_del_interested_party(grpc_channel_element *elem,
- grpc_pollset *pollset) {
+ grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
grpc_pollset_set_del_pollset(&chand->pollset_set, pollset);
}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index cd81294eb3..13681e3956 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -59,11 +59,12 @@ void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
grpc_iomgr_closure *on_complete);
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem);
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
+ grpc_channel_element *elem);
void grpc_client_channel_add_interested_party(grpc_channel_element *channel,
- grpc_pollset *pollset);
+ grpc_pollset *pollset);
void grpc_client_channel_del_interested_party(grpc_channel_element *channel,
- grpc_pollset *pollset);
+ grpc_pollset *pollset);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 8963c13b0f..762a4edc73 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -35,22 +35,25 @@
#include <string.h>
#include <grpc/compression.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include "src/core/channel/compress_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/compression/message_compress.h"
+#include "src/core/support/string.h"
typedef struct call_data {
gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_linked_mdelem compression_algorithm_storage;
+ grpc_linked_mdelem accept_encoding_storage;
int remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */
int written_initial_metadata; /**< Already processed initial md? */
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
- /** If true, contents of \a compression_algorithm are authoritative */
+ /** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm;
} call_data;
@@ -59,8 +62,12 @@ typedef struct channel_data {
grpc_mdstr *mdstr_request_compression_algorithm_key;
/** Metadata key for the outgoing (used) compression algorithm */
grpc_mdstr *mdstr_outgoing_compression_algorithm_key;
+ /** Metadata key for the accepted encodings */
+ grpc_mdstr *mdstr_compression_capabilities_key;
/** Precomputed metadata elements for all available compression algorithms */
grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
+ /** Precomputed metadata elements for the accepted encodings */
+ grpc_mdelem *mdelem_accept_encoding;
/** The default, channel-level, compression algorithm */
grpc_compression_algorithm default_compression_algorithm;
} channel_data;
@@ -71,7 +78,7 @@ typedef struct channel_data {
*
* Returns 1 if the data was actually compress and 0 otherwise. */
static int compress_send_sb(grpc_compression_algorithm algorithm,
- gpr_slice_buffer *slices) {
+ gpr_slice_buffer *slices) {
int did_compress;
gpr_slice_buffer tmp;
gpr_slice_buffer_init(&tmp);
@@ -86,14 +93,14 @@ static int compress_send_sb(grpc_compression_algorithm algorithm,
/** For each \a md element from the incoming metadata, filter out the entry for
* "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */
-static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
if (md->key == channeld->mdstr_request_compression_algorithm_key) {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
- if (!grpc_compression_algorithm_parse(md_c_str,
+ if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
&calld->compression_algorithm)) {
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.",
md_c_str);
@@ -108,10 +115,10 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
static int skip_compression(channel_data *channeld, call_data *calld) {
if (calld->has_compression_algorithm) {
- if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
- return 1;
- }
- return 0; /* we have an actual call-specific algorithm */
+ if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
+ return 1;
+ }
+ return 0; /* we have an actual call-specific algorithm */
}
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
@@ -184,7 +191,7 @@ static void process_send_ops(grpc_call_element *elem,
* given by GRPC_OP_BEGIN_MESSAGE) */
calld->remaining_slice_bytes = sop->data.begin_message.length;
if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
calld->compression_algorithm = GRPC_COMPRESS_NONE;
}
break;
@@ -202,10 +209,17 @@ static void process_send_ops(grpc_call_element *elem,
channeld->default_compression_algorithm;
calld->has_compression_algorithm = 1; /* GPR_TRUE */
}
+ /* hint compression algorithm */
grpc_metadata_batch_add_tail(
&(sop->data.metadata), &calld->compression_algorithm_storage,
GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms
[calld->compression_algorithm]));
+
+ /* convey supported compression algorithms */
+ grpc_metadata_batch_add_tail(
+ &(sop->data.metadata), &calld->accept_encoding_storage,
+ GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
+
calld->written_initial_metadata = 1; /* GPR_TRUE */
}
break;
@@ -279,6 +293,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
int is_first, int is_last) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
+ const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
+ char *accept_encoding_str;
+ size_t accept_encoding_str_len;
channeld->default_compression_algorithm =
grpc_channel_args_get_compression_algorithm(args);
@@ -289,6 +306,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channeld->mdstr_outgoing_compression_algorithm_key =
grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
+ channeld->mdstr_compression_capabilities_key =
+ grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0);
+
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
char *algorithm_name;
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
@@ -297,8 +317,22 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
mdctx,
GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorithm_name, 0));
+ if (algo_idx > 0) {
+ supported_algorithms_names[algo_idx - 1] = algorithm_name;
+ }
}
+ /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
+ * arrays, as to avoid the heap allocs */
+ accept_encoding_str = gpr_strjoin_sep(
+ supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names),
+ ", ", &accept_encoding_str_len);
+
+ channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
+ mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
+ grpc_mdstr_from_string(mdctx, accept_encoding_str, 0));
+ gpr_free(accept_encoding_str);
+
GPR_ASSERT(!is_last);
}
@@ -309,10 +343,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key);
GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key);
- for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
- ++algo_idx) {
+ GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key);
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]);
}
+ GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding);
}
const grpc_channel_filter grpc_compress_filter = {
diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h
index 0694e2c1dd..0917e81ca4 100644
--- a/src/core/channel/compress_filter.h
+++ b/src/core/channel/compress_filter.h
@@ -62,4 +62,4 @@
extern const grpc_channel_filter grpc_compress_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */
diff --git a/src/core/channel/http_client_filter.h b/src/core/channel/http_client_filter.h
index 04eb839e00..21c66b9b8e 100644
--- a/src/core/channel/http_client_filter.h
+++ b/src/core/channel/http_client_filter.h
@@ -41,4 +41,4 @@ extern const grpc_channel_filter grpc_http_client_filter;
#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme"
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_CLIENT_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_CLIENT_FILTER_H */
diff --git a/src/core/channel/http_server_filter.h b/src/core/channel/http_server_filter.h
index 42f76ed17f..f219d4e66f 100644
--- a/src/core/channel/http_server_filter.h
+++ b/src/core/channel/http_server_filter.h
@@ -39,4 +39,4 @@
/* Processes metadata on the client side for HTTP2 transports */
extern const grpc_channel_filter grpc_http_server_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_SERVER_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_SERVER_FILTER_H */
diff --git a/src/core/channel/noop_filter.h b/src/core/channel/noop_filter.h
index 96463e5322..ded9b33117 100644
--- a/src/core/channel/noop_filter.h
+++ b/src/core/channel/noop_filter.h
@@ -41,4 +41,4 @@
customize for their own filters */
extern const grpc_channel_filter grpc_no_op_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H */