aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2016-01-11 10:35:29 -0800
committerGravatar Vijay Pai <vpai@google.com>2016-01-11 10:35:29 -0800
commit0878d522e6ce372748641f77b2fe04c7d01963a0 (patch)
treeca5edd0e4e812bfa07d38cb0a14ce32567be39ac
parentb3a6787bd52a9fe4a68114cbe5cd45926287da17 (diff)
parent093193edb785aee0ae90a604b4fae876b8a626b0 (diff)
Merge pull request #4238 from ctiller/poffy
Make pick_first fast path lock free
-rw-r--r--src/core/channel/client_channel.c9
-rw-r--r--src/core/client_config/lb_policies/pick_first.c60
-rw-r--r--templates/test/core/end2end/end2end_defs.include2
-rw-r--r--test/core/end2end/end2end_nosec_tests.c2
-rw-r--r--test/core/end2end/end2end_tests.c2
5 files changed, 47 insertions, 28 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 385ae3be9b..a92a6ecaf2 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -353,10 +353,13 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
return 1;
}
if (chand->lb_policy != NULL) {
- int r =
- grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset,
- initial_metadata, connected_subchannel, on_ready);
+ grpc_lb_policy *lb_policy = chand->lb_policy;
+ int r;
+ GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
+ r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset,
+ initial_metadata, connected_subchannel, on_ready);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
return r;
}
if (chand->resolver != NULL && !chand->started_resolving) {
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 37de3e9f68..4a90b07002 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -55,12 +55,11 @@ typedef struct {
grpc_closure connectivity_changed;
+ /** the selected channel (a grpc_connected_subchannel) */
+ gpr_atm selected;
+
/** mutex protecting remaining members */
gpr_mu mu;
- /** the selected channel
- TODO(ctiller): this should be atomically set so we don't
- need to take a mutex in the common case */
- grpc_connected_subchannel *selected;
/** have we started picking? */
int started_picking;
/** are we shut down? */
@@ -76,15 +75,18 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
+#define GET_SELECTED(p) ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected))
+
void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_connected_subchannel *selected = GET_SELECTED(p);
size_t i;
GPR_ASSERT(p->pending_picks == NULL);
for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
}
- if (p->selected) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+ if (selected != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "picked_first");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
@@ -95,16 +97,18 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
+ grpc_connected_subchannel *selected;
gpr_mu_lock(&p->mu);
+ selected = GET_SELECTED(p);
p->shutdown = 1;
pp = p->pending_picks;
p->pending_picks = NULL;
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
/* cancel subscription */
- if (p->selected != NULL) {
+ if (selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
+ exec_ctx, selected, NULL, NULL, &p->connectivity_changed);
} else {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
@@ -171,10 +175,20 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_connected_subchannel **target, grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
+
+ /* Check atomically for a selected channel */
+ grpc_connected_subchannel *selected = GET_SELECTED(p);
+ if (selected != NULL) {
+ *target = selected;
+ return 1;
+ }
+
+ /* No subchannel selected yet, so acquire lock and then attempt again */
gpr_mu_lock(&p->mu);
- if (p->selected) {
+ selected = GET_SELECTED(p);
+ if (selected) {
gpr_mu_unlock(&p->mu);
- *target = p->selected;
+ *target = selected;
return 1;
} else {
if (!p->started_picking) {
@@ -219,14 +233,17 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
+ grpc_connected_subchannel *selected;
gpr_mu_lock(&p->mu);
+ selected = GET_SELECTED(p);
+
if (p->shutdown) {
gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
return;
- } else if (p->selected != NULL) {
+ } else if (selected != NULL) {
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* if the selected channel goes bad, we're done */
p->checking_connectivity = GRPC_CHANNEL_FATAL_FAILURE;
@@ -235,7 +252,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_connectivity, "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, p->selected, &p->base.interested_parties,
+ exec_ctx, selected, &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
} else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
@@ -247,10 +264,10 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, "connecting_ready");
selected_subchannel = p->subchannels[p->checking_subchannel];
- p->selected =
- grpc_subchannel_get_connected_subchannel(selected_subchannel);
- GPR_ASSERT(p->selected);
- GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first");
+ selected = grpc_subchannel_get_connected_subchannel(selected_subchannel);
+ GPR_ASSERT(selected != NULL);
+ gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected);
+ GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
grpc_exec_ctx_enqueue(exec_ctx,
@@ -258,14 +275,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
- *pp->target = p->selected;
+ *pp->target = selected;
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, p->selected, &p->base.interested_parties,
+ exec_ctx, selected, &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
@@ -351,13 +368,12 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- gpr_mu_lock(&p->mu);
- if (p->selected) {
- grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
+ grpc_connected_subchannel *selected = GET_SELECTED(p);
+ if (selected) {
+ grpc_connected_subchannel_ping(exec_ctx, selected, closure);
} else {
grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
}
- gpr_mu_unlock(&p->mu);
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
diff --git a/templates/test/core/end2end/end2end_defs.include b/templates/test/core/end2end/end2end_defs.include
index 1b13bba94c..929827e145 100644
--- a/templates/test/core/end2end/end2end_defs.include
+++ b/templates/test/core/end2end/end2end_defs.include
@@ -61,7 +61,7 @@ void grpc_end2end_tests(int argc, char **argv,
continue;
}
% endfor
- gpr_log(GPR_DEBUG, "not a test: '%%s'", argv[i]);
+ gpr_log(GPR_DEBUG, "not a test: '%s'", argv[i]);
abort();
}
}</%def> \ No newline at end of file
diff --git a/test/core/end2end/end2end_nosec_tests.c b/test/core/end2end/end2end_nosec_tests.c
index d9df5fd7f0..9ff46d62e4 100644
--- a/test/core/end2end/end2end_nosec_tests.c
+++ b/test/core/end2end/end2end_nosec_tests.c
@@ -258,7 +258,7 @@ void grpc_end2end_tests(int argc, char **argv,
trailing_metadata(config);
continue;
}
- gpr_log(GPR_DEBUG, "not a test: '%%s'", argv[i]);
+ gpr_log(GPR_DEBUG, "not a test: '%s'", argv[i]);
abort();
}
}
diff --git a/test/core/end2end/end2end_tests.c b/test/core/end2end/end2end_tests.c
index 7b1471eb89..397ff446a9 100644
--- a/test/core/end2end/end2end_tests.c
+++ b/test/core/end2end/end2end_tests.c
@@ -264,7 +264,7 @@ void grpc_end2end_tests(int argc, char **argv,
trailing_metadata(config);
continue;
}
- gpr_log(GPR_DEBUG, "not a test: '%%s'", argv[i]);
+ gpr_log(GPR_DEBUG, "not a test: '%s'", argv[i]);
abort();
}
}