diff options
Diffstat (limited to 'src/core/client_config/lb_policies/pick_first.c')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 37 |
1 files changed, 20 insertions, 17 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 93312abb00..6d9e6af4a6 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -42,7 +42,7 @@ typedef struct pending_pick { struct pending_pick *next; grpc_pollset *pollset; - grpc_subchannel **target; + grpc_connected_subchannel **target; grpc_closure *on_complete; } pending_pick; @@ -60,7 +60,7 @@ typedef struct { /** the selected channel TODO(ctiller): this should be atomically set so we don't need to take a mutex in the common case */ - grpc_subchannel *selected; + grpc_connected_subchannel *selected; /** have we started picking? */ int started_picking; /** are we shut down? */ @@ -102,7 +102,7 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); } if (p->selected) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); @@ -131,7 +131,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_subchannel **target) { + grpc_connected_subchannel **target) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -174,7 +174,7 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_subchannel **target, + grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; @@ -207,7 +207,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport_op op; size_t num_subchannels = p->num_subchannels; grpc_subchannel **subchannels; - grpc_subchannel *exclude_subchannel; + grpc_connected_subchannel *exclude_subchannel; gpr_mu_lock(&p->mu); subchannels = p->subchannels; @@ -218,7 +218,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { - if (subchannels[i] != exclude_subchannel) { + if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != exclude_subchannel) { memset(&op, 0, sizeof(op)); op.disconnect = 1; grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op); @@ -232,6 +232,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; + grpc_subchannel *selected_subchannel; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -244,7 +245,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_subchannel_notify_on_state_change(exec_ctx, p->selected, + grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, &p->checking_connectivity, &p->connectivity_changed); } else { @@ -256,8 +257,10 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, case GRPC_CHANNEL_READY: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); - p->selected = p->subchannels[p->checking_subchannel]; - GRPC_SUBCHANNEL_REF(p->selected, "picked_first"); + selected_subchannel = p->subchannels[p->checking_subchannel]; + p->selected = grpc_subchannel_get_connected_subchannel(selected_subchannel); + GPR_ASSERT(p->selected); + GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first"); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels"); grpc_exec_ctx_enqueue(exec_ctx, @@ -266,12 +269,12 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_subchannel_del_interested_party(exec_ctx, p->selected, + grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } - grpc_subchannel_notify_on_state_change(exec_ctx, p->selected, + grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, &p->checking_connectivity, &p->connectivity_changed); break; @@ -342,14 +345,14 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, size_t i; size_t n; grpc_subchannel **subchannels; - grpc_subchannel *selected; + grpc_connected_subchannel *selected; gpr_mu_lock(&p->mu); n = p->num_subchannels; subchannels = gpr_malloc(n * sizeof(*subchannels)); selected = p->selected; if (selected) { - GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected"); + GRPC_CONNECTED_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected"); } for (i = 0; i < n; i++) { subchannels[i] = p->subchannels[i]; @@ -358,13 +361,13 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { - if (selected == subchannels[i]) continue; + if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) continue; grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); } if (p->selected) { - grpc_subchannel_process_transport_op(exec_ctx, selected, op); - GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected"); + grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected"); } gpr_free(subchannels); } |