aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/client_config')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c22
-rw-r--r--src/core/client_config/lb_policies/round_robin.c14
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c6
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c6
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c4
-rw-r--r--src/core/client_config/subchannel.c41
6 files changed, 52 insertions, 41 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index e6ddb1a11f..459bbebb68 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -76,7 +76,7 @@ typedef struct {
} pick_first_lb_policy;
#define GET_SELECTED(p) \
- ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected))
+ ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected))
void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -121,7 +121,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
*pp->target = NULL;
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
pp = next;
}
@@ -140,7 +140,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -209,7 +209,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
}
static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
pick_first_lb_policy *p = arg;
size_t i;
size_t num_subchannels = p->num_subchannels;
@@ -230,7 +230,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
}
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
@@ -268,19 +268,19 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
selected =
grpc_subchannel_get_connected_subchannel(selected_subchannel);
GPR_ASSERT(selected != NULL);
- gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected);
GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
- grpc_exec_ctx_enqueue(exec_ctx,
- grpc_closure_create(destroy_subchannels, p), 1);
+ gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
+ grpc_exec_ctx_enqueue(
+ exec_ctx, grpc_closure_create(destroy_subchannels, p), true, NULL);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
@@ -327,7 +327,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
@@ -374,7 +374,7 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (selected) {
grpc_connected_subchannel_ping(exec_ctx, selected, closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
}
}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index d487456363..b1171c45b0 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -237,7 +237,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
}
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
@@ -263,7 +263,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -336,7 +336,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
}
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
pending_pick *pp;
@@ -376,7 +376,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
@@ -428,7 +428,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
} else {
@@ -479,7 +479,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel_ping(exec_ctx, target, closure);
} else {
gpr_mu_unlock(&p->mu);
- grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
}
}
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 28ca30b946..376b6b3d76 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -93,7 +93,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -182,7 +182,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
if (r->resolved_config) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 6343259246..68910ad975 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -98,7 +98,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -153,7 +153,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = 1;
*r->target_config = cfg;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
}
}
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index 4924ca77d6..166738e768 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -420,7 +420,7 @@ static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
if (r->resolved_config != NULL) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 2992da8b79..0a996a1e8b 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -145,7 +145,7 @@ struct grpc_subchannel_call {
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
- int iomgr_success);
+ bool iomgr_success);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define REF_REASON reason
@@ -175,7 +175,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
*/
static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
grpc_connected_subchannel *c = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
@@ -198,7 +198,7 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
*/
static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
grpc_subchannel *c = arg;
gpr_free((void *)c->filters);
grpc_channel_args_destroy(c->args);
@@ -268,7 +268,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 1) {
grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c),
- 1);
+ true, NULL);
}
}
@@ -284,9 +284,13 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->connector = connector;
grpc_connector_ref(c->connector);
c->num_filters = args->filter_count;
- c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
- memcpy((void *)c->filters, args->filters,
- sizeof(grpc_channel_filter *) * c->num_filters);
+ if (c->num_filters > 0) {
+ c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
+ memcpy((void *)c->filters, args->filters,
+ sizeof(grpc_channel_filter *) * c->num_filters);
+ } else {
+ c->filters = NULL;
+ }
c->addr = gpr_malloc(args->addr_len);
memcpy(c->addr, args->addr, args->addr_len);
grpc_pollset_set_init(&c->pollset_set);
@@ -337,7 +341,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
}
static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify;
if (w->pollset_set != NULL) {
@@ -409,7 +413,7 @@ void grpc_connected_subchannel_process_transport_op(
}
static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
- int iomgr_success) {
+ bool iomgr_success) {
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
@@ -483,7 +487,9 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
filters = gpr_malloc(sizeof(*filters) * num_filters);
- memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
+ if (c->num_filters > 0) {
+ memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
+ }
memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
sizeof(*filters) * c->connecting_result.num_filters);
filters[num_filters - 1] = &grpc_connected_channel_filter;
@@ -519,7 +525,12 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
}
/* publish */
- GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con));
+ /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
+ I'd have expected the rel_cas below to be enough, but
+ seemingly it's not.
+ Re-evaluate if we really need this. */
+ gpr_atm_full_barrier();
+ GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
c->connecting = 0;
/* setup subchannel watching connected subchannel for changes; subchannel ref
@@ -583,7 +594,7 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
grpc_subchannel *c = arg;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
@@ -600,7 +611,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
}
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
grpc_subchannel *c = arg;
if (c->connecting_result.transport != NULL) {
@@ -636,7 +647,7 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
*/
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
- int success) {
+ bool success) {
grpc_subchannel_call *c = call;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));