aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/lb_policy
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/lb_policy')
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c41
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c28
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c22
3 files changed, 47 insertions, 44 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index bed5e6c901..bb0ee9d95c 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -180,8 +180,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
wrapped_rr_closure_arg *wc_arg = arg;
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
- grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
if (wc_arg->rr_policy != NULL) {
/* if *target is NULL, no pick has been made by the RR policy (eg, all
@@ -248,7 +247,8 @@ static void add_pending_pick(pending_pick **root,
pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.free_when_done = pp;
grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure,
- wrapped_rr_closure, &pp->wrapped_on_complete_arg);
+ wrapped_rr_closure, &pp->wrapped_on_complete_arg,
+ grpc_schedule_on_exec_ctx);
*root = pp;
}
@@ -268,7 +268,8 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
pping->wrapped_notify_arg.free_when_done = pping;
pping->next = *root;
grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure,
- wrapped_rr_closure, &pping->wrapped_notify_arg);
+ wrapped_rr_closure, &pping->wrapped_notify_arg,
+ grpc_schedule_on_exec_ctx);
*root = pping;
}
@@ -667,7 +668,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
- rr_connectivity);
+ rr_connectivity, grpc_schedule_on_exec_ctx);
rr_connectivity->glb_policy = glb_policy;
rr_connectivity->state = new_rr_state;
@@ -908,15 +909,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_ERROR_NONE);
pp = next;
}
while (pping != NULL) {
pending_ping *next = pping->next;
- grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_ERROR_NONE);
pping = next;
}
}
@@ -932,9 +933,9 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -957,9 +958,9 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -994,11 +995,10 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *on_complete) {
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, on_complete,
GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
- "won't work without it. Failing"),
- NULL);
+ "won't work without it. Failing"));
return 0;
}
@@ -1017,7 +1017,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg));
- grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg);
+ grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
+ grpc_schedule_on_exec_ctx);
wc_arg->rr_policy = glb_policy->rr_policy;
wc_arg->target = target;
wc_arg->wrapped_closure = on_complete;
@@ -1117,9 +1118,11 @@ static void lb_call_init_locked(glb_lb_policy *glb_policy) {
glb_policy->lb_call_status_details_capacity = 0;
grpc_closure_init(&glb_policy->lb_on_server_status_received,
- lb_on_server_status_received, glb_policy);
+ lb_on_server_status_received, glb_policy,
+ grpc_schedule_on_exec_ctx);
grpc_closure_init(&glb_policy->lb_on_response_received,
- lb_on_response_received, glb_policy);
+ lb_on_response_received, glb_policy,
+ grpc_schedule_on_exec_ctx);
gpr_backoff_init(&glb_policy->lb_call_backoff_state,
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index b9cfe6b5c0..821becff69 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -120,7 +120,7 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
pp = next;
}
@@ -138,9 +138,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -165,9 +165,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -306,14 +306,15 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
- grpc_exec_ctx_sched(exec_ctx,
- grpc_closure_create(destroy_subchannels, p),
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(destroy_subchannels, p,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
@@ -366,8 +367,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
@@ -419,8 +419,7 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (selected) {
grpc_connected_subchannel_ping(exec_ctx, selected, closure);
} else {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"));
}
}
@@ -485,7 +484,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
p->num_subchannels = subchannel_idx;
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
- grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
+ grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p,
+ grpc_schedule_on_exec_ctx);
gpr_mu_init(&p->mu);
return &p->base;
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index f0305473d2..47f20a1904 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -321,8 +321,8 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE("Channel Shutdown"), NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Channel Shutdown"));
gpr_free(pp);
}
grpc_connectivity_state_set(
@@ -348,9 +348,9 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -376,9 +376,9 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
*pp->target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1));
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -581,7 +581,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
(void *)selected->subchannel, (void *)selected);
}
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
update_lb_connectivity_status(exec_ctx, sd, error);
@@ -634,7 +634,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
}
@@ -684,8 +684,8 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
} else {
gpr_mu_unlock(&p->mu);
- grpc_exec_ctx_sched(exec_ctx, closure,
- GRPC_ERROR_CREATE("Round Robin not connected"), NULL);
+ grpc_closure_sched(exec_ctx, closure,
+ GRPC_ERROR_CREATE("Round Robin not connected"));
}
}
@@ -749,7 +749,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
}
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
- rr_connectivity_changed, sd);
+ rr_connectivity_changed, sd, grpc_schedule_on_exec_ctx);
}
}
if (subchannel_idx == 0) {