diff options
Diffstat (limited to 'src/core/client_config/lb_policies')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 26 |
1 files changed, 19 insertions, 7 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 18c8c0412f..4b3aaab09c 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -101,6 +101,9 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); } + if (p->selected) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); + } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); gpr_mu_destroy(&p->mu); @@ -183,20 +186,18 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&p->mu); subchannels = p->subchannels; - exclude_subchannel = p->selected; p->num_subchannels = 0; p->subchannels = NULL; + exclude_subchannel = p->selected; gpr_mu_unlock(&p->mu); GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { - if (subchannels[i] == exclude_subchannel) { - exclude_subchannel = NULL; - continue; + if (subchannels[i] != exclude_subchannel) { + memset(&op, 0, sizeof(op)); + op.disconnect = 1; + grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op); } - memset(&op, 0, sizeof(op)); - op.disconnect = 1; - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op); GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); } @@ -231,6 +232,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); p->selected = p->subchannels[p->checking_subchannel]; + GRPC_SUBCHANNEL_REF(p->selected, "picked_first"); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels"); grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1); @@ -314,10 +316,15 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, size_t i; size_t n; grpc_subchannel **subchannels; + grpc_subchannel *selected; gpr_mu_lock(&p->mu); n = p->num_subchannels; subchannels = gpr_malloc(n * sizeof(*subchannels)); + selected = p->selected; + if (selected) { + GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected"); + } for (i = 0; i < n; i++) { subchannels[i] = p->subchannels[i]; GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast"); @@ -325,9 +332,14 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { + if (selected == subchannels[i]) continue; grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); } + if (p->selected) { + grpc_subchannel_process_transport_op(exec_ctx, selected, op); + GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected"); + } gpr_free(subchannels); } |