aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/client_channel/client_channel.c100
-rw-r--r--src/core/ext/client_channel/subchannel.c2
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c9
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c2
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c2
5 files changed, 70 insertions, 45 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 2595acd8c4..58504de87d 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -164,10 +164,7 @@ typedef struct client_channel_channel_data {
/** combiner protecting all variables below in this data structure */
grpc_combiner *combiner;
/** currently active load balancer */
- char *lb_policy_name;
grpc_lb_policy *lb_policy;
- /** service config in JSON form */
- char *service_config_json;
/** maps method names to method_parameters structs */
grpc_slice_hash_table *method_params_table;
/** incoming resolver result - set by resolver.next() */
@@ -184,6 +181,13 @@ typedef struct client_channel_channel_data {
grpc_channel_stack *owning_stack;
/** interested parties (owned) */
grpc_pollset_set *interested_parties;
+
+ /* the following properties are guarded by a mutex since API's require them
+ to be instantaniously available */
+ gpr_mu info_mu;
+ char *info_lb_policy_name;
+ /** service config in JSON form */
+ char *info_service_config_json;
} channel_data;
/** We create one watcher for each new lb_policy that is returned from a
@@ -345,16 +349,18 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
chand->interested_parties);
}
+ gpr_mu_lock(&chand->info_mu);
if (lb_policy_name != NULL) {
- gpr_free(chand->lb_policy_name);
- chand->lb_policy_name = lb_policy_name;
+ gpr_free(chand->info_lb_policy_name);
+ chand->info_lb_policy_name = lb_policy_name;
}
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (service_config_json != NULL) {
- gpr_free(chand->service_config_json);
- chand->service_config_json = service_config_json;
+ gpr_free(chand->info_service_config_json);
+ chand->info_service_config_json = service_config_json;
}
+ gpr_mu_unlock(&chand->info_mu);
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
@@ -491,18 +497,19 @@ static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
const grpc_channel_info *info) {
channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->mu);
+ gpr_mu_lock(&chand->info_mu);
if (info->lb_policy_name != NULL) {
- *info->lb_policy_name = chand->lb_policy_name == NULL
+ *info->lb_policy_name = chand->info_lb_policy_name == NULL
? NULL
- : gpr_strdup(chand->lb_policy_name);
+ : gpr_strdup(chand->info_lb_policy_name);
}
if (info->service_config_json != NULL) {
- *info->service_config_json = chand->service_config_json == NULL
- ? NULL
- : gpr_strdup(chand->service_config_json);
+ *info->service_config_json =
+ chand->info_service_config_json == NULL
+ ? NULL
+ : gpr_strdup(chand->info_service_config_json);
}
- gpr_mu_unlock(&chand->mu);
+ gpr_mu_unlock(&chand->info_mu);
}
/* Constructor for channel_data */
@@ -567,8 +574,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
- gpr_free(chand->lb_policy_name);
- gpr_free(chand->service_config_json);
+ gpr_free(chand->info_lb_policy_name);
+ gpr_free(chand->info_service_config_json);
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
@@ -1181,26 +1188,34 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
+static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error_ignored) {
+ channel_data *chand = arg;
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
+ } else {
+ chand->exit_idle_when_lb_policy_arrives = true;
+ if (!chand->started_resolving && chand->resolver != NULL) {
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ chand->started_resolving = true;
+ grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
+ &chand->on_resolver_result_changed);
+ }
+ }
+}
+
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
- gpr_mu_lock(&chand->mu);
- out = grpc_connectivity_state_check(&chand->state_tracker, NULL);
+ out = grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
- if (chand->lb_policy != NULL) {
- grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
- } else {
- chand->exit_idle_when_lb_policy_arrives = true;
- if (!chand->started_resolving && chand->resolver != NULL) {
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- chand->started_resolving = true;
- grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
- &chand->on_resolver_result_changed);
- }
- }
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_create(try_to_connect_locked, chand,
+ grpc_combiner_scheduler(chand->combiner, false)),
+ GRPC_ERROR_NONE);
}
- gpr_mu_unlock(&chand->mu);
return out;
}
@@ -1208,6 +1223,7 @@ typedef struct {
channel_data *chand;
grpc_pollset *pollset;
grpc_closure *on_complete;
+ grpc_connectivity_state *state;
grpc_closure my_closure;
} external_connectivity_watcher;
@@ -1220,7 +1236,17 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
gpr_free(w);
- follow_up->cb(exec_ctx, follow_up->cb_arg, error);
+ grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
+}
+
+static void cc_watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error_ignored) {
+ external_connectivity_watcher *w = arg;
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
+ grpc_schedule_on_exec_ctx);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
}
void grpc_client_channel_watch_connectivity_state(
@@ -1231,13 +1257,13 @@ void grpc_client_channel_watch_connectivity_state(
w->chand = chand;
w->pollset = pollset;
w->on_complete = on_complete;
+ w->state = state;
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
- grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
- grpc_schedule_on_exec_ctx);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
- gpr_mu_lock(&chand->mu);
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &chand->state_tracker, state, &w->my_closure);
- gpr_mu_unlock(&chand->mu);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(&w->my_closure, cc_watch_connectivity_state_locked, w,
+ grpc_combiner_scheduler(chand->combiner, true)),
+ GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index aa036e883b..c37134fd5e 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -419,7 +419,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
grpc_error **error) {
grpc_connectivity_state state;
gpr_mu_lock(&c->mu);
- state = grpc_connectivity_state_check(&c->state_tracker, error);
+ state = grpc_connectivity_state_get(&c->state_tracker, error);
gpr_mu_unlock(&c->mu);
return state;
}
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index ab62e5ed6a..8a2af48328 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -492,9 +492,8 @@ static grpc_lb_addresses *process_serverlist_locked(
static bool update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) {
- grpc_error *curr_state_error;
- const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(
- &glb_policy->state_tracker, &curr_state_error);
+ const grpc_connectivity_state curr_glb_state =
+ grpc_connectivity_state_check(&glb_policy->state_tracker);
/* The new connectivity status is a function of the previous one and the new
* input coming from the status of the RR policy.
@@ -1098,8 +1097,8 @@ static grpc_connectivity_state glb_check_connectivity(
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&glb_policy->mu);
- st = grpc_connectivity_state_check(&glb_policy->state_tracker,
- connectivity_error);
+ st = grpc_connectivity_state_get(&glb_policy->state_tracker,
+ connectivity_error);
gpr_mu_unlock(&glb_policy->mu);
return st;
}
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9f2aa461be..1b965183f6 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -398,7 +398,7 @@ static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker, error);
+ st = grpc_connectivity_state_get(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 3e060d189a..63e3d033ad 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -655,7 +655,7 @@ static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker, error);
+ st = grpc_connectivity_state_get(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
}