aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/lb_policies
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/client_config/lb_policies')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c37
-rw-r--r--src/core/client_config/lb_policies/round_robin.c18
2 files changed, 29 insertions, 26 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 93312abb00..6d9e6af4a6 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -42,7 +42,7 @@
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
- grpc_subchannel **target;
+ grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -60,7 +60,7 @@ typedef struct {
/** the selected channel
TODO(ctiller): this should be atomically set so we don't
need to take a mutex in the common case */
- grpc_subchannel *selected;
+ grpc_connected_subchannel *selected;
/** have we started picking? */
int started_picking;
/** are we shut down? */
@@ -102,7 +102,7 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
}
if (p->selected) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
@@ -131,7 +131,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_subchannel **target) {
+ grpc_connected_subchannel **target) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -174,7 +174,7 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
- grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
+ grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
@@ -207,7 +207,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_op op;
size_t num_subchannels = p->num_subchannels;
grpc_subchannel **subchannels;
- grpc_subchannel *exclude_subchannel;
+ grpc_connected_subchannel *exclude_subchannel;
gpr_mu_lock(&p->mu);
subchannels = p->subchannels;
@@ -218,7 +218,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) {
- if (subchannels[i] != exclude_subchannel) {
+ if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != exclude_subchannel) {
memset(&op, 0, sizeof(op));
op.disconnect = 1;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
@@ -232,6 +232,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
pick_first_lb_policy *p = arg;
+ grpc_subchannel *selected_subchannel;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -244,7 +245,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
p->checking_connectivity, "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
- grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
+ grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected,
&p->checking_connectivity,
&p->connectivity_changed);
} else {
@@ -256,8 +257,10 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, "connecting_ready");
- p->selected = p->subchannels[p->checking_subchannel];
- GRPC_SUBCHANNEL_REF(p->selected, "picked_first");
+ selected_subchannel = p->subchannels[p->checking_subchannel];
+ p->selected = grpc_subchannel_get_connected_subchannel(selected_subchannel);
+ GPR_ASSERT(p->selected);
+ GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first");
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
grpc_exec_ctx_enqueue(exec_ctx,
@@ -266,12 +269,12 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
- grpc_subchannel_del_interested_party(exec_ctx, p->selected,
+ grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
- grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
+ grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected,
&p->checking_connectivity,
&p->connectivity_changed);
break;
@@ -342,14 +345,14 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
size_t i;
size_t n;
grpc_subchannel **subchannels;
- grpc_subchannel *selected;
+ grpc_connected_subchannel *selected;
gpr_mu_lock(&p->mu);
n = p->num_subchannels;
subchannels = gpr_malloc(n * sizeof(*subchannels));
selected = p->selected;
if (selected) {
- GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
+ GRPC_CONNECTED_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
}
for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i];
@@ -358,13 +361,13 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
- if (selected == subchannels[i]) continue;
+ if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) continue;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
}
if (p->selected) {
- grpc_subchannel_process_transport_op(exec_ctx, selected, op);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
+ grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
}
gpr_free(subchannels);
}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index 1ffe32fff2..08592b79e1 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -46,7 +46,7 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
- grpc_subchannel **target;
+ grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -144,9 +144,9 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) {
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
* csc to the list of ready subchannels. */
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- grpc_subchannel *csc) {
+ grpc_subchannel *sc) {
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
- new_elem->subchannel = csc;
+ new_elem->subchannel = sc;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@@ -160,7 +160,7 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
p->ready_list.prev = new_elem;
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc);
+ gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc);
}
return new_elem;
}
@@ -265,7 +265,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_subchannel **target) {
+ grpc_connected_subchannel **target) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
size_t i;
@@ -314,7 +314,7 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
- grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
+ grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target,
grpc_closure *on_complete) {
size_t i;
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -323,9 +323,9 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
gpr_mu_unlock(&p->mu);
- *target = selected->subchannel;
+ *target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)",
+ gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
/* only advance the last picked pointer if the selection was used */
@@ -390,7 +390,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
- *pp->target = selected->subchannel;
+ *pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",