aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc104
1 files changed, 62 insertions, 42 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index e9bef8a921..db06fc20b6 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -126,7 +126,7 @@
#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
-grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb");
+grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb");
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
@@ -217,7 +217,7 @@ static void wrapped_rr_closure(grpc_exec_ctx* exec_ctx, void* arg,
} else {
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
}
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy,
wc_arg->rr_policy);
}
@@ -623,7 +623,7 @@ static void update_lb_connectivity_status_locked(
GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
}
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(
GPR_INFO,
"[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
@@ -637,7 +637,7 @@ static void update_lb_connectivity_status_locked(
/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
* immediately (ignoring its completion callback), we need to perform the
- * cleanups this callback would otherwise be resposible for.
+ * cleanups this callback would otherwise be responsible for.
* If \a force_async is true, then we will manually schedule the
* completion callback even if the pick is available immediately. */
static bool pick_from_internal_rr_locked(
@@ -654,7 +654,7 @@ static bool pick_from_internal_rr_locked(
}
if (server->drop) {
// Not using the RR policy, so unref it.
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy,
wc_arg->rr_policy);
}
@@ -684,7 +684,7 @@ static bool pick_from_internal_rr_locked(
(void**)&wc_arg->lb_token, &wc_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy,
wc_arg->rr_policy);
}
@@ -766,6 +766,9 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
glb_policy->rr_policy);
return;
}
+ grpc_lb_policy_set_reresolve_closure_locked(
+ exec_ctx, new_rr_policy, glb_policy->base.request_reresolution);
+ glb_policy->base.request_reresolution = nullptr;
glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr;
const grpc_connectivity_state rr_state =
@@ -806,7 +809,7 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
pp->wrapped_on_complete_arg.client_stats =
grpc_grpclb_client_stats_ref(glb_policy->client_stats);
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Pending pick about to (async) PICK from RR %p",
glb_policy, glb_policy->rr_policy);
@@ -821,7 +824,7 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
glb_policy->pending_pings = pping->next;
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
glb_policy, glb_policy->rr_policy);
}
@@ -837,14 +840,14 @@ static void rr_handover_locked(grpc_exec_ctx* exec_ctx,
grpc_lb_policy_args* args = lb_policy_args_create(exec_ctx, glb_policy);
GPR_ASSERT(args != nullptr);
if (glb_policy->rr_policy != nullptr) {
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", glb_policy,
glb_policy->rr_policy);
}
grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args);
} else {
create_rr_locked(exec_ctx, glb_policy, args);
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", glb_policy,
glb_policy->rr_policy);
}
@@ -991,6 +994,7 @@ static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
glb_policy->shutting_down = true;
/* We need a copy of the lb_call pointer because we can't cancell the call
@@ -1021,6 +1025,9 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
glb_policy->pending_pings = nullptr;
if (glb_policy->rr_policy != nullptr) {
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
+ } else {
+ grpc_lb_policy_try_reresolve(exec_ctx, pol, &grpc_lb_glb_trace,
+ GRPC_ERROR_CANCELLED);
}
// We destroy the LB channel here because
// glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
@@ -1030,28 +1037,27 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
grpc_channel_destroy(glb_policy->lb_channel);
glb_policy->lb_channel = nullptr;
}
- grpc_connectivity_state_set(
- exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
+ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
+ GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
+ "glb_shutdown");
while (pp != nullptr) {
pending_pick* next = pp->next;
*pp->target = nullptr;
- GRPC_CLOSURE_SCHED(
- exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+ GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_ERROR_REF(error));
gpr_free(pp);
pp = next;
}
while (pping != nullptr) {
pending_ping* next = pping->next;
- GRPC_CLOSURE_SCHED(
- exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+ GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_ERROR_REF(error));
gpr_free(pping);
pping = next;
}
+ GRPC_ERROR_UNREF(error);
}
// Cancel a specific pending pick.
@@ -1186,7 +1192,7 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
// need to make sure we aren't trying to pick from a RR policy instance
// that's in shutdown.
if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] NOT picking from from RR %p: RR conn state=%s",
glb_policy, glb_policy->rr_policy,
@@ -1196,7 +1202,7 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
on_complete);
pick_done = false;
} else { // RR not in shutdown
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
glb_policy->rr_policy);
}
@@ -1221,7 +1227,7 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
false /* force_async */, target, wc_arg);
}
} else { // glb_policy->rr_policy == NULL
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[grpclb %p] No RR policy. Adding to grpclb's pending picks",
glb_policy);
@@ -1272,7 +1278,7 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
glb_policy->retry_timer_active = false;
if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr &&
error == GRPC_ERROR_NONE) {
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy);
}
query_for_backends_locked(exec_ctx, glb_policy);
@@ -1293,7 +1299,7 @@ static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx,
grpc_millis next_try =
grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state)
.next_attempt_start_time;
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
glb_policy);
grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx);
@@ -1342,6 +1348,9 @@ static void client_load_report_done_locked(grpc_exec_ctx* exec_ctx, void* arg,
glb_policy->client_load_report_timer_pending = false;
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"client_load_report");
+ if (glb_policy->lb_call == nullptr) {
+ maybe_restart_lb_call(exec_ctx, glb_policy);
+ }
return;
}
schedule_next_client_load_report(exec_ctx, glb_policy);
@@ -1496,7 +1505,7 @@ static void query_for_backends_locked(grpc_exec_ctx* exec_ctx,
lb_call_init_locked(exec_ctx, glb_policy);
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Query for backends (lb_channel: %p, lb_call: %p)",
glb_policy, glb_policy->lb_channel, glb_policy->lb_call);
@@ -1587,7 +1596,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
glb_policy->client_stats_report_interval = GPR_MAX(
GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
&response->client_stats_report_interval));
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Received initial LB response message; "
"client load reporting interval = %" PRIdPTR " milliseconds",
@@ -1599,7 +1608,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
glb_policy->client_load_report_timer_pending = true;
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report");
schedule_next_client_load_report(exec_ctx, glb_policy);
- } else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ } else if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Received initial LB response message; client load "
"reporting NOT enabled",
@@ -1612,7 +1621,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
grpc_grpclb_response_parse_serverlist(response_slice);
if (serverlist != nullptr) {
GPR_ASSERT(glb_policy->lb_call != nullptr);
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Serverlist with %" PRIuPTR " servers received",
glb_policy, serverlist->num_servers);
@@ -1630,7 +1639,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (serverlist->num_servers > 0) {
if (grpc_grpclb_serverlist_equals(glb_policy->serverlist,
serverlist)) {
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Incoming server list identical to current, "
"ignoring.",
@@ -1659,7 +1668,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
rr_handover_locked(exec_ctx, glb_policy);
}
} else {
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Received empty server list, ignoring.",
glb_policy);
@@ -1707,7 +1716,7 @@ static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
* actually runs, don't fall back. */
if (glb_policy->serverlist == nullptr) {
if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Falling back to use backends from resolver",
glb_policy);
@@ -1724,7 +1733,7 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx,
void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
GPR_ASSERT(glb_policy->lb_call != nullptr);
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
char* status_details =
grpc_slice_to_c_string(glb_policy->lb_call_status_details);
gpr_log(GPR_INFO,
@@ -1752,7 +1761,7 @@ static void fallback_update_locked(grpc_exec_ctx* exec_ctx,
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(exec_ctx, addresses);
if (glb_policy->lb_fallback_timeout_ms > 0 &&
- !glb_policy->fallback_timer_active) {
+ glb_policy->rr_policy != nullptr) {
rr_handover_locked(exec_ctx, glb_policy);
}
}
@@ -1850,7 +1859,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx,
grpc_call_cancel(glb_policy->lb_call, nullptr);
// lb_on_server_status_received() will pick up the cancel and reinit
// lb_call.
- } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
+ } else if (glb_policy->started_picking) {
if (glb_policy->retry_timer_active) {
grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
glb_policy->retry_timer_active = false;
@@ -1867,6 +1876,20 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx,
}
}
+static void glb_set_reresolve_closure_locked(
+ grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+ grpc_closure* request_reresolution) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)policy;
+ GPR_ASSERT(!glb_policy->shutting_down);
+ GPR_ASSERT(glb_policy->base.request_reresolution == nullptr);
+ if (glb_policy->rr_policy != nullptr) {
+ grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, glb_policy->rr_policy,
+ request_reresolution);
+ } else {
+ glb_policy->base.request_reresolution = request_reresolution;
+ }
+}
+
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
@@ -1878,7 +1901,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_exit_idle_locked,
glb_check_connectivity_locked,
glb_notify_on_state_change_locked,
- glb_update_locked};
+ glb_update_locked,
+ glb_set_reresolve_closure_locked};
static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx,
grpc_lb_policy_factory* factory,
@@ -1906,7 +1930,7 @@ static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx,
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Will use '%s' as the server name for LB request.",
glb_policy, glb_policy->server_name);
@@ -1998,16 +2022,12 @@ static bool maybe_add_client_load_reporting_filter(
return true;
}
-extern "C" void grpc_lb_policy_grpclb_init() {
+void grpc_lb_policy_grpclb_init() {
grpc_register_lb_policy(grpc_glb_lb_factory_create());
- grpc_register_tracer(&grpc_lb_glb_trace);
-#ifndef NDEBUG
- grpc_register_tracer(&grpc_trace_lb_policy_refcount);
-#endif
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_client_load_reporting_filter,
(void*)&grpc_client_load_reporting_filter);
}
-extern "C" void grpc_lb_policy_grpclb_shutdown() {}
+void grpc_lb_policy_grpclb_shutdown() {}