aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters')
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c10
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c5
-rw-r--r--src/core/ext/filters/client_channel/client_channel_factory.c7
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c3
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c184
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c3
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.c9
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c37
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c7
-rw-r--r--src/core/ext/filters/client_channel/retry_throttle.c9
-rw-r--r--src/core/ext/filters/client_channel/subchannel.c2
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.c34
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.h7
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.c6
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.c2
18 files changed, 188 insertions, 153 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index 416bdefa08..f79d3bdb56 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -86,7 +86,7 @@ static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
grpc_cq_completion *ignored) {
- int delete = 0;
+ bool should_delete = false;
state_watcher *w = (state_watcher *)pw;
gpr_mu_lock(&w->mu);
switch (w->phase) {
@@ -94,12 +94,12 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
case READY_TO_CALL_BACK:
GPR_UNREACHABLE_CODE(return );
case CALLING_BACK_AND_FINISHED:
- delete = 1;
+ should_delete = true;
break;
}
gpr_mu_unlock(&w->mu);
- if (delete) {
+ if (should_delete) {
delete_state_watcher(exec_ctx, w);
}
}
@@ -161,12 +161,12 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw,
grpc_error *error) {
- partly_done(exec_ctx, pw, true, GRPC_ERROR_REF(error));
+ partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error));
}
static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
grpc_error *error) {
- partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error));
+ partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error));
}
int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) {
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 93ba8684ed..c1daaf28c9 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -85,7 +85,7 @@ static void method_parameters_unref(method_parameters *method_params) {
}
static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
- method_parameters_unref(value);
+ method_parameters_unref((method_parameters *)value);
}
static bool parse_wait_for_ready(grpc_json *field,
@@ -719,7 +719,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"client channel factory arg must be a pointer");
}
- grpc_client_channel_factory_ref(arg->value.pointer.p);
+ grpc_client_channel_factory_ref(
+ (grpc_client_channel_factory *)arg->value.pointer.p);
chand->client_channel_factory =
(grpc_client_channel_factory *)arg->value.pointer.p;
// Get server name to resolve, using proxy mapper if needed.
diff --git a/src/core/ext/filters/client_channel/client_channel_factory.c b/src/core/ext/filters/client_channel/client_channel_factory.c
index 7220a8639e..e8aa4cda29 100644
--- a/src/core/ext/filters/client_channel/client_channel_factory.c
+++ b/src/core/ext/filters/client_channel/client_channel_factory.c
@@ -43,14 +43,13 @@ grpc_channel* grpc_client_channel_factory_create_channel(
}
static void* factory_arg_copy(void* factory) {
- grpc_client_channel_factory_ref(factory);
+ grpc_client_channel_factory_ref((grpc_client_channel_factory*)factory);
return factory;
}
static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) {
- // TODO(roth): Remove local exec_ctx when
- // https://github.com/grpc/grpc/pull/8705 is merged.
- grpc_client_channel_factory_unref(exec_ctx, factory);
+ grpc_client_channel_factory_unref(exec_ctx,
+ (grpc_client_channel_factory*)factory);
}
static int factory_arg_cmp(void* factory1, void* factory2) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
index bd290464c8..7ad322902b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
@@ -75,7 +75,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->context != NULL);
GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL);
calld->client_stats = grpc_grpclb_client_stats_ref(
- args->context[GRPC_GRPCLB_CLIENT_STATS].value);
+ (grpc_grpclb_client_stats *)args->context[GRPC_GRPCLB_CLIENT_STATS]
+ .value);
// Record call started.
grpc_grpclb_client_stats_add_call_started(calld->client_stats);
return GRPC_ERROR_NONE;
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index 137642613c..cd2bd02bd2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -101,6 +101,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
@@ -137,7 +138,7 @@ static grpc_error *initial_metadata_add_lb_token(
}
static void destroy_client_stats(void *arg) {
- grpc_grpclb_client_stats_unref(arg);
+ grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats *)arg);
}
typedef struct wrapped_rr_closure_arg {
@@ -285,7 +286,7 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
* glb_lb_policy
*/
typedef struct rr_connectivity_data rr_connectivity_data;
-static const grpc_lb_policy_vtable glb_lb_policy_vtable;
+
typedef struct glb_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
@@ -727,7 +728,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
/* Allocate the data for the tracking of the new RR policy's connectivity.
* It'll be deallocated in glb_rr_connectivity_changed() */
rr_connectivity_data *rr_connectivity =
- gpr_zalloc(sizeof(rr_connectivity_data));
+ (rr_connectivity_data *)gpr_zalloc(sizeof(rr_connectivity_data));
GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
glb_rr_connectivity_changed_locked, rr_connectivity,
grpc_combiner_scheduler(glb_policy->base.combiner));
@@ -869,7 +870,8 @@ static grpc_channel_args *build_lb_channel_args(
grpc_lb_addresses *lb_addresses =
grpc_lb_addresses_create(num_grpclb_addrs, NULL);
grpc_slice_hash_table_entry *targets_info_entries =
- gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs);
+ (grpc_slice_hash_table_entry *)gpr_zalloc(sizeof(*targets_info_entries) *
+ num_grpclb_addrs);
size_t lb_addresses_idx = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
@@ -911,92 +913,6 @@ static grpc_channel_args *build_lb_channel_args(
return result;
}
-static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error);
-static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy_factory *factory,
- grpc_lb_policy_args *args) {
- /* Count the number of gRPC-LB addresses. There must be at least one.
- * TODO(roth): For now, we ignore non-balancer addresses, but in the
- * future, we may change the behavior such that we fall back to using
- * the non-balancer addresses if we cannot reach any balancers. In the
- * fallback case, we should use the LB policy indicated by
- * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
- * unset, we should default to pick_first). */
- const grpc_arg *arg =
- grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
- if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
- return NULL;
- }
- grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
- size_t num_grpclb_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
- }
- if (num_grpclb_addrs == 0) return NULL;
-
- glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy));
-
- /* Get server name. */
- arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
- GPR_ASSERT(arg != NULL);
- GPR_ASSERT(arg->type == GRPC_ARG_STRING);
- grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
- GPR_ASSERT(uri->path[0] != '\0');
- glb_policy->server_name =
- gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
- glb_policy->server_name);
- }
- grpc_uri_destroy(uri);
-
- glb_policy->cc_factory = args->client_channel_factory;
- GPR_ASSERT(glb_policy->cc_factory != NULL);
-
- arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
- glb_policy->lb_call_timeout_ms =
- grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
-
- // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
- // since we use this to trigger the client_load_reporting filter.
- grpc_arg new_arg =
- grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb");
- static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
- glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
- args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
-
- /* Create a client channel over them to communicate with a LB service */
- glb_policy->response_generator =
- grpc_fake_resolver_response_generator_create();
- grpc_channel_args *lb_channel_args = build_lb_channel_args(
- exec_ctx, addresses, glb_policy->response_generator, args->args);
- char *uri_str;
- gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
- glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
- exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
-
- /* Propagate initial resolution */
- grpc_fake_resolver_response_generator_set_response(
- exec_ctx, glb_policy->response_generator, lb_channel_args);
- grpc_channel_args_destroy(exec_ctx, lb_channel_args);
- gpr_free(uri_str);
- if (glb_policy->lb_channel == NULL) {
- gpr_free((void *)glb_policy->server_name);
- grpc_channel_args_destroy(exec_ctx, glb_policy->args);
- gpr_free(glb_policy);
- return NULL;
- }
- GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
- glb_lb_channel_on_connectivity_changed_cb, glb_policy,
- grpc_combiner_scheduler(args->combiner));
- grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
- grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
- "grpclb");
- return &glb_policy->base;
-}
-
static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
GPR_ASSERT(glb_policy->pending_picks == NULL);
@@ -1011,6 +927,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
+ grpc_subchannel_index_unref();
if (glb_policy->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
gpr_free(glb_policy->pending_update_args);
@@ -1302,7 +1219,8 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
grpc_grpclb_dropped_call_counts *drop_entries =
- request->client_stats.calls_finished_with_drop.arg;
+ (grpc_grpclb_dropped_call_counts *)
+ request->client_stats.calls_finished_with_drop.arg;
return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 &&
request->client_stats.num_calls_finished_with_client_failed_to_send ==
@@ -1859,6 +1777,90 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_notify_on_state_change_locked,
glb_update_locked};
+static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_factory *factory,
+ grpc_lb_policy_args *args) {
+ /* Count the number of gRPC-LB addresses. There must be at least one.
+ * TODO(roth): For now, we ignore non-balancer addresses, but in the
+ * future, we may change the behavior such that we fall back to using
+ * the non-balancer addresses if we cannot reach any balancers. In the
+ * fallback case, we should use the LB policy indicated by
+ * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
+ * unset, we should default to pick_first). */
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ return NULL;
+ }
+ grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ if (num_grpclb_addrs == 0) return NULL;
+
+ glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy));
+
+ /* Get server name. */
+ arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
+ GPR_ASSERT(arg != NULL);
+ GPR_ASSERT(arg->type == GRPC_ARG_STRING);
+ grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
+ GPR_ASSERT(uri->path[0] != '\0');
+ glb_policy->server_name =
+ gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
+ glb_policy->server_name);
+ }
+ grpc_uri_destroy(uri);
+
+ glb_policy->cc_factory = args->client_channel_factory;
+ GPR_ASSERT(glb_policy->cc_factory != NULL);
+
+ arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
+ glb_policy->lb_call_timeout_ms =
+ grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
+
+ // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
+ // since we use this to trigger the client_load_reporting filter.
+ grpc_arg new_arg =
+ grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb");
+ static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
+ glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
+ args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
+
+ /* Create a client channel over them to communicate with a LB service */
+ glb_policy->response_generator =
+ grpc_fake_resolver_response_generator_create();
+ grpc_channel_args *lb_channel_args = build_lb_channel_args(
+ exec_ctx, addresses, glb_policy->response_generator, args->args);
+ char *uri_str;
+ gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
+ glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
+ exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
+
+ /* Propagate initial resolution */
+ grpc_fake_resolver_response_generator_set_response(
+ exec_ctx, glb_policy->response_generator, lb_channel_args);
+ grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+ gpr_free(uri_str);
+ if (glb_policy->lb_channel == NULL) {
+ gpr_free((void *)glb_policy->server_name);
+ grpc_channel_args_destroy(exec_ctx, glb_policy->args);
+ gpr_free(glb_policy);
+ return NULL;
+ }
+ grpc_subchannel_index_ref();
+ GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
+ glb_lb_channel_on_connectivity_changed_cb, glb_policy,
+ grpc_combiner_scheduler(args->combiner));
+ grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
+ grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
+ "grpclb");
+ return &glb_policy->base;
+}
+
static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
index 393f6256bf..9c6fb94bf1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
@@ -148,7 +148,8 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) {
void grpc_grpclb_request_destroy(grpc_grpclb_request *request) {
if (request->has_client_stats) {
grpc_grpclb_dropped_call_counts *drop_entries =
- request->client_stats.calls_finished_with_drop.arg;
+ (grpc_grpclb_dropped_call_counts *)
+ request->client_stats.calls_finished_with_drop.arg;
grpc_grpclb_dropped_call_counts_destroy(drop_entries);
}
gpr_free(request);
@@ -170,7 +171,8 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse(
if (!res.has_initial_response) return NULL;
grpc_grpclb_initial_response *initial_res =
- gpr_malloc(sizeof(grpc_grpclb_initial_response));
+ (grpc_grpclb_initial_response *)gpr_malloc(
+ sizeof(grpc_grpclb_initial_response));
memcpy(initial_res, &res.initial_response,
sizeof(grpc_grpclb_initial_response));
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index fab3073eb9..d20cbb8388 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -89,6 +89,7 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
"picked_first_destroy");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ grpc_subchannel_index_unref();
if (p->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
gpr_free(p->pending_update_args);
@@ -330,8 +331,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
(void *)p, (unsigned long)addresses->num_addresses);
}
- grpc_subchannel_args *sc_args =
- gpr_zalloc(sizeof(*sc_args) * addresses->num_addresses);
+ grpc_subchannel_args *sc_args = (grpc_subchannel_args *)gpr_zalloc(
+ sizeof(*sc_args) * addresses->num_addresses);
/* We remove the following keys in order for subchannel keys belonging to
* subchannels point to the same address to match. */
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
@@ -403,7 +404,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
/* Create the subchannels for the new subchannel args/addresses. */
grpc_subchannel **new_subchannels =
- gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
+ (grpc_subchannel **)gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
size_t num_new_subchannels = 0;
for (size_t i = 0; i < sc_args_count; i++) {
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
@@ -686,6 +687,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}
pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
+ grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,
grpc_combiner_scheduler(args->combiner));
return &p->base;
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index be91d3d651..8ac1a46abd 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -30,6 +30,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -310,6 +311,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
(void *)pol, (void *)pol);
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ grpc_subchannel_index_unref();
gpr_free(p);
}
@@ -890,6 +892,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->client_channel_factory != NULL);
round_robin_lb_policy *p = (round_robin_lb_policy *)gpr_zalloc(sizeof(*p));
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
+ grpc_subchannel_index_ref();
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
rr_update_locked(exec_ctx, &p->base, args);
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c
index cdcaf17544..acf5929746 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.c
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.c
@@ -126,13 +126,14 @@ void grpc_lb_addresses_destroy(grpc_exec_ctx* exec_ctx,
}
static void* lb_addresses_copy(void* addresses) {
- return grpc_lb_addresses_copy(addresses);
+ return grpc_lb_addresses_copy((grpc_lb_addresses*)addresses);
}
static void lb_addresses_destroy(grpc_exec_ctx* exec_ctx, void* addresses) {
- grpc_lb_addresses_destroy(exec_ctx, addresses);
+ grpc_lb_addresses_destroy(exec_ctx, (grpc_lb_addresses*)addresses);
}
static int lb_addresses_cmp(void* addresses1, void* addresses2) {
- return grpc_lb_addresses_cmp(addresses1, addresses2);
+ return grpc_lb_addresses_cmp((grpc_lb_addresses*)addresses1,
+ (grpc_lb_addresses*)addresses2);
}
static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = {
lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp};
@@ -149,7 +150,7 @@ grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES);
if (lb_addresses_arg == NULL || lb_addresses_arg->type != GRPC_ARG_POINTER)
return NULL;
- return lb_addresses_arg->value.pointer.p;
+ return (grpc_lb_addresses*)lb_addresses_arg->value.pointer.p;
}
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
index c32b5c5a75..075c275e42 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
@@ -204,7 +204,7 @@ static char *choose_service_config(char *service_config_choice_json) {
int random_pct = rand() % 100;
int percentage;
if (sscanf(field->value, "%d", &percentage) != 1 ||
- random_pct > percentage) {
+ random_pct > percentage || percentage == 0) {
service_config_json = NULL;
break;
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
index 9747d39a16..7f1f57259a 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
@@ -38,7 +38,7 @@ typedef struct fd_node {
/** the owner of this fd node */
grpc_ares_ev_driver *ev_driver;
/** the grpc_fd owned by this fd node */
- grpc_fd *grpc_fd;
+ grpc_fd *fd;
/** a closure wrapping on_readable_cb, which should be invoked when the
grpc_fd in this node becomes readable. */
grpc_closure read_closure;
@@ -96,15 +96,15 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver *ev_driver) {
}
static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) {
- gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd));
+ gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd));
GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered);
gpr_mu_destroy(&fdn->mu);
- grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd);
+ grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->fd);
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */
- grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */,
+ grpc_fd_orphan(exec_ctx, fdn->fd, NULL, NULL, true /* already_closed */,
"c-ares query finished");
gpr_free(fdn);
}
@@ -150,9 +150,8 @@ void grpc_ares_ev_driver_shutdown(grpc_exec_ctx *exec_ctx,
ev_driver->shutting_down = true;
fd_node *fn = ev_driver->fds;
while (fn != NULL) {
- grpc_fd_shutdown(
- exec_ctx, fn->grpc_fd,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("grpc_ares_ev_driver_shutdown"));
+ grpc_fd_shutdown(exec_ctx, fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "grpc_ares_ev_driver_shutdown"));
fn = fn->next;
}
gpr_mu_unlock(&ev_driver->mu);
@@ -165,7 +164,7 @@ static fd_node *pop_fd_node(fd_node **head, int fd) {
dummy_head.next = *head;
fd_node *node = &dummy_head;
while (node->next != NULL) {
- if (grpc_fd_wrapped_fd(node->next->grpc_fd) == fd) {
+ if (grpc_fd_wrapped_fd(node->next->fd) == fd) {
fd_node *ret = node->next;
node->next = node->next->next;
*head = dummy_head.next;
@@ -184,9 +183,9 @@ static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg,
fdn->readable_registered = false;
gpr_mu_unlock(&fdn->mu);
- gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd));
+ gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->fd));
if (error == GRPC_ERROR_NONE) {
- ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->grpc_fd),
+ ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->fd),
ARES_SOCKET_BAD);
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
@@ -211,10 +210,10 @@ static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg,
fdn->writable_registered = false;
gpr_mu_unlock(&fdn->mu);
- gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd));
+ gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->fd));
if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD,
- grpc_fd_wrapped_fd(fdn->grpc_fd));
+ grpc_fd_wrapped_fd(fdn->fd));
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
@@ -253,7 +252,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i);
fdn = (fd_node *)gpr_malloc(sizeof(fd_node));
gpr_log(GPR_DEBUG, "new fd: %d", socks[i]);
- fdn->grpc_fd = grpc_fd_create(socks[i], fd_name);
+ fdn->fd = grpc_fd_create(socks[i], fd_name);
fdn->ev_driver = ev_driver;
fdn->readable_registered = false;
fdn->writable_registered = false;
@@ -262,8 +261,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn,
grpc_schedule_on_exec_ctx);
- grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set,
- fdn->grpc_fd);
+ grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->fd);
gpr_free(fd_name);
}
fdn->next = new_list;
@@ -274,9 +272,8 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
!fdn->readable_registered) {
grpc_ares_ev_driver_ref(ev_driver);
- gpr_log(GPR_DEBUG, "notify read on: %d",
- grpc_fd_wrapped_fd(fdn->grpc_fd));
- grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure);
+ gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->fd));
+ grpc_fd_notify_on_read(exec_ctx, fdn->fd, &fdn->read_closure);
fdn->readable_registered = true;
}
// Register write_closure if the socket is writable and write_closure
@@ -284,9 +281,9 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
!fdn->writable_registered) {
gpr_log(GPR_DEBUG, "notify write on: %d",
- grpc_fd_wrapped_fd(fdn->grpc_fd));
+ grpc_fd_wrapped_fd(fdn->fd));
grpc_ares_ev_driver_ref(ev_driver);
- grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure);
+ grpc_fd_notify_on_write(exec_ctx, fdn->fd, &fdn->write_closure);
fdn->writable_registered = true;
}
gpr_mu_unlock(&fdn->mu);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
index 2e2b411ab8..0ffb38518a 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
@@ -123,8 +123,8 @@ static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx,
static grpc_ares_hostbyname_request *create_hostbyname_request(
grpc_ares_request *parent_request, char *host, uint16_t port,
bool is_balancer) {
- grpc_ares_hostbyname_request *hr =
- gpr_zalloc(sizeof(grpc_ares_hostbyname_request));
+ grpc_ares_hostbyname_request *hr = (grpc_ares_hostbyname_request *)gpr_zalloc(
+ sizeof(grpc_ares_hostbyname_request));
hr->parent_request = parent_request;
hr->host = gpr_strdup(host);
hr->port = port;
@@ -527,7 +527,8 @@ static void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
grpc_resolve_address_ares_request *r =
- gpr_zalloc(sizeof(grpc_resolve_address_ares_request));
+ (grpc_resolve_address_ares_request *)gpr_zalloc(
+ sizeof(grpc_resolve_address_ares_request));
r->addrs_out = addrs;
r->on_resolve_address_done = on_done;
GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r,
diff --git a/src/core/ext/filters/client_channel/retry_throttle.c b/src/core/ext/filters/client_channel/retry_throttle.c
index 6cd6654b6f..09dcade089 100644
--- a/src/core/ext/filters/client_channel/retry_throttle.c
+++ b/src/core/ext/filters/client_channel/retry_throttle.c
@@ -99,7 +99,7 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create(
int max_milli_tokens, int milli_token_ratio,
grpc_server_retry_throttle_data* old_throttle_data) {
grpc_server_retry_throttle_data* throttle_data =
- gpr_malloc(sizeof(*throttle_data));
+ (grpc_server_retry_throttle_data*)gpr_malloc(sizeof(*throttle_data));
memset(throttle_data, 0, sizeof(*throttle_data));
gpr_ref_init(&throttle_data->refs, 1);
throttle_data->max_milli_tokens = max_milli_tokens;
@@ -131,11 +131,11 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create(
//
static void* copy_server_name(void* key, void* unused) {
- return gpr_strdup(key);
+ return gpr_strdup((const char*)key);
}
static long compare_server_name(void* key1, void* key2, void* unused) {
- return strcmp(key1, key2);
+ return strcmp((const char*)key1, (const char*)key2);
}
static void destroy_server_retry_throttle_data(void* value, void* unused) {
@@ -177,7 +177,8 @@ grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server(
const char* server_name, int max_milli_tokens, int milli_token_ratio) {
gpr_mu_lock(&g_mu);
grpc_server_retry_throttle_data* throttle_data =
- gpr_avl_get(g_avl, (char*)server_name, NULL);
+ (grpc_server_retry_throttle_data*)gpr_avl_get(g_avl, (char*)server_name,
+ NULL);
if (throttle_data == NULL) {
// Entry not found. Create a new one.
throttle_data = grpc_server_retry_throttle_data_create(
diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c
index 2c466f595a..9bd35f83e8 100644
--- a/src/core/ext/filters/client_channel/subchannel.c
+++ b/src/core/ext/filters/client_channel/subchannel.c
@@ -33,6 +33,7 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
@@ -290,6 +291,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
return c;
}
+ GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx);
c = (grpc_subchannel *)gpr_zalloc(sizeof(*c));
c->key = key;
gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c
index f57b631c41..d7a51f3899 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.c
+++ b/src/core/ext/filters/client_channel/subchannel_index.c
@@ -34,6 +34,8 @@ static gpr_avl g_subchannel_index;
static gpr_mu g_mu;
+static gpr_refcount g_refcount;
+
struct grpc_subchannel_key {
grpc_subchannel_args args;
};
@@ -88,24 +90,26 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
static void sck_avl_destroy(void *p, void *user_data) {
grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data;
- grpc_subchannel_key_destroy(exec_ctx, p);
+ grpc_subchannel_key_destroy(exec_ctx, (grpc_subchannel_key *)p);
}
static void *sck_avl_copy(void *p, void *unused) {
- return subchannel_key_copy(p);
+ return subchannel_key_copy((grpc_subchannel_key *)p);
}
static long sck_avl_compare(void *a, void *b, void *unused) {
- return grpc_subchannel_key_compare(a, b);
+ return grpc_subchannel_key_compare((grpc_subchannel_key *)a,
+ (grpc_subchannel_key *)b);
}
static void scv_avl_destroy(void *p, void *user_data) {
grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data;
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, (grpc_subchannel *)p,
+ "subchannel_index");
}
static void *scv_avl_copy(void *p, void *unused) {
- GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index");
+ GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel *)p, "subchannel_index");
return p;
}
@@ -119,15 +123,27 @@ static const gpr_avl_vtable subchannel_avl_vtable = {
void grpc_subchannel_index_init(void) {
g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable);
gpr_mu_init(&g_mu);
+ gpr_ref_init(&g_refcount, 1);
}
void grpc_subchannel_index_shutdown(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- gpr_mu_destroy(&g_mu);
- gpr_avl_unref(g_subchannel_index, &exec_ctx);
- grpc_exec_ctx_finish(&exec_ctx);
+ // TODO(juanlishen): This refcounting mechanism may lead to memory leackage.
+ // To solve that, we should force polling to flush any pending callbacks, then
+ // shutdown safely.
+ grpc_subchannel_index_unref();
+}
+
+void grpc_subchannel_index_unref(void) {
+ if (gpr_unref(&g_refcount)) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_mu_destroy(&g_mu);
+ gpr_avl_unref(g_subchannel_index, &exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
}
+void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); }
+
grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key) {
// Lock, and take a reference to the subchannel index.
diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h
index 98d882a453..92e36d5283 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.h
+++ b/src/core/ext/filters/client_channel/subchannel_index.h
@@ -59,6 +59,13 @@ void grpc_subchannel_index_init(void);
/** Shutdown the subchannel index (global) */
void grpc_subchannel_index_shutdown(void);
+/** Increment the refcount (non-zero) of subchannel index (global). */
+void grpc_subchannel_index_ref(void);
+
+/** Decrement the refcount of subchannel index (global). If the refcount drops
+ to zero, unref the subchannel index and destroy its mutex. */
+void grpc_subchannel_index_unref(void);
+
/** \em TEST ONLY.
* If \a force_creation is true, all key comparisons will be false, resulting in
* new subchannels always being created. Otherwise, the keys will be compared as
diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c
index 554a7f530d..03958136b4 100644
--- a/src/core/ext/filters/http/server/http_server_filter.c
+++ b/src/core/ext/filters/http/server/http_server_filter.c
@@ -83,12 +83,12 @@ static grpc_error *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx,
}
static void add_error(const char *error_name, grpc_error **cumulative,
- grpc_error *new) {
- if (new == GRPC_ERROR_NONE) return;
+ grpc_error *new_err) {
+ if (new_err == GRPC_ERROR_NONE) return;
if (*cumulative == GRPC_ERROR_NONE) {
*cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name);
}
- *cumulative = grpc_error_add_child(*cumulative, new);
+ *cumulative = grpc_error_add_child(*cumulative, new_err);
}
static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c
index 1580efa063..b7032f288d 100644
--- a/src/core/ext/filters/max_age/max_age_filter.c
+++ b/src/core/ext/filters/max_age/max_age_filter.c
@@ -393,7 +393,7 @@ static bool maybe_add_max_age_filter(grpc_exec_ctx* exec_ctx,
bool enable =
grpc_channel_arg_get_integer(
grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS),
- MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX &&
+ MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX ||
grpc_channel_arg_get_integer(
grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS),
MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX;