From 86c0f8adb8238a923b56bdecf7cf52395d4f31f1 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 1 Dec 2015 20:05:40 -0800 Subject: Make pick_first fast path lock free, take channel lock for less time --- src/core/channel/client_channel.c | 9 +++-- src/core/client_config/lb_policies/pick_first.c | 48 +++++++++++++++---------- 2 files changed, 36 insertions(+), 21 deletions(-) (limited to 'src') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index da0fdba643..bd8fa70034 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -338,10 +338,13 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, return 1; } if (chand->lb_policy != NULL) { - int r = - grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset, - initial_metadata, connected_subchannel, on_ready); + grpc_lb_policy *lb_policy = chand->lb_policy; + int r; + GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel"); gpr_mu_unlock(&chand->mu_config); + r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset, + initial_metadata, connected_subchannel, on_ready); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel"); return r; } if (chand->resolver != NULL && !chand->started_resolving) { diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index b91f0609d2..beacffcf23 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -57,10 +57,8 @@ typedef struct { /** mutex protecting remaining members */ gpr_mu mu; - /** the selected channel - TODO(ctiller): this should be atomically set so we don't - need to take a mutex in the common case */ - grpc_connected_subchannel *selected; + /** the selected channel (a grpc_connected_subchannel) */ + gpr_atm selected; /** have we started picking? */ int started_picking; /** are we shut down? */ @@ -76,15 +74,18 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; +#define GET_SELECTED(p) ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected)) + void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + grpc_connected_subchannel *selected = GET_SELECTED(p); size_t i; GPR_ASSERT(p->pending_picks == NULL); for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); } - if (p->selected) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); + if (selected != NULL) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "picked_first"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); @@ -95,16 +96,18 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; + grpc_connected_subchannel *selected; gpr_mu_lock(&p->mu); + selected = GET_SELECTED(p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); /* cancel subscription */ - if (p->selected != NULL) { + if (selected != NULL) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); + exec_ctx, selected, NULL, NULL, &p->connectivity_changed); } else { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, @@ -171,10 +174,16 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, grpc_connected_subchannel **target, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; + grpc_connected_subchannel *selected = GET_SELECTED(p); + if (selected != NULL) { + *target = selected; + return 1; + } gpr_mu_lock(&p->mu); - if (p->selected) { + selected = GET_SELECTED(p); + if (selected) { gpr_mu_unlock(&p->mu); - *target = p->selected; + *target = selected; return 1; } else { if (!p->started_picking) { @@ -219,14 +228,17 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, pick_first_lb_policy *p = arg; grpc_subchannel *selected_subchannel; pending_pick *pp; + grpc_connected_subchannel *selected; gpr_mu_lock(&p->mu); + selected = GET_SELECTED(p); + if (p->shutdown) { gpr_mu_unlock(&p->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); return; - } else if (p->selected != NULL) { + } else if (selected != NULL) { if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { /* if the selected channel goes bad, we're done */ p->checking_connectivity = GRPC_CHANNEL_FATAL_FAILURE; @@ -235,7 +247,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, &p->base.interested_parties, + exec_ctx, selected, &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); @@ -247,10 +259,10 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); selected_subchannel = p->subchannels[p->checking_subchannel]; - p->selected = - grpc_subchannel_get_connected_subchannel(selected_subchannel); - GPR_ASSERT(p->selected); - GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first"); + selected = grpc_subchannel_get_connected_subchannel(selected_subchannel); + GPR_ASSERT(selected != NULL); + gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected); + GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first"); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); grpc_exec_ctx_enqueue(exec_ctx, @@ -258,14 +270,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = p->selected; + *pp->target = selected; grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, &p->base.interested_parties, + exec_ctx, selected, &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: -- cgit v1.2.3 From 320bee0e623f4d7cf7568db60bdcdd93a7556bed Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 6 Jan 2016 17:33:45 -0800 Subject: Review feedback --- src/core/client_config/lb_policies/pick_first.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 620b83d195..9beaeba2b4 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -55,10 +55,11 @@ typedef struct { grpc_closure connectivity_changed; - /** mutex protecting remaining members */ - gpr_mu mu; /** the selected channel (a grpc_connected_subchannel) */ gpr_atm selected; + + /** mutex protecting remaining members */ + gpr_mu mu; /** have we started picking? */ int started_picking; /** are we shut down? */ @@ -174,11 +175,15 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, grpc_connected_subchannel **target, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; + + /* Check atomically for a selected channel */ grpc_connected_subchannel *selected = GET_SELECTED(p); if (selected != NULL) { *target = selected; return 1; } + + /* No subchannel selected yet, so acquire lock and then attempt again */ gpr_mu_lock(&p->mu); selected = GET_SELECTED(p); if (selected) { -- cgit v1.2.3 From 093193edb785aee0ae90a604b4fae876b8a626b0 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 7 Jan 2016 07:14:44 -0800 Subject: Fix pings --- src/core/client_config/lb_policies/pick_first.c | 7 +++---- templates/test/core/end2end/end2end_defs.include | 2 +- test/core/end2end/end2end_nosec_tests.c | 2 +- test/core/end2end/end2end_tests.c | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 9beaeba2b4..4a90b07002 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -368,13 +368,12 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_mu_lock(&p->mu); - if (p->selected) { - grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); + grpc_connected_subchannel *selected = GET_SELECTED(p); + if (selected) { + grpc_connected_subchannel_ping(exec_ctx, selected, closure); } else { grpc_exec_ctx_enqueue(exec_ctx, closure, 0); } - gpr_mu_unlock(&p->mu); } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { diff --git a/templates/test/core/end2end/end2end_defs.include b/templates/test/core/end2end/end2end_defs.include index e95bd4a8b6..18a33b7d32 100644 --- a/templates/test/core/end2end/end2end_defs.include +++ b/templates/test/core/end2end/end2end_defs.include @@ -61,7 +61,7 @@ void grpc_end2end_tests(int argc, char **argv, continue; } % endfor - gpr_log(GPR_DEBUG, "not a test: '%%s'", argv[i]); + gpr_log(GPR_DEBUG, "not a test: '%s'", argv[i]); abort(); } } \ No newline at end of file diff --git a/test/core/end2end/end2end_nosec_tests.c b/test/core/end2end/end2end_nosec_tests.c index c0bea7bb36..e1974a7d65 100644 --- a/test/core/end2end/end2end_nosec_tests.c +++ b/test/core/end2end/end2end_nosec_tests.c @@ -259,7 +259,7 @@ void grpc_end2end_tests(int argc, char **argv, trailing_metadata(config); continue; } - gpr_log(GPR_DEBUG, "not a test: '%%s'", argv[i]); + gpr_log(GPR_DEBUG, "not a test: '%s'", argv[i]); abort(); } } diff --git a/test/core/end2end/end2end_tests.c b/test/core/end2end/end2end_tests.c index 4c3a018ad2..271b81efeb 100644 --- a/test/core/end2end/end2end_tests.c +++ b/test/core/end2end/end2end_tests.c @@ -265,7 +265,7 @@ void grpc_end2end_tests(int argc, char **argv, trailing_metadata(config); continue; } - gpr_log(GPR_DEBUG, "not a test: '%%s'", argv[i]); + gpr_log(GPR_DEBUG, "not a test: '%s'", argv[i]); abort(); } } -- cgit v1.2.3