aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/pick_first
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-10-13 16:07:13 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2017-10-18 17:12:19 -0700
commit0ee7574732a06e8cace4e099a678f4bd5dbff679 (patch)
treee43d5de442fdcc3d39cd5af687f319fa39612d3f /src/core/ext/filters/client_channel/lb_policy/pick_first
parent6bf5f833efe2cb9e2ecc14358dd9699cd5d05263 (diff)
Removing instances of exec_ctx being passed around in functions in
src/core. exec_ctx is now a thread_local pointer of type ExecCtx instead of grpc_exec_ctx which is initialized whenever ExecCtx is instantiated. ExecCtx also keeps track of the previous exec_ctx so that nesting of exec_ctx is allowed. This means that there is only one exec_ctx being used at any time. Also, grpc_exec_ctx_finish is called in the destructor of the object, and the previous exec_ctx is restored to avoid breaking current functionality. The code still explicitly calls grpc_exec_ctx_finish because removing all such instances causes the code to break.
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc183
1 files changed, 82 insertions, 101 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index b07fc3b720..5e638d9c86 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -78,20 +78,19 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
-static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void pf_destroy(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
GPR_ASSERT(p->pending_picks == NULL);
for (size_t i = 0; i < p->num_subchannels; i++) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy");
+ GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first_destroy");
}
if (p->selected != NULL) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
- "picked_first_destroy");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(p->selected, "picked_first_destroy");
}
- grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ grpc_connectivity_state_destroy(&p->state_tracker);
grpc_subchannel_index_unref();
if (p->pending_update_args != NULL) {
- grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
+ grpc_channel_args_destroy(p->pending_update_args->args);
gpr_free(p->pending_update_args);
}
gpr_free(p->subchannels);
@@ -102,34 +101,34 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
}
-static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void pf_shutdown_locked(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
p->shutdown = true;
pp = p->pending_picks;
p->pending_picks = NULL;
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown");
/* cancel subscription */
if (p->selected != NULL) {
- grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
+ grpc_connected_subchannel_notify_on_state_change(p->selected, NULL, NULL,
+ &p->connectivity_changed);
} else if (p->num_subchannels > 0 && p->started_picking) {
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
+ p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
}
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
pp = next;
}
}
-static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+static void pf_cancel_pick_locked(grpc_lb_policy *pol,
grpc_connected_subchannel **target,
grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -140,7 +139,7 @@ static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
gpr_free(pp);
@@ -153,7 +152,7 @@ static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_ERROR_UNREF(error);
}
-static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+static void pf_cancel_picks_locked(grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error *error) {
@@ -165,7 +164,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
gpr_free(pp);
@@ -178,8 +177,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_ERROR_UNREF(error);
}
-static void start_picking_locked(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
+static void start_picking_locked(pick_first_lb_policy *p) {
p->started_picking = true;
if (p->subchannels != NULL) {
GPR_ASSERT(p->num_subchannels > 0);
@@ -187,20 +185,19 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx,
p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
+ p->subchannels[p->checking_subchannel], p->base.interested_parties,
+ &p->checking_connectivity, &p->connectivity_changed);
}
}
-static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void pf_exit_idle_locked(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ start_picking_locked(p);
}
}
-static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+static int pf_pick_locked(grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_call_context_element *context, void **user_data,
@@ -216,7 +213,7 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
/* No subchannel selected yet, so try again */
if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ start_picking_locked(p);
}
pp = (pending_pick *)gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
@@ -227,50 +224,46 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
return 0;
}
-static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
+static void destroy_subchannels_locked(pick_first_lb_policy *p) {
size_t num_subchannels = p->num_subchannels;
grpc_subchannel **subchannels = p->subchannels;
p->num_subchannels = 0;
p->subchannels = NULL;
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
+ GRPC_LB_POLICY_WEAK_UNREF(&p->base, "destroy_subchannels");
for (size_t i = 0; i < num_subchannels; i++) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
+ GRPC_SUBCHANNEL_UNREF(subchannels[i], "pick_first");
}
gpr_free(subchannels);
}
static grpc_connectivity_state pf_check_connectivity_locked(
- grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
+ grpc_lb_policy *pol, grpc_error **error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
return grpc_connectivity_state_get(&p->state_tracker, error);
}
-static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
+static void pf_notify_on_state_change_locked(grpc_lb_policy *pol,
grpc_connectivity_state *current,
grpc_closure *notify) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
- current, notify);
+ grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+ notify);
}
-static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_closure *closure) {
+static void pf_ping_one_locked(grpc_lb_policy *pol, grpc_closure *closure) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
if (p->selected) {
- grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
+ grpc_connected_subchannel_ping(p->selected, closure);
} else {
- GRPC_CLOSURE_SCHED(exec_ctx, closure,
+ GRPC_CLOSURE_SCHED(closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
}
}
/* unsubscribe all subchannels */
-static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
+static void stop_connectivity_watchers(pick_first_lb_policy *p) {
if (p->num_subchannels > 0) {
GPR_ASSERT(p->selected == NULL);
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
@@ -278,7 +271,7 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
(void *)p, (void *)p->subchannels[p->checking_subchannel]);
}
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
+ p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
p->updating_subchannels = true;
} else if (p->selected != NULL) {
@@ -287,14 +280,14 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
"Pick First %p unsubscribing from selected subchannel %p",
(void *)p, (void *)p->selected);
}
- grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
+ grpc_connected_subchannel_notify_on_state_change(p->selected, NULL, NULL,
+ &p->connectivity_changed);
p->updating_selected = true;
}
}
/* true upon success */
-static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+static void pf_update_locked(grpc_lb_policy *policy,
const grpc_lb_policy_args *args) {
pick_first_lb_policy *p = (pick_first_lb_policy *)policy;
const grpc_arg *arg =
@@ -303,7 +296,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
if (p->subchannels == NULL) {
// If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"pf_update_missing");
} else {
@@ -321,10 +314,10 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
// Empty update. Unsubscribe from all current subchannels and put the
// channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
- stop_connectivity_watchers(exec_ctx, p);
+ stop_connectivity_watchers(p);
return;
}
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
@@ -363,7 +356,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]);
const bool found_selected =
grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0;
- grpc_subchannel_key_destroy(exec_ctx, ith_sc_key);
+ grpc_subchannel_key_destroy(ith_sc_key);
if (found_selected) {
// The currently selected subchannel is in the update: we are done.
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
@@ -373,8 +366,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
(void *)p, (void *)p->selected);
}
for (size_t j = 0; j < sc_args_count; j++) {
- grpc_channel_args_destroy(exec_ctx,
- (grpc_channel_args *)sc_args[j].args);
+ grpc_channel_args_destroy((grpc_channel_args *)sc_args[j].args);
}
gpr_free(sc_args);
return;
@@ -391,7 +383,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
(void *)p);
}
if (p->pending_update_args != NULL) {
- grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
+ grpc_channel_args_destroy(p->pending_update_args->args);
gpr_free(p->pending_update_args);
}
p->pending_update_args =
@@ -408,7 +400,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
size_t num_new_subchannels = 0;
for (size_t i = 0; i < sc_args_count; i++) {
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
- exec_ctx, args->client_channel_factory, &sc_args[i]);
+ args->client_channel_factory, &sc_args[i]);
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
@@ -417,7 +409,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
(void *)p, (void *)subchannel, address_uri);
gpr_free(address_uri);
}
- grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args);
+ grpc_channel_args_destroy((grpc_channel_args *)sc_args[i].args);
if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel;
}
gpr_free(sc_args);
@@ -426,15 +418,15 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
// Empty update. Unsubscribe from all current subchannels and put the
// channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"),
"pf_update_no_valid_addresses");
- stop_connectivity_watchers(exec_ctx, p);
+ stop_connectivity_watchers(p);
return;
}
/* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */
- stop_connectivity_watchers(exec_ctx, p);
+ stop_connectivity_watchers(p);
/* Save new subchannels. The switch over will happen in
* pf_connectivity_changed_locked */
@@ -450,15 +442,13 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
+ p->subchannels[p->checking_subchannel], p->base.interested_parties,
+ &p->checking_connectivity, &p->connectivity_changed);
}
}
}
-static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void pf_connectivity_changed_locked(void *arg, grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
@@ -476,8 +466,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (p->updating_selected && error != GRPC_ERROR_NONE) {
/* Captured the unsubscription for p->selected */
GPR_ASSERT(p->selected != NULL);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
- "pf_update_connectivity");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(p->selected, "pf_update_connectivity");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p",
(void *)p, (void *)p->selected);
@@ -493,8 +482,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
/* Captured the unsubscription for the checking subchannel */
GPR_ASSERT(p->selected == NULL);
for (size_t i = 0; i < p->num_subchannels; i++) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
- "pf_update_connectivity");
+ GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pf_update_connectivity");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p,
(void *)p->subchannels[i]);
@@ -523,20 +511,19 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_connectivity = GRPC_CHANNEL_IDLE;
/* reuses the weak ref from start_picking_locked */
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
+ p->subchannels[p->checking_subchannel], p->base.interested_parties,
+ &p->checking_connectivity, &p->connectivity_changed);
}
if (p->pending_update_args != NULL) {
const grpc_lb_policy_args *args = p->pending_update_args;
p->pending_update_args = NULL;
- pf_update_locked(exec_ctx, &p->base, args);
+ pf_update_locked(&p->base, args);
}
return;
}
GRPC_ERROR_REF(error);
if (p->shutdown) {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
+ GRPC_LB_POLICY_WEAK_UNREF(&p->base, "pick_first_connectivity");
GRPC_ERROR_UNREF(error);
return;
} else if (p->selected != NULL) {
@@ -544,15 +531,14 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
/* if the selected channel goes bad, we're done */
p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN;
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- p->checking_connectivity, GRPC_ERROR_REF(error),
- "selected_changed");
+ grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
+ GRPC_ERROR_REF(error), "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) {
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, p->selected, p->base.interested_parties,
- &p->checking_connectivity, &p->connectivity_changed);
+ p->selected, p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
} else {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
+ GRPC_LB_POLICY_WEAK_UNREF(&p->base, "pick_first_connectivity");
}
} else {
loop:
@@ -560,9 +546,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
case GRPC_CHANNEL_INIT:
GPR_UNREACHABLE_CODE(return );
case GRPC_CHANNEL_READY:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
- "connecting_ready");
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
+ GRPC_ERROR_NONE, "connecting_ready");
selected_subchannel = p->subchannels[p->checking_subchannel];
p->selected = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected_subchannel),
@@ -576,7 +561,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
p->selected_key = grpc_subchannel_get_key(selected_subchannel);
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
- destroy_subchannels_locked(exec_ctx, p);
+ destroy_subchannels_locked(p);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
@@ -586,12 +571,12 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
"Servicing pending pick with selected subchannel %p",
(void *)p->selected);
}
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, p->selected, p->base.interested_parties,
- &p->checking_connectivity, &p->connectivity_changed);
+ p->selected, p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
p->checking_subchannel =
@@ -600,7 +585,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
/* only trigger transient failure when we've tried all alternatives
*/
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
GRPC_ERROR_UNREF(error);
@@ -608,7 +593,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
p->subchannels[p->checking_subchannel], &error);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
+ p->subchannels[p->checking_subchannel],
p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
} else {
@@ -617,37 +602,34 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_REF(error), "connecting_changed");
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_REF(error),
+ "connecting_changed");
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
+ p->subchannels[p->checking_subchannel], p->base.interested_parties,
+ &p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_SHUTDOWN:
p->num_subchannels--;
GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
p->subchannels[p->num_subchannels]);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
- "pick_first");
+ GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
if (p->num_subchannels == 0) {
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick first exhausted channels", &error, 1),
"no_more_channels");
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
- "pick_first_connectivity");
+ GRPC_LB_POLICY_WEAK_UNREF(&p->base, "pick_first_connectivity");
} else {
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "subchannel_failed");
p->checking_subchannel %= p->num_subchannels;
GRPC_ERROR_UNREF(error);
@@ -677,15 +659,14 @@ static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
-static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy_factory *factory,
+static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->client_channel_factory != NULL);
pick_first_lb_policy *p = (pick_first_lb_policy *)gpr_zalloc(sizeof(*p));
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p created.", (void *)p);
}
- pf_update_locked(exec_ctx, &p->base, args);
+ pf_update_locked(&p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,