aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc399
1 files changed, 177 insertions, 222 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index ffd58129c6..aa4beb359f 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -131,12 +131,12 @@ grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb");
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
static grpc_error *initial_metadata_add_lb_token(
- grpc_exec_ctx *exec_ctx, grpc_metadata_batch *initial_metadata,
+ grpc_metadata_batch *initial_metadata,
grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem lb_token) {
GPR_ASSERT(lb_token_mdelem_storage != NULL);
GPR_ASSERT(!GRPC_MDISNULL(lb_token));
- return grpc_metadata_batch_add_tail(exec_ctx, initial_metadata,
- lb_token_mdelem_storage, lb_token);
+ return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+ lb_token);
}
static void destroy_client_stats(void *arg) {
@@ -182,12 +182,11 @@ typedef struct wrapped_rr_closure_arg {
/* The \a on_complete closure passed as part of the pick requires keeping a
* reference to its associated round robin instance. We wrap this closure in
* order to unref the round robin instance upon its invocation */
-static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void wrapped_rr_closure(void *arg, grpc_error *error) {
wrapped_rr_closure_arg *wc_arg = (wrapped_rr_closure_arg *)arg;
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
- GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
if (wc_arg->rr_policy != NULL) {
/* if *target is NULL, no pick has been made by the RR policy (eg, all
@@ -195,7 +194,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
* available */
if (*wc_arg->target != NULL) {
if (!GRPC_MDISNULL(wc_arg->lb_token)) {
- initial_metadata_add_lb_token(exec_ctx, wc_arg->initial_metadata,
+ initial_metadata_add_lb_token(wc_arg->initial_metadata,
wc_arg->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
} else {
@@ -215,7 +214,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy);
}
- GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
+ GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure");
}
GPR_ASSERT(wc_arg->free_when_done != NULL);
gpr_free(wc_arg->free_when_done);
@@ -453,9 +452,9 @@ static void *lb_token_copy(void *token) {
? NULL
: (void *)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
}
-static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) {
+static void lb_token_destroy(void *token) {
if (token != NULL) {
- GRPC_MDELEM_UNREF(exec_ctx, grpc_mdelem{(uintptr_t)token});
+ GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
}
}
static int lb_token_cmp(void *token1, void *token2) {
@@ -491,7 +490,7 @@ static void parse_server(const grpc_grpclb_server *server,
/* Returns addresses extracted from \a serverlist. */
static grpc_lb_addresses *process_serverlist_locked(
- grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist) {
+ const grpc_grpclb_serverlist *serverlist) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
* memory in a single block */
@@ -522,9 +521,9 @@ static grpc_lb_addresses *process_serverlist_locked(
strnlen(server->load_balance_token, lb_token_max_length);
grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
server->load_balance_token, lb_token_length);
- user_data = (void *)grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_LB_TOKEN,
- lb_token_mdstr)
- .payload;
+ user_data =
+ (void *)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
+ .payload;
} else {
char *uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
@@ -546,7 +545,7 @@ static grpc_lb_addresses *process_serverlist_locked(
/* Returns the backend addresses extracted from the given addresses */
static grpc_lb_addresses *extract_backend_addresses_locked(
- grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses) {
+ const grpc_lb_addresses *addresses) {
/* first pass: count the number of backend addresses */
size_t num_backends = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
@@ -571,8 +570,8 @@ static grpc_lb_addresses *extract_backend_addresses_locked(
}
static void update_lb_connectivity_status_locked(
- grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
- grpc_connectivity_state rr_state, grpc_error *rr_state_error) {
+ glb_lb_policy *glb_policy, grpc_connectivity_state rr_state,
+ grpc_error *rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
@@ -623,7 +622,7 @@ static void update_lb_connectivity_status_locked(
GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.",
grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy);
}
- grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state,
+ grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state,
rr_state_error,
"update_lb_connectivity_status_locked");
}
@@ -634,9 +633,9 @@ static void update_lb_connectivity_status_locked(
* If \a force_async is true, then we will manually schedule the
* completion callback even if the pick is available immediately. */
static bool pick_from_internal_rr_locked(
- grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
- const grpc_lb_policy_pick_args *pick_args, bool force_async,
- grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
+ glb_lb_policy *glb_policy, const grpc_lb_policy_pick_args *pick_args,
+ bool force_async, grpc_connected_subchannel **target,
+ wrapped_rr_closure_arg *wc_arg) {
// Check for drops if we are not using fallback backend addresses.
if (glb_policy->serverlist != NULL) {
// Look at the index into the serverlist to see if we should drop this call.
@@ -651,7 +650,7 @@ static bool pick_from_internal_rr_locked(
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
(intptr_t)wc_arg->rr_policy);
}
- GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
+ GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
// Update client load reporting stats to indicate the number of
// dropped calls. Note that we have to do this here instead of in
// the client_load_reporting filter, because we do not create a
@@ -662,7 +661,7 @@ static bool pick_from_internal_rr_locked(
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
- GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE);
gpr_free(wc_arg->free_when_done);
return false;
}
@@ -672,7 +671,7 @@ static bool pick_from_internal_rr_locked(
}
// Pick via the RR policy.
const bool pick_done = grpc_lb_policy_pick_locked(
- exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context,
+ wc_arg->rr_policy, pick_args, target, wc_arg->context,
(void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
@@ -680,9 +679,9 @@ static bool pick_from_internal_rr_locked(
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
(intptr_t)wc_arg->rr_policy);
}
- GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
+ GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
/* add the load reporting initial metadata */
- initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata,
+ initial_metadata_add_lb_token(pick_args->initial_metadata,
pick_args->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
// Pass on client stats via context. Passes ownership of the reference.
@@ -691,7 +690,7 @@ static bool pick_from_internal_rr_locked(
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
- GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE);
gpr_free(wc_arg->free_when_done);
return false;
}
@@ -704,12 +703,11 @@ static bool pick_from_internal_rr_locked(
return pick_done;
}
-static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static grpc_lb_policy_args *lb_policy_args_create(glb_lb_policy *glb_policy) {
grpc_lb_addresses *addresses;
if (glb_policy->serverlist != NULL) {
GPR_ASSERT(glb_policy->serverlist->num_servers > 0);
- addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist);
+ addresses = process_serverlist_locked(glb_policy->serverlist);
} else {
// If rr_handover_locked() is invoked when we haven't received any
// serverlist from the balancer, we use the fallback backends returned by
@@ -729,24 +727,21 @@ static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
args->args = grpc_channel_args_copy_and_add_and_remove(
glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
1);
- grpc_lb_addresses_destroy(exec_ctx, addresses);
+ grpc_lb_addresses_destroy(addresses);
return args;
}
-static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy_args *args) {
- grpc_channel_args_destroy(exec_ctx, args->args);
+static void lb_policy_args_destroy(grpc_lb_policy_args *args) {
+ grpc_channel_args_destroy(args->args);
gpr_free(args);
}
-static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *error);
-static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
+static void glb_rr_connectivity_changed_locked(void *arg, grpc_error *error);
+static void create_rr_locked(glb_lb_policy *glb_policy,
grpc_lb_policy_args *args) {
GPR_ASSERT(glb_policy->rr_policy == NULL);
- grpc_lb_policy *new_rr_policy =
- grpc_lb_policy_create(exec_ctx, "round_robin", args);
+ grpc_lb_policy *new_rr_policy = grpc_lb_policy_create("round_robin", args);
if (new_rr_policy == NULL) {
gpr_log(GPR_ERROR,
"Failure creating a RoundRobin policy for serverlist update with "
@@ -760,16 +755,14 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
glb_policy->rr_policy = new_rr_policy;
grpc_error *rr_state_error = NULL;
const grpc_connectivity_state rr_state =
- grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
+ grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy,
&rr_state_error);
/* Connectivity state is a function of the RR policy updated/created */
- update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state,
- rr_state_error);
+ update_lb_connectivity_status_locked(glb_policy, rr_state, rr_state_error);
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
- grpc_pollset_set_add_pollset_set(exec_ctx,
- glb_policy->rr_policy->interested_parties,
+ grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
/* Allocate the data for the tracking of the new RR policy's connectivity.
@@ -784,10 +777,10 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
/* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
- grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
+ grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
- grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy);
+ grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
/* Update picks and pings in wait */
pending_pick *pp;
@@ -801,7 +794,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p",
(void *)glb_policy->rr_policy);
}
- pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args,
+ pick_from_internal_rr_locked(glb_policy, &pp->pick_args,
true /* force_async */, pp->target,
&pp->wrapped_on_complete_arg);
}
@@ -815,40 +808,37 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy,
+ grpc_lb_policy_ping_one_locked(glb_policy->rr_policy,
&pping->wrapped_notify_arg.wrapper_closure);
}
}
/* glb_policy->rr_policy may be NULL (initial handover) */
-static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void rr_handover_locked(glb_lb_policy *glb_policy) {
if (glb_policy->shutting_down) return;
- grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy);
+ grpc_lb_policy_args *args = lb_policy_args_create(glb_policy);
GPR_ASSERT(args != NULL);
if (glb_policy->rr_policy != NULL) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)",
(void *)glb_policy->rr_policy);
}
- grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args);
+ grpc_lb_policy_update_locked(glb_policy->rr_policy, args);
} else {
- create_rr_locked(exec_ctx, glb_policy, args);
+ create_rr_locked(glb_policy, args);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)",
(void *)glb_policy->rr_policy);
}
}
- lb_policy_args_destroy(exec_ctx, args);
+ lb_policy_args_destroy(args);
}
-static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *error) {
+static void glb_rr_connectivity_changed_locked(void *arg, grpc_error *error) {
rr_connectivity_data *rr_connectivity = (rr_connectivity_data *)arg;
glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
if (glb_policy->shutting_down) {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "glb_rr_connectivity_cb");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
gpr_free(rr_connectivity);
return;
}
@@ -856,25 +846,22 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
/* An RR policy that has transitioned into the SHUTDOWN connectivity state
* should not be considered for picks or updates: the SHUTDOWN state is a
* sink, policies can't transition back from it. .*/
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy,
- "rr_connectivity_shutdown");
+ GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown");
glb_policy->rr_policy = NULL;
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "glb_rr_connectivity_cb");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
gpr_free(rr_connectivity);
return;
}
/* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
- update_lb_connectivity_status_locked(
- exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
+ update_lb_connectivity_status_locked(glb_policy, rr_connectivity->state,
+ GRPC_ERROR_REF(error));
/* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
- grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
+ grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
}
-static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
- void *balancer_name) {
+static void destroy_balancer_name(void *balancer_name) {
gpr_free(balancer_name);
}
@@ -901,7 +888,7 @@ static int balancer_name_cmp_fn(void *a, void *b) {
* above the grpclb policy.
* - \a args: other args inherited from the grpclb policy. */
static grpc_channel_args *build_lb_channel_args(
- grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses,
+ const grpc_lb_addresses *addresses,
grpc_fake_resolver_response_generator *response_generator,
const grpc_channel_args *args) {
size_t num_grpclb_addrs = 0;
@@ -944,7 +931,7 @@ static grpc_channel_args *build_lb_channel_args(
gpr_free(targets_info_entries);
grpc_channel_args *lb_channel_args =
- grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info,
+ grpc_lb_policy_grpclb_build_lb_channel_args(targets_info,
response_generator, args);
grpc_arg lb_channel_addresses_arg =
@@ -952,34 +939,34 @@ static grpc_channel_args *build_lb_channel_args(
grpc_channel_args *result = grpc_channel_args_copy_and_add(
lb_channel_args, &lb_channel_addresses_arg, 1);
- grpc_slice_hash_table_unref(exec_ctx, targets_info);
- grpc_channel_args_destroy(exec_ctx, lb_channel_args);
- grpc_lb_addresses_destroy(exec_ctx, lb_addresses);
+ grpc_slice_hash_table_unref(targets_info);
+ grpc_channel_args_destroy(lb_channel_args);
+ grpc_lb_addresses_destroy(lb_addresses);
return result;
}
-static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void glb_destroy(grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
GPR_ASSERT(glb_policy->pending_picks == NULL);
GPR_ASSERT(glb_policy->pending_pings == NULL);
gpr_free((void *)glb_policy->server_name);
- grpc_channel_args_destroy(exec_ctx, glb_policy->args);
+ grpc_channel_args_destroy(glb_policy->args);
if (glb_policy->client_stats != NULL) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
}
- grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
+ grpc_connectivity_state_destroy(&glb_policy->state_tracker);
if (glb_policy->serverlist != NULL) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
if (glb_policy->fallback_backend_addresses != NULL) {
- grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
+ grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
grpc_subchannel_index_unref();
gpr_free(glb_policy);
}
-static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void glb_shutdown_locked(grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
glb_policy->shutting_down = true;
@@ -997,11 +984,11 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
/* lb_on_server_status_received will pick up the cancel and clean up */
}
if (glb_policy->retry_timer_active) {
- grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
glb_policy->retry_timer_active = false;
}
if (glb_policy->fallback_timer_active) {
- grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer);
+ grpc_timer_cancel(&glb_policy->lb_fallback_timer);
glb_policy->fallback_timer_active = false;
}
@@ -1010,7 +997,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pending_ping *pping = glb_policy->pending_pings;
glb_policy->pending_pings = NULL;
if (glb_policy->rr_policy != NULL) {
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
+ GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
}
// We destroy the LB channel here because
// glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
@@ -1021,20 +1008,20 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_policy->lb_channel = NULL;
}
grpc_connectivity_state_set(
- exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_NONE);
pp = next;
}
while (pping != NULL) {
pending_ping *next = pping->next;
- GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(&pping->wrapped_notify_arg.wrapper_closure,
GRPC_ERROR_NONE);
pping = next;
}
@@ -1050,7 +1037,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
// - Otherwise, without an RR instance, picks stay pending at this policy's
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
// we invoke the completion closure and set *target to NULL right here.
-static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+static void glb_cancel_pick_locked(grpc_lb_policy *pol,
grpc_connected_subchannel **target,
grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
@@ -1060,7 +1047,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
@@ -1070,7 +1057,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
if (glb_policy->rr_policy != NULL) {
- grpc_lb_policy_cancel_pick_locked(exec_ctx, glb_policy->rr_policy, target,
+ grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, target,
GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
@@ -1086,8 +1073,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
// - Otherwise, without an RR instance, picks stay pending at this policy's
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
// we invoke the completion closure and set *target to NULL right here.
-static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
+static void glb_cancel_picks_locked(grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error *error) {
@@ -1098,7 +1084,7 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
pending_pick *next = pp->next;
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
@@ -1109,52 +1095,49 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
}
if (glb_policy->rr_policy != NULL) {
grpc_lb_policy_cancel_picks_locked(
- exec_ctx, glb_policy->rr_policy, initial_metadata_flags_mask,
+ glb_policy->rr_policy, initial_metadata_flags_mask,
initial_metadata_flags_eq, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
-static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy);
-static void start_picking_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void lb_on_fallback_timer_locked(void *arg, grpc_error *error);
+static void query_for_backends_locked(glb_lb_policy *glb_policy);
+static void start_picking_locked(glb_lb_policy *glb_policy) {
/* start a timer to fall back */
if (glb_policy->lb_fallback_timeout_ms > 0 &&
glb_policy->serverlist == NULL && !glb_policy->fallback_timer_active) {
grpc_millis deadline =
- grpc_exec_ctx_now(exec_ctx) + glb_policy->lb_fallback_timeout_ms;
+ grpc_exec_ctx_now() + glb_policy->lb_fallback_timeout_ms;
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer");
GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked,
glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
glb_policy->fallback_timer_active = true;
- grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline,
+ grpc_timer_init(&glb_policy->lb_fallback_timer, deadline,
&glb_policy->lb_on_fallback);
}
glb_policy->started_picking = true;
grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
- query_for_backends_locked(exec_ctx, glb_policy);
+ query_for_backends_locked(glb_policy);
}
-static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void glb_exit_idle_locked(grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
if (!glb_policy->started_picking) {
- start_picking_locked(exec_ctx, glb_policy);
+ start_picking_locked(glb_policy);
}
}
-static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+static int glb_pick_locked(grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
- GRPC_CLOSURE_SCHED(exec_ctx, on_complete,
+ GRPC_CLOSURE_SCHED(on_complete,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No mdelem storage for the LB token. Load reporting "
"won't work without it. Failing"));
@@ -1186,9 +1169,8 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
wc_arg->initial_metadata = pick_args->initial_metadata;
wc_arg->free_when_done = wc_arg;
- pick_done =
- pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args,
- false /* force_async */, target, wc_arg);
+ pick_done = pick_from_internal_rr_locked(
+ glb_policy, pick_args, false /* force_async */, target, wc_arg);
} else {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG,
@@ -1200,7 +1182,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
on_complete);
if (!glb_policy->started_picking) {
- start_picking_locked(exec_ctx, glb_policy);
+ start_picking_locked(glb_policy);
}
pick_done = false;
}
@@ -1208,37 +1190,33 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static grpc_connectivity_state glb_check_connectivity_locked(
- grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_error **connectivity_error) {
+ grpc_lb_policy *pol, grpc_error **connectivity_error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
return grpc_connectivity_state_get(&glb_policy->state_tracker,
connectivity_error);
}
-static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_closure *closure) {
+static void glb_ping_one_locked(grpc_lb_policy *pol, grpc_closure *closure) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
if (glb_policy->rr_policy) {
- grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure);
+ grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, closure);
} else {
add_pending_ping(&glb_policy->pending_pings, closure);
if (!glb_policy->started_picking) {
- start_picking_locked(exec_ctx, glb_policy);
+ start_picking_locked(glb_policy);
}
}
}
-static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
+static void glb_notify_on_state_change_locked(grpc_lb_policy *pol,
grpc_connectivity_state *current,
grpc_closure *notify) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &glb_policy->state_tracker, current, notify);
+ grpc_connectivity_state_notify_on_state_change(&glb_policy->state_tracker,
+ current, notify);
}
-static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void lb_call_on_retry_timer_locked(void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
glb_policy->retry_timer_active = false;
if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
@@ -1247,27 +1225,26 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
(void *)glb_policy);
}
GPR_ASSERT(glb_policy->lb_call == NULL);
- query_for_backends_locked(exec_ctx, glb_policy);
+ query_for_backends_locked(glb_policy);
}
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_retry_timer");
}
-static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void maybe_restart_lb_call(glb_lb_policy *glb_policy) {
if (glb_policy->started_picking && glb_policy->updating_lb_call) {
if (glb_policy->retry_timer_active) {
- grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
}
- if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy);
+ if (!glb_policy->shutting_down) start_picking_locked(glb_policy);
glb_policy->updating_lb_call = false;
} else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
grpc_millis next_try =
- grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state);
+ grpc_backoff_step(&glb_policy->lb_call_backoff_state);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
(void *)glb_policy);
- grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx);
+ grpc_millis timeout = next_try - grpc_exec_ctx_now();
if (timeout > 0) {
gpr_log(GPR_DEBUG, "... retry_timer_active in %" PRIdPTR "ms.",
timeout);
@@ -1280,40 +1257,36 @@ static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx,
lb_call_on_retry_timer_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
glb_policy->retry_timer_active = true;
- grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
+ grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry);
}
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
"lb_on_server_status_received_locked");
}
-static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
+static void send_client_load_report_locked(void *arg, grpc_error *error);
-static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void schedule_next_client_load_report(glb_lb_policy *glb_policy) {
const grpc_millis next_client_load_report_time =
- grpc_exec_ctx_now(exec_ctx) + glb_policy->client_stats_report_interval;
+ grpc_exec_ctx_now() + glb_policy->client_stats_report_interval;
GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
send_client_load_report_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer,
+ grpc_timer_init(&glb_policy->client_load_report_timer,
next_client_load_report_time,
&glb_policy->client_load_report_closure);
}
-static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void client_load_report_done_locked(void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
glb_policy->client_load_report_payload = NULL;
if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) {
glb_policy->client_load_report_timer_pending = false;
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "client_load_report");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report");
return;
}
- schedule_next_client_load_report(exec_ctx, glb_policy);
+ schedule_next_client_load_report(glb_policy);
}
static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
@@ -1328,15 +1301,13 @@ static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
(drop_entries == NULL || drop_entries->num_entries == 0);
}
-static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void send_client_load_report_locked(void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) {
glb_policy->client_load_report_timer_pending = false;
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "client_load_report");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report");
if (glb_policy->lb_call == NULL) {
- maybe_restart_lb_call(exec_ctx, glb_policy);
+ maybe_restart_lb_call(glb_policy);
}
return;
}
@@ -1349,7 +1320,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (load_report_counters_are_zero(request)) {
if (glb_policy->last_client_load_report_counters_were_zero) {
grpc_grpclb_request_destroy(request);
- schedule_next_client_load_report(exec_ctx, glb_policy);
+ schedule_next_client_load_report(glb_policy);
return;
}
glb_policy->last_client_load_report_counters_were_zero = true;
@@ -1359,7 +1330,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
glb_policy->client_load_report_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref_internal(exec_ctx, request_payload_slice);
+ grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
// Send load report message.
grpc_op op;
@@ -1370,20 +1341,16 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
client_load_report_done_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_call_error call_error = grpc_call_start_batch_and_execute(
- exec_ctx, glb_policy->lb_call, &op, 1,
- &glb_policy->client_load_report_closure);
+ glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
if (call_error != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "call_error=%d", call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
-static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *error);
-static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void lb_on_server_status_received_locked(void *arg, grpc_error *error);
+static void lb_on_response_received_locked(void *arg, grpc_error *error);
+static void lb_call_init_locked(glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->server_name != NULL);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
GPR_ASSERT(glb_policy->lb_call == NULL);
@@ -1396,13 +1363,13 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_millis deadline =
glb_policy->lb_call_timeout_ms == 0
? GRPC_MILLIS_INF_FUTURE
- : grpc_exec_ctx_now(exec_ctx) + glb_policy->lb_call_timeout_ms;
+ : grpc_exec_ctx_now() + glb_policy->lb_call_timeout_ms;
glb_policy->lb_call = grpc_channel_create_pollset_set_call(
- exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
+ glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties,
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
&host, deadline, NULL);
- grpc_slice_unref_internal(exec_ctx, host);
+ grpc_slice_unref_internal(host);
if (glb_policy->client_stats != NULL) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
@@ -1417,7 +1384,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
glb_policy->lb_request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref_internal(exec_ctx, request_payload_slice);
+ grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
@@ -1438,8 +1405,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
glb_policy->last_client_load_report_counters_were_zero = false;
}
-static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void lb_call_destroy_locked(glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->lb_call != NULL);
grpc_call_unref(glb_policy->lb_call);
glb_policy->lb_call = NULL;
@@ -1448,22 +1414,21 @@ static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
- grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details);
+ grpc_slice_unref_internal(glb_policy->lb_call_status_details);
if (glb_policy->client_load_report_timer_pending) {
- grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer);
+ grpc_timer_cancel(&glb_policy->client_load_report_timer);
}
}
/*
* Auxiliary functions and LB client callbacks.
*/
-static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void query_for_backends_locked(glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->lb_channel != NULL);
if (glb_policy->shutting_down) return;
- lb_call_init_locked(exec_ctx, glb_policy);
+ lb_call_init_locked(glb_policy);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
@@ -1495,8 +1460,8 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
op->flags = 0;
op->reserved = NULL;
op++;
- call_error = grpc_call_start_batch_and_execute(exec_ctx, glb_policy->lb_call,
- ops, (size_t)(op - ops), NULL);
+ call_error = grpc_call_start_batch_and_execute(glb_policy->lb_call, ops,
+ (size_t)(op - ops), NULL);
GPR_ASSERT(GRPC_CALL_OK == call_error);
op = ops;
@@ -1514,7 +1479,7 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
"lb_on_server_status_received_locked");
call_error = grpc_call_start_batch_and_execute(
- exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
+ glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_server_status_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
@@ -1528,13 +1493,12 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
* lb_on_response_received_locked */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked");
call_error = grpc_call_start_batch_and_execute(
- exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
+ glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
-static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void lb_on_response_received_locked(void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
@@ -1568,7 +1532,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
* send_client_load_report_locked() */
glb_policy->client_load_report_timer_pending = true;
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report");
- schedule_next_client_load_report(exec_ctx, glb_policy);
+ schedule_next_client_load_report(glb_policy);
} else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"received initial LB response message; "
@@ -1608,11 +1572,10 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
} else {
/* or dispose of the fallback */
- grpc_lb_addresses_destroy(exec_ctx,
- glb_policy->fallback_backend_addresses);
+ grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses = NULL;
if (glb_policy->fallback_timer_active) {
- grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer);
+ grpc_timer_cancel(&glb_policy->lb_fallback_timer);
glb_policy->fallback_timer_active = false;
}
}
@@ -1621,7 +1584,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
* update or in glb_destroy() */
glb_policy->serverlist = serverlist;
glb_policy->serverlist_index = 0;
- rr_handover_locked(exec_ctx, glb_policy);
+ rr_handover_locked(glb_policy);
}
} else {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
@@ -1634,7 +1597,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
}
}
- grpc_slice_unref_internal(exec_ctx, response_slice);
+ grpc_slice_unref_internal(response_slice);
if (!glb_policy->shutting_down) {
/* keep listening for serverlist updates */
op->op = GRPC_OP_RECV_MESSAGE;
@@ -1645,23 +1608,22 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
/* reuse the "lb_on_response_received_locked" weak ref taken in
* query_for_backends_locked() */
const grpc_call_error call_error = grpc_call_start_batch_and_execute(
- exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
+ glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received); /* loop */
GPR_ASSERT(GRPC_CALL_OK == call_error);
} else {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
"lb_on_response_received_locked_shutdown");
}
} else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received_locked" weak ref taken in
* query_for_backends_locked() and reused in every reception loop */
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
"lb_on_response_received_locked_empty_payload");
}
}
-static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void lb_on_fallback_timer_locked(void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
glb_policy->fallback_timer_active = false;
/* If we receive a serverlist after the timer fires but before this callback
@@ -1674,15 +1636,13 @@ static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
(void *)glb_policy);
}
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
- rr_handover_locked(exec_ctx, glb_policy);
+ rr_handover_locked(glb_policy);
}
}
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "grpclb_fallback_timer");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_fallback_timer");
}
-static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *error) {
+static void lb_on_server_status_received_locked(void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
GPR_ASSERT(glb_policy->lb_call != NULL);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
@@ -1696,29 +1656,28 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
gpr_free(status_details);
}
/* We need to perform cleanups no matter what. */
- lb_call_destroy_locked(exec_ctx, glb_policy);
+ lb_call_destroy_locked(glb_policy);
// If the load report timer is still pending, we wait for it to be
// called before restarting the call. Otherwise, we restart the call
// here.
if (!glb_policy->client_load_report_timer_pending) {
- maybe_restart_lb_call(exec_ctx, glb_policy);
+ maybe_restart_lb_call(glb_policy);
}
}
-static void fallback_update_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy,
+static void fallback_update_locked(glb_lb_policy *glb_policy,
const grpc_lb_addresses *addresses) {
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
- grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
+ grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses =
- extract_backend_addresses_locked(exec_ctx, addresses);
+ extract_backend_addresses_locked(addresses);
if (glb_policy->lb_fallback_timeout_ms > 0 &&
!glb_policy->fallback_timer_active) {
- rr_handover_locked(exec_ctx, glb_policy);
+ rr_handover_locked(glb_policy);
}
}
-static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+static void glb_update_locked(grpc_lb_policy *policy,
const grpc_lb_policy_args *args) {
glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
const grpc_arg *arg =
@@ -1728,7 +1687,7 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
// If we don't have a current channel to the LB, go into TRANSIENT
// FAILURE.
grpc_connectivity_state_set(
- exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"glb_update_missing");
} else {
@@ -1745,16 +1704,16 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
// If a non-empty serverlist hasn't been received from the balancer,
// propagate the update to fallback_backend_addresses.
if (glb_policy->serverlist == NULL) {
- fallback_update_locked(exec_ctx, glb_policy, addresses);
+ fallback_update_locked(glb_policy, addresses);
}
GPR_ASSERT(glb_policy->lb_channel != NULL);
// Propagate updates to the LB channel (pick_first) through the fake
// resolver.
grpc_channel_args *lb_channel_args = build_lb_channel_args(
- exec_ctx, addresses, glb_policy->response_generator, args->args);
+ addresses, glb_policy->response_generator, args->args);
grpc_fake_resolver_response_generator_set_response(
- exec_ctx, glb_policy->response_generator, lb_channel_args);
- grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+ glb_policy->response_generator, lb_channel_args);
+ grpc_channel_args_destroy(lb_channel_args);
// Start watching the LB channel connectivity for connection, if not
// already doing so.
if (!glb_policy->watching_lb_channel) {
@@ -1766,9 +1725,8 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
glb_policy->watching_lb_channel = true;
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity");
grpc_client_channel_watch_connectivity_state(
- exec_ctx, client_channel_elem,
- grpc_polling_entity_create_from_pollset_set(
- glb_policy->base.interested_parties),
+ client_channel_elem, grpc_polling_entity_create_from_pollset_set(
+ glb_policy->base.interested_parties),
&glb_policy->lb_channel_connectivity,
&glb_policy->lb_channel_on_connectivity_changed, NULL);
}
@@ -1777,8 +1735,7 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
// Invoked as part of the update process. It continues watching the LB channel
// until it shuts down or becomes READY. It's invoked even if the LB channel
// stayed READY throughout the update (for example if the update is identical).
-static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
- void *arg,
+static void glb_lb_channel_on_connectivity_changed_cb(void *arg,
grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
if (glb_policy->shutting_down) goto done;
@@ -1795,9 +1752,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
grpc_channel_get_channel_stack(glb_policy->lb_channel));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
- exec_ctx, client_channel_elem,
- grpc_polling_entity_create_from_pollset_set(
- glb_policy->base.interested_parties),
+ client_channel_elem, grpc_polling_entity_create_from_pollset_set(
+ glb_policy->base.interested_parties),
&glb_policy->lb_channel_connectivity,
&glb_policy->lb_channel_on_connectivity_changed, NULL);
break;
@@ -1815,16 +1771,16 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
// lb_call.
} else if (glb_policy->started_picking && !glb_policy->shutting_down) {
if (glb_policy->retry_timer_active) {
- grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
glb_policy->retry_timer_active = false;
}
- start_picking_locked(exec_ctx, glb_policy);
+ start_picking_locked(glb_policy);
}
/* fallthrough */
case GRPC_CHANNEL_SHUTDOWN:
done:
glb_policy->watching_lb_channel = false;
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
"watch_lb_channel_connectivity_cb_shutdown");
break;
}
@@ -1843,8 +1799,7 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_notify_on_state_change_locked,
glb_update_locked};
-static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy_factory *factory,
+static grpc_lb_policy *glb_create(grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
/* Count the number of gRPC-LB addresses. There must be at least one. */
const grpc_arg *arg =
@@ -1865,7 +1820,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
- grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
+ grpc_uri *uri = grpc_uri_parse(arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
@@ -1897,26 +1852,26 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
/* Extract the backend addresses (may be empty) from the resolver for
* fallback. */
glb_policy->fallback_backend_addresses =
- extract_backend_addresses_locked(exec_ctx, addresses);
+ extract_backend_addresses_locked(addresses);
/* Create a client channel over them to communicate with a LB service */
glb_policy->response_generator =
grpc_fake_resolver_response_generator_create();
grpc_channel_args *lb_channel_args = build_lb_channel_args(
- exec_ctx, addresses, glb_policy->response_generator, args->args);
+ addresses, glb_policy->response_generator, args->args);
char *uri_str;
gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
- exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
+ uri_str, args->client_channel_factory, lb_channel_args);
/* Propagate initial resolution */
grpc_fake_resolver_response_generator_set_response(
- exec_ctx, glb_policy->response_generator, lb_channel_args);
- grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+ glb_policy->response_generator, lb_channel_args);
+ grpc_channel_args_destroy(lb_channel_args);
gpr_free(uri_str);
if (glb_policy->lb_channel == NULL) {
gpr_free((void *)glb_policy->server_name);
- grpc_channel_args_destroy(exec_ctx, glb_policy->args);
+ grpc_channel_args_destroy(glb_policy->args);
gpr_free(glb_policy);
return NULL;
}
@@ -1947,7 +1902,7 @@ grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
// Only add client_load_reporting filter if the grpclb LB policy is used.
static bool maybe_add_client_load_reporting_filter(
- grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) {
+ grpc_channel_stack_builder *builder, void *arg) {
const grpc_channel_args *args =
grpc_channel_stack_builder_get_channel_arguments(builder);
const grpc_arg *channel_arg =