aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-02-09 12:00:43 -0800
committerGravatar Craig Tiller <ctiller@google.com>2017-02-09 12:00:43 -0800
commit613dafa60ce3a22a5d7f1351b8054a3090b9deb1 (patch)
tree54ad04d0c7879ca0c8a50a851c49275f07acbfa7 /src/core
parentbefafe64f9a010920c492956ee8df1257a6d4e77 (diff)
Convert connectivity_state, channel info into a combiner-compatible form
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_channel/client_channel.c100
-rw-r--r--src/core/ext/client_channel/subchannel.c2
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c9
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c2
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c2
-rw-r--r--src/core/lib/transport/connectivity_state.c42
-rw-r--r--src/core/lib/transport/connectivity_state.h20
7 files changed, 115 insertions, 62 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 2595acd8c4..58504de87d 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -164,10 +164,7 @@ typedef struct client_channel_channel_data {
/** combiner protecting all variables below in this data structure */
grpc_combiner *combiner;
/** currently active load balancer */
- char *lb_policy_name;
grpc_lb_policy *lb_policy;
- /** service config in JSON form */
- char *service_config_json;
/** maps method names to method_parameters structs */
grpc_slice_hash_table *method_params_table;
/** incoming resolver result - set by resolver.next() */
@@ -184,6 +181,13 @@ typedef struct client_channel_channel_data {
grpc_channel_stack *owning_stack;
/** interested parties (owned) */
grpc_pollset_set *interested_parties;
+
+ /* the following properties are guarded by a mutex since API's require them
+ to be instantaniously available */
+ gpr_mu info_mu;
+ char *info_lb_policy_name;
+ /** service config in JSON form */
+ char *info_service_config_json;
} channel_data;
/** We create one watcher for each new lb_policy that is returned from a
@@ -345,16 +349,18 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
chand->interested_parties);
}
+ gpr_mu_lock(&chand->info_mu);
if (lb_policy_name != NULL) {
- gpr_free(chand->lb_policy_name);
- chand->lb_policy_name = lb_policy_name;
+ gpr_free(chand->info_lb_policy_name);
+ chand->info_lb_policy_name = lb_policy_name;
}
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (service_config_json != NULL) {
- gpr_free(chand->service_config_json);
- chand->service_config_json = service_config_json;
+ gpr_free(chand->info_service_config_json);
+ chand->info_service_config_json = service_config_json;
}
+ gpr_mu_unlock(&chand->info_mu);
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
@@ -491,18 +497,19 @@ static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
const grpc_channel_info *info) {
channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->mu);
+ gpr_mu_lock(&chand->info_mu);
if (info->lb_policy_name != NULL) {
- *info->lb_policy_name = chand->lb_policy_name == NULL
+ *info->lb_policy_name = chand->info_lb_policy_name == NULL
? NULL
- : gpr_strdup(chand->lb_policy_name);
+ : gpr_strdup(chand->info_lb_policy_name);
}
if (info->service_config_json != NULL) {
- *info->service_config_json = chand->service_config_json == NULL
- ? NULL
- : gpr_strdup(chand->service_config_json);
+ *info->service_config_json =
+ chand->info_service_config_json == NULL
+ ? NULL
+ : gpr_strdup(chand->info_service_config_json);
}
- gpr_mu_unlock(&chand->mu);
+ gpr_mu_unlock(&chand->info_mu);
}
/* Constructor for channel_data */
@@ -567,8 +574,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
- gpr_free(chand->lb_policy_name);
- gpr_free(chand->service_config_json);
+ gpr_free(chand->info_lb_policy_name);
+ gpr_free(chand->info_service_config_json);
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
@@ -1181,26 +1188,34 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
+static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error_ignored) {
+ channel_data *chand = arg;
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
+ } else {
+ chand->exit_idle_when_lb_policy_arrives = true;
+ if (!chand->started_resolving && chand->resolver != NULL) {
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ chand->started_resolving = true;
+ grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
+ &chand->on_resolver_result_changed);
+ }
+ }
+}
+
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
- gpr_mu_lock(&chand->mu);
- out = grpc_connectivity_state_check(&chand->state_tracker, NULL);
+ out = grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
- if (chand->lb_policy != NULL) {
- grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
- } else {
- chand->exit_idle_when_lb_policy_arrives = true;
- if (!chand->started_resolving && chand->resolver != NULL) {
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- chand->started_resolving = true;
- grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
- &chand->on_resolver_result_changed);
- }
- }
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_create(try_to_connect_locked, chand,
+ grpc_combiner_scheduler(chand->combiner, false)),
+ GRPC_ERROR_NONE);
}
- gpr_mu_unlock(&chand->mu);
return out;
}
@@ -1208,6 +1223,7 @@ typedef struct {
channel_data *chand;
grpc_pollset *pollset;
grpc_closure *on_complete;
+ grpc_connectivity_state *state;
grpc_closure my_closure;
} external_connectivity_watcher;
@@ -1220,7 +1236,17 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
gpr_free(w);
- follow_up->cb(exec_ctx, follow_up->cb_arg, error);
+ grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
+}
+
+static void cc_watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error_ignored) {
+ external_connectivity_watcher *w = arg;
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
+ grpc_schedule_on_exec_ctx);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
}
void grpc_client_channel_watch_connectivity_state(
@@ -1231,13 +1257,13 @@ void grpc_client_channel_watch_connectivity_state(
w->chand = chand;
w->pollset = pollset;
w->on_complete = on_complete;
+ w->state = state;
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
- grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
- grpc_schedule_on_exec_ctx);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
- gpr_mu_lock(&chand->mu);
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &chand->state_tracker, state, &w->my_closure);
- gpr_mu_unlock(&chand->mu);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(&w->my_closure, cc_watch_connectivity_state_locked, w,
+ grpc_combiner_scheduler(chand->combiner, true)),
+ GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index aa036e883b..c37134fd5e 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -419,7 +419,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
grpc_error **error) {
grpc_connectivity_state state;
gpr_mu_lock(&c->mu);
- state = grpc_connectivity_state_check(&c->state_tracker, error);
+ state = grpc_connectivity_state_get(&c->state_tracker, error);
gpr_mu_unlock(&c->mu);
return state;
}
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index ab62e5ed6a..8a2af48328 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -492,9 +492,8 @@ static grpc_lb_addresses *process_serverlist_locked(
static bool update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) {
- grpc_error *curr_state_error;
- const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(
- &glb_policy->state_tracker, &curr_state_error);
+ const grpc_connectivity_state curr_glb_state =
+ grpc_connectivity_state_check(&glb_policy->state_tracker);
/* The new connectivity status is a function of the previous one and the new
* input coming from the status of the RR policy.
@@ -1098,8 +1097,8 @@ static grpc_connectivity_state glb_check_connectivity(
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&glb_policy->mu);
- st = grpc_connectivity_state_check(&glb_policy->state_tracker,
- connectivity_error);
+ st = grpc_connectivity_state_get(&glb_policy->state_tracker,
+ connectivity_error);
gpr_mu_unlock(&glb_policy->mu);
return st;
}
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9f2aa461be..1b965183f6 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -398,7 +398,7 @@ static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker, error);
+ st = grpc_connectivity_state_get(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 3e060d189a..63e3d033ad 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -655,7 +655,7 @@ static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker, error);
+ st = grpc_connectivity_state_get(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
}
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 8fc5bf3e9a..e884567240 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -62,7 +62,7 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state init_state,
const char *name) {
- tracker->current_state = init_state;
+ gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state);
tracker->current_error = GRPC_ERROR_NONE;
tracker->watchers = NULL;
tracker->name = gpr_strdup(name);
@@ -89,15 +89,29 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
}
grpc_connectivity_state grpc_connectivity_state_check(
+ grpc_connectivity_state_tracker *tracker) {
+ grpc_connectivity_state cur =
+ (grpc_connectivity_state)gpr_atm_no_barrier_load(
+ &tracker->current_state_atm);
+ if (grpc_connectivity_state_trace) {
+ gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name,
+ grpc_connectivity_state_name(cur));
+ }
+ return cur;
+}
+
+grpc_connectivity_state grpc_connectivity_state_get(
grpc_connectivity_state_tracker *tracker, grpc_error **error) {
+ grpc_connectivity_state cur =(grpc_connectivity_state)
+ gpr_atm_no_barrier_load(&tracker->current_state_atm);
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name,
- grpc_connectivity_state_name(tracker->current_state));
+ grpc_connectivity_state_name(cur));
}
if (error != NULL) {
*error = GRPC_ERROR_REF(tracker->current_error);
}
- return tracker->current_state;
+ return cur;
}
bool grpc_connectivity_state_has_watchers(
@@ -108,6 +122,8 @@ bool grpc_connectivity_state_has_watchers(
bool grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify) {
+ grpc_connectivity_state cur =(grpc_connectivity_state)
+ gpr_atm_no_barrier_load(&tracker->current_state_atm);
if (grpc_connectivity_state_trace) {
if (current == NULL) {
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker,
@@ -115,7 +131,7 @@ bool grpc_connectivity_state_notify_on_state_change(
} else {
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker,
tracker->name, grpc_connectivity_state_name(*current),
- grpc_connectivity_state_name(tracker->current_state), notify);
+ grpc_connectivity_state_name(cur), notify);
}
}
if (current == NULL) {
@@ -138,8 +154,8 @@ bool grpc_connectivity_state_notify_on_state_change(
}
return false;
} else {
- if (tracker->current_state != *current) {
- *current = tracker->current_state;
+ if (cur != *current) {
+ *current = cur;
grpc_closure_sched(exec_ctx, notify,
GRPC_ERROR_REF(tracker->current_error));
} else {
@@ -149,7 +165,7 @@ bool grpc_connectivity_state_notify_on_state_change(
w->next = tracker->watchers;
tracker->watchers = w;
}
- return tracker->current_state == GRPC_CHANNEL_IDLE;
+ return cur == GRPC_CHANNEL_IDLE;
}
}
@@ -157,11 +173,13 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
grpc_error *error, const char *reason) {
+ grpc_connectivity_state cur =(grpc_connectivity_state)
+ gpr_atm_no_barrier_load(&tracker->current_state_atm);
grpc_connectivity_state_watcher *w;
if (grpc_connectivity_state_trace) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker,
- tracker->name, grpc_connectivity_state_name(tracker->current_state),
+ tracker->name, grpc_connectivity_state_name(cur),
grpc_connectivity_state_name(state), reason, error, error_string);
}
switch (state) {
@@ -178,13 +196,13 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
}
GRPC_ERROR_UNREF(tracker->current_error);
tracker->current_error = error;
- if (tracker->current_state == state) {
+ if (cur == state) {
return;
}
- GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_SHUTDOWN);
- tracker->current_state = state;
+ GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN);
+ gpr_atm_no_barrier_store(&tracker->current_state_atm, state);
while ((w = tracker->watchers) != NULL) {
- *w->current = tracker->current_state;
+ *w->current = state;
tracker->watchers = w->next;
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name,
diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h
index 769c675b79..c9604c34dd 100644
--- a/src/core/lib/transport/connectivity_state.h
+++ b/src/core/lib/transport/connectivity_state.h
@@ -47,8 +47,8 @@ typedef struct grpc_connectivity_state_watcher {
} grpc_connectivity_state_watcher;
typedef struct {
- /** current connectivity state */
- grpc_connectivity_state current_state;
+ /** current grpc_connectivity_state */
+ gpr_atm current_state_atm;
/** error associated with state */
grpc_error *current_error;
/** all our watchers */
@@ -59,6 +59,7 @@ typedef struct {
extern int grpc_connectivity_state_trace;
+/** enum --> string conversion */
const char *grpc_connectivity_state_name(grpc_connectivity_state state);
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
@@ -68,22 +69,31 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_tracker *tracker);
/** Set connectivity state; not thread safe; access must be serialized with an
- * external lock */
+ * external lock */
void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
grpc_error *associated_error,
const char *reason);
+/** Return true if this connectivity state has watchers.
+ Access must be serialized with an external lock. */
bool grpc_connectivity_state_has_watchers(
grpc_connectivity_state_tracker *tracker);
+/** Return the last seen connectivity state. No need to synchronize access. */
grpc_connectivity_state grpc_connectivity_state_check(
- grpc_connectivity_state_tracker *tracker, grpc_error **current_error);
+ grpc_connectivity_state_tracker *tracker);
+
+/** Return the last seen connectivity state, and the associated error.
+ Access must be serialized with an external lock. */
+grpc_connectivity_state grpc_connectivity_state_get(
+ grpc_connectivity_state_tracker *tracker, grpc_error **error);
/** Return 1 if the channel should start connecting, 0 otherwise.
If current==NULL cancel notify if it is already queued (success==0 in that
- case) */
+ case).
+ Access must be serialized with an external lock. */
bool grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify);