aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/lb_policy/pick_first/pick_first.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/lb_policy/pick_first/pick_first.c')
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c181
1 files changed, 69 insertions, 112 deletions
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9f2aa461be..501cb6d94d 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -38,6 +38,7 @@
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -57,11 +58,11 @@ typedef struct {
grpc_closure connectivity_changed;
- /** the selected channel (a grpc_connected_subchannel) */
- gpr_atm selected;
+ /** remaining members are protected by the combiner */
+
+ /** the selected channel */
+ grpc_connected_subchannel *selected;
- /** mutex protecting remaining members */
- gpr_mu mu;
/** have we started picking? */
int started_picking;
/** are we shut down? */
@@ -77,32 +78,24 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
-#define GET_SELECTED(p) \
- ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected))
-
static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connected_subchannel *selected = GET_SELECTED(p);
size_t i;
GPR_ASSERT(p->pending_picks == NULL);
for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
}
- if (selected != NULL) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "picked_first");
+ if (p->selected != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
- gpr_mu_destroy(&p->mu);
gpr_free(p);
}
-static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
- grpc_connected_subchannel *selected;
- gpr_mu_lock(&p->mu);
- selected = GET_SELECTED(p);
p->shutdown = 1;
pp = p->pending_picks;
p->pending_picks = NULL;
@@ -110,15 +103,14 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE("Channel shutdown"), "shutdown");
/* cancel subscription */
- if (selected != NULL) {
+ if (p->selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, selected, NULL, NULL, &p->connectivity_changed);
+ exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
} else if (p->num_subchannels > 0) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
}
- gpr_mu_unlock(&p->mu);
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
@@ -128,12 +120,11 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
}
-static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target,
- grpc_error *error) {
+static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
- gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
@@ -150,17 +141,15 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
pp = next;
}
- gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
-static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error *error) {
+static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
- gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
@@ -177,7 +166,6 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
pp = next;
}
- gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
@@ -192,63 +180,48 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
&p->connectivity_changed);
}
-static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- gpr_mu_lock(&p->mu);
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- gpr_mu_unlock(&p->mu);
}
-static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- const grpc_lb_policy_pick_args *pick_args,
- grpc_connected_subchannel **target, void **user_data,
- grpc_closure *on_complete) {
+static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
+ grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
/* Check atomically for a selected channel */
- grpc_connected_subchannel *selected = GET_SELECTED(p);
- if (selected != NULL) {
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
+ if (p->selected != NULL) {
+ *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
return 1;
}
- /* No subchannel selected yet, so acquire lock and then attempt again */
- gpr_mu_lock(&p->mu);
- selected = GET_SELECTED(p);
- if (selected) {
- gpr_mu_unlock(&p->mu);
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
- return 1;
- } else {
- if (!p->started_picking) {
- start_picking(exec_ctx, p);
- }
- pp = gpr_malloc(sizeof(*pp));
- pp->next = p->pending_picks;
- pp->target = target;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->on_complete = on_complete;
- p->pending_picks = pp;
- gpr_mu_unlock(&p->mu);
- return 0;
+ /* No subchannel selected yet, so try again */
+ if (!p->started_picking) {
+ start_picking(exec_ctx, p);
}
+ pp = gpr_malloc(sizeof(*pp));
+ pp->next = p->pending_picks;
+ pp->target = target;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->on_complete = on_complete;
+ p->pending_picks = pp;
+ return 0;
}
-static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- pick_first_lb_policy *p = arg;
+static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx,
+ pick_first_lb_policy *p) {
size_t i;
size_t num_subchannels = p->num_subchannels;
grpc_subchannel **subchannels;
- gpr_mu_lock(&p->mu);
subchannels = p->subchannels;
p->num_subchannels = 0;
p->subchannels = NULL;
- gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) {
@@ -258,25 +231,19 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(subchannels);
}
-static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
- grpc_connected_subchannel *selected;
GRPC_ERROR_REF(error);
- gpr_mu_lock(&p->mu);
-
- selected = GET_SELECTED(p);
-
if (p->shutdown) {
- gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
GRPC_ERROR_UNREF(error);
return;
- } else if (selected != NULL) {
+ } else if (p->selected != NULL) {
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* if the selected channel goes bad, we're done */
p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN;
@@ -286,7 +253,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) {
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, selected, p->base.interested_parties,
+ exec_ctx, p->selected, p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
} else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
@@ -301,26 +268,21 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
"connecting_ready");
selected_subchannel = p->subchannels[p->checking_subchannel];
- selected =
- grpc_subchannel_get_connected_subchannel(selected_subchannel);
- GPR_ASSERT(selected != NULL);
- GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
+ p->selected = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(selected_subchannel),
+ "picked_first");
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
- gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
- grpc_closure_sched(exec_ctx,
- grpc_closure_create(destroy_subchannels, p,
- grpc_schedule_on_exec_ctx),
- GRPC_ERROR_NONE);
+ destroy_subchannels_locked(exec_ctx, p);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
- *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
+ *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, selected, p->base.interested_parties,
+ exec_ctx, p->selected, p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
@@ -387,48 +349,44 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
}
- gpr_mu_unlock(&p->mu);
-
GRPC_ERROR_UNREF(error);
}
-static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
- grpc_error **error) {
+static grpc_connectivity_state pf_check_connectivity_locked(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connectivity_state st;
- gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker, error);
- gpr_mu_unlock(&p->mu);
- return st;
+ return grpc_connectivity_state_get(&p->state_tracker, error);
}
-static void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
- grpc_connectivity_state *current,
- grpc_closure *notify) {
+static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *pol,
+ grpc_connectivity_state *current,
+ grpc_closure *notify) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- gpr_mu_lock(&p->mu);
grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
current, notify);
- gpr_mu_unlock(&p->mu);
}
-static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_closure *closure) {
+static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_closure *closure) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connected_subchannel *selected = GET_SELECTED(p);
- if (selected) {
- grpc_connected_subchannel_ping(exec_ctx, selected, closure);
+ if (p->selected) {
+ grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
} else {
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"));
}
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy, pf_shutdown, pf_pick,
- pf_cancel_pick, pf_cancel_picks, pf_ping_one,
- pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change};
+ pf_destroy,
+ pf_shutdown_locked,
+ pf_pick_locked,
+ pf_cancel_pick_locked,
+ pf_cancel_picks_locked,
+ pf_ping_one_locked,
+ pf_exit_idle_locked,
+ pf_check_connectivity_locked,
+ pf_notify_on_state_change_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -489,10 +447,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}
p->num_subchannels = subchannel_idx;
- grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
- grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p,
- grpc_schedule_on_exec_ctx);
- gpr_mu_init(&p->mu);
+ grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
+ grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p,
+ grpc_combiner_scheduler(args->combiner, false));
return &p->base;
}