aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-08-18 16:11:09 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-08-18 16:11:09 -0700
commit7fc37a23856604259a0b048e0df00ef3065a58d8 (patch)
tree9e3b7622f474d15a43b753912582234b81588e32 /src/core/channel
parentbeac88ca56f4710e86668f2cbbd80e02e0607f9c (diff)
parent7ff63f6abad6c49a7b88b036c54075a03433457c (diff)
Merge branch 'master' of github.com:grpc/grpc into compression-accept-encoding
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/census_filter.h2
-rw-r--r--src/core/channel/client_channel.c35
-rw-r--r--src/core/channel/client_channel.h7
-rw-r--r--src/core/channel/compress_filter.c41
-rw-r--r--src/core/channel/compress_filter.h2
-rw-r--r--src/core/channel/http_client_filter.h2
-rw-r--r--src/core/channel/http_server_filter.h2
-rw-r--r--src/core/channel/noop_filter.h2
8 files changed, 49 insertions, 44 deletions
diff --git a/src/core/channel/census_filter.h b/src/core/channel/census_filter.h
index 4f9759f0db..1453c05d28 100644
--- a/src/core/channel/census_filter.h
+++ b/src/core/channel/census_filter.h
@@ -41,4 +41,4 @@
extern const grpc_channel_filter grpc_client_census_filter;
extern const grpc_channel_filter grpc_server_census_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CENSUS_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CENSUS_FILTER_H */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index a293c93ec6..a73458821e 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -84,8 +84,10 @@ typedef struct {
grpc_pollset_set pollset_set;
} channel_data;
-/** We create one watcher for each new lb_policy that is returned from a resolver,
- to watch for state changes from the lb_policy. When a state change is seen, we
+/** We create one watcher for each new lb_policy that is returned from a
+ resolver,
+ to watch for state changes from the lb_policy. When a state change is seen,
+ we
update the channel, and create a new watcher */
typedef struct {
channel_data *chand;
@@ -380,7 +382,8 @@ static void perform_transport_stream_op(grpc_call_element *elem,
if (lb_policy) {
grpc_transport_stream_op *op = &calld->waiting_op;
grpc_pollset *bind_pollset = op->bind_pollset;
- grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata;
+ grpc_metadata_batch *initial_metadata =
+ &op->send_ops->ops[0].data.metadata;
GRPC_LB_POLICY_REF(lb_policy, "pick");
gpr_mu_unlock(&chand->mu_config);
calld->state = CALL_WAITING_FOR_PICK;
@@ -388,13 +391,14 @@ static void perform_transport_stream_op(grpc_call_element *elem,
GPR_ASSERT(op->bind_pollset);
GPR_ASSERT(op->send_ops);
GPR_ASSERT(op->send_ops->nops >= 1);
- GPR_ASSERT(
- op->send_ops->ops[0].type == GRPC_OP_METADATA);
+ GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
+ grpc_iomgr_closure_init(&calld->async_setup_task, picked_target,
+ calld);
grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
- &calld->picked_channel, &calld->async_setup_task);
+ &calld->picked_channel,
+ &calld->async_setup_task);
GRPC_LB_POLICY_UNREF(lb_policy, "pick");
} else if (chand->resolver != NULL) {
@@ -430,7 +434,8 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
perform_transport_stream_op(elem, op, 0);
}
-static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state);
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state);
static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
@@ -450,7 +455,8 @@ static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
gpr_free(w);
}
-static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) {
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
@@ -527,6 +533,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
if (old_lb_policy != NULL) {
+ grpc_lb_policy_shutdown(old_lb_policy);
GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
}
@@ -662,7 +669,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
chand);
- grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel");
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
+ "client_channel");
}
/* Destructor for channel_data */
@@ -746,19 +754,20 @@ void grpc_client_channel_watch_connectivity_state(
gpr_mu_unlock(&chand->mu_config);
}
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem) {
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
+ grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
return &chand->pollset_set;
}
void grpc_client_channel_add_interested_party(grpc_channel_element *elem,
- grpc_pollset *pollset) {
+ grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
grpc_pollset_set_add_pollset(&chand->pollset_set, pollset);
}
void grpc_client_channel_del_interested_party(grpc_channel_element *elem,
- grpc_pollset *pollset) {
+ grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
grpc_pollset_set_del_pollset(&chand->pollset_set, pollset);
}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index cd81294eb3..13681e3956 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -59,11 +59,12 @@ void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
grpc_iomgr_closure *on_complete);
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem);
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
+ grpc_channel_element *elem);
void grpc_client_channel_add_interested_party(grpc_channel_element *channel,
- grpc_pollset *pollset);
+ grpc_pollset *pollset);
void grpc_client_channel_del_interested_party(grpc_channel_element *channel,
- grpc_pollset *pollset);
+ grpc_pollset *pollset);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 065fe258dc..68060f9da3 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -53,7 +53,7 @@ typedef struct call_data {
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
- /** If true, contents of \a compression_algorithm are authoritative */
+ /** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm;
} call_data;
@@ -80,7 +80,7 @@ typedef struct channel_data {
*
* Returns 1 if the data was actually compress and 0 otherwise. */
static int compress_send_sb(grpc_compression_algorithm algorithm,
- gpr_slice_buffer *slices) {
+ gpr_slice_buffer *slices) {
int did_compress;
gpr_slice_buffer tmp;
gpr_slice_buffer_init(&tmp);
@@ -95,7 +95,7 @@ static int compress_send_sb(grpc_compression_algorithm algorithm,
/** For each \a md element from the incoming metadata, filter out the entry for
* "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */
-static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -127,10 +127,10 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
static int skip_compression(channel_data *channeld, call_data *calld) {
if (calld->has_compression_algorithm) {
- if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
- return 1;
- }
- return 0; /* we have an actual call-specific algorithm */
+ if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
+ return 1;
+ }
+ return 0; /* we have an actual call-specific algorithm */
}
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
@@ -203,7 +203,7 @@ static void process_send_ops(grpc_call_element *elem,
* given by GRPC_OP_BEGIN_MESSAGE) */
calld->remaining_slice_bytes = sop->data.begin_message.length;
if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
calld->compression_algorithm = GRPC_COMPRESS_NONE;
}
break;
@@ -305,7 +305,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
int is_first, int is_last) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
- const char* supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT-1];
+ const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
char *accept_encoding_str;
size_t accept_encoding_str_len;
@@ -344,23 +344,19 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorithm_name, 0));
if (algo_idx > 0) {
- supported_algorithms_names[algo_idx-1] = algorithm_name;
+ supported_algorithms_names[algo_idx - 1] = algorithm_name;
}
}
/* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
* arrays, as to avoid the heap allocs */
- accept_encoding_str =
- gpr_strjoin_sep(supported_algorithms_names,
- GPR_ARRAY_SIZE(supported_algorithms_names),
- ", ",
- &accept_encoding_str_len);
-
- channeld->mdelem_accept_encoding =
- grpc_mdelem_from_metadata_strings(
- mdctx,
- GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
- grpc_mdstr_from_string(mdctx, accept_encoding_str, 0));
+ accept_encoding_str = gpr_strjoin_sep(
+ supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names),
+ ", ", &accept_encoding_str_len);
+
+ channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
+ mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
+ grpc_mdstr_from_string(mdctx, accept_encoding_str, 0));
gpr_free(accept_encoding_str);
GPR_ASSERT(!is_last);
@@ -374,8 +370,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key);
GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key);
GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key);
- for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
- ++algo_idx) {
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]);
}
GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding);
diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h
index 0694e2c1dd..0917e81ca4 100644
--- a/src/core/channel/compress_filter.h
+++ b/src/core/channel/compress_filter.h
@@ -62,4 +62,4 @@
extern const grpc_channel_filter grpc_compress_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */
diff --git a/src/core/channel/http_client_filter.h b/src/core/channel/http_client_filter.h
index 04eb839e00..21c66b9b8e 100644
--- a/src/core/channel/http_client_filter.h
+++ b/src/core/channel/http_client_filter.h
@@ -41,4 +41,4 @@ extern const grpc_channel_filter grpc_http_client_filter;
#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme"
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_CLIENT_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_CLIENT_FILTER_H */
diff --git a/src/core/channel/http_server_filter.h b/src/core/channel/http_server_filter.h
index 42f76ed17f..f219d4e66f 100644
--- a/src/core/channel/http_server_filter.h
+++ b/src/core/channel/http_server_filter.h
@@ -39,4 +39,4 @@
/* Processes metadata on the client side for HTTP2 transports */
extern const grpc_channel_filter grpc_http_server_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_SERVER_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_SERVER_FILTER_H */
diff --git a/src/core/channel/noop_filter.h b/src/core/channel/noop_filter.h
index 96463e5322..ded9b33117 100644
--- a/src/core/channel/noop_filter.h
+++ b/src/core/channel/noop_filter.h
@@ -41,4 +41,4 @@
customize for their own filters */
extern const grpc_channel_filter grpc_no_op_filter;
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H */