diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 399 |
1 files changed, 177 insertions, 222 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index ffd58129c6..aa4beb359f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -131,12 +131,12 @@ grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb"); /* add lb_token of selected subchannel (address) to the call's initial * metadata */ static grpc_error *initial_metadata_add_lb_token( - grpc_exec_ctx *exec_ctx, grpc_metadata_batch *initial_metadata, + grpc_metadata_batch *initial_metadata, grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem lb_token) { GPR_ASSERT(lb_token_mdelem_storage != NULL); GPR_ASSERT(!GRPC_MDISNULL(lb_token)); - return grpc_metadata_batch_add_tail(exec_ctx, initial_metadata, - lb_token_mdelem_storage, lb_token); + return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); } static void destroy_client_stats(void *arg) { @@ -182,12 +182,11 @@ typedef struct wrapped_rr_closure_arg { /* The \a on_complete closure passed as part of the pick requires keeping a * reference to its associated round robin instance. We wrap this closure in * order to unref the round robin instance upon its invocation */ -static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void wrapped_rr_closure(void *arg, grpc_error *error) { wrapped_rr_closure_arg *wc_arg = (wrapped_rr_closure_arg *)arg; GPR_ASSERT(wc_arg->wrapped_closure != NULL); - GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); if (wc_arg->rr_policy != NULL) { /* if *target is NULL, no pick has been made by the RR policy (eg, all @@ -195,7 +194,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, * available */ if (*wc_arg->target != NULL) { if (!GRPC_MDISNULL(wc_arg->lb_token)) { - initial_metadata_add_lb_token(exec_ctx, wc_arg->initial_metadata, + initial_metadata_add_lb_token(wc_arg->initial_metadata, wc_arg->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); } else { @@ -215,7 +214,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy); } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); + GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure"); } GPR_ASSERT(wc_arg->free_when_done != NULL); gpr_free(wc_arg->free_when_done); @@ -453,9 +452,9 @@ static void *lb_token_copy(void *token) { ? NULL : (void *)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload; } -static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) { +static void lb_token_destroy(void *token) { if (token != NULL) { - GRPC_MDELEM_UNREF(exec_ctx, grpc_mdelem{(uintptr_t)token}); + GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token}); } } static int lb_token_cmp(void *token1, void *token2) { @@ -491,7 +490,7 @@ static void parse_server(const grpc_grpclb_server *server, /* Returns addresses extracted from \a serverlist. */ static grpc_lb_addresses *process_serverlist_locked( - grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist) { + const grpc_grpclb_serverlist *serverlist) { size_t num_valid = 0; /* first pass: count how many are valid in order to allocate the necessary * memory in a single block */ @@ -522,9 +521,9 @@ static grpc_lb_addresses *process_serverlist_locked( strnlen(server->load_balance_token, lb_token_max_length); grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( server->load_balance_token, lb_token_length); - user_data = (void *)grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_LB_TOKEN, - lb_token_mdstr) - .payload; + user_data = + (void *)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) + .payload; } else { char *uri = grpc_sockaddr_to_uri(&addr); gpr_log(GPR_INFO, @@ -546,7 +545,7 @@ static grpc_lb_addresses *process_serverlist_locked( /* Returns the backend addresses extracted from the given addresses */ static grpc_lb_addresses *extract_backend_addresses_locked( - grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses) { + const grpc_lb_addresses *addresses) { /* first pass: count the number of backend addresses */ size_t num_backends = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { @@ -571,8 +570,8 @@ static grpc_lb_addresses *extract_backend_addresses_locked( } static void update_lb_connectivity_status_locked( - grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, - grpc_connectivity_state rr_state, grpc_error *rr_state_error) { + glb_lb_policy *glb_policy, grpc_connectivity_state rr_state, + grpc_error *rr_state_error) { const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(&glb_policy->state_tracker); @@ -623,7 +622,7 @@ static void update_lb_connectivity_status_locked( GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.", grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy); } - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state, + grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state, rr_state_error, "update_lb_connectivity_status_locked"); } @@ -634,9 +633,9 @@ static void update_lb_connectivity_status_locked( * If \a force_async is true, then we will manually schedule the * completion callback even if the pick is available immediately. */ static bool pick_from_internal_rr_locked( - grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, - const grpc_lb_policy_pick_args *pick_args, bool force_async, - grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { + glb_lb_policy *glb_policy, const grpc_lb_policy_pick_args *pick_args, + bool force_async, grpc_connected_subchannel **target, + wrapped_rr_closure_arg *wc_arg) { // Check for drops if we are not using fallback backend addresses. if (glb_policy->serverlist != NULL) { // Look at the index into the serverlist to see if we should drop this call. @@ -651,7 +650,7 @@ static bool pick_from_internal_rr_locked( gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", (intptr_t)wc_arg->rr_policy); } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); // Update client load reporting stats to indicate the number of // dropped calls. Note that we have to do this here instead of in // the client_load_reporting filter, because we do not create a @@ -662,7 +661,7 @@ static bool pick_from_internal_rr_locked( grpc_grpclb_client_stats_unref(wc_arg->client_stats); if (force_async) { GPR_ASSERT(wc_arg->wrapped_closure != NULL); - GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); gpr_free(wc_arg->free_when_done); return false; } @@ -672,7 +671,7 @@ static bool pick_from_internal_rr_locked( } // Pick via the RR policy. const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context, + wc_arg->rr_policy, pick_args, target, wc_arg->context, (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ @@ -680,9 +679,9 @@ static bool pick_from_internal_rr_locked( gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", (intptr_t)wc_arg->rr_policy); } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); /* add the load reporting initial metadata */ - initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata, + initial_metadata_add_lb_token(pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); // Pass on client stats via context. Passes ownership of the reference. @@ -691,7 +690,7 @@ static bool pick_from_internal_rr_locked( wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; if (force_async) { GPR_ASSERT(wc_arg->wrapped_closure != NULL); - GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); gpr_free(wc_arg->free_when_done); return false; } @@ -704,12 +703,11 @@ static bool pick_from_internal_rr_locked( return pick_done; } -static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static grpc_lb_policy_args *lb_policy_args_create(glb_lb_policy *glb_policy) { grpc_lb_addresses *addresses; if (glb_policy->serverlist != NULL) { GPR_ASSERT(glb_policy->serverlist->num_servers > 0); - addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist); + addresses = process_serverlist_locked(glb_policy->serverlist); } else { // If rr_handover_locked() is invoked when we haven't received any // serverlist from the balancer, we use the fallback backends returned by @@ -729,24 +727,21 @@ static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, args->args = grpc_channel_args_copy_and_add_and_remove( glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1); - grpc_lb_addresses_destroy(exec_ctx, addresses); + grpc_lb_addresses_destroy(addresses); return args; } -static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_args *args) { - grpc_channel_args_destroy(exec_ctx, args->args); +static void lb_policy_args_destroy(grpc_lb_policy_args *args) { + grpc_channel_args_destroy(args->args); gpr_free(args); } -static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, - void *arg, grpc_error *error); -static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, +static void glb_rr_connectivity_changed_locked(void *arg, grpc_error *error); +static void create_rr_locked(glb_lb_policy *glb_policy, grpc_lb_policy_args *args) { GPR_ASSERT(glb_policy->rr_policy == NULL); - grpc_lb_policy *new_rr_policy = - grpc_lb_policy_create(exec_ctx, "round_robin", args); + grpc_lb_policy *new_rr_policy = grpc_lb_policy_create("round_robin", args); if (new_rr_policy == NULL) { gpr_log(GPR_ERROR, "Failure creating a RoundRobin policy for serverlist update with " @@ -760,16 +755,14 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, glb_policy->rr_policy = new_rr_policy; grpc_error *rr_state_error = NULL; const grpc_connectivity_state rr_state = - grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy, + grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy, &rr_state_error); /* Connectivity state is a function of the RR policy updated/created */ - update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state, - rr_state_error); + update_lb_connectivity_status_locked(glb_policy, rr_state, rr_state_error); /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ - grpc_pollset_set_add_pollset_set(exec_ctx, - glb_policy->rr_policy->interested_parties, + grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); /* Allocate the data for the tracking of the new RR policy's connectivity. @@ -784,10 +777,10 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, /* Subscribe to changes to the connectivity of the new RR */ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb"); - grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, + grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); - grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy); + grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy); /* Update picks and pings in wait */ pending_pick *pp; @@ -801,7 +794,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p", (void *)glb_policy->rr_policy); } - pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args, + pick_from_internal_rr_locked(glb_policy, &pp->pick_args, true /* force_async */, pp->target, &pp->wrapped_on_complete_arg); } @@ -815,40 +808,37 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, + grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, &pping->wrapped_notify_arg.wrapper_closure); } } /* glb_policy->rr_policy may be NULL (initial handover) */ -static void rr_handover_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static void rr_handover_locked(glb_lb_policy *glb_policy) { if (glb_policy->shutting_down) return; - grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); + grpc_lb_policy_args *args = lb_policy_args_create(glb_policy); GPR_ASSERT(args != NULL); if (glb_policy->rr_policy != NULL) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)", (void *)glb_policy->rr_policy); } - grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args); + grpc_lb_policy_update_locked(glb_policy->rr_policy, args); } else { - create_rr_locked(exec_ctx, glb_policy, args); + create_rr_locked(glb_policy, args); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)", (void *)glb_policy->rr_policy); } } - lb_policy_args_destroy(exec_ctx, args); + lb_policy_args_destroy(args); } -static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, - void *arg, grpc_error *error) { +static void glb_rr_connectivity_changed_locked(void *arg, grpc_error *error) { rr_connectivity_data *rr_connectivity = (rr_connectivity_data *)arg; glb_lb_policy *glb_policy = rr_connectivity->glb_policy; if (glb_policy->shutting_down) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "glb_rr_connectivity_cb"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); gpr_free(rr_connectivity); return; } @@ -856,25 +846,22 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, /* An RR policy that has transitioned into the SHUTDOWN connectivity state * should not be considered for picks or updates: the SHUTDOWN state is a * sink, policies can't transition back from it. .*/ - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, - "rr_connectivity_shutdown"); + GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown"); glb_policy->rr_policy = NULL; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "glb_rr_connectivity_cb"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); gpr_free(rr_connectivity); return; } /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */ - update_lb_connectivity_status_locked( - exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error)); + update_lb_connectivity_status_locked(glb_policy, rr_connectivity->state, + GRPC_ERROR_REF(error)); /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */ - grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, + grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); } -static void destroy_balancer_name(grpc_exec_ctx *exec_ctx, - void *balancer_name) { +static void destroy_balancer_name(void *balancer_name) { gpr_free(balancer_name); } @@ -901,7 +888,7 @@ static int balancer_name_cmp_fn(void *a, void *b) { * above the grpclb policy. * - \a args: other args inherited from the grpclb policy. */ static grpc_channel_args *build_lb_channel_args( - grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses, + const grpc_lb_addresses *addresses, grpc_fake_resolver_response_generator *response_generator, const grpc_channel_args *args) { size_t num_grpclb_addrs = 0; @@ -944,7 +931,7 @@ static grpc_channel_args *build_lb_channel_args( gpr_free(targets_info_entries); grpc_channel_args *lb_channel_args = - grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info, + grpc_lb_policy_grpclb_build_lb_channel_args(targets_info, response_generator, args); grpc_arg lb_channel_addresses_arg = @@ -952,34 +939,34 @@ static grpc_channel_args *build_lb_channel_args( grpc_channel_args *result = grpc_channel_args_copy_and_add( lb_channel_args, &lb_channel_addresses_arg, 1); - grpc_slice_hash_table_unref(exec_ctx, targets_info); - grpc_channel_args_destroy(exec_ctx, lb_channel_args); - grpc_lb_addresses_destroy(exec_ctx, lb_addresses); + grpc_slice_hash_table_unref(targets_info); + grpc_channel_args_destroy(lb_channel_args); + grpc_lb_addresses_destroy(lb_addresses); return result; } -static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void glb_destroy(grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; GPR_ASSERT(glb_policy->pending_picks == NULL); GPR_ASSERT(glb_policy->pending_pings == NULL); gpr_free((void *)glb_policy->server_name); - grpc_channel_args_destroy(exec_ctx, glb_policy->args); + grpc_channel_args_destroy(glb_policy->args); if (glb_policy->client_stats != NULL) { grpc_grpclb_client_stats_unref(glb_policy->client_stats); } - grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); + grpc_connectivity_state_destroy(&glb_policy->state_tracker); if (glb_policy->serverlist != NULL) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } if (glb_policy->fallback_backend_addresses != NULL) { - grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); + grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); grpc_subchannel_index_unref(); gpr_free(glb_policy); } -static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void glb_shutdown_locked(grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_policy->shutting_down = true; @@ -997,11 +984,11 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { /* lb_on_server_status_received will pick up the cancel and clean up */ } if (glb_policy->retry_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + grpc_timer_cancel(&glb_policy->lb_call_retry_timer); glb_policy->retry_timer_active = false; } if (glb_policy->fallback_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer); + grpc_timer_cancel(&glb_policy->lb_fallback_timer); glb_policy->fallback_timer_active = false; } @@ -1010,7 +997,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pending_ping *pping = glb_policy->pending_pings; glb_policy->pending_pings = NULL; if (glb_policy->rr_policy != NULL) { - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); + GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown"); } // We destroy the LB channel here because // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy @@ -1021,20 +1008,20 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_policy->lb_channel = NULL; } grpc_connectivity_state_set( - exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, + &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown"); while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_NONE); pp = next; } while (pping != NULL) { pending_ping *next = pping->next; - GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(&pping->wrapped_notify_arg.wrapper_closure, GRPC_ERROR_NONE); pping = next; } @@ -1050,7 +1037,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { // - Otherwise, without an RR instance, picks stay pending at this policy's // level (grpclb), inside the glb_policy->pending_picks list. To cancel these, // we invoke the completion closure and set *target to NULL right here. -static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, +static void glb_cancel_pick_locked(grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; @@ -1060,7 +1047,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1070,7 +1057,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } if (glb_policy->rr_policy != NULL) { - grpc_lb_policy_cancel_pick_locked(exec_ctx, glb_policy->rr_policy, target, + grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, target, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); @@ -1086,8 +1073,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, // - Otherwise, without an RR instance, picks stay pending at this policy's // level (grpclb), inside the glb_policy->pending_picks list. To cancel these, // we invoke the completion closure and set *target to NULL right here. -static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, +static void glb_cancel_picks_locked(grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, grpc_error *error) { @@ -1098,7 +1084,7 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, pending_pick *next = pp->next; if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1109,52 +1095,49 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, } if (glb_policy->rr_policy != NULL) { grpc_lb_policy_cancel_picks_locked( - exec_ctx, glb_policy->rr_policy, initial_metadata_flags_mask, + glb_policy->rr_policy, initial_metadata_flags_mask, initial_metadata_flags_eq, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } -static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy); -static void start_picking_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static void lb_on_fallback_timer_locked(void *arg, grpc_error *error); +static void query_for_backends_locked(glb_lb_policy *glb_policy); +static void start_picking_locked(glb_lb_policy *glb_policy) { /* start a timer to fall back */ if (glb_policy->lb_fallback_timeout_ms > 0 && glb_policy->serverlist == NULL && !glb_policy->fallback_timer_active) { grpc_millis deadline = - grpc_exec_ctx_now(exec_ctx) + glb_policy->lb_fallback_timeout_ms; + grpc_exec_ctx_now() + glb_policy->lb_fallback_timeout_ms; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); glb_policy->fallback_timer_active = true; - grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline, + grpc_timer_init(&glb_policy->lb_fallback_timer, deadline, &glb_policy->lb_on_fallback); } glb_policy->started_picking = true; grpc_backoff_reset(&glb_policy->lb_call_backoff_state); - query_for_backends_locked(exec_ctx, glb_policy); + query_for_backends_locked(glb_policy); } -static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void glb_exit_idle_locked(grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; if (!glb_policy->started_picking) { - start_picking_locked(exec_ctx, glb_policy); + start_picking_locked(glb_policy); } } -static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, +static int glb_pick_locked(grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, on_complete, + GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "No mdelem storage for the LB token. Load reporting " "won't work without it. Failing")); @@ -1186,9 +1169,8 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; wc_arg->free_when_done = wc_arg; - pick_done = - pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args, - false /* force_async */, target, wc_arg); + pick_done = pick_from_internal_rr_locked( + glb_policy, pick_args, false /* force_async */, target, wc_arg); } else { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, @@ -1200,7 +1182,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, on_complete); if (!glb_policy->started_picking) { - start_picking_locked(exec_ctx, glb_policy); + start_picking_locked(glb_policy); } pick_done = false; } @@ -1208,37 +1190,33 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } static grpc_connectivity_state glb_check_connectivity_locked( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_error **connectivity_error) { + grpc_lb_policy *pol, grpc_error **connectivity_error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; return grpc_connectivity_state_get(&glb_policy->state_tracker, connectivity_error); } -static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { +static void glb_ping_one_locked(grpc_lb_policy *pol, grpc_closure *closure) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; if (glb_policy->rr_policy) { - grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure); + grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, closure); } else { add_pending_ping(&glb_policy->pending_pings, closure); if (!glb_policy->started_picking) { - start_picking_locked(exec_ctx, glb_policy); + start_picking_locked(glb_policy); } } } -static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, +static void glb_notify_on_state_change_locked(grpc_lb_policy *pol, grpc_connectivity_state *current, grpc_closure *notify) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &glb_policy->state_tracker, current, notify); + grpc_connectivity_state_notify_on_state_change(&glb_policy->state_tracker, + current, notify); } -static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void lb_call_on_retry_timer_locked(void *arg, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)arg; glb_policy->retry_timer_active = false; if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { @@ -1247,27 +1225,26 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, (void *)glb_policy); } GPR_ASSERT(glb_policy->lb_call == NULL); - query_for_backends_locked(exec_ctx, glb_policy); + query_for_backends_locked(glb_policy); } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_retry_timer"); } -static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static void maybe_restart_lb_call(glb_lb_policy *glb_policy) { if (glb_policy->started_picking && glb_policy->updating_lb_call) { if (glb_policy->retry_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + grpc_timer_cancel(&glb_policy->lb_call_retry_timer); } - if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy); + if (!glb_policy->shutting_down) start_picking_locked(glb_policy); glb_policy->updating_lb_call = false; } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ grpc_millis next_try = - grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state); + grpc_backoff_step(&glb_policy->lb_call_backoff_state); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...", (void *)glb_policy); - grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); + grpc_millis timeout = next_try - grpc_exec_ctx_now(); if (timeout > 0) { gpr_log(GPR_DEBUG, "... retry_timer_active in %" PRIdPTR "ms.", timeout); @@ -1280,40 +1257,36 @@ static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx, lb_call_on_retry_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); glb_policy->retry_timer_active = true; - grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, + grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry); } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "lb_on_server_status_received_locked"); } -static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); +static void send_client_load_report_locked(void *arg, grpc_error *error); -static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static void schedule_next_client_load_report(glb_lb_policy *glb_policy) { const grpc_millis next_client_load_report_time = - grpc_exec_ctx_now(exec_ctx) + glb_policy->client_stats_report_interval; + grpc_exec_ctx_now() + glb_policy->client_stats_report_interval; GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, send_client_load_report_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, + grpc_timer_init(&glb_policy->client_load_report_timer, next_client_load_report_time, &glb_policy->client_load_report_closure); } -static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void client_load_report_done_locked(void *arg, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)arg; grpc_byte_buffer_destroy(glb_policy->client_load_report_payload); glb_policy->client_load_report_payload = NULL; if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) { glb_policy->client_load_report_timer_pending = false; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "client_load_report"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); return; } - schedule_next_client_load_report(exec_ctx, glb_policy); + schedule_next_client_load_report(glb_policy); } static bool load_report_counters_are_zero(grpc_grpclb_request *request) { @@ -1328,15 +1301,13 @@ static bool load_report_counters_are_zero(grpc_grpclb_request *request) { (drop_entries == NULL || drop_entries->num_entries == 0); } -static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void send_client_load_report_locked(void *arg, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)arg; if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) { glb_policy->client_load_report_timer_pending = false; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "client_load_report"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); if (glb_policy->lb_call == NULL) { - maybe_restart_lb_call(exec_ctx, glb_policy); + maybe_restart_lb_call(glb_policy); } return; } @@ -1349,7 +1320,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, if (load_report_counters_are_zero(request)) { if (glb_policy->last_client_load_report_counters_were_zero) { grpc_grpclb_request_destroy(request); - schedule_next_client_load_report(exec_ctx, glb_policy); + schedule_next_client_load_report(glb_policy); return; } glb_policy->last_client_load_report_counters_were_zero = true; @@ -1359,7 +1330,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); glb_policy->client_load_report_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_slice_unref_internal(exec_ctx, request_payload_slice); + grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); // Send load report message. grpc_op op; @@ -1370,20 +1341,16 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, client_load_report_done_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_call, &op, 1, - &glb_policy->client_load_report_closure); + glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); if (call_error != GRPC_CALL_OK) { gpr_log(GPR_ERROR, "call_error=%d", call_error); GPR_ASSERT(GRPC_CALL_OK == call_error); } } -static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, - void *arg, grpc_error *error); -static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static void lb_on_server_status_received_locked(void *arg, grpc_error *error); +static void lb_on_response_received_locked(void *arg, grpc_error *error); +static void lb_call_init_locked(glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); GPR_ASSERT(glb_policy->lb_call == NULL); @@ -1396,13 +1363,13 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_millis deadline = glb_policy->lb_call_timeout_ms == 0 ? GRPC_MILLIS_INF_FUTURE - : grpc_exec_ctx_now(exec_ctx) + glb_policy->lb_call_timeout_ms; + : grpc_exec_ctx_now() + glb_policy->lb_call_timeout_ms; glb_policy->lb_call = grpc_channel_create_pollset_set_call( - exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, + glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, &host, deadline, NULL); - grpc_slice_unref_internal(exec_ctx, host); + grpc_slice_unref_internal(host); if (glb_policy->client_stats != NULL) { grpc_grpclb_client_stats_unref(glb_policy->client_stats); @@ -1417,7 +1384,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); glb_policy->lb_request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_slice_unref_internal(exec_ctx, request_payload_slice); + grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received, @@ -1438,8 +1405,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, glb_policy->last_client_load_report_counters_were_zero = false; } -static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static void lb_call_destroy_locked(glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_call != NULL); grpc_call_unref(glb_policy->lb_call); glb_policy->lb_call = NULL; @@ -1448,22 +1414,21 @@ static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx, grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv); grpc_byte_buffer_destroy(glb_policy->lb_request_payload); - grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details); + grpc_slice_unref_internal(glb_policy->lb_call_status_details); if (glb_policy->client_load_report_timer_pending) { - grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer); + grpc_timer_cancel(&glb_policy->client_load_report_timer); } } /* * Auxiliary functions and LB client callbacks. */ -static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static void query_for_backends_locked(glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_channel != NULL); if (glb_policy->shutting_down) return; - lb_call_init_locked(exec_ctx, glb_policy); + lb_call_init_locked(glb_policy); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, @@ -1495,8 +1460,8 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; - call_error = grpc_call_start_batch_and_execute(exec_ctx, glb_policy->lb_call, - ops, (size_t)(op - ops), NULL); + call_error = grpc_call_start_batch_and_execute(glb_policy->lb_call, ops, + (size_t)(op - ops), NULL); GPR_ASSERT(GRPC_CALL_OK == call_error); op = ops; @@ -1514,7 +1479,7 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received_locked"); call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_server_status_received); GPR_ASSERT(GRPC_CALL_OK == call_error); @@ -1528,13 +1493,12 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, * lb_on_response_received_locked */ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked"); call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); GPR_ASSERT(GRPC_CALL_OK == call_error); } -static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void lb_on_response_received_locked(void *arg, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)arg; grpc_op ops[2]; memset(ops, 0, sizeof(ops)); @@ -1568,7 +1532,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, * send_client_load_report_locked() */ glb_policy->client_load_report_timer_pending = true; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); - schedule_next_client_load_report(exec_ctx, glb_policy); + schedule_next_client_load_report(glb_policy); } else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "received initial LB response message; " @@ -1608,11 +1572,10 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } else { /* or dispose of the fallback */ - grpc_lb_addresses_destroy(exec_ctx, - glb_policy->fallback_backend_addresses); + grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); glb_policy->fallback_backend_addresses = NULL; if (glb_policy->fallback_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer); + grpc_timer_cancel(&glb_policy->lb_fallback_timer); glb_policy->fallback_timer_active = false; } } @@ -1621,7 +1584,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, * update or in glb_destroy() */ glb_policy->serverlist = serverlist; glb_policy->serverlist_index = 0; - rr_handover_locked(exec_ctx, glb_policy); + rr_handover_locked(glb_policy); } } else { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { @@ -1634,7 +1597,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); } } - grpc_slice_unref_internal(exec_ctx, response_slice); + grpc_slice_unref_internal(response_slice); if (!glb_policy->shutting_down) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; @@ -1645,23 +1608,22 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, /* reuse the "lb_on_response_received_locked" weak ref taken in * query_for_backends_locked() */ const grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } else { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "lb_on_response_received_locked_shutdown"); } } else { /* empty payload: call cancelled. */ /* dispose of the "lb_on_response_received_locked" weak ref taken in * query_for_backends_locked() and reused in every reception loop */ - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "lb_on_response_received_locked_empty_payload"); } } -static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void lb_on_fallback_timer_locked(void *arg, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)arg; glb_policy->fallback_timer_active = false; /* If we receive a serverlist after the timer fires but before this callback @@ -1674,15 +1636,13 @@ static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, (void *)glb_policy); } GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); - rr_handover_locked(exec_ctx, glb_policy); + rr_handover_locked(glb_policy); } } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "grpclb_fallback_timer"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_fallback_timer"); } -static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, - void *arg, grpc_error *error) { +static void lb_on_server_status_received_locked(void *arg, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)arg; GPR_ASSERT(glb_policy->lb_call != NULL); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { @@ -1696,29 +1656,28 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, gpr_free(status_details); } /* We need to perform cleanups no matter what. */ - lb_call_destroy_locked(exec_ctx, glb_policy); + lb_call_destroy_locked(glb_policy); // If the load report timer is still pending, we wait for it to be // called before restarting the call. Otherwise, we restart the call // here. if (!glb_policy->client_load_report_timer_pending) { - maybe_restart_lb_call(exec_ctx, glb_policy); + maybe_restart_lb_call(glb_policy); } } -static void fallback_update_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy, +static void fallback_update_locked(glb_lb_policy *glb_policy, const grpc_lb_addresses *addresses) { GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); - grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); + grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); glb_policy->fallback_backend_addresses = - extract_backend_addresses_locked(exec_ctx, addresses); + extract_backend_addresses_locked(addresses); if (glb_policy->lb_fallback_timeout_ms > 0 && !glb_policy->fallback_timer_active) { - rr_handover_locked(exec_ctx, glb_policy); + rr_handover_locked(glb_policy); } } -static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, +static void glb_update_locked(grpc_lb_policy *policy, const grpc_lb_policy_args *args) { glb_lb_policy *glb_policy = (glb_lb_policy *)policy; const grpc_arg *arg = @@ -1728,7 +1687,7 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, // If we don't have a current channel to the LB, go into TRANSIENT // FAILURE. grpc_connectivity_state_set( - exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), "glb_update_missing"); } else { @@ -1745,16 +1704,16 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, // If a non-empty serverlist hasn't been received from the balancer, // propagate the update to fallback_backend_addresses. if (glb_policy->serverlist == NULL) { - fallback_update_locked(exec_ctx, glb_policy, addresses); + fallback_update_locked(glb_policy, addresses); } GPR_ASSERT(glb_policy->lb_channel != NULL); // Propagate updates to the LB channel (pick_first) through the fake // resolver. grpc_channel_args *lb_channel_args = build_lb_channel_args( - exec_ctx, addresses, glb_policy->response_generator, args->args); + addresses, glb_policy->response_generator, args->args); grpc_fake_resolver_response_generator_set_response( - exec_ctx, glb_policy->response_generator, lb_channel_args); - grpc_channel_args_destroy(exec_ctx, lb_channel_args); + glb_policy->response_generator, lb_channel_args); + grpc_channel_args_destroy(lb_channel_args); // Start watching the LB channel connectivity for connection, if not // already doing so. if (!glb_policy->watching_lb_channel) { @@ -1766,9 +1725,8 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, glb_policy->watching_lb_channel = true; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity"); grpc_client_channel_watch_connectivity_state( - exec_ctx, client_channel_elem, - grpc_polling_entity_create_from_pollset_set( - glb_policy->base.interested_parties), + client_channel_elem, grpc_polling_entity_create_from_pollset_set( + glb_policy->base.interested_parties), &glb_policy->lb_channel_connectivity, &glb_policy->lb_channel_on_connectivity_changed, NULL); } @@ -1777,8 +1735,7 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, // Invoked as part of the update process. It continues watching the LB channel // until it shuts down or becomes READY. It's invoked even if the LB channel // stayed READY throughout the update (for example if the update is identical). -static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, - void *arg, +static void glb_lb_channel_on_connectivity_changed_cb(void *arg, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)arg; if (glb_policy->shutting_down) goto done; @@ -1795,9 +1752,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, grpc_channel_get_channel_stack(glb_policy->lb_channel)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); grpc_client_channel_watch_connectivity_state( - exec_ctx, client_channel_elem, - grpc_polling_entity_create_from_pollset_set( - glb_policy->base.interested_parties), + client_channel_elem, grpc_polling_entity_create_from_pollset_set( + glb_policy->base.interested_parties), &glb_policy->lb_channel_connectivity, &glb_policy->lb_channel_on_connectivity_changed, NULL); break; @@ -1815,16 +1771,16 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, // lb_call. } else if (glb_policy->started_picking && !glb_policy->shutting_down) { if (glb_policy->retry_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + grpc_timer_cancel(&glb_policy->lb_call_retry_timer); glb_policy->retry_timer_active = false; } - start_picking_locked(exec_ctx, glb_policy); + start_picking_locked(glb_policy); } /* fallthrough */ case GRPC_CHANNEL_SHUTDOWN: done: glb_policy->watching_lb_channel = false; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "watch_lb_channel_connectivity_cb_shutdown"); break; } @@ -1843,8 +1799,7 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_notify_on_state_change_locked, glb_update_locked}; -static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_factory *factory, +static grpc_lb_policy *glb_create(grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { /* Count the number of gRPC-LB addresses. There must be at least one. */ const grpc_arg *arg = @@ -1865,7 +1820,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); GPR_ASSERT(arg != NULL); GPR_ASSERT(arg->type == GRPC_ARG_STRING); - grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true); + grpc_uri *uri = grpc_uri_parse(arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); glb_policy->server_name = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); @@ -1897,26 +1852,26 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, /* Extract the backend addresses (may be empty) from the resolver for * fallback. */ glb_policy->fallback_backend_addresses = - extract_backend_addresses_locked(exec_ctx, addresses); + extract_backend_addresses_locked(addresses); /* Create a client channel over them to communicate with a LB service */ glb_policy->response_generator = grpc_fake_resolver_response_generator_create(); grpc_channel_args *lb_channel_args = build_lb_channel_args( - exec_ctx, addresses, glb_policy->response_generator, args->args); + addresses, glb_policy->response_generator, args->args); char *uri_str; gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( - exec_ctx, uri_str, args->client_channel_factory, lb_channel_args); + uri_str, args->client_channel_factory, lb_channel_args); /* Propagate initial resolution */ grpc_fake_resolver_response_generator_set_response( - exec_ctx, glb_policy->response_generator, lb_channel_args); - grpc_channel_args_destroy(exec_ctx, lb_channel_args); + glb_policy->response_generator, lb_channel_args); + grpc_channel_args_destroy(lb_channel_args); gpr_free(uri_str); if (glb_policy->lb_channel == NULL) { gpr_free((void *)glb_policy->server_name); - grpc_channel_args_destroy(exec_ctx, glb_policy->args); + grpc_channel_args_destroy(glb_policy->args); gpr_free(glb_policy); return NULL; } @@ -1947,7 +1902,7 @@ grpc_lb_policy_factory *grpc_glb_lb_factory_create() { // Only add client_load_reporting filter if the grpclb LB policy is used. static bool maybe_add_client_load_reporting_filter( - grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) { + grpc_channel_stack_builder *builder, void *arg) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); const grpc_arg *channel_arg = |