aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/lb_policies/pick_first.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/client_config/lb_policies/pick_first.c')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c37
1 files changed, 20 insertions, 17 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 93312abb00..6d9e6af4a6 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -42,7 +42,7 @@
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
- grpc_subchannel **target;
+ grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -60,7 +60,7 @@ typedef struct {
/** the selected channel
TODO(ctiller): this should be atomically set so we don't
need to take a mutex in the common case */
- grpc_subchannel *selected;
+ grpc_connected_subchannel *selected;
/** have we started picking? */
int started_picking;
/** are we shut down? */
@@ -102,7 +102,7 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
}
if (p->selected) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
@@ -131,7 +131,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_subchannel **target) {
+ grpc_connected_subchannel **target) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -174,7 +174,7 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
- grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
+ grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
@@ -207,7 +207,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_op op;
size_t num_subchannels = p->num_subchannels;
grpc_subchannel **subchannels;
- grpc_subchannel *exclude_subchannel;
+ grpc_connected_subchannel *exclude_subchannel;
gpr_mu_lock(&p->mu);
subchannels = p->subchannels;
@@ -218,7 +218,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) {
- if (subchannels[i] != exclude_subchannel) {
+ if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != exclude_subchannel) {
memset(&op, 0, sizeof(op));
op.disconnect = 1;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
@@ -232,6 +232,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
pick_first_lb_policy *p = arg;
+ grpc_subchannel *selected_subchannel;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -244,7 +245,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
p->checking_connectivity, "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
- grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
+ grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected,
&p->checking_connectivity,
&p->connectivity_changed);
} else {
@@ -256,8 +257,10 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, "connecting_ready");
- p->selected = p->subchannels[p->checking_subchannel];
- GRPC_SUBCHANNEL_REF(p->selected, "picked_first");
+ selected_subchannel = p->subchannels[p->checking_subchannel];
+ p->selected = grpc_subchannel_get_connected_subchannel(selected_subchannel);
+ GPR_ASSERT(p->selected);
+ GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first");
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
grpc_exec_ctx_enqueue(exec_ctx,
@@ -266,12 +269,12 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
- grpc_subchannel_del_interested_party(exec_ctx, p->selected,
+ grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
- grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
+ grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected,
&p->checking_connectivity,
&p->connectivity_changed);
break;
@@ -342,14 +345,14 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
size_t i;
size_t n;
grpc_subchannel **subchannels;
- grpc_subchannel *selected;
+ grpc_connected_subchannel *selected;
gpr_mu_lock(&p->mu);
n = p->num_subchannels;
subchannels = gpr_malloc(n * sizeof(*subchannels));
selected = p->selected;
if (selected) {
- GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
+ GRPC_CONNECTED_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
}
for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i];
@@ -358,13 +361,13 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
- if (selected == subchannels[i]) continue;
+ if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) continue;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
}
if (p->selected) {
- grpc_subchannel_process_transport_op(exec_ctx, selected, op);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
+ grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
}
gpr_free(subchannels);
}