diff options
author | 2017-10-13 16:07:13 -0700 | |
---|---|---|
committer | 2017-10-18 17:12:19 -0700 | |
commit | 0ee7574732a06e8cace4e099a678f4bd5dbff679 (patch) | |
tree | e43d5de442fdcc3d39cd5af687f319fa39612d3f /src/core/ext/filters/client_channel/lb_policy/pick_first | |
parent | 6bf5f833efe2cb9e2ecc14358dd9699cd5d05263 (diff) |
Removing instances of exec_ctx being passed around in functions in
src/core. exec_ctx is now a thread_local pointer of type ExecCtx instead of
grpc_exec_ctx which is initialized whenever ExecCtx is instantiated. ExecCtx
also keeps track of the previous exec_ctx so that nesting of exec_ctx is
allowed. This means that there is only one exec_ctx being used at any
time. Also, grpc_exec_ctx_finish is called in the destructor of the
object, and the previous exec_ctx is restored to avoid breaking current
functionality. The code still explicitly calls grpc_exec_ctx_finish
because removing all such instances causes the code to break.
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 183 |
1 files changed, 82 insertions, 101 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index b07fc3b720..5e638d9c86 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -78,20 +78,19 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void pf_destroy(grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; GPR_ASSERT(p->pending_picks == NULL); for (size_t i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy"); + GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first_destroy"); } if (p->selected != NULL) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, - "picked_first_destroy"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(p->selected, "picked_first_destroy"); } - grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + grpc_connectivity_state_destroy(&p->state_tracker); grpc_subchannel_index_unref(); if (p->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); + grpc_channel_args_destroy(p->pending_update_args->args); gpr_free(p->pending_update_args); } gpr_free(p->subchannels); @@ -102,34 +101,34 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } } -static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void pf_shutdown_locked(grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; p->shutdown = true; pp = p->pending_picks; p->pending_picks = NULL; grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, + &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown"); /* cancel subscription */ if (p->selected != NULL) { - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); + grpc_connected_subchannel_notify_on_state_change(p->selected, NULL, NULL, + &p->connectivity_changed); } else if (p->num_subchannels > 0 && p->started_picking) { grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, + p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); } while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); pp = next; } } -static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, +static void pf_cancel_pick_locked(grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; @@ -140,7 +139,7 @@ static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); gpr_free(pp); @@ -153,7 +152,7 @@ static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_UNREF(error); } -static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, +static void pf_cancel_picks_locked(grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, grpc_error *error) { @@ -165,7 +164,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); gpr_free(pp); @@ -178,8 +177,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_UNREF(error); } -static void start_picking_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { +static void start_picking_locked(pick_first_lb_policy *p) { p->started_picking = true; if (p->subchannels != NULL) { GPR_ASSERT(p->num_subchannels > 0); @@ -187,20 +185,19 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, p->checking_connectivity = GRPC_CHANNEL_IDLE; GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); + p->subchannels[p->checking_subchannel], p->base.interested_parties, + &p->checking_connectivity, &p->connectivity_changed); } } -static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void pf_exit_idle_locked(grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; if (!p->started_picking) { - start_picking_locked(exec_ctx, p); + start_picking_locked(p); } } -static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, +static int pf_pick_locked(grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, grpc_call_context_element *context, void **user_data, @@ -216,7 +213,7 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* No subchannel selected yet, so try again */ if (!p->started_picking) { - start_picking_locked(exec_ctx, p); + start_picking_locked(p); } pp = (pending_pick *)gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; @@ -227,50 +224,46 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, return 0; } -static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { +static void destroy_subchannels_locked(pick_first_lb_policy *p) { size_t num_subchannels = p->num_subchannels; grpc_subchannel **subchannels = p->subchannels; p->num_subchannels = 0; p->subchannels = NULL; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); + GRPC_LB_POLICY_WEAK_UNREF(&p->base, "destroy_subchannels"); for (size_t i = 0; i < num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); + GRPC_SUBCHANNEL_UNREF(subchannels[i], "pick_first"); } gpr_free(subchannels); } static grpc_connectivity_state pf_check_connectivity_locked( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { + grpc_lb_policy *pol, grpc_error **error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; return grpc_connectivity_state_get(&p->state_tracker, error); } -static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, +static void pf_notify_on_state_change_locked(grpc_lb_policy *pol, grpc_connectivity_state *current, grpc_closure *notify) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, - current, notify); + grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, + notify); } -static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { +static void pf_ping_one_locked(grpc_lb_policy *pol, grpc_closure *closure) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; if (p->selected) { - grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); + grpc_connected_subchannel_ping(p->selected, closure); } else { - GRPC_CLOSURE_SCHED(exec_ctx, closure, + GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); } } /* unsubscribe all subchannels */ -static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { +static void stop_connectivity_watchers(pick_first_lb_policy *p) { if (p->num_subchannels > 0) { GPR_ASSERT(p->selected == NULL); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { @@ -278,7 +271,7 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, (void *)p, (void *)p->subchannels[p->checking_subchannel]); } grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, + p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); p->updating_subchannels = true; } else if (p->selected != NULL) { @@ -287,14 +280,14 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, "Pick First %p unsubscribing from selected subchannel %p", (void *)p, (void *)p->selected); } - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); + grpc_connected_subchannel_notify_on_state_change(p->selected, NULL, NULL, + &p->connectivity_changed); p->updating_selected = true; } } /* true upon success */ -static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, +static void pf_update_locked(grpc_lb_policy *policy, const grpc_lb_policy_args *args) { pick_first_lb_policy *p = (pick_first_lb_policy *)policy; const grpc_arg *arg = @@ -303,7 +296,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, if (p->subchannels == NULL) { // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), "pf_update_missing"); } else { @@ -321,10 +314,10 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, // Empty update. Unsubscribe from all current subchannels and put the // channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "pf_update_empty"); - stop_connectivity_watchers(exec_ctx, p); + stop_connectivity_watchers(p); return; } if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { @@ -363,7 +356,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]); const bool found_selected = grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0; - grpc_subchannel_key_destroy(exec_ctx, ith_sc_key); + grpc_subchannel_key_destroy(ith_sc_key); if (found_selected) { // The currently selected subchannel is in the update: we are done. if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { @@ -373,8 +366,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, (void *)p, (void *)p->selected); } for (size_t j = 0; j < sc_args_count; j++) { - grpc_channel_args_destroy(exec_ctx, - (grpc_channel_args *)sc_args[j].args); + grpc_channel_args_destroy((grpc_channel_args *)sc_args[j].args); } gpr_free(sc_args); return; @@ -391,7 +383,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, (void *)p); } if (p->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); + grpc_channel_args_destroy(p->pending_update_args->args); gpr_free(p->pending_update_args); } p->pending_update_args = @@ -408,7 +400,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, size_t num_new_subchannels = 0; for (size_t i = 0; i < sc_args_count; i++) { grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( - exec_ctx, args->client_channel_factory, &sc_args[i]); + args->client_channel_factory, &sc_args[i]); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); @@ -417,7 +409,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, (void *)p, (void *)subchannel, address_uri); gpr_free(address_uri); } - grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args); + grpc_channel_args_destroy((grpc_channel_args *)sc_args[i].args); if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel; } gpr_free(sc_args); @@ -426,15 +418,15 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, // Empty update. Unsubscribe from all current subchannels and put the // channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"), "pf_update_no_valid_addresses"); - stop_connectivity_watchers(exec_ctx, p); + stop_connectivity_watchers(p); return; } /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */ - stop_connectivity_watchers(exec_ctx, p); + stop_connectivity_watchers(p); /* Save new subchannels. The switch over will happen in * pf_connectivity_changed_locked */ @@ -450,15 +442,13 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); + p->subchannels[p->checking_subchannel], p->base.interested_parties, + &p->checking_connectivity, &p->connectivity_changed); } } } -static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void pf_connectivity_changed_locked(void *arg, grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)arg; grpc_subchannel *selected_subchannel; pending_pick *pp; @@ -476,8 +466,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, if (p->updating_selected && error != GRPC_ERROR_NONE) { /* Captured the unsubscription for p->selected */ GPR_ASSERT(p->selected != NULL); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, - "pf_update_connectivity"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(p->selected, "pf_update_connectivity"); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p", (void *)p, (void *)p->selected); @@ -493,8 +482,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, /* Captured the unsubscription for the checking subchannel */ GPR_ASSERT(p->selected == NULL); for (size_t i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], - "pf_update_connectivity"); + GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pf_update_connectivity"); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p, (void *)p->subchannels[i]); @@ -523,20 +511,19 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, p->checking_connectivity = GRPC_CHANNEL_IDLE; /* reuses the weak ref from start_picking_locked */ grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); + p->subchannels[p->checking_subchannel], p->base.interested_parties, + &p->checking_connectivity, &p->connectivity_changed); } if (p->pending_update_args != NULL) { const grpc_lb_policy_args *args = p->pending_update_args; p->pending_update_args = NULL; - pf_update_locked(exec_ctx, &p->base, args); + pf_update_locked(&p->base, args); } return; } GRPC_ERROR_REF(error); if (p->shutdown) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(&p->base, "pick_first_connectivity"); GRPC_ERROR_UNREF(error); return; } else if (p->selected != NULL) { @@ -544,15 +531,14 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, /* if the selected channel goes bad, we're done */ p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN; } - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - p->checking_connectivity, GRPC_ERROR_REF(error), - "selected_changed"); + grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, + GRPC_ERROR_REF(error), "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + p->selected, p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); } else { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(&p->base, "pick_first_connectivity"); } } else { loop: @@ -560,9 +546,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, case GRPC_CHANNEL_INIT: GPR_UNREACHABLE_CODE(return ); case GRPC_CHANNEL_READY: - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_READY, GRPC_ERROR_NONE, - "connecting_ready"); + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, + GRPC_ERROR_NONE, "connecting_ready"); selected_subchannel = p->subchannels[p->checking_subchannel]; p->selected = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected_subchannel), @@ -576,7 +561,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, p->selected_key = grpc_subchannel_get_key(selected_subchannel); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); - destroy_subchannels_locked(exec_ctx, p); + destroy_subchannels_locked(p); /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -586,12 +571,12 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, "Servicing pending pick with selected subchannel %p", (void *)p->selected); } - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + p->selected, p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: p->checking_subchannel = @@ -600,7 +585,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, /* only trigger transient failure when we've tried all alternatives */ grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } GRPC_ERROR_UNREF(error); @@ -608,7 +593,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, p->subchannels[p->checking_subchannel], &error); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], + p->subchannels[p->checking_subchannel], p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { @@ -617,37 +602,34 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, break; case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_REF(error), "connecting_changed"); + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_REF(error), + "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); + p->subchannels[p->checking_subchannel], p->base.interested_parties, + &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_SHUTDOWN: p->num_subchannels--; GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], - "pick_first"); + GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); if (p->num_subchannels == 0) { grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, + &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick first exhausted channels", &error, 1), "no_more_channels"); while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, - "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(&p->base, "pick_first_connectivity"); } else { grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "subchannel_failed"); p->checking_subchannel %= p->num_subchannels; GRPC_ERROR_UNREF(error); @@ -677,15 +659,14 @@ static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} -static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_factory *factory, +static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { GPR_ASSERT(args->client_channel_factory != NULL); pick_first_lb_policy *p = (pick_first_lb_policy *)gpr_zalloc(sizeof(*p)); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_DEBUG, "Pick First %p created.", (void *)p); } - pf_update_locked(exec_ctx, &p->base, args); + pf_update_locked(&p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_subchannel_index_ref(); GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p, |