aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc27
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc488
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc10
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc9
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h1
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c7
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h21
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc194
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc163
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc54
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h18
13 files changed, 464 insertions, 538 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
index 6d9fadaf30..3eedb08ecc 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
@@ -25,14 +25,12 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/profiling/timers.h"
-static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem,
+static grpc_error* init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
return GRPC_ERROR_NONE;
}
-static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem) {}
+static void destroy_channel_elem(grpc_channel_element* elem) {}
typedef struct {
// Stats object to update.
@@ -47,28 +45,24 @@ typedef struct {
bool recv_initial_metadata_succeeded;
} call_data;
-static void on_complete_for_send(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void on_complete_for_send(void* arg, grpc_error* error) {
call_data* calld = (call_data*)arg;
if (error == GRPC_ERROR_NONE) {
calld->send_initial_metadata_succeeded = true;
}
- GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete_for_send,
- GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(calld->original_on_complete_for_send, GRPC_ERROR_REF(error));
}
-static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
call_data* calld = (call_data*)arg;
if (error == GRPC_ERROR_NONE) {
calld->recv_initial_metadata_succeeded = true;
}
- GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready,
+ GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
-static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
+static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
call_data* calld = (call_data*)elem->call_data;
// Get stats object from context and take a ref.
@@ -81,7 +75,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
return GRPC_ERROR_NONE;
}
-static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = (call_data*)elem->call_data;
@@ -96,8 +90,7 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
}
static void start_transport_stream_op_batch(
- grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_transport_stream_op_batch* batch) {
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = (call_data*)elem->call_data;
GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0);
// Intercept send_initial_metadata.
@@ -118,7 +111,7 @@ static void start_transport_stream_op_batch(
&calld->recv_initial_metadata_ready;
}
// Chain to next filter.
- grpc_call_next_op(exec_ctx, elem, batch);
+ grpc_call_next_op(elem, batch);
GPR_TIMER_END("clr_start_transport_stream_op_batch", 0);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index d8137636b8..dd6fc602ab 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -131,12 +131,12 @@ grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb");
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
static grpc_error* initial_metadata_add_lb_token(
- grpc_exec_ctx* exec_ctx, grpc_metadata_batch* initial_metadata,
+ grpc_metadata_batch* initial_metadata,
grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
GPR_ASSERT(lb_token_mdelem_storage != nullptr);
GPR_ASSERT(!GRPC_MDISNULL(lb_token));
- return grpc_metadata_batch_add_tail(exec_ctx, initial_metadata,
- lb_token_mdelem_storage, lb_token);
+ return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+ lb_token);
}
static void destroy_client_stats(void* arg) {
@@ -186,20 +186,19 @@ typedef struct wrapped_rr_closure_arg {
/* The \a on_complete closure passed as part of the pick requires keeping a
* reference to its associated round robin instance. We wrap this closure in
* order to unref the round robin instance upon its invocation */
-static void wrapped_rr_closure(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void wrapped_rr_closure(void* arg, grpc_error* error) {
wrapped_rr_closure_arg* wc_arg = (wrapped_rr_closure_arg*)arg;
GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
- GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
if (wc_arg->rr_policy != nullptr) {
- /* if *target is NULL, no pick has been made by the RR policy (eg, all
+ /* if *target is nullptr, no pick has been made by the RR policy (eg, all
* addresses failed to connect). There won't be any user_data/token
* available */
if (*wc_arg->target != nullptr) {
if (!GRPC_MDISNULL(wc_arg->lb_token)) {
- initial_metadata_add_lb_token(exec_ctx, wc_arg->initial_metadata,
+ initial_metadata_add_lb_token(wc_arg->initial_metadata,
wc_arg->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
} else {
@@ -221,7 +220,7 @@ static void wrapped_rr_closure(grpc_exec_ctx* exec_ctx, void* arg,
gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy,
wc_arg->rr_policy);
}
- GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
+ GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure");
}
GPR_ASSERT(wc_arg->free_when_done != nullptr);
gpr_free(wc_arg->free_when_done);
@@ -241,8 +240,8 @@ typedef struct pending_pick {
/* original pick()'s arguments */
grpc_lb_policy_pick_args pick_args;
- /* output argument where to store the pick()ed connected subchannel, or NULL
- * upon error. */
+ /* output argument where to store the pick()ed connected subchannel, or
+ * nullptr upon error. */
grpc_connected_subchannel** target;
/* args for wrapped_on_complete */
@@ -275,18 +274,30 @@ static void add_pending_pick(pending_pick** root,
typedef struct pending_ping {
struct pending_ping* next;
- /* args for wrapped_notify */
- wrapped_rr_closure_arg wrapped_notify_arg;
+ /* args for sending the ping */
+ wrapped_rr_closure_arg* on_initiate;
+ wrapped_rr_closure_arg* on_ack;
} pending_ping;
-static void add_pending_ping(pending_ping** root, grpc_closure* notify) {
+static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
- pping->wrapped_notify_arg.wrapped_closure = notify;
- pping->wrapped_notify_arg.free_when_done = pping;
+ if (on_initiate != nullptr) {
+ pping->on_initiate =
+ (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate));
+ pping->on_initiate->wrapped_closure = on_initiate;
+ pping->on_initiate->free_when_done = pping->on_initiate;
+ GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure,
+ &pping->on_initiate, grpc_schedule_on_exec_ctx);
+ }
+ if (on_ack != nullptr) {
+ pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack));
+ pping->on_ack->wrapped_closure = on_ack;
+ pping->on_ack->free_when_done = pping->on_ack;
+ GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure,
+ &pping->on_ack, grpc_schedule_on_exec_ctx);
+ }
pping->next = *root;
- GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure,
- wrapped_rr_closure, &pping->wrapped_notify_arg,
- grpc_schedule_on_exec_ctx);
*root = pping;
}
@@ -328,8 +339,8 @@ typedef struct glb_lb_policy {
/** connectivity state of the LB channel */
grpc_connectivity_state lb_channel_connectivity;
- /** stores the deserialized response from the LB. May be NULL until one such
- * response has arrived. */
+ /** stores the deserialized response from the LB. May be nullptr until one
+ * such response has arrived. */
grpc_grpclb_serverlist* serverlist;
/** Index into serverlist for next pick.
@@ -459,9 +470,9 @@ static void* lb_token_copy(void* token) {
? nullptr
: (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
}
-static void lb_token_destroy(grpc_exec_ctx* exec_ctx, void* token) {
+static void lb_token_destroy(void* token) {
if (token != nullptr) {
- GRPC_MDELEM_UNREF(exec_ctx, grpc_mdelem{(uintptr_t)token});
+ GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
}
}
static int lb_token_cmp(void* token1, void* token2) {
@@ -497,7 +508,7 @@ static void parse_server(const grpc_grpclb_server* server,
/* Returns addresses extracted from \a serverlist. */
static grpc_lb_addresses* process_serverlist_locked(
- grpc_exec_ctx* exec_ctx, const grpc_grpclb_serverlist* serverlist) {
+ const grpc_grpclb_serverlist* serverlist) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
* memory in a single block */
@@ -528,9 +539,9 @@ static grpc_lb_addresses* process_serverlist_locked(
strnlen(server->load_balance_token, lb_token_max_length);
grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
server->load_balance_token, lb_token_length);
- user_data = (void*)grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_LB_TOKEN,
- lb_token_mdstr)
- .payload;
+ user_data =
+ (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
+ .payload;
} else {
char* uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
@@ -552,7 +563,7 @@ static grpc_lb_addresses* process_serverlist_locked(
/* Returns the backend addresses extracted from the given addresses */
static grpc_lb_addresses* extract_backend_addresses_locked(
- grpc_exec_ctx* exec_ctx, const grpc_lb_addresses* addresses) {
+ const grpc_lb_addresses* addresses) {
/* first pass: count the number of backend addresses */
size_t num_backends = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
@@ -577,8 +588,8 @@ static grpc_lb_addresses* extract_backend_addresses_locked(
}
static void update_lb_connectivity_status_locked(
- grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
- grpc_connectivity_state rr_state, grpc_error* rr_state_error) {
+ glb_lb_policy* glb_policy, grpc_connectivity_state rr_state,
+ grpc_error* rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
@@ -630,7 +641,7 @@ static void update_lb_connectivity_status_locked(
glb_policy, grpc_connectivity_state_name(rr_state),
glb_policy->rr_policy);
}
- grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state,
+ grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state,
rr_state_error,
"update_lb_connectivity_status_locked");
}
@@ -641,9 +652,9 @@ static void update_lb_connectivity_status_locked(
* If \a force_async is true, then we will manually schedule the
* completion callback even if the pick is available immediately. */
static bool pick_from_internal_rr_locked(
- grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
- const grpc_lb_policy_pick_args* pick_args, bool force_async,
- grpc_connected_subchannel** target, wrapped_rr_closure_arg* wc_arg) {
+ glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args,
+ bool force_async, grpc_connected_subchannel** target,
+ wrapped_rr_closure_arg* wc_arg) {
// Check for drops if we are not using fallback backend addresses.
if (glb_policy->serverlist != nullptr) {
// Look at the index into the serverlist to see if we should drop this call.
@@ -658,7 +669,7 @@ static bool pick_from_internal_rr_locked(
gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy,
wc_arg->rr_policy);
}
- GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
+ GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
// Update client load reporting stats to indicate the number of
// dropped calls. Note that we have to do this here instead of in
// the client_load_reporting filter, because we do not create a
@@ -670,7 +681,7 @@ static bool pick_from_internal_rr_locked(
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
- GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE);
gpr_free(wc_arg->free_when_done);
return false;
}
@@ -680,7 +691,7 @@ static bool pick_from_internal_rr_locked(
}
// Pick via the RR policy.
const bool pick_done = grpc_lb_policy_pick_locked(
- exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context,
+ wc_arg->rr_policy, pick_args, target, wc_arg->context,
(void**)&wc_arg->lb_token, &wc_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
@@ -688,9 +699,9 @@ static bool pick_from_internal_rr_locked(
gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy,
wc_arg->rr_policy);
}
- GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
+ GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
/* add the load reporting initial metadata */
- initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata,
+ initial_metadata_add_lb_token(pick_args->initial_metadata,
pick_args->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
// Pass on client stats via context. Passes ownership of the reference.
@@ -699,7 +710,7 @@ static bool pick_from_internal_rr_locked(
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
- GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE);
gpr_free(wc_arg->free_when_done);
return false;
}
@@ -712,12 +723,11 @@ static bool pick_from_internal_rr_locked(
return pick_done;
}
-static grpc_lb_policy_args* lb_policy_args_create(grpc_exec_ctx* exec_ctx,
- glb_lb_policy* glb_policy) {
+static grpc_lb_policy_args* lb_policy_args_create(glb_lb_policy* glb_policy) {
grpc_lb_addresses* addresses;
if (glb_policy->serverlist != nullptr) {
GPR_ASSERT(glb_policy->serverlist->num_servers > 0);
- addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist);
+ addresses = process_serverlist_locked(glb_policy->serverlist);
} else {
// If rr_handover_locked() is invoked when we haven't received any
// serverlist from the balancer, we use the fallback backends returned by
@@ -737,24 +747,21 @@ static grpc_lb_policy_args* lb_policy_args_create(grpc_exec_ctx* exec_ctx,
args->args = grpc_channel_args_copy_and_add_and_remove(
glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
1);
- grpc_lb_addresses_destroy(exec_ctx, addresses);
+ grpc_lb_addresses_destroy(addresses);
return args;
}
-static void lb_policy_args_destroy(grpc_exec_ctx* exec_ctx,
- grpc_lb_policy_args* args) {
- grpc_channel_args_destroy(exec_ctx, args->args);
+static void lb_policy_args_destroy(grpc_lb_policy_args* args) {
+ grpc_channel_args_destroy(args->args);
gpr_free(args);
}
-static void glb_rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx,
- void* arg, grpc_error* error);
-static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
+static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error);
+static void create_rr_locked(glb_lb_policy* glb_policy,
grpc_lb_policy_args* args) {
GPR_ASSERT(glb_policy->rr_policy == nullptr);
- grpc_lb_policy* new_rr_policy =
- grpc_lb_policy_create(exec_ctx, "round_robin", args);
+ grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args);
if (new_rr_policy == nullptr) {
gpr_log(GPR_ERROR,
"[grpclb %p] Failure creating a RoundRobin policy for serverlist "
@@ -767,21 +774,19 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
return;
}
grpc_lb_policy_set_reresolve_closure_locked(
- exec_ctx, new_rr_policy, glb_policy->base.request_reresolution);
+ new_rr_policy, glb_policy->base.request_reresolution);
glb_policy->base.request_reresolution = nullptr;
glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr;
const grpc_connectivity_state rr_state =
- grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
+ grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy,
&rr_state_error);
/* Connectivity state is a function of the RR policy updated/created */
- update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state,
- rr_state_error);
+ update_lb_connectivity_status_locked(glb_policy, rr_state, rr_state_error);
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
- grpc_pollset_set_add_pollset_set(exec_ctx,
- glb_policy->rr_policy->interested_parties,
+ grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
/* Allocate the data for the tracking of the new RR policy's connectivity.
@@ -796,10 +801,10 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
/* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
- grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
+ grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
- grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy);
+ grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
/* Update picks and pings in wait */
pending_pick* pp;
@@ -814,7 +819,7 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
"[grpclb %p] Pending pick about to (async) PICK from RR %p",
glb_policy, glb_policy->rr_policy);
}
- pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args,
+ pick_from_internal_rr_locked(glb_policy, &pp->pick_args,
true /* force_async */, pp->target,
&pp->wrapped_on_complete_arg);
}
@@ -822,46 +827,53 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
pending_ping* pping;
while ((pping = glb_policy->pending_pings)) {
glb_policy->pending_pings = pping->next;
- GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
- pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
+ grpc_closure* on_initiate = nullptr;
+ grpc_closure* on_ack = nullptr;
+ if (pping->on_initiate != nullptr) {
+ GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
+ pping->on_initiate->rr_policy = glb_policy->rr_policy;
+ on_initiate = &pping->on_initiate->wrapper_closure;
+ }
+ if (pping->on_ack != nullptr) {
+ GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
+ pping->on_ack->rr_policy = glb_policy->rr_policy;
+ on_ack = &pping->on_ack->wrapper_closure;
+ }
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
glb_policy, glb_policy->rr_policy);
}
- grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy,
- &pping->wrapped_notify_arg.wrapper_closure);
+ grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
+ gpr_free(pping);
}
}
-/* glb_policy->rr_policy may be NULL (initial handover) */
-static void rr_handover_locked(grpc_exec_ctx* exec_ctx,
- glb_lb_policy* glb_policy) {
+/* glb_policy->rr_policy may be nullptr (initial handover) */
+static void rr_handover_locked(glb_lb_policy* glb_policy) {
if (glb_policy->shutting_down) return;
- grpc_lb_policy_args* args = lb_policy_args_create(exec_ctx, glb_policy);
+ grpc_lb_policy_args* args = lb_policy_args_create(glb_policy);
GPR_ASSERT(args != nullptr);
if (glb_policy->rr_policy != nullptr) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", glb_policy,
glb_policy->rr_policy);
}
- grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args);
+ grpc_lb_policy_update_locked(glb_policy->rr_policy, args);
} else {
- create_rr_locked(exec_ctx, glb_policy, args);
+ create_rr_locked(glb_policy, args);
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", glb_policy,
glb_policy->rr_policy);
}
}
- lb_policy_args_destroy(exec_ctx, args);
+ lb_policy_args_destroy(args);
}
-static void glb_rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx,
- void* arg, grpc_error* error) {
+static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
rr_connectivity_data* rr_connectivity = (rr_connectivity_data*)arg;
glb_lb_policy* glb_policy = rr_connectivity->glb_policy;
if (glb_policy->shutting_down) {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "glb_rr_connectivity_cb");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
gpr_free(rr_connectivity);
return;
}
@@ -869,25 +881,22 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx,
/* An RR policy that has transitioned into the SHUTDOWN connectivity state
* should not be considered for picks or updates: the SHUTDOWN state is a
* sink, policies can't transition back from it. .*/
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy,
- "rr_connectivity_shutdown");
+ GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown");
glb_policy->rr_policy = nullptr;
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "glb_rr_connectivity_cb");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
gpr_free(rr_connectivity);
return;
}
/* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
- update_lb_connectivity_status_locked(
- exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
+ update_lb_connectivity_status_locked(glb_policy, rr_connectivity->state,
+ GRPC_ERROR_REF(error));
/* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
- grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
+ grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
}
-static void destroy_balancer_name(grpc_exec_ctx* exec_ctx,
- void* balancer_name) {
+static void destroy_balancer_name(void* balancer_name) {
gpr_free(balancer_name);
}
@@ -914,7 +923,7 @@ static int balancer_name_cmp_fn(void* a, void* b) {
* above the grpclb policy.
* - \a args: other args inherited from the grpclb policy. */
static grpc_channel_args* build_lb_channel_args(
- grpc_exec_ctx* exec_ctx, const grpc_lb_addresses* addresses,
+ const grpc_lb_addresses* addresses,
grpc_fake_resolver_response_generator* response_generator,
const grpc_channel_args* args) {
size_t num_grpclb_addrs = 0;
@@ -957,7 +966,7 @@ static grpc_channel_args* build_lb_channel_args(
gpr_free(targets_info_entries);
grpc_channel_args* lb_channel_args =
- grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info,
+ grpc_lb_policy_grpclb_build_lb_channel_args(targets_info,
response_generator, args);
grpc_arg lb_channel_addresses_arg =
@@ -965,34 +974,34 @@ static grpc_channel_args* build_lb_channel_args(
grpc_channel_args* result = grpc_channel_args_copy_and_add(
lb_channel_args, &lb_channel_addresses_arg, 1);
- grpc_slice_hash_table_unref(exec_ctx, targets_info);
- grpc_channel_args_destroy(exec_ctx, lb_channel_args);
- grpc_lb_addresses_destroy(exec_ctx, lb_addresses);
+ grpc_slice_hash_table_unref(targets_info);
+ grpc_channel_args_destroy(lb_channel_args);
+ grpc_lb_addresses_destroy(lb_addresses);
return result;
}
-static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void glb_destroy(grpc_lb_policy* pol) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
GPR_ASSERT(glb_policy->pending_picks == nullptr);
GPR_ASSERT(glb_policy->pending_pings == nullptr);
gpr_free((void*)glb_policy->server_name);
- grpc_channel_args_destroy(exec_ctx, glb_policy->args);
+ grpc_channel_args_destroy(glb_policy->args);
if (glb_policy->client_stats != nullptr) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
}
- grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
+ grpc_connectivity_state_destroy(&glb_policy->state_tracker);
if (glb_policy->serverlist != nullptr) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
if (glb_policy->fallback_backend_addresses != nullptr) {
- grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
+ grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
grpc_subchannel_index_unref();
gpr_free(glb_policy);
}
-static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void glb_shutdown_locked(grpc_lb_policy* pol) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
glb_policy->shutting_down = true;
@@ -1011,11 +1020,11 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
/* lb_on_server_status_received will pick up the cancel and clean up */
}
if (glb_policy->retry_timer_active) {
- grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
glb_policy->retry_timer_active = false;
}
if (glb_policy->fallback_timer_active) {
- grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer);
+ grpc_timer_cancel(&glb_policy->lb_fallback_timer);
glb_policy->fallback_timer_active = false;
}
@@ -1024,10 +1033,9 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
pending_ping* pping = glb_policy->pending_pings;
glb_policy->pending_pings = nullptr;
if (glb_policy->rr_policy != nullptr) {
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
+ GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
} else {
- grpc_lb_policy_try_reresolve(exec_ctx, pol, &grpc_lb_glb_trace,
- GRPC_ERROR_CANCELLED);
+ grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
}
// We destroy the LB channel here because
// glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
@@ -1037,14 +1045,13 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
grpc_channel_destroy(glb_policy->lb_channel);
glb_policy->lb_channel = nullptr;
}
- grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
- "glb_shutdown");
+ grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_REF(error), "glb_shutdown");
while (pp != nullptr) {
pending_pick* next = pp->next;
*pp->target = nullptr;
- GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_REF(error));
gpr_free(pp);
pp = next;
@@ -1052,8 +1059,16 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
while (pping != nullptr) {
pending_ping* next = pping->next;
- GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
- GRPC_ERROR_REF(error));
+ if (pping->on_initiate != nullptr) {
+ GRPC_CLOSURE_SCHED(&pping->on_initiate->wrapper_closure,
+ GRPC_ERROR_REF(error));
+ gpr_free(pping->on_initiate);
+ }
+ if (pping->on_ack != nullptr) {
+ GRPC_CLOSURE_SCHED(&pping->on_ack->wrapper_closure,
+ GRPC_ERROR_REF(error));
+ gpr_free(pping->on_ack);
+ }
gpr_free(pping);
pping = next;
}
@@ -1069,8 +1084,8 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
// pick needs also be cancelled by the RR instance.
// - Otherwise, without an RR instance, picks stay pending at this policy's
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
-// we invoke the completion closure and set *target to NULL right here.
-static void glb_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+// we invoke the completion closure and set *target to nullptr right here.
+static void glb_cancel_pick_locked(grpc_lb_policy* pol,
grpc_connected_subchannel** target,
grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
@@ -1080,7 +1095,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
pending_pick* next = pp->next;
if (pp->target == target) {
*target = nullptr;
- GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
@@ -1090,7 +1105,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
pp = next;
}
if (glb_policy->rr_policy != nullptr) {
- grpc_lb_policy_cancel_pick_locked(exec_ctx, glb_policy->rr_policy, target,
+ grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, target,
GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
@@ -1105,9 +1120,8 @@ static void glb_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
// pick needs also be cancelled by the RR instance.
// - Otherwise, without an RR instance, picks stay pending at this policy's
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
-// we invoke the completion closure and set *target to NULL right here.
-static void glb_cancel_picks_locked(grpc_exec_ctx* exec_ctx,
- grpc_lb_policy* pol,
+// we invoke the completion closure and set *target to nullptr right here.
+static void glb_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
@@ -1118,7 +1132,7 @@ static void glb_cancel_picks_locked(grpc_exec_ctx* exec_ctx,
pending_pick* next = pp->next;
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
@@ -1129,52 +1143,49 @@ static void glb_cancel_picks_locked(grpc_exec_ctx* exec_ctx,
}
if (glb_policy->rr_policy != nullptr) {
grpc_lb_policy_cancel_picks_locked(
- exec_ctx, glb_policy->rr_policy, initial_metadata_flags_mask,
+ glb_policy->rr_policy, initial_metadata_flags_mask,
initial_metadata_flags_eq, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
-static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error);
-static void query_for_backends_locked(grpc_exec_ctx* exec_ctx,
- glb_lb_policy* glb_policy);
-static void start_picking_locked(grpc_exec_ctx* exec_ctx,
- glb_lb_policy* glb_policy) {
+static void lb_on_fallback_timer_locked(void* arg, grpc_error* error);
+static void query_for_backends_locked(glb_lb_policy* glb_policy);
+static void start_picking_locked(glb_lb_policy* glb_policy) {
/* start a timer to fall back */
if (glb_policy->lb_fallback_timeout_ms > 0 &&
glb_policy->serverlist == nullptr && !glb_policy->fallback_timer_active) {
grpc_millis deadline =
- grpc_exec_ctx_now(exec_ctx) + glb_policy->lb_fallback_timeout_ms;
+ grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms;
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer");
GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked,
glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
glb_policy->fallback_timer_active = true;
- grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline,
+ grpc_timer_init(&glb_policy->lb_fallback_timer, deadline,
&glb_policy->lb_on_fallback);
}
glb_policy->started_picking = true;
glb_policy->lb_call_backoff->Reset();
- query_for_backends_locked(exec_ctx, glb_policy);
+ query_for_backends_locked(glb_policy);
}
-static void glb_exit_idle_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void glb_exit_idle_locked(grpc_lb_policy* pol) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
if (!glb_policy->started_picking) {
- start_picking_locked(exec_ctx, glb_policy);
+ start_picking_locked(glb_policy);
}
}
-static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+static int glb_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args,
grpc_connected_subchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete) {
if (pick_args->lb_token_mdelem_storage == nullptr) {
*target = nullptr;
- GRPC_CLOSURE_SCHED(exec_ctx, on_complete,
+ GRPC_CLOSURE_SCHED(on_complete,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No mdelem storage for the LB token. Load reporting "
"won't work without it. Failing"));
@@ -1184,8 +1195,8 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
bool pick_done = false;
if (glb_policy->rr_policy != nullptr) {
const grpc_connectivity_state rr_connectivity_state =
- grpc_lb_policy_check_connectivity_locked(
- exec_ctx, glb_policy->rr_policy, nullptr);
+ grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy,
+ nullptr);
// The glb_policy->rr_policy may have transitioned to SHUTDOWN but the
// callback registered to capture this event
// (glb_rr_connectivity_changed_locked) may not have been invoked yet. We
@@ -1222,9 +1233,8 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
wc_arg->initial_metadata = pick_args->initial_metadata;
wc_arg->free_when_done = wc_arg;
wc_arg->glb_policy = pol;
- pick_done =
- pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args,
- false /* force_async */, target, wc_arg);
+ pick_done = pick_from_internal_rr_locked(
+ glb_policy, pick_args, false /* force_async */, target, wc_arg);
}
} else { // glb_policy->rr_policy == NULL
if (grpc_lb_glb_trace.enabled()) {
@@ -1235,7 +1245,7 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
on_complete);
if (!glb_policy->started_picking) {
- start_picking_locked(exec_ctx, glb_policy);
+ start_picking_locked(glb_policy);
}
pick_done = false;
}
@@ -1243,37 +1253,34 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
}
static grpc_connectivity_state glb_check_connectivity_locked(
- grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
- grpc_error** connectivity_error) {
+ grpc_lb_policy* pol, grpc_error** connectivity_error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
return grpc_connectivity_state_get(&glb_policy->state_tracker,
connectivity_error);
}
-static void glb_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
- grpc_closure* closure) {
+static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
if (glb_policy->rr_policy) {
- grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure);
+ grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
} else {
- add_pending_ping(&glb_policy->pending_pings, closure);
+ add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack);
if (!glb_policy->started_picking) {
- start_picking_locked(exec_ctx, glb_policy);
+ start_picking_locked(glb_policy);
}
}
}
-static void glb_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx,
- grpc_lb_policy* pol,
+static void glb_notify_on_state_change_locked(grpc_lb_policy* pol,
grpc_connectivity_state* current,
grpc_closure* notify) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &glb_policy->state_tracker, current, notify);
+ grpc_connectivity_state_notify_on_state_change(&glb_policy->state_tracker,
+ current, notify);
}
-static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
glb_policy->retry_timer_active = false;
if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr &&
@@ -1281,26 +1288,25 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy);
}
- query_for_backends_locked(exec_ctx, glb_policy);
+ query_for_backends_locked(glb_policy);
}
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_retry_timer");
}
-static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx,
- glb_lb_policy* glb_policy) {
+static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
if (glb_policy->started_picking && glb_policy->updating_lb_call) {
if (glb_policy->retry_timer_active) {
- grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
}
- if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy);
+ if (!glb_policy->shutting_down) start_picking_locked(glb_policy);
glb_policy->updating_lb_call = false;
} else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
- grpc_millis next_try = glb_policy->lb_call_backoff->Step(exec_ctx);
+ grpc_millis next_try = glb_policy->lb_call_backoff->Step();
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
glb_policy);
- grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx);
+ grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
if (timeout > 0) {
gpr_log(GPR_DEBUG,
"[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.",
@@ -1315,43 +1321,40 @@ static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx,
lb_call_on_retry_timer_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
glb_policy->retry_timer_active = true;
- grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
+ grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry);
}
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
"lb_on_server_status_received_locked");
}
-static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error);
+static void send_client_load_report_locked(void* arg, grpc_error* error);
-static void schedule_next_client_load_report(grpc_exec_ctx* exec_ctx,
- glb_lb_policy* glb_policy) {
+static void schedule_next_client_load_report(glb_lb_policy* glb_policy) {
const grpc_millis next_client_load_report_time =
- grpc_exec_ctx_now(exec_ctx) + glb_policy->client_stats_report_interval;
+ grpc_core::ExecCtx::Get()->Now() +
+ glb_policy->client_stats_report_interval;
GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
send_client_load_report_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer,
+ grpc_timer_init(&glb_policy->client_load_report_timer,
next_client_load_report_time,
&glb_policy->client_load_report_closure);
}
-static void client_load_report_done_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void client_load_report_done_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
glb_policy->client_load_report_payload = nullptr;
if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) {
glb_policy->client_load_report_timer_pending = false;
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "client_load_report");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report");
if (glb_policy->lb_call == nullptr) {
- maybe_restart_lb_call(exec_ctx, glb_policy);
+ maybe_restart_lb_call(glb_policy);
}
return;
}
- schedule_next_client_load_report(exec_ctx, glb_policy);
+ schedule_next_client_load_report(glb_policy);
}
static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
@@ -1366,15 +1369,13 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
(drop_entries == nullptr || drop_entries->num_entries == 0);
}
-static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void send_client_load_report_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) {
glb_policy->client_load_report_timer_pending = false;
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "client_load_report");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report");
if (glb_policy->lb_call == nullptr) {
- maybe_restart_lb_call(exec_ctx, glb_policy);
+ maybe_restart_lb_call(glb_policy);
}
return;
}
@@ -1387,7 +1388,7 @@ static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (load_report_counters_are_zero(request)) {
if (glb_policy->last_client_load_report_counters_were_zero) {
grpc_grpclb_request_destroy(request);
- schedule_next_client_load_report(exec_ctx, glb_policy);
+ schedule_next_client_load_report(glb_policy);
return;
}
glb_policy->last_client_load_report_counters_were_zero = true;
@@ -1397,7 +1398,7 @@ static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg,
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
glb_policy->client_load_report_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref_internal(exec_ctx, request_payload_slice);
+ grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
// Send load report message.
grpc_op op;
@@ -1408,20 +1409,16 @@ static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg,
client_load_report_done_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_call_error call_error = grpc_call_start_batch_and_execute(
- exec_ctx, glb_policy->lb_call, &op, 1,
- &glb_policy->client_load_report_closure);
+ glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
if (call_error != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
-static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx,
- void* arg, grpc_error* error);
-static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error);
-static void lb_call_init_locked(grpc_exec_ctx* exec_ctx,
- glb_lb_policy* glb_policy) {
+static void lb_on_server_status_received_locked(void* arg, grpc_error* error);
+static void lb_on_response_received_locked(void* arg, grpc_error* error);
+static void lb_call_init_locked(glb_lb_policy* glb_policy) {
GPR_ASSERT(glb_policy->server_name != nullptr);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
GPR_ASSERT(glb_policy->lb_call == nullptr);
@@ -1434,13 +1431,13 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx,
grpc_millis deadline =
glb_policy->lb_call_timeout_ms == 0
? GRPC_MILLIS_INF_FUTURE
- : grpc_exec_ctx_now(exec_ctx) + glb_policy->lb_call_timeout_ms;
+ : grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms;
glb_policy->lb_call = grpc_channel_create_pollset_set_call(
- exec_ctx, glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS,
+ glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties,
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
&host, deadline, nullptr);
- grpc_slice_unref_internal(exec_ctx, host);
+ grpc_slice_unref_internal(host);
if (glb_policy->client_stats != nullptr) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
@@ -1455,7 +1452,7 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx,
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
glb_policy->lb_request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref_internal(exec_ctx, request_payload_slice);
+ grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
@@ -1478,8 +1475,7 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx,
glb_policy->last_client_load_report_counters_were_zero = false;
}
-static void lb_call_destroy_locked(grpc_exec_ctx* exec_ctx,
- glb_lb_policy* glb_policy) {
+static void lb_call_destroy_locked(glb_lb_policy* glb_policy) {
GPR_ASSERT(glb_policy->lb_call != nullptr);
grpc_call_unref(glb_policy->lb_call);
glb_policy->lb_call = nullptr;
@@ -1488,22 +1484,21 @@ static void lb_call_destroy_locked(grpc_exec_ctx* exec_ctx,
grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
- grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details);
+ grpc_slice_unref_internal(glb_policy->lb_call_status_details);
if (glb_policy->client_load_report_timer_pending) {
- grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer);
+ grpc_timer_cancel(&glb_policy->client_load_report_timer);
}
}
/*
* Auxiliary functions and LB client callbacks.
*/
-static void query_for_backends_locked(grpc_exec_ctx* exec_ctx,
- glb_lb_policy* glb_policy) {
+static void query_for_backends_locked(glb_lb_policy* glb_policy) {
GPR_ASSERT(glb_policy->lb_channel != nullptr);
if (glb_policy->shutting_down) return;
- lb_call_init_locked(exec_ctx, glb_policy);
+ lb_call_init_locked(glb_policy);
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
@@ -1534,8 +1529,8 @@ static void query_for_backends_locked(grpc_exec_ctx* exec_ctx,
op->flags = 0;
op->reserved = nullptr;
op++;
- call_error = grpc_call_start_batch_and_execute(
- exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), nullptr);
+ call_error = grpc_call_start_batch_and_execute(glb_policy->lb_call, ops,
+ (size_t)(op - ops), nullptr);
GPR_ASSERT(GRPC_CALL_OK == call_error);
op = ops;
@@ -1553,7 +1548,7 @@ static void query_for_backends_locked(grpc_exec_ctx* exec_ctx,
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
"lb_on_server_status_received_locked");
call_error = grpc_call_start_batch_and_execute(
- exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
+ glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_server_status_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
@@ -1567,13 +1562,12 @@ static void query_for_backends_locked(grpc_exec_ctx* exec_ctx,
* lb_on_response_received_locked */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked");
call_error = grpc_call_start_batch_and_execute(
- exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
+ glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
-static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void lb_on_response_received_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
@@ -1607,7 +1601,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
* send_client_load_report_locked() */
glb_policy->client_load_report_timer_pending = true;
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report");
- schedule_next_client_load_report(exec_ctx, glb_policy);
+ schedule_next_client_load_report(glb_policy);
} else if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Received initial LB response message; client load "
@@ -1652,11 +1646,10 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
} else {
/* or dispose of the fallback */
- grpc_lb_addresses_destroy(exec_ctx,
- glb_policy->fallback_backend_addresses);
+ grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses = nullptr;
if (glb_policy->fallback_timer_active) {
- grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer);
+ grpc_timer_cancel(&glb_policy->lb_fallback_timer);
glb_policy->fallback_timer_active = false;
}
}
@@ -1665,7 +1658,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
* update or in glb_destroy() */
glb_policy->serverlist = serverlist;
glb_policy->serverlist_index = 0;
- rr_handover_locked(exec_ctx, glb_policy);
+ rr_handover_locked(glb_policy);
}
} else {
if (grpc_lb_glb_trace.enabled()) {
@@ -1675,14 +1668,14 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
}
grpc_grpclb_destroy_serverlist(serverlist);
}
- } else { /* serverlist == NULL */
+ } else { /* serverlist == nullptr */
gpr_log(GPR_ERROR,
"[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
glb_policy,
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
}
}
- grpc_slice_unref_internal(exec_ctx, response_slice);
+ grpc_slice_unref_internal(response_slice);
if (!glb_policy->shutting_down) {
/* keep listening for serverlist updates */
op->op = GRPC_OP_RECV_MESSAGE;
@@ -1693,23 +1686,22 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
/* reuse the "lb_on_response_received_locked" weak ref taken in
* query_for_backends_locked() */
const grpc_call_error call_error = grpc_call_start_batch_and_execute(
- exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
+ glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received); /* loop */
GPR_ASSERT(GRPC_CALL_OK == call_error);
} else {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
"lb_on_response_received_locked_shutdown");
}
} else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received_locked" weak ref taken in
* query_for_backends_locked() and reused in every reception loop */
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
"lb_on_response_received_locked_empty_payload");
}
}
-static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
glb_policy->fallback_timer_active = false;
/* If we receive a serverlist after the timer fires but before this callback
@@ -1722,15 +1714,13 @@ static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
glb_policy);
}
GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
- rr_handover_locked(exec_ctx, glb_policy);
+ rr_handover_locked(glb_policy);
}
}
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "grpclb_fallback_timer");
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_fallback_timer");
}
-static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx,
- void* arg, grpc_error* error) {
+static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
GPR_ASSERT(glb_policy->lb_call != nullptr);
if (grpc_lb_glb_trace.enabled()) {
@@ -1744,29 +1734,28 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx,
gpr_free(status_details);
}
/* We need to perform cleanups no matter what. */
- lb_call_destroy_locked(exec_ctx, glb_policy);
+ lb_call_destroy_locked(glb_policy);
// If the load report timer is still pending, we wait for it to be
// called before restarting the call. Otherwise, we restart the call
// here.
if (!glb_policy->client_load_report_timer_pending) {
- maybe_restart_lb_call(exec_ctx, glb_policy);
+ maybe_restart_lb_call(glb_policy);
}
}
-static void fallback_update_locked(grpc_exec_ctx* exec_ctx,
- glb_lb_policy* glb_policy,
+static void fallback_update_locked(glb_lb_policy* glb_policy,
const grpc_lb_addresses* addresses) {
GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
- grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
+ grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses =
- extract_backend_addresses_locked(exec_ctx, addresses);
+ extract_backend_addresses_locked(addresses);
if (glb_policy->lb_fallback_timeout_ms > 0 &&
glb_policy->rr_policy != nullptr) {
- rr_handover_locked(exec_ctx, glb_policy);
+ rr_handover_locked(glb_policy);
}
}
-static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+static void glb_update_locked(grpc_lb_policy* policy,
const grpc_lb_policy_args* args) {
glb_lb_policy* glb_policy = (glb_lb_policy*)policy;
const grpc_arg* arg =
@@ -1776,7 +1765,7 @@ static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
// If we don't have a current channel to the LB, go into TRANSIENT
// FAILURE.
grpc_connectivity_state_set(
- exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"glb_update_missing");
} else {
@@ -1793,16 +1782,16 @@ static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
// If a non-empty serverlist hasn't been received from the balancer,
// propagate the update to fallback_backend_addresses.
if (glb_policy->serverlist == nullptr) {
- fallback_update_locked(exec_ctx, glb_policy, addresses);
+ fallback_update_locked(glb_policy, addresses);
}
GPR_ASSERT(glb_policy->lb_channel != nullptr);
// Propagate updates to the LB channel (pick_first) through the fake
// resolver.
grpc_channel_args* lb_channel_args = build_lb_channel_args(
- exec_ctx, addresses, glb_policy->response_generator, args->args);
+ addresses, glb_policy->response_generator, args->args);
grpc_fake_resolver_response_generator_set_response(
- exec_ctx, glb_policy->response_generator, lb_channel_args);
- grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+ glb_policy->response_generator, lb_channel_args);
+ grpc_channel_args_destroy(lb_channel_args);
// Start watching the LB channel connectivity for connection, if not
// already doing so.
if (!glb_policy->watching_lb_channel) {
@@ -1814,7 +1803,7 @@ static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
glb_policy->watching_lb_channel = true;
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity");
grpc_client_channel_watch_connectivity_state(
- exec_ctx, client_channel_elem,
+ client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
glb_policy->base.interested_parties),
&glb_policy->lb_channel_connectivity,
@@ -1825,8 +1814,7 @@ static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
// Invoked as part of the update process. It continues watching the LB channel
// until it shuts down or becomes READY. It's invoked even if the LB channel
// stayed READY throughout the update (for example if the update is identical).
-static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx,
- void* arg,
+static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (glb_policy->shutting_down) goto done;
@@ -1842,7 +1830,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx,
grpc_channel_get_channel_stack(glb_policy->lb_channel));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
- exec_ctx, client_channel_elem,
+ client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
glb_policy->base.interested_parties),
&glb_policy->lb_channel_connectivity,
@@ -1861,29 +1849,28 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx,
// lb_call.
} else if (glb_policy->started_picking) {
if (glb_policy->retry_timer_active) {
- grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
glb_policy->retry_timer_active = false;
}
- start_picking_locked(exec_ctx, glb_policy);
+ start_picking_locked(glb_policy);
}
/* fallthrough */
case GRPC_CHANNEL_SHUTDOWN:
done:
glb_policy->watching_lb_channel = false;
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
"watch_lb_channel_connectivity_cb_shutdown");
break;
}
}
static void glb_set_reresolve_closure_locked(
- grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
- grpc_closure* request_reresolution) {
+ grpc_lb_policy* policy, grpc_closure* request_reresolution) {
glb_lb_policy* glb_policy = (glb_lb_policy*)policy;
GPR_ASSERT(!glb_policy->shutting_down);
GPR_ASSERT(glb_policy->base.request_reresolution == nullptr);
if (glb_policy->rr_policy != nullptr) {
- grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, glb_policy->rr_policy,
+ grpc_lb_policy_set_reresolve_closure_locked(glb_policy->rr_policy,
request_reresolution);
} else {
glb_policy->base.request_reresolution = request_reresolution;
@@ -1904,8 +1891,7 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_update_locked,
glb_set_reresolve_closure_locked};
-static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx,
- grpc_lb_policy_factory* factory,
+static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
/* Count the number of gRPC-LB addresses. There must be at least one. */
const grpc_arg* arg =
@@ -1926,7 +1912,7 @@ static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx,
arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != nullptr);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
- grpc_uri* uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
+ grpc_uri* uri = grpc_uri_parse(arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
@@ -1959,26 +1945,26 @@ static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx,
/* Extract the backend addresses (may be empty) from the resolver for
* fallback. */
glb_policy->fallback_backend_addresses =
- extract_backend_addresses_locked(exec_ctx, addresses);
+ extract_backend_addresses_locked(addresses);
/* Create a client channel over them to communicate with a LB service */
glb_policy->response_generator =
grpc_fake_resolver_response_generator_create();
grpc_channel_args* lb_channel_args = build_lb_channel_args(
- exec_ctx, addresses, glb_policy->response_generator, args->args);
+ addresses, glb_policy->response_generator, args->args);
char* uri_str;
gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
- exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
+ uri_str, args->client_channel_factory, lb_channel_args);
/* Propagate initial resolution */
grpc_fake_resolver_response_generator_set_response(
- exec_ctx, glb_policy->response_generator, lb_channel_args);
- grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+ glb_policy->response_generator, lb_channel_args);
+ grpc_channel_args_destroy(lb_channel_args);
gpr_free(uri_str);
if (glb_policy->lb_channel == nullptr) {
gpr_free((void*)glb_policy->server_name);
- grpc_channel_args_destroy(exec_ctx, glb_policy->args);
+ grpc_channel_args_destroy(glb_policy->args);
gpr_free(glb_policy);
return nullptr;
}
@@ -2009,7 +1995,7 @@ grpc_lb_policy_factory* grpc_glb_lb_factory_create() {
// Only add client_load_reporting filter if the grpclb LB policy is used.
static bool maybe_add_client_load_reporting_filter(
- grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, void* arg) {
+ grpc_channel_stack_builder* builder, void* arg) {
const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
const grpc_arg* channel_arg =
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc
index aacaec197d..a8ecea4212 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc
@@ -26,17 +26,17 @@
#include "src/core/lib/support/string.h"
grpc_channel* grpc_lb_policy_grpclb_create_lb_channel(
- grpc_exec_ctx* exec_ctx, const char* lb_service_target_addresses,
+ const char* lb_service_target_addresses,
grpc_client_channel_factory* client_channel_factory,
grpc_channel_args* args) {
grpc_channel* lb_channel = grpc_client_channel_factory_create_channel(
- exec_ctx, client_channel_factory, lb_service_target_addresses,
+ client_channel_factory, lb_service_target_addresses,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, args);
return lb_channel;
}
grpc_channel_args* grpc_lb_policy_grpclb_build_lb_channel_args(
- grpc_exec_ctx* exec_ctx, grpc_slice_hash_table* targets_info,
+ grpc_slice_hash_table* targets_info,
grpc_fake_resolver_response_generator* response_generator,
const grpc_channel_args* args) {
const grpc_arg to_add[] = {
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
index 70b1c28b0d..56104b2ec0 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
@@ -31,12 +31,12 @@
* \a client_channel_factory will be used for the creation of the LB channel,
* alongside the channel args passed in \a args. */
grpc_channel* grpc_lb_policy_grpclb_create_lb_channel(
- grpc_exec_ctx* exec_ctx, const char* lb_service_target_addresses,
+ const char* lb_service_target_addresses,
grpc_client_channel_factory* client_channel_factory,
grpc_channel_args* args);
grpc_channel_args* grpc_lb_policy_grpclb_build_lb_channel_args(
- grpc_exec_ctx* exec_ctx, grpc_slice_hash_table* targets_info,
+ grpc_slice_hash_table* targets_info,
grpc_fake_resolver_response_generator* response_generator,
const grpc_channel_args* args);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
index 8eaa90e97b..76bcddf945 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
@@ -29,7 +29,7 @@
#include "src/core/lib/support/string.h"
grpc_channel* grpc_lb_policy_grpclb_create_lb_channel(
- grpc_exec_ctx* exec_ctx, const char* lb_service_target_addresses,
+ const char* lb_service_target_addresses,
grpc_client_channel_factory* client_channel_factory,
grpc_channel_args* args) {
grpc_channel_args* new_args = args;
@@ -50,19 +50,19 @@ grpc_channel* grpc_lb_policy_grpclb_create_lb_channel(
new_args = grpc_channel_args_copy_and_add_and_remove(
args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
- grpc_channel_credentials_unref(exec_ctx, creds_sans_call_creds);
+ grpc_channel_credentials_unref(creds_sans_call_creds);
}
grpc_channel* lb_channel = grpc_client_channel_factory_create_channel(
- exec_ctx, client_channel_factory, lb_service_target_addresses,
+ client_channel_factory, lb_service_target_addresses,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args);
if (channel_credentials != nullptr) {
- grpc_channel_args_destroy(exec_ctx, new_args);
+ grpc_channel_args_destroy(new_args);
}
return lb_channel;
}
grpc_channel_args* grpc_lb_policy_grpclb_build_lb_channel_args(
- grpc_exec_ctx* exec_ctx, grpc_slice_hash_table* targets_info,
+ grpc_slice_hash_table* targets_info,
grpc_fake_resolver_response_generator* response_generator,
const grpc_channel_args* args) {
const grpc_arg to_add[] = {
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
index 2c8d7f4291..fc781da330 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
@@ -215,9 +215,6 @@ grpc_grpclb_serverlist* grpc_grpclb_response_parse_serverlist(
return nullptr;
}
}
- if (res.server_list.has_expiration_interval) {
- sl->expiration_interval = res.server_list.expiration_interval;
- }
return sl;
}
@@ -237,8 +234,6 @@ grpc_grpclb_serverlist* grpc_grpclb_serverlist_copy(
grpc_grpclb_serverlist* copy =
(grpc_grpclb_serverlist*)gpr_zalloc(sizeof(grpc_grpclb_serverlist));
copy->num_servers = sl->num_servers;
- memcpy(&copy->expiration_interval, &sl->expiration_interval,
- sizeof(grpc_grpclb_duration));
copy->servers = (grpc_grpclb_server**)gpr_malloc(sizeof(grpc_grpclb_server*) *
sl->num_servers);
for (size_t i = 0; i < sl->num_servers; i++) {
@@ -257,10 +252,6 @@ bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist* lhs,
if (lhs->num_servers != rhs->num_servers) {
return false;
}
- if (grpc_grpclb_duration_compare(&lhs->expiration_interval,
- &rhs->expiration_interval) != 0) {
- return false;
- }
for (size_t i = 0; i < lhs->num_servers; i++) {
if (!grpc_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) {
return false;
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index 017c40ec1a..ccb0212643 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -35,7 +35,6 @@ typedef grpc_lb_v1_Duration grpc_grpclb_duration;
typedef struct {
grpc_grpclb_server** servers;
size_t num_servers;
- grpc_grpclb_duration expiration_interval;
} grpc_grpclb_serverlist;
/** Create a request for a gRPC LB service under \a lb_service_name */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index 6a5d54c82a..4e6c5cc832 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -61,9 +61,8 @@ const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3] = {
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
+const pb_field_t grpc_lb_v1_ServerList_fields[2] = {
PB_FIELD( 1, MESSAGE , REPEATED, CALLBACK, FIRST, grpc_lb_v1_ServerList, servers, servers, &grpc_lb_v1_Server_fields),
- PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ServerList, expiration_interval, servers, &grpc_lb_v1_Duration_fields),
PB_LAST_FIELD
};
@@ -85,7 +84,7 @@ const pb_field_t grpc_lb_v1_Server_fields[5] = {
* numbers or field sizes that are larger than what can fit in 8 or 16 bit
* field descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
@@ -96,7 +95,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request)
* numbers or field sizes that are larger than what can fit in the default
* 8 bit descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index 93333d1aed..066c076202 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -14,6 +14,11 @@ extern "C" {
#endif
/* Struct definitions */
+typedef struct _grpc_lb_v1_ServerList {
+ pb_callback_t servers;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
+} grpc_lb_v1_ServerList;
+
typedef struct _grpc_lb_v1_ClientStatsPerToken {
pb_callback_t load_balance_token;
bool has_num_calls;
@@ -79,13 +84,6 @@ typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceResponse) */
} grpc_lb_v1_InitialLoadBalanceResponse;
-typedef struct _grpc_lb_v1_ServerList {
- pb_callback_t servers;
- bool has_expiration_interval;
- grpc_lb_v1_Duration expiration_interval;
-/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
-} grpc_lb_v1_ServerList;
-
typedef struct _grpc_lb_v1_LoadBalanceRequest {
bool has_initial_request;
grpc_lb_v1_InitialLoadBalanceRequest initial_request;
@@ -113,7 +111,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
+#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}}
#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
#define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0}
@@ -123,10 +121,11 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
+#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}}
#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
/* Field tags (for use in manual encoding/decoding) */
+#define grpc_lb_v1_ServerList_servers_tag 1
#define grpc_lb_v1_ClientStatsPerToken_load_balance_token_tag 1
#define grpc_lb_v1_ClientStatsPerToken_num_calls_tag 2
#define grpc_lb_v1_Duration_seconds_tag 1
@@ -146,8 +145,6 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_ClientStats_calls_finished_with_drop_tag 8
#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2
-#define grpc_lb_v1_ServerList_servers_tag 1
-#define grpc_lb_v1_ServerList_expiration_interval_tag 3
#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
#define grpc_lb_v1_LoadBalanceResponse_initial_response_tag 1
@@ -162,7 +159,7 @@ extern const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3];
extern const pb_field_t grpc_lb_v1_ClientStats_fields[7];
extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3];
-extern const pb_field_t grpc_lb_v1_ServerList_fields[3];
+extern const pb_field_t grpc_lb_v1_ServerList_fields[2];
extern const pb_field_t grpc_lb_v1_Server_fields[5];
/* Maximum encoded size of messages (where known) */
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 228a77d9db..0861261359 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -57,12 +57,12 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
-static void pf_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void pf_destroy(grpc_lb_policy* pol) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
GPR_ASSERT(p->subchannel_list == nullptr);
GPR_ASSERT(p->latest_pending_subchannel_list == nullptr);
GPR_ASSERT(p->pending_picks == nullptr);
- grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ grpc_connectivity_state_destroy(&p->state_tracker);
gpr_free(p);
grpc_subchannel_index_unref();
if (grpc_lb_pick_first_trace.enabled()) {
@@ -70,7 +70,7 @@ static void pf_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
}
}
-static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void pf_shutdown_locked(grpc_lb_policy* pol) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
@@ -81,28 +81,27 @@ static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
while ((pp = p->pending_picks) != nullptr) {
p->pending_picks = pp->next;
*pp->target = nullptr;
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error));
gpr_free(pp);
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
- "shutdown");
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_REF(error), "shutdown");
if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
"pf_shutdown");
p->subchannel_list = nullptr;
}
if (p->latest_pending_subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown");
+ p->latest_pending_subchannel_list, "pf_shutdown");
p->latest_pending_subchannel_list = nullptr;
}
- grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_pick_first_trace,
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
-static void pf_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+static void pf_cancel_pick_locked(grpc_lb_policy* pol,
grpc_connected_subchannel** target,
grpc_error* error) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
@@ -112,7 +111,7 @@ static void pf_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
pending_pick* next = pp->next;
if (pp->target == target) {
*target = nullptr;
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
gpr_free(pp);
@@ -125,7 +124,7 @@ static void pf_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
GRPC_ERROR_UNREF(error);
}
-static void pf_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+static void pf_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
@@ -136,7 +135,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
pending_pick* next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
gpr_free(pp);
@@ -149,8 +148,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
GRPC_ERROR_UNREF(error);
}
-static void start_picking_locked(grpc_exec_ctx* exec_ctx,
- pick_first_lb_policy* p) {
+static void start_picking_locked(pick_first_lb_policy* p) {
p->started_picking = true;
if (p->subchannel_list != nullptr &&
p->subchannel_list->num_subchannels > 0) {
@@ -160,21 +158,21 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx,
grpc_lb_subchannel_list_ref_for_connectivity_watch(
p->subchannel_list, "connectivity_watch+start_picking");
grpc_lb_subchannel_data_start_connectivity_watch(
- exec_ctx, &p->subchannel_list->subchannels[i]);
+ &p->subchannel_list->subchannels[i]);
break;
}
}
}
}
-static void pf_exit_idle_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void pf_exit_idle_locked(grpc_lb_policy* pol) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ start_picking_locked(p);
}
}
-static int pf_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+static int pf_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args,
grpc_connected_subchannel** target,
grpc_call_context_element* context, void** user_data,
@@ -188,7 +186,7 @@ static int pf_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
}
// No subchannel selected yet, so handle asynchronously.
if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ start_picking_locked(p);
}
pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
@@ -199,48 +197,47 @@ static int pf_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
return 0;
}
-static void destroy_unselected_subchannels_locked(grpc_exec_ctx* exec_ctx,
- pick_first_lb_policy* p) {
+static void destroy_unselected_subchannels_locked(pick_first_lb_policy* p) {
for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
grpc_lb_subchannel_data* sd = &p->subchannel_list->subchannels[i];
if (p->selected != sd) {
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
+ grpc_lb_subchannel_data_unref_subchannel(sd,
"selected_different_subchannel");
}
}
}
static grpc_connectivity_state pf_check_connectivity_locked(
- grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, grpc_error** error) {
+ grpc_lb_policy* pol, grpc_error** error) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
return grpc_connectivity_state_get(&p->state_tracker, error);
}
-static void pf_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx,
- grpc_lb_policy* pol,
+static void pf_notify_on_state_change_locked(grpc_lb_policy* pol,
grpc_connectivity_state* current,
grpc_closure* notify) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
- grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
- current, notify);
+ grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+ notify);
}
-static void pf_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
- grpc_closure* closure) {
+static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
if (p->selected) {
- grpc_connected_subchannel_ping(exec_ctx, p->selected->connected_subchannel,
- closure);
+ grpc_connected_subchannel_ping(p->selected->connected_subchannel,
+ on_initiate, on_ack);
} else {
- GRPC_CLOSURE_SCHED(exec_ctx, closure,
+ GRPC_CLOSURE_SCHED(on_initiate,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
+ GRPC_CLOSURE_SCHED(on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
}
}
-static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error);
+static void pf_connectivity_changed_locked(void* arg, grpc_error* error);
-static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+static void pf_update_locked(grpc_lb_policy* policy,
const grpc_lb_policy_args* args) {
pick_first_lb_policy* p = (pick_first_lb_policy*)policy;
const grpc_arg* arg =
@@ -249,7 +246,7 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
if (p->subchannel_list == nullptr) {
// If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"pf_update_missing");
} else {
@@ -268,17 +265,17 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
(void*)p, (unsigned long)addresses->num_addresses);
}
grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
- exec_ctx, &p->base, &grpc_lb_pick_first_trace, addresses, args,
+ &p->base, &grpc_lb_pick_first_trace, addresses, args,
pf_connectivity_changed_locked);
if (subchannel_list->num_subchannels == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
// subchannels and put the channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
"sl_shutdown_empty_update");
}
p->subchannel_list = subchannel_list; // Empty list.
@@ -289,7 +286,7 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
"pf_update_before_selected");
}
p->subchannel_list = subchannel_list;
@@ -314,19 +311,19 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
p->selected = sd;
if (p->subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->subchannel_list, "pf_update_includes_selected");
+ p->subchannel_list, "pf_update_includes_selected");
}
p->subchannel_list = subchannel_list;
- destroy_unselected_subchannels_locked(exec_ctx, p);
+ destroy_unselected_subchannels_locked(p);
grpc_lb_subchannel_list_ref_for_connectivity_watch(
subchannel_list, "connectivity_watch+replace_selected");
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
if (p->latest_pending_subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->latest_pending_subchannel_list,
+ p->latest_pending_subchannel_list,
"pf_update_includes_selected+outdated");
p->latest_pending_subchannel_list = nullptr;
}
@@ -346,8 +343,7 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
(void*)subchannel_list);
}
grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->latest_pending_subchannel_list,
- "sl_outdated_dont_smash");
+ p->latest_pending_subchannel_list, "sl_outdated_dont_smash");
}
p->latest_pending_subchannel_list = subchannel_list;
}
@@ -357,12 +353,11 @@ static void pf_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
grpc_lb_subchannel_list_ref_for_connectivity_watch(
subchannel_list, "connectivity_watch+update");
grpc_lb_subchannel_data_start_connectivity_watch(
- exec_ctx, &subchannel_list->subchannels[0]);
+ &subchannel_list->subchannels[0]);
}
}
-static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg;
pick_first_lb_policy* p = (pick_first_lb_policy*)sd->subchannel_list->policy;
if (grpc_lb_pick_first_trace.enabled()) {
@@ -380,18 +375,18 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
}
// If the policy is shutting down, unref and return.
if (p->shutdown) {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "pf_shutdown");
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown");
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
+ "pf_shutdown");
return;
}
// If the subchannel list is shutting down, stop watching.
if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_sl_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "pf_sl_shutdown");
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown");
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
+ "pf_sl_shutdown");
return;
}
// If we're still here, the notification must be for a subchannel in
@@ -407,15 +402,15 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list != nullptr) {
p->selected = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "selected_not_ready+switch_to_update");
+ sd->subchannel_list, "selected_not_ready+switch_to_update");
grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update");
+ p->subchannel_list, "selected_not_ready+switch_to_update");
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = nullptr;
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
} else {
// TODO(juanlishen): we re-resolve when the selected subchannel goes to
@@ -426,27 +421,26 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
+ GRPC_ERROR_NONE,
"selected_changed+reresolve");
p->started_picking = false;
- grpc_lb_policy_try_reresolve(
- exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
+ GRPC_ERROR_NONE);
} else {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ grpc_connectivity_state_set(&p->state_tracker,
sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
}
if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
} else {
p->selected = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "pf_selected_shutdown");
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
- "pf_selected_shutdown");
+ sd->subchannel_list, "pf_selected_shutdown");
+ grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown");
}
}
return;
@@ -466,15 +460,14 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
// p->subchannel_list.
if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != nullptr);
- grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
"finish_update");
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = nullptr;
}
// Cases 1 and 2.
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
- "connecting_ready");
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
+ GRPC_ERROR_NONE, "connecting_ready");
sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(sd->subchannel),
"connected");
@@ -484,7 +477,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
(void*)sd->subchannel);
}
// Drop all other subchannels, since we are now connected.
- destroy_unselected_subchannels_locked(exec_ctx, p);
+ destroy_unselected_subchannels_locked(p);
// Update any calls that were waiting for a pick.
pending_pick* pp;
while ((pp = p->pending_picks)) {
@@ -496,15 +489,15 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
"Servicing pending pick with selected subchannel %p",
(void*)p->selected);
}
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
do {
sd->subchannel_list->checking_subchannel =
(sd->subchannel_list->checking_subchannel + 1) %
@@ -517,29 +510,28 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (sd->subchannel_list->checking_subchannel == 0 &&
sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
// Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
break;
}
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_REF(error), "connecting_changed");
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_REF(error),
+ "connecting_changed");
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
break;
}
case GRPC_CHANNEL_SHUTDOWN: {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
- "pf_candidate_shutdown");
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown");
// Advance to next subchannel and check its state.
grpc_lb_subchannel_data* original_sd = sd;
do {
@@ -551,31 +543,30 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
} while (sd->subchannel == nullptr && sd != original_sd);
if (sd == original_sd) {
grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "pf_exhausted_subchannels");
+ sd->subchannel_list, "pf_exhausted_subchannels");
if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
+ GRPC_ERROR_NONE,
"exhausted_subchannels+reresolve");
p->started_picking = false;
- grpc_lb_policy_try_reresolve(
- exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
+ GRPC_ERROR_NONE);
}
} else {
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "subchannel_failed");
}
// Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
}
}
}
static void pf_set_reresolve_closure_locked(
- grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
- grpc_closure* request_reresolution) {
+ grpc_lb_policy* policy, grpc_closure* request_reresolution) {
pick_first_lb_policy* p = (pick_first_lb_policy*)policy;
GPR_ASSERT(!p->shutdown);
GPR_ASSERT(policy->request_reresolution == nullptr);
@@ -599,15 +590,14 @@ static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {}
static void pick_first_factory_unref(grpc_lb_policy_factory* factory) {}
-static grpc_lb_policy* create_pick_first(grpc_exec_ctx* exec_ctx,
- grpc_lb_policy_factory* factory,
+static grpc_lb_policy* create_pick_first(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
GPR_ASSERT(args->client_channel_factory != nullptr);
pick_first_lb_policy* p = (pick_first_lb_policy*)gpr_zalloc(sizeof(*p));
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG, "Pick First %p created.", (void*)p);
}
- pf_update_locked(exec_ctx, &p->base, args);
+ pf_update_locked(&p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
grpc_subchannel_index_ref();
return &p->base;
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index f68daba474..b0c84017df 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -154,7 +154,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p,
}
}
-static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void rr_destroy(grpc_lb_policy* pol) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p",
@@ -162,12 +162,12 @@ static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
}
GPR_ASSERT(p->subchannel_list == nullptr);
GPR_ASSERT(p->latest_pending_subchannel_list == nullptr);
- grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ grpc_connectivity_state_destroy(&p->state_tracker);
grpc_subchannel_index_unref();
gpr_free(p);
}
-static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void rr_shutdown_locked(grpc_lb_policy* pol) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
@@ -178,29 +178,27 @@ static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
while ((pp = p->pending_picks) != nullptr) {
p->pending_picks = pp->next;
*pp->target = nullptr;
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error));
gpr_free(pp);
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
- "rr_shutdown");
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_REF(error), "rr_shutdown");
if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
"sl_shutdown_rr_shutdown");
p->subchannel_list = nullptr;
}
if (p->latest_pending_subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->latest_pending_subchannel_list,
- "sl_shutdown_pending_rr_shutdown");
+ p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown");
p->latest_pending_subchannel_list = nullptr;
}
- grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
-static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+static void rr_cancel_pick_locked(grpc_lb_policy* pol,
grpc_connected_subchannel** target,
grpc_error* error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
@@ -210,7 +208,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
pending_pick* next = pp->next;
if (pp->target == target) {
*target = nullptr;
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
gpr_free(pp);
@@ -223,7 +221,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
GRPC_ERROR_UNREF(error);
}
-static void rr_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+static void rr_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
@@ -235,7 +233,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
*pp->target = nullptr;
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
gpr_free(pp);
@@ -248,27 +246,26 @@ static void rr_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
GRPC_ERROR_UNREF(error);
}
-static void start_picking_locked(grpc_exec_ctx* exec_ctx,
- round_robin_lb_policy* p) {
+static void start_picking_locked(round_robin_lb_policy* p) {
p->started_picking = true;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
"connectivity_watch");
grpc_lb_subchannel_data_start_connectivity_watch(
- exec_ctx, &p->subchannel_list->subchannels[i]);
+ &p->subchannel_list->subchannels[i]);
}
}
}
-static void rr_exit_idle_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void rr_exit_idle_locked(grpc_lb_policy* pol) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ start_picking_locked(p);
}
}
-static int rr_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+static int rr_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args,
grpc_connected_subchannel** target,
grpc_call_context_element* context, void** user_data,
@@ -305,7 +302,7 @@ static int rr_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
}
/* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ start_picking_locked(p);
}
pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
@@ -348,8 +345,7 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
* (the grpc_lb_subchannel_data associated with the updated subchannel) and the
* subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
* only if the policy transitions to state TRANSIENT_FAILURE. */
-static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx,
- grpc_lb_subchannel_data* sd,
+static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
grpc_error* error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
@@ -365,55 +361,45 @@ static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx,
* CHECK: subchannel_list->num_shutdown ==
* subchannel_list->num_subchannels.
*
- * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
+ * 4) RULE: ALL subchannels are SHUTDOWN or TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
- * CHECK: subchannel_list->num_transient_failures ==
+ * CHECK: subchannel_list->num_shutdown +
+ * subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
- *
- * 5) RULE: ALL subchannels are IDLE => policy is IDLE.
- * CHECK: subchannel_list->num_idle == subchannel_list->num_subchannels.
- * (Note that all the subchannels will transition from IDLE to CONNECTING
- * in batch when we start trying to connect.)
*/
- // TODO(juanlishen): if the subchannel states are mixed by {SHUTDOWN,
- // TRANSIENT_FAILURE}, we don't change the state. We may want to improve on
- // this.
+ // TODO(juanlishen): For rule 4, we may want to re-resolve instead.
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy;
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE);
if (subchannel_list->num_ready > 0) {
/* 1) READY */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
/* 2) CONNECTING */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
- "rr_connecting");
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_NONE, "rr_connecting");
} else if (subchannel_list->num_shutdown ==
subchannel_list->num_subchannels) {
/* 3) IDLE and re-resolve */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"rr_exhausted_subchannels+reresolve");
p->started_picking = false;
- grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
GRPC_ERROR_NONE);
- } else if (subchannel_list->num_transient_failures ==
+ } else if (subchannel_list->num_shutdown +
+ subchannel_list->num_transient_failures ==
subchannel_list->num_subchannels) {
/* 4) TRANSIENT_FAILURE */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
- } else if (subchannel_list->num_idle == subchannel_list->num_subchannels) {
- /* 5) IDLE */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
- GRPC_ERROR_NONE, "rr_idle");
}
GRPC_ERROR_UNREF(error);
}
-static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg;
round_robin_lb_policy* p =
(round_robin_lb_policy*)sd->subchannel_list->policy;
@@ -431,18 +417,18 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
}
// If the policy is shutting down, unref and return.
if (p->shutdown) {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "rr_shutdown");
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown");
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
+ "rr_shutdown");
return;
}
// If the subchannel list is shutting down, stop watching.
if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_sl_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "rr_sl_shutdown");
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown");
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
+ "rr_sl_shutdown");
return;
}
// If we're still here, the notification must be for a subchannel in
@@ -455,14 +441,13 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Update state counters and new overall state.
update_state_counters_locked(sd);
- update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
+ update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
// If the sd's new state is SHUTDOWN, unref the subchannel.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
- "rr_connectivity_shutdown");
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown");
grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown");
+ sd->subchannel_list, "rr_connectivity_shutdown");
} else { // sd not in SHUTDOWN
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
if (sd->connected_subchannel == nullptr) {
@@ -490,8 +475,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
}
if (p->subchannel_list != nullptr) {
// dispose of the current subchannel_list
- grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->subchannel_list, "sl_phase_out_shutdown");
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
+ "sl_phase_out_shutdown");
}
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = nullptr;
@@ -523,32 +508,31 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
(void*)p, (void*)selected->subchannel,
(void*)p->subchannel_list, (unsigned long)next_ready_index);
}
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
}
static grpc_connectivity_state rr_check_connectivity_locked(
- grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, grpc_error** error) {
+ grpc_lb_policy* pol, grpc_error** error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
return grpc_connectivity_state_get(&p->state_tracker, error);
}
-static void rr_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx,
- grpc_lb_policy* pol,
+static void rr_notify_on_state_change_locked(grpc_lb_policy* pol,
grpc_connectivity_state* current,
grpc_closure* notify) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
- grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
- current, notify);
+ grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+ notify);
}
-static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
- grpc_closure* closure) {
+static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
if (next_ready_index < p->subchannel_list->num_subchannels) {
@@ -556,16 +540,17 @@ static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
&p->subchannel_list->subchannels[next_ready_index];
grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
selected->connected_subchannel, "rr_ping");
- grpc_connected_subchannel_ping(exec_ctx, target, closure);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping");
+ grpc_connected_subchannel_ping(target, on_initiate, on_ack);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping");
} else {
- GRPC_CLOSURE_SCHED(
- exec_ctx, closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected"));
+ GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Round Robin not connected"));
+ GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Round Robin not connected"));
}
}
-static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+static void rr_update_locked(grpc_lb_policy* policy,
const grpc_lb_policy_args* args) {
round_robin_lb_policy* p = (round_robin_lb_policy*)policy;
const grpc_arg* arg =
@@ -576,7 +561,7 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
// Otherwise, keep using the current subchannel list (ignore this update).
if (p->subchannel_list == nullptr) {
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"rr_update_missing");
}
@@ -588,15 +573,15 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
addresses->num_addresses);
}
grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
- exec_ctx, &p->base, &grpc_lb_round_robin_trace, addresses, args,
+ &p->base, &grpc_lb_round_robin_trace, addresses, args,
rr_connectivity_changed_locked);
if (subchannel_list->num_subchannels == 0) {
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"rr_update_empty");
if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
"sl_shutdown_empty_update");
}
p->subchannel_list = subchannel_list; // empty list
@@ -612,7 +597,7 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
(void*)subchannel_list);
}
grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->latest_pending_subchannel_list, "sl_outdated");
+ p->latest_pending_subchannel_list, "sl_outdated");
}
p->latest_pending_subchannel_list = subchannel_list;
for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
@@ -623,22 +608,21 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list,
"connectivity_watch");
grpc_lb_subchannel_data_start_connectivity_watch(
- exec_ctx, &subchannel_list->subchannels[i]);
+ &subchannel_list->subchannels[i]);
}
} else {
// The policy isn't picking yet. Save the update for later, disposing of
// previous version if any.
if (p->subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->subchannel_list, "rr_update_before_started_picking");
+ p->subchannel_list, "rr_update_before_started_picking");
}
p->subchannel_list = subchannel_list;
}
}
static void rr_set_reresolve_closure_locked(
- grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
- grpc_closure* request_reresolution) {
+ grpc_lb_policy* policy, grpc_closure* request_reresolution) {
round_robin_lb_policy* p = (round_robin_lb_policy*)policy;
GPR_ASSERT(!p->shutdown);
GPR_ASSERT(policy->request_reresolution == nullptr);
@@ -662,8 +646,7 @@ static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}
static void round_robin_factory_unref(grpc_lb_policy_factory* factory) {}
-static grpc_lb_policy* round_robin_create(grpc_exec_ctx* exec_ctx,
- grpc_lb_policy_factory* factory,
+static grpc_lb_policy* round_robin_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
GPR_ASSERT(args->client_channel_factory != nullptr);
round_robin_lb_policy* p = (round_robin_lb_policy*)gpr_zalloc(sizeof(*p));
@@ -671,7 +654,7 @@ static grpc_lb_policy* round_robin_create(grpc_exec_ctx* exec_ctx,
grpc_subchannel_index_ref();
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
- rr_update_locked(exec_ctx, &p->base, args);
+ rr_update_locked(&p->base, args);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void*)p,
(unsigned long)p->subchannel_list->num_subchannels);
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
index b6fce4d207..a3b4c8e524 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
@@ -28,8 +28,7 @@
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
-void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx* exec_ctx,
- grpc_lb_subchannel_data* sd,
+void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
const char* reason) {
if (sd->subchannel != nullptr) {
if (sd->subchannel_list->tracer->enabled()) {
@@ -41,23 +40,22 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx* exec_ctx,
(size_t)(sd - sd->subchannel_list->subchannels),
sd->subchannel_list->num_subchannels, sd->subchannel);
}
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, reason);
+ GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason);
sd->subchannel = nullptr;
if (sd->connected_subchannel != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, sd->connected_subchannel,
- reason);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, reason);
sd->connected_subchannel = nullptr;
}
if (sd->user_data != nullptr) {
GPR_ASSERT(sd->user_data_vtable != nullptr);
- sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ sd->user_data_vtable->destroy(sd->user_data);
sd->user_data = nullptr;
}
}
}
void grpc_lb_subchannel_data_start_connectivity_watch(
- grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd) {
+ grpc_lb_subchannel_data* sd) {
if (sd->subchannel_list->tracer->enabled()) {
gpr_log(GPR_DEBUG,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
@@ -69,13 +67,13 @@ void grpc_lb_subchannel_data_start_connectivity_watch(
}
sd->connectivity_notification_pending = true;
grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, sd->subchannel_list->policy->interested_parties,
+ sd->subchannel, sd->subchannel_list->policy->interested_parties,
&sd->pending_connectivity_state_unsafe,
&sd->connectivity_changed_closure);
}
void grpc_lb_subchannel_data_stop_connectivity_watch(
- grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd) {
+ grpc_lb_subchannel_data* sd) {
if (sd->subchannel_list->tracer->enabled()) {
gpr_log(GPR_DEBUG,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
@@ -90,7 +88,7 @@ void grpc_lb_subchannel_data_stop_connectivity_watch(
}
grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
- grpc_exec_ctx* exec_ctx, grpc_lb_policy* p, grpc_core::TraceFlag* tracer,
+ grpc_lb_policy* p, grpc_core::TraceFlag* tracer,
const grpc_lb_addresses* addresses, const grpc_lb_policy_args* args,
grpc_iomgr_cb_func connectivity_changed_cb) {
grpc_lb_subchannel_list* subchannel_list =
@@ -124,8 +122,8 @@ grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
gpr_free(addr_arg.value.string);
sc_args.args = new_args;
grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
- exec_ctx, args->client_channel_factory, &sc_args);
- grpc_channel_args_destroy(exec_ctx, new_args);
+ args->client_channel_factory, &sc_args);
+ grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) {
// Subchannel could not be created.
if (tracer->enabled()) {
@@ -172,8 +170,7 @@ grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
return subchannel_list;
}
-static void subchannel_list_destroy(grpc_exec_ctx* exec_ctx,
- grpc_lb_subchannel_list* subchannel_list) {
+static void subchannel_list_destroy(grpc_lb_subchannel_list* subchannel_list) {
if (subchannel_list->tracer->enabled()) {
gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p",
subchannel_list->tracer->name(), subchannel_list->policy,
@@ -181,8 +178,7 @@ static void subchannel_list_destroy(grpc_exec_ctx* exec_ctx,
}
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
- "subchannel_list_destroy");
+ grpc_lb_subchannel_data_unref_subchannel(sd, "subchannel_list_destroy");
}
gpr_free(subchannel_list->subchannels);
gpr_free(subchannel_list);
@@ -200,8 +196,7 @@ void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
}
}
-void grpc_lb_subchannel_list_unref(grpc_exec_ctx* exec_ctx,
- grpc_lb_subchannel_list* subchannel_list,
+void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
const char* reason) {
const bool done = gpr_unref(&subchannel_list->refcount);
if (subchannel_list->tracer->enabled()) {
@@ -212,7 +207,7 @@ void grpc_lb_subchannel_list_unref(grpc_exec_ctx* exec_ctx,
reason);
}
if (done) {
- subchannel_list_destroy(exec_ctx, subchannel_list);
+ subchannel_list_destroy(subchannel_list);
}
}
@@ -223,14 +218,13 @@ void grpc_lb_subchannel_list_ref_for_connectivity_watch(
}
void grpc_lb_subchannel_list_unref_for_connectivity_watch(
- grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_list* subchannel_list,
- const char* reason) {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, subchannel_list->policy, reason);
- grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason);
+ grpc_lb_subchannel_list* subchannel_list, const char* reason) {
+ GRPC_LB_POLICY_WEAK_UNREF(subchannel_list->policy, reason);
+ grpc_lb_subchannel_list_unref(subchannel_list, reason);
}
static void subchannel_data_cancel_connectivity_watch(
- grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd, const char* reason) {
+ grpc_lb_subchannel_data* sd, const char* reason) {
if (sd->subchannel_list->tracer->enabled()) {
gpr_log(GPR_DEBUG,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
@@ -240,14 +234,12 @@ static void subchannel_data_cancel_connectivity_watch(
(size_t)(sd - sd->subchannel_list->subchannels),
sd->subchannel_list->num_subchannels, sd->subchannel, reason);
}
- grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, nullptr,
- nullptr,
+ grpc_subchannel_notify_on_state_change(sd->subchannel, nullptr, nullptr,
&sd->connectivity_changed_closure);
}
void grpc_lb_subchannel_list_shutdown_and_unref(
- grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_list* subchannel_list,
- const char* reason) {
+ grpc_lb_subchannel_list* subchannel_list, const char* reason) {
if (subchannel_list->tracer->enabled()) {
gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
subchannel_list->tracer->name(), subchannel_list->policy,
@@ -261,10 +253,10 @@ void grpc_lb_subchannel_list_shutdown_and_unref(
// the callback is responsible for unreffing the subchannel.
// Otherwise, unref the subchannel directly.
if (sd->connectivity_notification_pending) {
- subchannel_data_cancel_connectivity_watch(exec_ctx, sd, reason);
+ subchannel_data_cancel_connectivity_watch(sd, reason);
} else if (sd->subchannel != nullptr) {
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, reason);
+ grpc_lb_subchannel_data_unref_subchannel(sd, reason);
}
}
- grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason);
+ grpc_lb_subchannel_list_unref(subchannel_list, reason);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index e3e5eba56a..0f8cea9347 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -65,8 +65,7 @@ typedef struct {
} grpc_lb_subchannel_data;
/// Unrefs the subchannel contained in sd.
-void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx* exec_ctx,
- grpc_lb_subchannel_data* sd,
+void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
const char* reason);
/// Starts watching the connectivity state of the subchannel.
@@ -74,11 +73,11 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx* exec_ctx,
/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call
/// grpc_lb_subchannel_data_start_connectivity_watch().
void grpc_lb_subchannel_data_start_connectivity_watch(
- grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd);
+ grpc_lb_subchannel_data* sd);
/// Stops watching the connectivity state of the subchannel.
void grpc_lb_subchannel_data_stop_connectivity_watch(
- grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd);
+ grpc_lb_subchannel_data* sd);
struct grpc_lb_subchannel_list {
/** backpointer to owning policy */
@@ -117,15 +116,14 @@ struct grpc_lb_subchannel_list {
};
grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
- grpc_exec_ctx* exec_ctx, grpc_lb_policy* p, grpc_core::TraceFlag* tracer,
+ grpc_lb_policy* p, grpc_core::TraceFlag* tracer,
const grpc_lb_addresses* addresses, const grpc_lb_policy_args* args,
grpc_iomgr_cb_func connectivity_changed_cb);
void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
const char* reason);
-void grpc_lb_subchannel_list_unref(grpc_exec_ctx* exec_ctx,
- grpc_lb_subchannel_list* subchannel_list,
+void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
const char* reason);
/// Takes and releases refs needed for a connectivity notification.
@@ -133,13 +131,11 @@ void grpc_lb_subchannel_list_unref(grpc_exec_ctx* exec_ctx,
void grpc_lb_subchannel_list_ref_for_connectivity_watch(
grpc_lb_subchannel_list* subchannel_list, const char* reason);
void grpc_lb_subchannel_list_unref_for_connectivity_watch(
- grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_list* subchannel_list,
- const char* reason);
+ grpc_lb_subchannel_list* subchannel_list, const char* reason);
/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The
/// connectivity state notification callback will ultimately unref it.
void grpc_lb_subchannel_list_shutdown_and_unref(
- grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_list* subchannel_list,
- const char* reason);
+ grpc_lb_subchannel_list* subchannel_list, const char* reason);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */