aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/lb_policies/pick_first.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-18 07:20:29 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-18 07:20:29 -0700
commit000cd8f9f7346defc79fe6aa877af11b42ab5f1e (patch)
tree883d73a97471f63e616d02c1e17efc62b099c8ad /src/core/client_config/lb_policies/pick_first.c
parent38adec97e875c21cd9d6cc9d039664bdf4fdb889 (diff)
Introduce call lists for moving work outside locks
Diffstat (limited to 'src/core/client_config/lb_policies/pick_first.c')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c127
1 files changed, 44 insertions, 83 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 804dbdeadd..8fd8dd7b67 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -108,9 +108,8 @@ void pf_destroy(grpc_lb_policy *pol) {
gpr_free(p);
}
-void pf_shutdown(grpc_lb_policy *pol) {
+void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connectivity_state_flusher f;
pending_pick *pp;
gpr_mu_lock(&p->mu);
del_interested_parties_locked(p);
@@ -118,51 +117,40 @@ void pf_shutdown(grpc_lb_policy *pol) {
pp = p->pending_picks;
p->pending_picks = NULL;
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
- "shutdown");
- grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
+ "shutdown", call_list);
gpr_mu_unlock(&p->mu);
- grpc_connectivity_state_end_flush(&f);
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- pp->on_complete->cb(pp->on_complete->cb_arg, 0);
+ grpc_iomgr_call_list_add(call_list, pp->on_complete, 1);
gpr_free(pp);
pp = next;
}
}
-/* returns a closure to call, or NULL */
-static grpc_iomgr_closure *start_picking(pick_first_lb_policy *p) {
+static void start_picking(pick_first_lb_policy *p,
+ grpc_iomgr_call_list *call_list) {
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
- if (grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed)
- .state_already_changed) {
- return &p->connectivity_changed;
- } else {
- return NULL;
- }
+ grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
+ &p->checking_connectivity,
+ &p->connectivity_changed, call_list);
}
-void pf_exit_idle(grpc_lb_policy *pol) {
+void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&p->mu);
if (!p->started_picking) {
- call = start_picking(p);
+ start_picking(p, call_list);
}
gpr_mu_unlock(&p->mu);
- if (call) {
- call->cb(call->cb_arg, 1);
- }
}
void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
- grpc_iomgr_closure *on_complete) {
+ grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -171,9 +159,8 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
*target = p->selected;
on_complete->cb(on_complete->cb_arg, 1);
} else {
- grpc_iomgr_closure *call = NULL;
if (!p->started_picking) {
- call = start_picking(p);
+ start_picking(p, call_list);
}
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
pollset);
@@ -184,9 +171,6 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
- if (call) {
- call->cb(call->cb_arg, 1);
- }
}
}
@@ -194,8 +178,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
pick_first_lb_policy *p = arg;
pending_pick *pp;
int unref = 0;
- grpc_iomgr_closure *cbs = NULL;
- grpc_connectivity_state_flusher f;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&p->mu);
@@ -203,14 +186,11 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
unref = 1;
} else if (p->selected != NULL) {
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
- "selected_changed");
+ "selected_changed", &call_list);
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
- if (grpc_subchannel_notify_on_state_change(
- p->selected, &p->checking_connectivity, &p->connectivity_changed)
- .state_already_changed) {
- p->connectivity_changed.next = cbs;
- cbs = &p->connectivity_changed;
- }
+ grpc_subchannel_notify_on_state_change(
+ p->selected, &p->checking_connectivity, &p->connectivity_changed,
+ &call_list);
} else {
unref = 1;
}
@@ -219,28 +199,23 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
switch (p->checking_connectivity) {
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
- "connecting_ready");
+ "connecting_ready", &call_list);
p->selected = p->subchannels[p->checking_subchannel];
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
grpc_subchannel_del_interested_party(p->selected, pp->pollset);
- pp->on_complete->next = cbs;
- cbs = pp->on_complete;
+ grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
gpr_free(pp);
}
- if (grpc_subchannel_notify_on_state_change(p->selected,
- &p->checking_connectivity,
- &p->connectivity_changed)
- .state_already_changed) {
- p->connectivity_changed.next = cbs;
- cbs = &p->connectivity_changed;
- }
+ grpc_subchannel_notify_on_state_change(
+ p->selected, &p->checking_connectivity, &p->connectivity_changed,
+ &call_list);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
+ "connecting_transient_failure", &call_list);
del_interested_parties_locked(p);
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
@@ -248,13 +223,9 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->subchannels[p->checking_subchannel]);
add_interested_parties_locked(p);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- if (grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel],
- &p->checking_connectivity, &p->connectivity_changed)
- .state_already_changed) {
- p->connectivity_changed.next = cbs;
- cbs = &p->connectivity_changed;
- }
+ grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+ &p->connectivity_changed, &call_list);
} else {
goto loop;
}
@@ -262,14 +233,10 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
- "connecting_changed");
- if (grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel],
- &p->checking_connectivity, &p->connectivity_changed)
- .state_already_changed) {
- p->connectivity_changed.next = cbs;
- cbs = &p->connectivity_changed;
- }
+ "connecting_changed", &call_list);
+ grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+ &p->connectivity_changed, &call_list);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
del_interested_parties_locked(p);
@@ -280,19 +247,18 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
if (p->num_subchannels == 0) {
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE,
- "no_more_channels");
+ "no_more_channels", &call_list);
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- pp->on_complete->next = cbs;
- cbs = pp->on_complete;
+ grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
gpr_free(pp);
}
unref = 1;
} else {
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
- "subchannel_failed");
+ "subchannel_failed", &call_list);
p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
@@ -302,22 +268,17 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
}
}
- grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
gpr_mu_unlock(&p->mu);
- grpc_connectivity_state_end_flush(&f);
- while (cbs != NULL) {
- grpc_iomgr_closure *next = cbs->next;
- cbs->cb(cbs->cb_arg, 1);
- cbs = next;
- }
+ grpc_iomgr_call_list_run(call_list);
if (unref) {
GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
}
}
-static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
+static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op,
+ grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
size_t n;
@@ -339,7 +300,8 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
gpr_free(subchannels);
}
-static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
+static grpc_connectivity_state pf_check_connectivity(
+ grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
@@ -348,16 +310,15 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
return st;
}
-static grpc_connectivity_state_notify_on_state_change_result
-pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current,
- grpc_iomgr_closure *notify) {
+void pf_notify_on_state_change(grpc_lb_policy *pol,
+ grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify,
+ grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&p->mu);
- r = grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
- notify);
+ grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+ notify, call_list);
gpr_mu_unlock(&p->mu);
- return r;
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {