diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
9 files changed, 399 insertions, 492 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc index 6d9fadaf30..3eedb08ecc 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc @@ -25,14 +25,12 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/profiling/timers.h" -static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { return GRPC_ERROR_NONE; } -static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) {} +static void destroy_channel_elem(grpc_channel_element* elem) {} typedef struct { // Stats object to update. @@ -47,28 +45,24 @@ typedef struct { bool recv_initial_metadata_succeeded; } call_data; -static void on_complete_for_send(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void on_complete_for_send(void* arg, grpc_error* error) { call_data* calld = (call_data*)arg; if (error == GRPC_ERROR_NONE) { calld->send_initial_metadata_succeeded = true; } - GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete_for_send, - GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(calld->original_on_complete_for_send, GRPC_ERROR_REF(error)); } -static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void recv_initial_metadata_ready(void* arg, grpc_error* error) { call_data* calld = (call_data*)arg; if (error == GRPC_ERROR_NONE) { calld->recv_initial_metadata_succeeded = true; } - GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready, + GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } -static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = (call_data*)elem->call_data; // Get stats object from context and take a ref. @@ -81,7 +75,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, return GRPC_ERROR_NONE; } -static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, +static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = (call_data*)elem->call_data; @@ -96,8 +90,7 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, } static void start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* batch) { + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0); // Intercept send_initial_metadata. @@ -118,7 +111,7 @@ static void start_transport_stream_op_batch( &calld->recv_initial_metadata_ready; } // Chain to next filter. - grpc_call_next_op(exec_ctx, elem, batch); + grpc_call_next_op(elem, batch); GPR_TIMER_END("clr_start_transport_stream_op_batch", 0); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 704cbd8a3b..3c64213fb9 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -131,12 +131,12 @@ grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb"); /* add lb_token of selected subchannel (address) to the call's initial * metadata */ static grpc_error* initial_metadata_add_lb_token( - grpc_exec_ctx* exec_ctx, grpc_metadata_batch* initial_metadata, + grpc_metadata_batch* initial_metadata, grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) { GPR_ASSERT(lb_token_mdelem_storage != nullptr); GPR_ASSERT(!GRPC_MDISNULL(lb_token)); - return grpc_metadata_batch_add_tail(exec_ctx, initial_metadata, - lb_token_mdelem_storage, lb_token); + return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); } static void destroy_client_stats(void* arg) { @@ -186,20 +186,19 @@ typedef struct wrapped_rr_closure_arg { /* The \a on_complete closure passed as part of the pick requires keeping a * reference to its associated round robin instance. We wrap this closure in * order to unref the round robin instance upon its invocation */ -static void wrapped_rr_closure(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void wrapped_rr_closure(void* arg, grpc_error* error) { wrapped_rr_closure_arg* wc_arg = (wrapped_rr_closure_arg*)arg; GPR_ASSERT(wc_arg->wrapped_closure != nullptr); - GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); if (wc_arg->rr_policy != nullptr) { - /* if *target is NULL, no pick has been made by the RR policy (eg, all + /* if *target is nullptr, no pick has been made by the RR policy (eg, all * addresses failed to connect). There won't be any user_data/token * available */ if (*wc_arg->target != nullptr) { if (!GRPC_MDISNULL(wc_arg->lb_token)) { - initial_metadata_add_lb_token(exec_ctx, wc_arg->initial_metadata, + initial_metadata_add_lb_token(wc_arg->initial_metadata, wc_arg->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); } else { @@ -221,7 +220,7 @@ static void wrapped_rr_closure(grpc_exec_ctx* exec_ctx, void* arg, gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy, wc_arg->rr_policy); } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); + GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure"); } GPR_ASSERT(wc_arg->free_when_done != nullptr); gpr_free(wc_arg->free_when_done); @@ -241,8 +240,8 @@ typedef struct pending_pick { /* original pick()'s arguments */ grpc_lb_policy_pick_args pick_args; - /* output argument where to store the pick()ed connected subchannel, or NULL - * upon error. */ + /* output argument where to store the pick()ed connected subchannel, or + * nullptr upon error. */ grpc_connected_subchannel** target; /* args for wrapped_on_complete */ @@ -340,8 +339,8 @@ typedef struct glb_lb_policy { /** connectivity state of the LB channel */ grpc_connectivity_state lb_channel_connectivity; - /** stores the deserialized response from the LB. May be NULL until one such - * response has arrived. */ + /** stores the deserialized response from the LB. May be nullptr until one + * such response has arrived. */ grpc_grpclb_serverlist* serverlist; /** Index into serverlist for next pick. @@ -471,9 +470,9 @@ static void* lb_token_copy(void* token) { ? nullptr : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload; } -static void lb_token_destroy(grpc_exec_ctx* exec_ctx, void* token) { +static void lb_token_destroy(void* token) { if (token != nullptr) { - GRPC_MDELEM_UNREF(exec_ctx, grpc_mdelem{(uintptr_t)token}); + GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token}); } } static int lb_token_cmp(void* token1, void* token2) { @@ -509,7 +508,7 @@ static void parse_server(const grpc_grpclb_server* server, /* Returns addresses extracted from \a serverlist. */ static grpc_lb_addresses* process_serverlist_locked( - grpc_exec_ctx* exec_ctx, const grpc_grpclb_serverlist* serverlist) { + const grpc_grpclb_serverlist* serverlist) { size_t num_valid = 0; /* first pass: count how many are valid in order to allocate the necessary * memory in a single block */ @@ -540,9 +539,9 @@ static grpc_lb_addresses* process_serverlist_locked( strnlen(server->load_balance_token, lb_token_max_length); grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( server->load_balance_token, lb_token_length); - user_data = (void*)grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_LB_TOKEN, - lb_token_mdstr) - .payload; + user_data = + (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) + .payload; } else { char* uri = grpc_sockaddr_to_uri(&addr); gpr_log(GPR_INFO, @@ -564,7 +563,7 @@ static grpc_lb_addresses* process_serverlist_locked( /* Returns the backend addresses extracted from the given addresses */ static grpc_lb_addresses* extract_backend_addresses_locked( - grpc_exec_ctx* exec_ctx, const grpc_lb_addresses* addresses) { + const grpc_lb_addresses* addresses) { /* first pass: count the number of backend addresses */ size_t num_backends = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { @@ -589,8 +588,8 @@ static grpc_lb_addresses* extract_backend_addresses_locked( } static void update_lb_connectivity_status_locked( - grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, - grpc_connectivity_state rr_state, grpc_error* rr_state_error) { + glb_lb_policy* glb_policy, grpc_connectivity_state rr_state, + grpc_error* rr_state_error) { const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(&glb_policy->state_tracker); @@ -642,7 +641,7 @@ static void update_lb_connectivity_status_locked( glb_policy, grpc_connectivity_state_name(rr_state), glb_policy->rr_policy); } - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state, + grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state, rr_state_error, "update_lb_connectivity_status_locked"); } @@ -653,9 +652,9 @@ static void update_lb_connectivity_status_locked( * If \a force_async is true, then we will manually schedule the * completion callback even if the pick is available immediately. */ static bool pick_from_internal_rr_locked( - grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, - const grpc_lb_policy_pick_args* pick_args, bool force_async, - grpc_connected_subchannel** target, wrapped_rr_closure_arg* wc_arg) { + glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args, + bool force_async, grpc_connected_subchannel** target, + wrapped_rr_closure_arg* wc_arg) { // Check for drops if we are not using fallback backend addresses. if (glb_policy->serverlist != nullptr) { // Look at the index into the serverlist to see if we should drop this call. @@ -670,7 +669,7 @@ static bool pick_from_internal_rr_locked( gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy, wc_arg->rr_policy); } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); // Update client load reporting stats to indicate the number of // dropped calls. Note that we have to do this here instead of in // the client_load_reporting filter, because we do not create a @@ -682,7 +681,7 @@ static bool pick_from_internal_rr_locked( grpc_grpclb_client_stats_unref(wc_arg->client_stats); if (force_async) { GPR_ASSERT(wc_arg->wrapped_closure != nullptr); - GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); gpr_free(wc_arg->free_when_done); return false; } @@ -692,7 +691,7 @@ static bool pick_from_internal_rr_locked( } // Pick via the RR policy. const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context, + wc_arg->rr_policy, pick_args, target, wc_arg->context, (void**)&wc_arg->lb_token, &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ @@ -700,9 +699,9 @@ static bool pick_from_internal_rr_locked( gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy, wc_arg->rr_policy); } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); /* add the load reporting initial metadata */ - initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata, + initial_metadata_add_lb_token(pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); // Pass on client stats via context. Passes ownership of the reference. @@ -711,7 +710,7 @@ static bool pick_from_internal_rr_locked( wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; if (force_async) { GPR_ASSERT(wc_arg->wrapped_closure != nullptr); - GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); gpr_free(wc_arg->free_when_done); return false; } @@ -724,12 +723,11 @@ static bool pick_from_internal_rr_locked( return pick_done; } -static grpc_lb_policy_args* lb_policy_args_create(grpc_exec_ctx* exec_ctx, - glb_lb_policy* glb_policy) { +static grpc_lb_policy_args* lb_policy_args_create(glb_lb_policy* glb_policy) { grpc_lb_addresses* addresses; if (glb_policy->serverlist != nullptr) { GPR_ASSERT(glb_policy->serverlist->num_servers > 0); - addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist); + addresses = process_serverlist_locked(glb_policy->serverlist); } else { // If rr_handover_locked() is invoked when we haven't received any // serverlist from the balancer, we use the fallback backends returned by @@ -749,24 +747,21 @@ static grpc_lb_policy_args* lb_policy_args_create(grpc_exec_ctx* exec_ctx, args->args = grpc_channel_args_copy_and_add_and_remove( glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1); - grpc_lb_addresses_destroy(exec_ctx, addresses); + grpc_lb_addresses_destroy(addresses); return args; } -static void lb_policy_args_destroy(grpc_exec_ctx* exec_ctx, - grpc_lb_policy_args* args) { - grpc_channel_args_destroy(exec_ctx, args->args); +static void lb_policy_args_destroy(grpc_lb_policy_args* args) { + grpc_channel_args_destroy(args->args); gpr_free(args); } -static void glb_rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, - void* arg, grpc_error* error); -static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, +static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error); +static void create_rr_locked(glb_lb_policy* glb_policy, grpc_lb_policy_args* args) { GPR_ASSERT(glb_policy->rr_policy == nullptr); - grpc_lb_policy* new_rr_policy = - grpc_lb_policy_create(exec_ctx, "round_robin", args); + grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args); if (new_rr_policy == nullptr) { gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy for serverlist " @@ -779,21 +774,19 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, return; } grpc_lb_policy_set_reresolve_closure_locked( - exec_ctx, new_rr_policy, glb_policy->base.request_reresolution); + new_rr_policy, glb_policy->base.request_reresolution); glb_policy->base.request_reresolution = nullptr; glb_policy->rr_policy = new_rr_policy; grpc_error* rr_state_error = nullptr; const grpc_connectivity_state rr_state = - grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy, + grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy, &rr_state_error); /* Connectivity state is a function of the RR policy updated/created */ - update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state, - rr_state_error); + update_lb_connectivity_status_locked(glb_policy, rr_state, rr_state_error); /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ - grpc_pollset_set_add_pollset_set(exec_ctx, - glb_policy->rr_policy->interested_parties, + grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); /* Allocate the data for the tracking of the new RR policy's connectivity. @@ -808,10 +801,10 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, /* Subscribe to changes to the connectivity of the new RR */ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb"); - grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, + grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); - grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy); + grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy); /* Update picks and pings in wait */ pending_pick* pp; @@ -826,7 +819,7 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, "[grpclb %p] Pending pick about to (async) PICK from RR %p", glb_policy, glb_policy->rr_policy); } - pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args, + pick_from_internal_rr_locked(glb_policy, &pp->pick_args, true /* force_async */, pp->target, &pp->wrapped_on_complete_arg); } @@ -850,41 +843,37 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", glb_policy, glb_policy->rr_policy); } - grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, on_initiate, - on_ack); + grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); gpr_free(pping); } } -/* glb_policy->rr_policy may be NULL (initial handover) */ -static void rr_handover_locked(grpc_exec_ctx* exec_ctx, - glb_lb_policy* glb_policy) { +/* glb_policy->rr_policy may be nullptr (initial handover) */ +static void rr_handover_locked(glb_lb_policy* glb_policy) { if (glb_policy->shutting_down) return; - grpc_lb_policy_args* args = lb_policy_args_create(exec_ctx, glb_policy); + grpc_lb_policy_args* args = lb_policy_args_create(glb_policy); GPR_ASSERT(args != nullptr); if (glb_policy->rr_policy != nullptr) { if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", glb_policy, glb_policy->rr_policy); } - grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args); + grpc_lb_policy_update_locked(glb_policy->rr_policy, args); } else { - create_rr_locked(exec_ctx, glb_policy, args); + create_rr_locked(glb_policy, args); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", glb_policy, glb_policy->rr_policy); } } - lb_policy_args_destroy(exec_ctx, args); + lb_policy_args_destroy(args); } -static void glb_rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, - void* arg, grpc_error* error) { +static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error) { rr_connectivity_data* rr_connectivity = (rr_connectivity_data*)arg; glb_lb_policy* glb_policy = rr_connectivity->glb_policy; if (glb_policy->shutting_down) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "glb_rr_connectivity_cb"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); gpr_free(rr_connectivity); return; } @@ -892,25 +881,22 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, /* An RR policy that has transitioned into the SHUTDOWN connectivity state * should not be considered for picks or updates: the SHUTDOWN state is a * sink, policies can't transition back from it. .*/ - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, - "rr_connectivity_shutdown"); + GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown"); glb_policy->rr_policy = nullptr; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "glb_rr_connectivity_cb"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); gpr_free(rr_connectivity); return; } /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */ - update_lb_connectivity_status_locked( - exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error)); + update_lb_connectivity_status_locked(glb_policy, rr_connectivity->state, + GRPC_ERROR_REF(error)); /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */ - grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, + grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); } -static void destroy_balancer_name(grpc_exec_ctx* exec_ctx, - void* balancer_name) { +static void destroy_balancer_name(void* balancer_name) { gpr_free(balancer_name); } @@ -937,7 +923,7 @@ static int balancer_name_cmp_fn(void* a, void* b) { * above the grpclb policy. * - \a args: other args inherited from the grpclb policy. */ static grpc_channel_args* build_lb_channel_args( - grpc_exec_ctx* exec_ctx, const grpc_lb_addresses* addresses, + const grpc_lb_addresses* addresses, grpc_fake_resolver_response_generator* response_generator, const grpc_channel_args* args) { size_t num_grpclb_addrs = 0; @@ -980,7 +966,7 @@ static grpc_channel_args* build_lb_channel_args( gpr_free(targets_info_entries); grpc_channel_args* lb_channel_args = - grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info, + grpc_lb_policy_grpclb_build_lb_channel_args(targets_info, response_generator, args); grpc_arg lb_channel_addresses_arg = @@ -988,34 +974,34 @@ static grpc_channel_args* build_lb_channel_args( grpc_channel_args* result = grpc_channel_args_copy_and_add( lb_channel_args, &lb_channel_addresses_arg, 1); - grpc_slice_hash_table_unref(exec_ctx, targets_info); - grpc_channel_args_destroy(exec_ctx, lb_channel_args); - grpc_lb_addresses_destroy(exec_ctx, lb_addresses); + grpc_slice_hash_table_unref(targets_info); + grpc_channel_args_destroy(lb_channel_args); + grpc_lb_addresses_destroy(lb_addresses); return result; } -static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { +static void glb_destroy(grpc_lb_policy* pol) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; GPR_ASSERT(glb_policy->pending_picks == nullptr); GPR_ASSERT(glb_policy->pending_pings == nullptr); gpr_free((void*)glb_policy->server_name); - grpc_channel_args_destroy(exec_ctx, glb_policy->args); + grpc_channel_args_destroy(glb_policy->args); if (glb_policy->client_stats != nullptr) { grpc_grpclb_client_stats_unref(glb_policy->client_stats); } - grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); + grpc_connectivity_state_destroy(&glb_policy->state_tracker); if (glb_policy->serverlist != nullptr) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } if (glb_policy->fallback_backend_addresses != nullptr) { - grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); + grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); grpc_subchannel_index_unref(); gpr_free(glb_policy); } -static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { +static void glb_shutdown_locked(grpc_lb_policy* pol) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); glb_policy->shutting_down = true; @@ -1034,11 +1020,11 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { /* lb_on_server_status_received will pick up the cancel and clean up */ } if (glb_policy->retry_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + grpc_timer_cancel(&glb_policy->lb_call_retry_timer); glb_policy->retry_timer_active = false; } if (glb_policy->fallback_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer); + grpc_timer_cancel(&glb_policy->lb_fallback_timer); glb_policy->fallback_timer_active = false; } @@ -1047,10 +1033,9 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { pending_ping* pping = glb_policy->pending_pings; glb_policy->pending_pings = nullptr; if (glb_policy->rr_policy != nullptr) { - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); + GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown"); } else { - grpc_lb_policy_try_reresolve(exec_ctx, pol, &grpc_lb_glb_trace, - GRPC_ERROR_CANCELLED); + grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); } // We destroy the LB channel here because // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy @@ -1060,14 +1045,13 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = nullptr; } - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), - "glb_shutdown"); + grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_REF(error), "glb_shutdown"); while (pp != nullptr) { pending_pick* next = pp->next; *pp->target = nullptr; - GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_REF(error)); gpr_free(pp); pp = next; @@ -1076,12 +1060,12 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { while (pping != nullptr) { pending_ping* next = pping->next; if (pping->on_initiate != nullptr) { - GRPC_CLOSURE_SCHED(exec_ctx, &pping->on_initiate->wrapper_closure, + GRPC_CLOSURE_SCHED(&pping->on_initiate->wrapper_closure, GRPC_ERROR_REF(error)); gpr_free(pping->on_initiate); } if (pping->on_ack != nullptr) { - GRPC_CLOSURE_SCHED(exec_ctx, &pping->on_ack->wrapper_closure, + GRPC_CLOSURE_SCHED(&pping->on_ack->wrapper_closure, GRPC_ERROR_REF(error)); gpr_free(pping->on_ack); } @@ -1100,8 +1084,8 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { // pick needs also be cancelled by the RR instance. // - Otherwise, without an RR instance, picks stay pending at this policy's // level (grpclb), inside the glb_policy->pending_picks list. To cancel these, -// we invoke the completion closure and set *target to NULL right here. -static void glb_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, +// we invoke the completion closure and set *target to nullptr right here. +static void glb_cancel_pick_locked(grpc_lb_policy* pol, grpc_connected_subchannel** target, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; @@ -1111,7 +1095,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, pending_pick* next = pp->next; if (pp->target == target) { *target = nullptr; - GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1121,7 +1105,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, pp = next; } if (glb_policy->rr_policy != nullptr) { - grpc_lb_policy_cancel_pick_locked(exec_ctx, glb_policy->rr_policy, target, + grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, target, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); @@ -1136,9 +1120,8 @@ static void glb_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, // pick needs also be cancelled by the RR instance. // - Otherwise, without an RR instance, picks stay pending at this policy's // level (grpclb), inside the glb_policy->pending_picks list. To cancel these, -// we invoke the completion closure and set *target to NULL right here. -static void glb_cancel_picks_locked(grpc_exec_ctx* exec_ctx, - grpc_lb_policy* pol, +// we invoke the completion closure and set *target to nullptr right here. +static void glb_cancel_picks_locked(grpc_lb_policy* pol, uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, grpc_error* error) { @@ -1149,7 +1132,7 @@ static void glb_cancel_picks_locked(grpc_exec_ctx* exec_ctx, pending_pick* next = pp->next; if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1160,52 +1143,49 @@ static void glb_cancel_picks_locked(grpc_exec_ctx* exec_ctx, } if (glb_policy->rr_policy != nullptr) { grpc_lb_policy_cancel_picks_locked( - exec_ctx, glb_policy->rr_policy, initial_metadata_flags_mask, + glb_policy->rr_policy, initial_metadata_flags_mask, initial_metadata_flags_eq, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } -static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error); -static void query_for_backends_locked(grpc_exec_ctx* exec_ctx, - glb_lb_policy* glb_policy); -static void start_picking_locked(grpc_exec_ctx* exec_ctx, - glb_lb_policy* glb_policy) { +static void lb_on_fallback_timer_locked(void* arg, grpc_error* error); +static void query_for_backends_locked(glb_lb_policy* glb_policy); +static void start_picking_locked(glb_lb_policy* glb_policy) { /* start a timer to fall back */ if (glb_policy->lb_fallback_timeout_ms > 0 && glb_policy->serverlist == nullptr && !glb_policy->fallback_timer_active) { grpc_millis deadline = - grpc_exec_ctx_now(exec_ctx) + glb_policy->lb_fallback_timeout_ms; + grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); glb_policy->fallback_timer_active = true; - grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline, + grpc_timer_init(&glb_policy->lb_fallback_timer, deadline, &glb_policy->lb_on_fallback); } glb_policy->started_picking = true; grpc_backoff_reset(&glb_policy->lb_call_backoff_state); - query_for_backends_locked(exec_ctx, glb_policy); + query_for_backends_locked(glb_policy); } -static void glb_exit_idle_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { +static void glb_exit_idle_locked(grpc_lb_policy* pol) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; if (!glb_policy->started_picking) { - start_picking_locked(exec_ctx, glb_policy); + start_picking_locked(glb_policy); } } -static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, +static int glb_pick_locked(grpc_lb_policy* pol, const grpc_lb_policy_pick_args* pick_args, grpc_connected_subchannel** target, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete) { if (pick_args->lb_token_mdelem_storage == nullptr) { *target = nullptr; - GRPC_CLOSURE_SCHED(exec_ctx, on_complete, + GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "No mdelem storage for the LB token. Load reporting " "won't work without it. Failing")); @@ -1215,8 +1195,8 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, bool pick_done = false; if (glb_policy->rr_policy != nullptr) { const grpc_connectivity_state rr_connectivity_state = - grpc_lb_policy_check_connectivity_locked( - exec_ctx, glb_policy->rr_policy, nullptr); + grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy, + nullptr); // The glb_policy->rr_policy may have transitioned to SHUTDOWN but the // callback registered to capture this event // (glb_rr_connectivity_changed_locked) may not have been invoked yet. We @@ -1253,9 +1233,8 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, wc_arg->initial_metadata = pick_args->initial_metadata; wc_arg->free_when_done = wc_arg; wc_arg->glb_policy = pol; - pick_done = - pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args, - false /* force_async */, target, wc_arg); + pick_done = pick_from_internal_rr_locked( + glb_policy, pick_args, false /* force_async */, target, wc_arg); } } else { // glb_policy->rr_policy == NULL if (grpc_lb_glb_trace.enabled()) { @@ -1266,7 +1245,7 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, on_complete); if (!glb_policy->started_picking) { - start_picking_locked(exec_ctx, glb_policy); + start_picking_locked(glb_policy); } pick_done = false; } @@ -1274,39 +1253,34 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, } static grpc_connectivity_state glb_check_connectivity_locked( - grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, - grpc_error** connectivity_error) { + grpc_lb_policy* pol, grpc_error** connectivity_error) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; return grpc_connectivity_state_get(&glb_policy->state_tracker, connectivity_error); } -static void glb_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, - grpc_closure* on_initiate, +static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, grpc_closure* on_ack) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; if (glb_policy->rr_policy) { - grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, on_initiate, - on_ack); + grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); } else { add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack); if (!glb_policy->started_picking) { - start_picking_locked(exec_ctx, glb_policy); + start_picking_locked(glb_policy); } } } -static void glb_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx, - grpc_lb_policy* pol, +static void glb_notify_on_state_change_locked(grpc_lb_policy* pol, grpc_connectivity_state* current, grpc_closure* notify) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &glb_policy->state_tracker, current, notify); + grpc_connectivity_state_notify_on_state_change(&glb_policy->state_tracker, + current, notify); } -static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; glb_policy->retry_timer_active = false; if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr && @@ -1314,28 +1288,26 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy); } - query_for_backends_locked(exec_ctx, glb_policy); + query_for_backends_locked(glb_policy); } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_retry_timer"); } -static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx, - glb_lb_policy* glb_policy) { +static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { if (glb_policy->started_picking && glb_policy->updating_lb_call) { if (glb_policy->retry_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + grpc_timer_cancel(&glb_policy->lb_call_retry_timer); } - if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy); + if (!glb_policy->shutting_down) start_picking_locked(glb_policy); glb_policy->updating_lb_call = false; } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ - grpc_millis next_try = - grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state) - .next_attempt_start_time; + grpc_millis next_try = grpc_backoff_step(&glb_policy->lb_call_backoff_state) + .next_attempt_start_time; if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); - grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); + grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); if (timeout > 0) { gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", @@ -1350,43 +1322,40 @@ static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx, lb_call_on_retry_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); glb_policy->retry_timer_active = true; - grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, + grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry); } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "lb_on_server_status_received_locked"); } -static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error); +static void send_client_load_report_locked(void* arg, grpc_error* error); -static void schedule_next_client_load_report(grpc_exec_ctx* exec_ctx, - glb_lb_policy* glb_policy) { +static void schedule_next_client_load_report(glb_lb_policy* glb_policy) { const grpc_millis next_client_load_report_time = - grpc_exec_ctx_now(exec_ctx) + glb_policy->client_stats_report_interval; + grpc_core::ExecCtx::Get()->Now() + + glb_policy->client_stats_report_interval; GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, send_client_load_report_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, + grpc_timer_init(&glb_policy->client_load_report_timer, next_client_load_report_time, &glb_policy->client_load_report_closure); } -static void client_load_report_done_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void client_load_report_done_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; grpc_byte_buffer_destroy(glb_policy->client_load_report_payload); glb_policy->client_load_report_payload = nullptr; if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) { glb_policy->client_load_report_timer_pending = false; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "client_load_report"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); if (glb_policy->lb_call == nullptr) { - maybe_restart_lb_call(exec_ctx, glb_policy); + maybe_restart_lb_call(glb_policy); } return; } - schedule_next_client_load_report(exec_ctx, glb_policy); + schedule_next_client_load_report(glb_policy); } static bool load_report_counters_are_zero(grpc_grpclb_request* request) { @@ -1401,15 +1370,13 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) { (drop_entries == nullptr || drop_entries->num_entries == 0); } -static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void send_client_load_report_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) { glb_policy->client_load_report_timer_pending = false; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "client_load_report"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); if (glb_policy->lb_call == nullptr) { - maybe_restart_lb_call(exec_ctx, glb_policy); + maybe_restart_lb_call(glb_policy); } return; } @@ -1422,7 +1389,7 @@ static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg, if (load_report_counters_are_zero(request)) { if (glb_policy->last_client_load_report_counters_were_zero) { grpc_grpclb_request_destroy(request); - schedule_next_client_load_report(exec_ctx, glb_policy); + schedule_next_client_load_report(glb_policy); return; } glb_policy->last_client_load_report_counters_were_zero = true; @@ -1432,7 +1399,7 @@ static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); glb_policy->client_load_report_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_slice_unref_internal(exec_ctx, request_payload_slice); + grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); // Send load report message. grpc_op op; @@ -1443,20 +1410,16 @@ static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg, client_load_report_done_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_call, &op, 1, - &glb_policy->client_load_report_closure); + glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); if (call_error != GRPC_CALL_OK) { gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error); GPR_ASSERT(GRPC_CALL_OK == call_error); } } -static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx, - void* arg, grpc_error* error); -static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error); -static void lb_call_init_locked(grpc_exec_ctx* exec_ctx, - glb_lb_policy* glb_policy) { +static void lb_on_server_status_received_locked(void* arg, grpc_error* error); +static void lb_on_response_received_locked(void* arg, grpc_error* error); +static void lb_call_init_locked(glb_lb_policy* glb_policy) { GPR_ASSERT(glb_policy->server_name != nullptr); GPR_ASSERT(glb_policy->server_name[0] != '\0'); GPR_ASSERT(glb_policy->lb_call == nullptr); @@ -1469,13 +1432,13 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx, grpc_millis deadline = glb_policy->lb_call_timeout_ms == 0 ? GRPC_MILLIS_INF_FUTURE - : grpc_exec_ctx_now(exec_ctx) + glb_policy->lb_call_timeout_ms; + : grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms; glb_policy->lb_call = grpc_channel_create_pollset_set_call( - exec_ctx, glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS, + glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, &host, deadline, nullptr); - grpc_slice_unref_internal(exec_ctx, host); + grpc_slice_unref_internal(host); if (glb_policy->client_stats != nullptr) { grpc_grpclb_client_stats_unref(glb_policy->client_stats); @@ -1490,7 +1453,7 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx, grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); glb_policy->lb_request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_slice_unref_internal(exec_ctx, request_payload_slice); + grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received, @@ -1511,8 +1474,7 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx, glb_policy->last_client_load_report_counters_were_zero = false; } -static void lb_call_destroy_locked(grpc_exec_ctx* exec_ctx, - glb_lb_policy* glb_policy) { +static void lb_call_destroy_locked(glb_lb_policy* glb_policy) { GPR_ASSERT(glb_policy->lb_call != nullptr); grpc_call_unref(glb_policy->lb_call); glb_policy->lb_call = nullptr; @@ -1521,22 +1483,21 @@ static void lb_call_destroy_locked(grpc_exec_ctx* exec_ctx, grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv); grpc_byte_buffer_destroy(glb_policy->lb_request_payload); - grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details); + grpc_slice_unref_internal(glb_policy->lb_call_status_details); if (glb_policy->client_load_report_timer_pending) { - grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer); + grpc_timer_cancel(&glb_policy->client_load_report_timer); } } /* * Auxiliary functions and LB client callbacks. */ -static void query_for_backends_locked(grpc_exec_ctx* exec_ctx, - glb_lb_policy* glb_policy) { +static void query_for_backends_locked(glb_lb_policy* glb_policy) { GPR_ASSERT(glb_policy->lb_channel != nullptr); if (glb_policy->shutting_down) return; - lb_call_init_locked(exec_ctx, glb_policy); + lb_call_init_locked(glb_policy); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, @@ -1567,8 +1528,8 @@ static void query_for_backends_locked(grpc_exec_ctx* exec_ctx, op->flags = 0; op->reserved = nullptr; op++; - call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), nullptr); + call_error = grpc_call_start_batch_and_execute(glb_policy->lb_call, ops, + (size_t)(op - ops), nullptr); GPR_ASSERT(GRPC_CALL_OK == call_error); op = ops; @@ -1586,7 +1547,7 @@ static void query_for_backends_locked(grpc_exec_ctx* exec_ctx, GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received_locked"); call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_server_status_received); GPR_ASSERT(GRPC_CALL_OK == call_error); @@ -1600,13 +1561,12 @@ static void query_for_backends_locked(grpc_exec_ctx* exec_ctx, * lb_on_response_received_locked */ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked"); call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); GPR_ASSERT(GRPC_CALL_OK == call_error); } -static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void lb_on_response_received_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; grpc_op ops[2]; memset(ops, 0, sizeof(ops)); @@ -1640,7 +1600,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, * send_client_load_report_locked() */ glb_policy->client_load_report_timer_pending = true; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); - schedule_next_client_load_report(exec_ctx, glb_policy); + schedule_next_client_load_report(glb_policy); } else if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Received initial LB response message; client load " @@ -1685,11 +1645,10 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } else { /* or dispose of the fallback */ - grpc_lb_addresses_destroy(exec_ctx, - glb_policy->fallback_backend_addresses); + grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); glb_policy->fallback_backend_addresses = nullptr; if (glb_policy->fallback_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer); + grpc_timer_cancel(&glb_policy->lb_fallback_timer); glb_policy->fallback_timer_active = false; } } @@ -1698,7 +1657,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, * update or in glb_destroy() */ glb_policy->serverlist = serverlist; glb_policy->serverlist_index = 0; - rr_handover_locked(exec_ctx, glb_policy); + rr_handover_locked(glb_policy); } } else { if (grpc_lb_glb_trace.enabled()) { @@ -1708,14 +1667,14 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, } grpc_grpclb_destroy_serverlist(serverlist); } - } else { /* serverlist == NULL */ + } else { /* serverlist == nullptr */ gpr_log(GPR_ERROR, "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", glb_policy, grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); } } - grpc_slice_unref_internal(exec_ctx, response_slice); + grpc_slice_unref_internal(response_slice); if (!glb_policy->shutting_down) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; @@ -1726,23 +1685,22 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, /* reuse the "lb_on_response_received_locked" weak ref taken in * query_for_backends_locked() */ const grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } else { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "lb_on_response_received_locked_shutdown"); } } else { /* empty payload: call cancelled. */ /* dispose of the "lb_on_response_received_locked" weak ref taken in * query_for_backends_locked() and reused in every reception loop */ - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "lb_on_response_received_locked_empty_payload"); } } -static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; glb_policy->fallback_timer_active = false; /* If we receive a serverlist after the timer fires but before this callback @@ -1755,15 +1713,13 @@ static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, glb_policy); } GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr); - rr_handover_locked(exec_ctx, glb_policy); + rr_handover_locked(glb_policy); } } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "grpclb_fallback_timer"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_fallback_timer"); } -static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx, - void* arg, grpc_error* error) { +static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; GPR_ASSERT(glb_policy->lb_call != nullptr); if (grpc_lb_glb_trace.enabled()) { @@ -1777,29 +1733,28 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx, gpr_free(status_details); } /* We need to perform cleanups no matter what. */ - lb_call_destroy_locked(exec_ctx, glb_policy); + lb_call_destroy_locked(glb_policy); // If the load report timer is still pending, we wait for it to be // called before restarting the call. Otherwise, we restart the call // here. if (!glb_policy->client_load_report_timer_pending) { - maybe_restart_lb_call(exec_ctx, glb_policy); + maybe_restart_lb_call(glb_policy); } } -static void fallback_update_locked(grpc_exec_ctx* exec_ctx, - glb_lb_policy* glb_policy, +static void fallback_update_locked(glb_lb_policy* glb_policy, const grpc_lb_addresses* addresses) { GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr); - grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); + grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); glb_policy->fallback_backend_addresses = - extract_backend_addresses_locked(exec_ctx, addresses); + extract_backend_addresses_locked(addresses); if (glb_policy->lb_fallback_timeout_ms > 0 && glb_policy->rr_policy != nullptr) { - rr_handover_locked(exec_ctx, glb_policy); + rr_handover_locked(glb_policy); } } -static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, +static void glb_update_locked(grpc_lb_policy* policy, const grpc_lb_policy_args* args) { glb_lb_policy* glb_policy = (glb_lb_policy*)policy; const grpc_arg* arg = @@ -1809,7 +1764,7 @@ static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, // If we don't have a current channel to the LB, go into TRANSIENT // FAILURE. grpc_connectivity_state_set( - exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), "glb_update_missing"); } else { @@ -1826,16 +1781,16 @@ static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, // If a non-empty serverlist hasn't been received from the balancer, // propagate the update to fallback_backend_addresses. if (glb_policy->serverlist == nullptr) { - fallback_update_locked(exec_ctx, glb_policy, addresses); + fallback_update_locked(glb_policy, addresses); } GPR_ASSERT(glb_policy->lb_channel != nullptr); // Propagate updates to the LB channel (pick_first) through the fake // resolver. grpc_channel_args* lb_channel_args = build_lb_channel_args( - exec_ctx, addresses, glb_policy->response_generator, args->args); + addresses, glb_policy->response_generator, args->args); grpc_fake_resolver_response_generator_set_response( - exec_ctx, glb_policy->response_generator, lb_channel_args); - grpc_channel_args_destroy(exec_ctx, lb_channel_args); + glb_policy->response_generator, lb_channel_args); + grpc_channel_args_destroy(lb_channel_args); // Start watching the LB channel connectivity for connection, if not // already doing so. if (!glb_policy->watching_lb_channel) { @@ -1847,7 +1802,7 @@ static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, glb_policy->watching_lb_channel = true; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity"); grpc_client_channel_watch_connectivity_state( - exec_ctx, client_channel_elem, + client_channel_elem, grpc_polling_entity_create_from_pollset_set( glb_policy->base.interested_parties), &glb_policy->lb_channel_connectivity, @@ -1858,8 +1813,7 @@ static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, // Invoked as part of the update process. It continues watching the LB channel // until it shuts down or becomes READY. It's invoked even if the LB channel // stayed READY throughout the update (for example if the update is identical). -static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx, - void* arg, +static void glb_lb_channel_on_connectivity_changed_cb(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; if (glb_policy->shutting_down) goto done; @@ -1875,7 +1829,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx, grpc_channel_get_channel_stack(glb_policy->lb_channel)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); grpc_client_channel_watch_connectivity_state( - exec_ctx, client_channel_elem, + client_channel_elem, grpc_polling_entity_create_from_pollset_set( glb_policy->base.interested_parties), &glb_policy->lb_channel_connectivity, @@ -1894,29 +1848,28 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx, // lb_call. } else if (glb_policy->started_picking) { if (glb_policy->retry_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + grpc_timer_cancel(&glb_policy->lb_call_retry_timer); glb_policy->retry_timer_active = false; } - start_picking_locked(exec_ctx, glb_policy); + start_picking_locked(glb_policy); } /* fallthrough */ case GRPC_CHANNEL_SHUTDOWN: done: glb_policy->watching_lb_channel = false; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "watch_lb_channel_connectivity_cb_shutdown"); break; } } static void glb_set_reresolve_closure_locked( - grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, - grpc_closure* request_reresolution) { + grpc_lb_policy* policy, grpc_closure* request_reresolution) { glb_lb_policy* glb_policy = (glb_lb_policy*)policy; GPR_ASSERT(!glb_policy->shutting_down); GPR_ASSERT(glb_policy->base.request_reresolution == nullptr); if (glb_policy->rr_policy != nullptr) { - grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, glb_policy->rr_policy, + grpc_lb_policy_set_reresolve_closure_locked(glb_policy->rr_policy, request_reresolution); } else { glb_policy->base.request_reresolution = request_reresolution; @@ -1937,8 +1890,7 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_update_locked, glb_set_reresolve_closure_locked}; -static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx, - grpc_lb_policy_factory* factory, +static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) { /* Count the number of gRPC-LB addresses. There must be at least one. */ const grpc_arg* arg = @@ -1959,7 +1911,7 @@ static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx, arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); GPR_ASSERT(arg != nullptr); GPR_ASSERT(arg->type == GRPC_ARG_STRING); - grpc_uri* uri = grpc_uri_parse(exec_ctx, arg->value.string, true); + grpc_uri* uri = grpc_uri_parse(arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); glb_policy->server_name = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); @@ -1992,26 +1944,26 @@ static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx, /* Extract the backend addresses (may be empty) from the resolver for * fallback. */ glb_policy->fallback_backend_addresses = - extract_backend_addresses_locked(exec_ctx, addresses); + extract_backend_addresses_locked(addresses); /* Create a client channel over them to communicate with a LB service */ glb_policy->response_generator = grpc_fake_resolver_response_generator_create(); grpc_channel_args* lb_channel_args = build_lb_channel_args( - exec_ctx, addresses, glb_policy->response_generator, args->args); + addresses, glb_policy->response_generator, args->args); char* uri_str; gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( - exec_ctx, uri_str, args->client_channel_factory, lb_channel_args); + uri_str, args->client_channel_factory, lb_channel_args); /* Propagate initial resolution */ grpc_fake_resolver_response_generator_set_response( - exec_ctx, glb_policy->response_generator, lb_channel_args); - grpc_channel_args_destroy(exec_ctx, lb_channel_args); + glb_policy->response_generator, lb_channel_args); + grpc_channel_args_destroy(lb_channel_args); gpr_free(uri_str); if (glb_policy->lb_channel == nullptr) { gpr_free((void*)glb_policy->server_name); - grpc_channel_args_destroy(exec_ctx, glb_policy->args); + grpc_channel_args_destroy(glb_policy->args); gpr_free(glb_policy); return nullptr; } @@ -2042,7 +1994,7 @@ grpc_lb_policy_factory* grpc_glb_lb_factory_create() { // Only add client_load_reporting filter if the grpclb LB policy is used. static bool maybe_add_client_load_reporting_filter( - grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, void* arg) { + grpc_channel_stack_builder* builder, void* arg) { const grpc_channel_args* args = grpc_channel_stack_builder_get_channel_arguments(builder); const grpc_arg* channel_arg = diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc index aacaec197d..a8ecea4212 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc @@ -26,17 +26,17 @@ #include "src/core/lib/support/string.h" grpc_channel* grpc_lb_policy_grpclb_create_lb_channel( - grpc_exec_ctx* exec_ctx, const char* lb_service_target_addresses, + const char* lb_service_target_addresses, grpc_client_channel_factory* client_channel_factory, grpc_channel_args* args) { grpc_channel* lb_channel = grpc_client_channel_factory_create_channel( - exec_ctx, client_channel_factory, lb_service_target_addresses, + client_channel_factory, lb_service_target_addresses, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, args); return lb_channel; } grpc_channel_args* grpc_lb_policy_grpclb_build_lb_channel_args( - grpc_exec_ctx* exec_ctx, grpc_slice_hash_table* targets_info, + grpc_slice_hash_table* targets_info, grpc_fake_resolver_response_generator* response_generator, const grpc_channel_args* args) { const grpc_arg to_add[] = { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h index 70b1c28b0d..56104b2ec0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h @@ -31,12 +31,12 @@ * \a client_channel_factory will be used for the creation of the LB channel, * alongside the channel args passed in \a args. */ grpc_channel* grpc_lb_policy_grpclb_create_lb_channel( - grpc_exec_ctx* exec_ctx, const char* lb_service_target_addresses, + const char* lb_service_target_addresses, grpc_client_channel_factory* client_channel_factory, grpc_channel_args* args); grpc_channel_args* grpc_lb_policy_grpclb_build_lb_channel_args( - grpc_exec_ctx* exec_ctx, grpc_slice_hash_table* targets_info, + grpc_slice_hash_table* targets_info, grpc_fake_resolver_response_generator* response_generator, const grpc_channel_args* args); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc index 8eaa90e97b..76bcddf945 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc @@ -29,7 +29,7 @@ #include "src/core/lib/support/string.h" grpc_channel* grpc_lb_policy_grpclb_create_lb_channel( - grpc_exec_ctx* exec_ctx, const char* lb_service_target_addresses, + const char* lb_service_target_addresses, grpc_client_channel_factory* client_channel_factory, grpc_channel_args* args) { grpc_channel_args* new_args = args; @@ -50,19 +50,19 @@ grpc_channel* grpc_lb_policy_grpclb_create_lb_channel( new_args = grpc_channel_args_copy_and_add_and_remove( args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); - grpc_channel_credentials_unref(exec_ctx, creds_sans_call_creds); + grpc_channel_credentials_unref(creds_sans_call_creds); } grpc_channel* lb_channel = grpc_client_channel_factory_create_channel( - exec_ctx, client_channel_factory, lb_service_target_addresses, + client_channel_factory, lb_service_target_addresses, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args); if (channel_credentials != nullptr) { - grpc_channel_args_destroy(exec_ctx, new_args); + grpc_channel_args_destroy(new_args); } return lb_channel; } grpc_channel_args* grpc_lb_policy_grpclb_build_lb_channel_args( - grpc_exec_ctx* exec_ctx, grpc_slice_hash_table* targets_info, + grpc_slice_hash_table* targets_info, grpc_fake_resolver_response_generator* response_generator, const grpc_channel_args* args) { const grpc_arg to_add[] = { diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index b2007ca301..0861261359 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -57,12 +57,12 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -static void pf_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { +static void pf_destroy(grpc_lb_policy* pol) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; GPR_ASSERT(p->subchannel_list == nullptr); GPR_ASSERT(p->latest_pending_subchannel_list == nullptr); GPR_ASSERT(p->pending_picks == nullptr); - grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + grpc_connectivity_state_destroy(&p->state_tracker); gpr_free(p); grpc_subchannel_index_unref(); if (grpc_lb_pick_first_trace.enabled()) { @@ -70,7 +70,7 @@ static void pf_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { } } -static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { +static void pf_shutdown_locked(grpc_lb_policy* pol) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { @@ -81,28 +81,27 @@ static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { while ((pp = p->pending_picks) != nullptr) { p->pending_picks = pp->next; *pp->target = nullptr; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error)); gpr_free(pp); } - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), - "shutdown"); + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_REF(error), "shutdown"); if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, "pf_shutdown"); p->subchannel_list = nullptr; } if (p->latest_pending_subchannel_list != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown"); + p->latest_pending_subchannel_list, "pf_shutdown"); p->latest_pending_subchannel_list = nullptr; } - grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_pick_first_trace, + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } -static void pf_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, +static void pf_cancel_pick_locked(grpc_lb_policy* pol, grpc_connected_subchannel** target, grpc_error* error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; @@ -112,7 +111,7 @@ static void pf_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, pending_pick* next = pp->next; if (pp->target == target) { *target = nullptr; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); gpr_free(pp); @@ -125,7 +124,7 @@ static void pf_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, GRPC_ERROR_UNREF(error); } -static void pf_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, +static void pf_cancel_picks_locked(grpc_lb_policy* pol, uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, grpc_error* error) { @@ -136,7 +135,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, pending_pick* next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); gpr_free(pp); @@ -149,8 +148,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, GRPC_ERROR_UNREF(error); } -static void start_picking_locked(grpc_exec_ctx* exec_ctx, - pick_first_lb_policy* p) { +static void start_picking_locked(pick_first_lb_policy* p) { p->started_picking = true; if (p->subchannel_list != nullptr && p->subchannel_list->num_subchannels > 0) { @@ -160,21 +158,21 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_list_ref_for_connectivity_watch( p->subchannel_list, "connectivity_watch+start_picking"); grpc_lb_subchannel_data_start_connectivity_watch( - exec_ctx, &p->subchannel_list->subchannels[i]); + &p->subchannel_list->subchannels[i]); break; } } } } -static void pf_exit_idle_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { +static void pf_exit_idle_locked(grpc_lb_policy* pol) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; if (!p->started_picking) { - start_picking_locked(exec_ctx, p); + start_picking_locked(p); } } -static int pf_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, +static int pf_pick_locked(grpc_lb_policy* pol, const grpc_lb_policy_pick_args* pick_args, grpc_connected_subchannel** target, grpc_call_context_element* context, void** user_data, @@ -188,7 +186,7 @@ static int pf_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, } // No subchannel selected yet, so handle asynchronously. if (!p->started_picking) { - start_picking_locked(exec_ctx, p); + start_picking_locked(p); } pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; @@ -199,51 +197,47 @@ static int pf_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, return 0; } -static void destroy_unselected_subchannels_locked(grpc_exec_ctx* exec_ctx, - pick_first_lb_policy* p) { +static void destroy_unselected_subchannels_locked(pick_first_lb_policy* p) { for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { grpc_lb_subchannel_data* sd = &p->subchannel_list->subchannels[i]; if (p->selected != sd) { - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + grpc_lb_subchannel_data_unref_subchannel(sd, "selected_different_subchannel"); } } } static grpc_connectivity_state pf_check_connectivity_locked( - grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, grpc_error** error) { + grpc_lb_policy* pol, grpc_error** error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; return grpc_connectivity_state_get(&p->state_tracker, error); } -static void pf_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx, - grpc_lb_policy* pol, +static void pf_notify_on_state_change_locked(grpc_lb_policy* pol, grpc_connectivity_state* current, grpc_closure* notify) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; - grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, - current, notify); + grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, + notify); } -static void pf_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, - grpc_closure* on_initiate, +static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, grpc_closure* on_ack) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; if (p->selected) { - grpc_connected_subchannel_ping(exec_ctx, p->selected->connected_subchannel, + grpc_connected_subchannel_ping(p->selected->connected_subchannel, on_initiate, on_ack); } else { - GRPC_CLOSURE_SCHED(exec_ctx, on_initiate, + GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); - GRPC_CLOSURE_SCHED(exec_ctx, on_ack, + GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); } } -static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error); +static void pf_connectivity_changed_locked(void* arg, grpc_error* error); -static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, +static void pf_update_locked(grpc_lb_policy* policy, const grpc_lb_policy_args* args) { pick_first_lb_policy* p = (pick_first_lb_policy*)policy; const grpc_arg* arg = @@ -252,7 +246,7 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, if (p->subchannel_list == nullptr) { // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), "pf_update_missing"); } else { @@ -271,17 +265,17 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, (void*)p, (unsigned long)addresses->num_addresses); } grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( - exec_ctx, &p->base, &grpc_lb_pick_first_trace, addresses, args, + &p->base, &grpc_lb_pick_first_trace, addresses, args, pf_connectivity_changed_locked); if (subchannel_list->num_subchannels == 0) { // Empty update or no valid subchannels. Unsubscribe from all current // subchannels and put the channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "pf_update_empty"); if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, "sl_shutdown_empty_update"); } p->subchannel_list = subchannel_list; // Empty list. @@ -292,7 +286,7 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, // We don't yet have a selected subchannel, so replace the current // subchannel list immediately. if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, "pf_update_before_selected"); } p->subchannel_list = subchannel_list; @@ -317,19 +311,19 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, p->selected = sd; if (p->subchannel_list != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->subchannel_list, "pf_update_includes_selected"); + p->subchannel_list, "pf_update_includes_selected"); } p->subchannel_list = subchannel_list; - destroy_unselected_subchannels_locked(exec_ctx, p); + destroy_unselected_subchannels_locked(p); grpc_lb_subchannel_list_ref_for_connectivity_watch( subchannel_list, "connectivity_watch+replace_selected"); - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_start_connectivity_watch(sd); // If there was a previously pending update (which may or may // not have contained the currently selected subchannel), drop // it, so that it doesn't override what we've done here. if (p->latest_pending_subchannel_list != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->latest_pending_subchannel_list, + p->latest_pending_subchannel_list, "pf_update_includes_selected+outdated"); p->latest_pending_subchannel_list = nullptr; } @@ -349,8 +343,7 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, (void*)subchannel_list); } grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->latest_pending_subchannel_list, - "sl_outdated_dont_smash"); + p->latest_pending_subchannel_list, "sl_outdated_dont_smash"); } p->latest_pending_subchannel_list = subchannel_list; } @@ -360,12 +353,11 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, grpc_lb_subchannel_list_ref_for_connectivity_watch( subchannel_list, "connectivity_watch+update"); grpc_lb_subchannel_data_start_connectivity_watch( - exec_ctx, &subchannel_list->subchannels[0]); + &subchannel_list->subchannels[0]); } } -static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg; pick_first_lb_policy* p = (pick_first_lb_policy*)sd->subchannel_list->policy; if (grpc_lb_pick_first_trace.enabled()) { @@ -383,18 +375,18 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } // If the policy is shutting down, unref and return. if (p->shutdown) { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "pf_shutdown"); + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, + "pf_shutdown"); return; } // If the subchannel list is shutting down, stop watching. if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_sl_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "pf_sl_shutdown"); + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, + "pf_sl_shutdown"); return; } // If we're still here, the notification must be for a subchannel in @@ -410,15 +402,15 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && p->latest_pending_subchannel_list != nullptr) { p->selected = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "selected_not_ready+switch_to_update"); + sd->subchannel_list, "selected_not_ready+switch_to_update"); grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update"); + p->subchannel_list, "selected_not_ready+switch_to_update"); p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = nullptr; grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { // TODO(juanlishen): we re-resolve when the selected subchannel goes to @@ -429,27 +421,26 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN || sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, + GRPC_ERROR_NONE, "selected_changed+reresolve"); p->started_picking = false; - grpc_lb_policy_try_reresolve( - exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, + GRPC_ERROR_NONE); } else { - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + grpc_connectivity_state_set(&p->state_tracker, sd->curr_connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); } if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_start_connectivity_watch(sd); } else { p->selected = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "pf_selected_shutdown"); - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, - "pf_selected_shutdown"); + sd->subchannel_list, "pf_selected_shutdown"); + grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown"); } } return; @@ -469,15 +460,14 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, // p->subchannel_list. if (sd->subchannel_list == p->latest_pending_subchannel_list) { GPR_ASSERT(p->subchannel_list != nullptr); - grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, "finish_update"); p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = nullptr; } // Cases 1 and 2. - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_READY, GRPC_ERROR_NONE, - "connecting_ready"); + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, + GRPC_ERROR_NONE, "connecting_ready"); sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(sd->subchannel), "connected"); @@ -487,7 +477,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, (void*)sd->subchannel); } // Drop all other subchannels, since we are now connected. - destroy_unselected_subchannels_locked(exec_ctx, p); + destroy_unselected_subchannels_locked(p); // Update any calls that were waiting for a pick. pending_pick* pp; while ((pp = p->pending_picks)) { @@ -499,15 +489,15 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, "Servicing pending pick with selected subchannel %p", (void*)p->selected); } - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_start_connectivity_watch(sd); break; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_stop_connectivity_watch(sd); do { sd->subchannel_list->checking_subchannel = (sd->subchannel_list->checking_subchannel + 1) % @@ -520,29 +510,28 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, if (sd->subchannel_list->checking_subchannel == 0 && sd->subchannel_list == p->subchannel_list) { grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_start_connectivity_watch(sd); break; } case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: { // Only update connectivity state in case 1. if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_REF(error), "connecting_changed"); + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_REF(error), + "connecting_changed"); } // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_start_connectivity_watch(sd); break; } case GRPC_CHANNEL_SHUTDOWN: { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, - "pf_candidate_shutdown"); + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown"); // Advance to next subchannel and check its state. grpc_lb_subchannel_data* original_sd = sd; do { @@ -554,31 +543,30 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } while (sd->subchannel == nullptr && sd != original_sd); if (sd == original_sd) { grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "pf_exhausted_subchannels"); + sd->subchannel_list, "pf_exhausted_subchannels"); if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, + GRPC_ERROR_NONE, "exhausted_subchannels+reresolve"); p->started_picking = false; - grpc_lb_policy_try_reresolve( - exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, + GRPC_ERROR_NONE); } } else { if (sd->subchannel_list == p->subchannel_list) { grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "subchannel_failed"); } // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_start_connectivity_watch(sd); } } } } static void pf_set_reresolve_closure_locked( - grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, - grpc_closure* request_reresolution) { + grpc_lb_policy* policy, grpc_closure* request_reresolution) { pick_first_lb_policy* p = (pick_first_lb_policy*)policy; GPR_ASSERT(!p->shutdown); GPR_ASSERT(policy->request_reresolution == nullptr); @@ -602,15 +590,14 @@ static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {} static void pick_first_factory_unref(grpc_lb_policy_factory* factory) {} -static grpc_lb_policy* create_pick_first(grpc_exec_ctx* exec_ctx, - grpc_lb_policy_factory* factory, +static grpc_lb_policy* create_pick_first(grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) { GPR_ASSERT(args->client_channel_factory != nullptr); pick_first_lb_policy* p = (pick_first_lb_policy*)gpr_zalloc(sizeof(*p)); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p created.", (void*)p); } - pf_update_locked(exec_ctx, &p->base, args); + pf_update_locked(&p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_subchannel_index_ref(); return &p->base; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index df458d339d..b0c84017df 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -154,7 +154,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p, } } -static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { +static void rr_destroy(grpc_lb_policy* pol) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p", @@ -162,12 +162,12 @@ static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { } GPR_ASSERT(p->subchannel_list == nullptr); GPR_ASSERT(p->latest_pending_subchannel_list == nullptr); - grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + grpc_connectivity_state_destroy(&p->state_tracker); grpc_subchannel_index_unref(); gpr_free(p); } -static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { +static void rr_shutdown_locked(grpc_lb_policy* pol) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { @@ -178,29 +178,27 @@ static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { while ((pp = p->pending_picks) != nullptr) { p->pending_picks = pp->next; *pp->target = nullptr; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error)); gpr_free(pp); } - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), - "rr_shutdown"); + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_REF(error), "rr_shutdown"); if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, "sl_shutdown_rr_shutdown"); p->subchannel_list = nullptr; } if (p->latest_pending_subchannel_list != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->latest_pending_subchannel_list, - "sl_shutdown_pending_rr_shutdown"); + p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown"); p->latest_pending_subchannel_list = nullptr; } - grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace, + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } -static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, +static void rr_cancel_pick_locked(grpc_lb_policy* pol, grpc_connected_subchannel** target, grpc_error* error) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; @@ -210,7 +208,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, pending_pick* next = pp->next; if (pp->target == target) { *target = nullptr; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); gpr_free(pp); @@ -223,7 +221,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, GRPC_ERROR_UNREF(error); } -static void rr_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, +static void rr_cancel_picks_locked(grpc_lb_policy* pol, uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, grpc_error* error) { @@ -235,7 +233,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { *pp->target = nullptr; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); gpr_free(pp); @@ -248,27 +246,26 @@ static void rr_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, GRPC_ERROR_UNREF(error); } -static void start_picking_locked(grpc_exec_ctx* exec_ctx, - round_robin_lb_policy* p) { +static void start_picking_locked(round_robin_lb_policy* p) { p->started_picking = true; for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { if (p->subchannel_list->subchannels[i].subchannel != nullptr) { grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, "connectivity_watch"); grpc_lb_subchannel_data_start_connectivity_watch( - exec_ctx, &p->subchannel_list->subchannels[i]); + &p->subchannel_list->subchannels[i]); } } } -static void rr_exit_idle_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { +static void rr_exit_idle_locked(grpc_lb_policy* pol) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; if (!p->started_picking) { - start_picking_locked(exec_ctx, p); + start_picking_locked(p); } } -static int rr_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, +static int rr_pick_locked(grpc_lb_policy* pol, const grpc_lb_policy_pick_args* pick_args, grpc_connected_subchannel** target, grpc_call_context_element* context, void** user_data, @@ -305,7 +302,7 @@ static int rr_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, } /* no pick currently available. Save for later in list of pending picks */ if (!p->started_picking) { - start_picking_locked(exec_ctx, p); + start_picking_locked(p); } pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; @@ -348,8 +345,7 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { * (the grpc_lb_subchannel_data associated with the updated subchannel) and the * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used * only if the policy transitions to state TRANSIENT_FAILURE. */ -static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx, - grpc_lb_subchannel_data* sd, +static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, grpc_error* error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). @@ -377,35 +373,33 @@ static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx, GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); if (subchannel_list->num_ready > 0) { /* 1) READY */ - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, - "rr_connecting"); + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_NONE, "rr_connecting"); } else if (subchannel_list->num_shutdown == subchannel_list->num_subchannels) { /* 3) IDLE and re-resolve */ - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, "rr_exhausted_subchannels+reresolve"); p->started_picking = false; - grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace, + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, GRPC_ERROR_NONE); } else if (subchannel_list->num_shutdown + subchannel_list->num_transient_failures == subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "rr_transient_failure"); } GRPC_ERROR_UNREF(error); } -static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg; round_robin_lb_policy* p = (round_robin_lb_policy*)sd->subchannel_list->policy; @@ -423,18 +417,18 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } // If the policy is shutting down, unref and return. if (p->shutdown) { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "rr_shutdown"); + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, + "rr_shutdown"); return; } // If the subchannel list is shutting down, stop watching. if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_sl_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "rr_sl_shutdown"); + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, + "rr_sl_shutdown"); return; } // If we're still here, the notification must be for a subchannel in @@ -447,14 +441,13 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; // Update state counters and new overall state. update_state_counters_locked(sd); - update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); + update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); // If the sd's new state is SHUTDOWN, unref the subchannel. if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, - "rr_connectivity_shutdown"); + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown"); grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown"); + sd->subchannel_list, "rr_connectivity_shutdown"); } else { // sd not in SHUTDOWN if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { if (sd->connected_subchannel == nullptr) { @@ -482,8 +475,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } if (p->subchannel_list != nullptr) { // dispose of the current subchannel_list - grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->subchannel_list, "sl_phase_out_shutdown"); + grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, + "sl_phase_out_shutdown"); } p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = nullptr; @@ -515,32 +508,30 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, (void*)p, (void*)selected->subchannel, (void*)p->subchannel_list, (unsigned long)next_ready_index); } - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } } // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_start_connectivity_watch(sd); } } static grpc_connectivity_state rr_check_connectivity_locked( - grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, grpc_error** error) { + grpc_lb_policy* pol, grpc_error** error) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; return grpc_connectivity_state_get(&p->state_tracker, error); } -static void rr_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx, - grpc_lb_policy* pol, +static void rr_notify_on_state_change_locked(grpc_lb_policy* pol, grpc_connectivity_state* current, grpc_closure* notify) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, - current, notify); + grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, + notify); } -static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, - grpc_closure* on_initiate, +static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, grpc_closure* on_ack) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); @@ -549,19 +540,17 @@ static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, &p->subchannel_list->subchannels[next_ready_index]; grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( selected->connected_subchannel, "rr_ping"); - grpc_connected_subchannel_ping(exec_ctx, target, on_initiate, on_ack); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping"); + grpc_connected_subchannel_ping(target, on_initiate, on_ack); + GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping"); } else { - GRPC_CLOSURE_SCHED( - exec_ctx, on_initiate, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected")); - GRPC_CLOSURE_SCHED( - exec_ctx, on_ack, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected")); + GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Round Robin not connected")); + GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Round Robin not connected")); } } -static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, +static void rr_update_locked(grpc_lb_policy* policy, const grpc_lb_policy_args* args) { round_robin_lb_policy* p = (round_robin_lb_policy*)policy; const grpc_arg* arg = @@ -572,7 +561,7 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, // Otherwise, keep using the current subchannel list (ignore this update). if (p->subchannel_list == nullptr) { grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), "rr_update_missing"); } @@ -584,15 +573,15 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, addresses->num_addresses); } grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( - exec_ctx, &p->base, &grpc_lb_round_robin_trace, addresses, args, + &p->base, &grpc_lb_round_robin_trace, addresses, args, rr_connectivity_changed_locked); if (subchannel_list->num_subchannels == 0) { grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "rr_update_empty"); if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, "sl_shutdown_empty_update"); } p->subchannel_list = subchannel_list; // empty list @@ -608,7 +597,7 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, (void*)subchannel_list); } grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->latest_pending_subchannel_list, "sl_outdated"); + p->latest_pending_subchannel_list, "sl_outdated"); } p->latest_pending_subchannel_list = subchannel_list; for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { @@ -619,22 +608,21 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list, "connectivity_watch"); grpc_lb_subchannel_data_start_connectivity_watch( - exec_ctx, &subchannel_list->subchannels[i]); + &subchannel_list->subchannels[i]); } } else { // The policy isn't picking yet. Save the update for later, disposing of // previous version if any. if (p->subchannel_list != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->subchannel_list, "rr_update_before_started_picking"); + p->subchannel_list, "rr_update_before_started_picking"); } p->subchannel_list = subchannel_list; } } static void rr_set_reresolve_closure_locked( - grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, - grpc_closure* request_reresolution) { + grpc_lb_policy* policy, grpc_closure* request_reresolution) { round_robin_lb_policy* p = (round_robin_lb_policy*)policy; GPR_ASSERT(!p->shutdown); GPR_ASSERT(policy->request_reresolution == nullptr); @@ -658,8 +646,7 @@ static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {} static void round_robin_factory_unref(grpc_lb_policy_factory* factory) {} -static grpc_lb_policy* round_robin_create(grpc_exec_ctx* exec_ctx, - grpc_lb_policy_factory* factory, +static grpc_lb_policy* round_robin_create(grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) { GPR_ASSERT(args->client_channel_factory != nullptr); round_robin_lb_policy* p = (round_robin_lb_policy*)gpr_zalloc(sizeof(*p)); @@ -667,7 +654,7 @@ static grpc_lb_policy* round_robin_create(grpc_exec_ctx* exec_ctx, grpc_subchannel_index_ref(); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); - rr_update_locked(exec_ctx, &p->base, args); + rr_update_locked(&p->base, args); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void*)p, (unsigned long)p->subchannel_list->num_subchannels); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index b6fce4d207..a3b4c8e524 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -28,8 +28,7 @@ #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" -void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx* exec_ctx, - grpc_lb_subchannel_data* sd, +void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, const char* reason) { if (sd->subchannel != nullptr) { if (sd->subchannel_list->tracer->enabled()) { @@ -41,23 +40,22 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx* exec_ctx, (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel); } - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, reason); + GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason); sd->subchannel = nullptr; if (sd->connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, sd->connected_subchannel, - reason); + GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, reason); sd->connected_subchannel = nullptr; } if (sd->user_data != nullptr) { GPR_ASSERT(sd->user_data_vtable != nullptr); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + sd->user_data_vtable->destroy(sd->user_data); sd->user_data = nullptr; } } } void grpc_lb_subchannel_data_start_connectivity_watch( - grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd) { + grpc_lb_subchannel_data* sd) { if (sd->subchannel_list->tracer->enabled()) { gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR @@ -69,13 +67,13 @@ void grpc_lb_subchannel_data_start_connectivity_watch( } sd->connectivity_notification_pending = true; grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, sd->subchannel_list->policy->interested_parties, + sd->subchannel, sd->subchannel_list->policy->interested_parties, &sd->pending_connectivity_state_unsafe, &sd->connectivity_changed_closure); } void grpc_lb_subchannel_data_stop_connectivity_watch( - grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd) { + grpc_lb_subchannel_data* sd) { if (sd->subchannel_list->tracer->enabled()) { gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR @@ -90,7 +88,7 @@ void grpc_lb_subchannel_data_stop_connectivity_watch( } grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( - grpc_exec_ctx* exec_ctx, grpc_lb_policy* p, grpc_core::TraceFlag* tracer, + grpc_lb_policy* p, grpc_core::TraceFlag* tracer, const grpc_lb_addresses* addresses, const grpc_lb_policy_args* args, grpc_iomgr_cb_func connectivity_changed_cb) { grpc_lb_subchannel_list* subchannel_list = @@ -124,8 +122,8 @@ grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( gpr_free(addr_arg.value.string); sc_args.args = new_args; grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( - exec_ctx, args->client_channel_factory, &sc_args); - grpc_channel_args_destroy(exec_ctx, new_args); + args->client_channel_factory, &sc_args); + grpc_channel_args_destroy(new_args); if (subchannel == nullptr) { // Subchannel could not be created. if (tracer->enabled()) { @@ -172,8 +170,7 @@ grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( return subchannel_list; } -static void subchannel_list_destroy(grpc_exec_ctx* exec_ctx, - grpc_lb_subchannel_list* subchannel_list) { +static void subchannel_list_destroy(grpc_lb_subchannel_list* subchannel_list) { if (subchannel_list->tracer->enabled()) { gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", subchannel_list->tracer->name(), subchannel_list->policy, @@ -181,8 +178,7 @@ static void subchannel_list_destroy(grpc_exec_ctx* exec_ctx, } for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, - "subchannel_list_destroy"); + grpc_lb_subchannel_data_unref_subchannel(sd, "subchannel_list_destroy"); } gpr_free(subchannel_list->subchannels); gpr_free(subchannel_list); @@ -200,8 +196,7 @@ void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list, } } -void grpc_lb_subchannel_list_unref(grpc_exec_ctx* exec_ctx, - grpc_lb_subchannel_list* subchannel_list, +void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, const char* reason) { const bool done = gpr_unref(&subchannel_list->refcount); if (subchannel_list->tracer->enabled()) { @@ -212,7 +207,7 @@ void grpc_lb_subchannel_list_unref(grpc_exec_ctx* exec_ctx, reason); } if (done) { - subchannel_list_destroy(exec_ctx, subchannel_list); + subchannel_list_destroy(subchannel_list); } } @@ -223,14 +218,13 @@ void grpc_lb_subchannel_list_ref_for_connectivity_watch( } void grpc_lb_subchannel_list_unref_for_connectivity_watch( - grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_list* subchannel_list, - const char* reason) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, subchannel_list->policy, reason); - grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason); + grpc_lb_subchannel_list* subchannel_list, const char* reason) { + GRPC_LB_POLICY_WEAK_UNREF(subchannel_list->policy, reason); + grpc_lb_subchannel_list_unref(subchannel_list, reason); } static void subchannel_data_cancel_connectivity_watch( - grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd, const char* reason) { + grpc_lb_subchannel_data* sd, const char* reason) { if (sd->subchannel_list->tracer->enabled()) { gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR @@ -240,14 +234,12 @@ static void subchannel_data_cancel_connectivity_watch( (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel, reason); } - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, nullptr, - nullptr, + grpc_subchannel_notify_on_state_change(sd->subchannel, nullptr, nullptr, &sd->connectivity_changed_closure); } void grpc_lb_subchannel_list_shutdown_and_unref( - grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_list* subchannel_list, - const char* reason) { + grpc_lb_subchannel_list* subchannel_list, const char* reason) { if (subchannel_list->tracer->enabled()) { gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)", subchannel_list->tracer->name(), subchannel_list->policy, @@ -261,10 +253,10 @@ void grpc_lb_subchannel_list_shutdown_and_unref( // the callback is responsible for unreffing the subchannel. // Otherwise, unref the subchannel directly. if (sd->connectivity_notification_pending) { - subchannel_data_cancel_connectivity_watch(exec_ctx, sd, reason); + subchannel_data_cancel_connectivity_watch(sd, reason); } else if (sd->subchannel != nullptr) { - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, reason); + grpc_lb_subchannel_data_unref_subchannel(sd, reason); } } - grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason); + grpc_lb_subchannel_list_unref(subchannel_list, reason); } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index e3e5eba56a..0f8cea9347 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -65,8 +65,7 @@ typedef struct { } grpc_lb_subchannel_data; /// Unrefs the subchannel contained in sd. -void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx* exec_ctx, - grpc_lb_subchannel_data* sd, +void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, const char* reason); /// Starts watching the connectivity state of the subchannel. @@ -74,11 +73,11 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx* exec_ctx, /// grpc_lb_subchannel_data_stop_connectivity_watch() or again call /// grpc_lb_subchannel_data_start_connectivity_watch(). void grpc_lb_subchannel_data_start_connectivity_watch( - grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd); + grpc_lb_subchannel_data* sd); /// Stops watching the connectivity state of the subchannel. void grpc_lb_subchannel_data_stop_connectivity_watch( - grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd); + grpc_lb_subchannel_data* sd); struct grpc_lb_subchannel_list { /** backpointer to owning policy */ @@ -117,15 +116,14 @@ struct grpc_lb_subchannel_list { }; grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( - grpc_exec_ctx* exec_ctx, grpc_lb_policy* p, grpc_core::TraceFlag* tracer, + grpc_lb_policy* p, grpc_core::TraceFlag* tracer, const grpc_lb_addresses* addresses, const grpc_lb_policy_args* args, grpc_iomgr_cb_func connectivity_changed_cb); void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list, const char* reason); -void grpc_lb_subchannel_list_unref(grpc_exec_ctx* exec_ctx, - grpc_lb_subchannel_list* subchannel_list, +void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, const char* reason); /// Takes and releases refs needed for a connectivity notification. @@ -133,13 +131,11 @@ void grpc_lb_subchannel_list_unref(grpc_exec_ctx* exec_ctx, void grpc_lb_subchannel_list_ref_for_connectivity_watch( grpc_lb_subchannel_list* subchannel_list, const char* reason); void grpc_lb_subchannel_list_unref_for_connectivity_watch( - grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_list* subchannel_list, - const char* reason); + grpc_lb_subchannel_list* subchannel_list, const char* reason); /// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The /// connectivity state notification callback will ultimately unref it. void grpc_lb_subchannel_list_shutdown_and_unref( - grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_list* subchannel_list, - const char* reason); + grpc_lb_subchannel_list* subchannel_list, const char* reason); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */ |