diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 104 |
1 files changed, 62 insertions, 42 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index e9bef8a921..db06fc20b6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -126,7 +126,7 @@ #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 -grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb"); +grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb"); /* add lb_token of selected subchannel (address) to the call's initial * metadata */ @@ -217,7 +217,7 @@ static void wrapped_rr_closure(grpc_exec_ctx* exec_ctx, void* arg, } else { grpc_grpclb_client_stats_unref(wc_arg->client_stats); } - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy, wc_arg->rr_policy); } @@ -623,7 +623,7 @@ static void update_lb_connectivity_status_locked( GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); } - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log( GPR_INFO, "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.", @@ -637,7 +637,7 @@ static void update_lb_connectivity_status_locked( /* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return * immediately (ignoring its completion callback), we need to perform the - * cleanups this callback would otherwise be resposible for. + * cleanups this callback would otherwise be responsible for. * If \a force_async is true, then we will manually schedule the * completion callback even if the pick is available immediately. */ static bool pick_from_internal_rr_locked( @@ -654,7 +654,7 @@ static bool pick_from_internal_rr_locked( } if (server->drop) { // Not using the RR policy, so unref it. - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy, wc_arg->rr_policy); } @@ -684,7 +684,7 @@ static bool pick_from_internal_rr_locked( (void**)&wc_arg->lb_token, &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy, wc_arg->rr_policy); } @@ -766,6 +766,9 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, glb_policy->rr_policy); return; } + grpc_lb_policy_set_reresolve_closure_locked( + exec_ctx, new_rr_policy, glb_policy->base.request_reresolution); + glb_policy->base.request_reresolution = nullptr; glb_policy->rr_policy = new_rr_policy; grpc_error* rr_state_error = nullptr; const grpc_connectivity_state rr_state = @@ -806,7 +809,7 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; pp->wrapped_on_complete_arg.client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending pick about to (async) PICK from RR %p", glb_policy, glb_policy->rr_policy); @@ -821,7 +824,7 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, glb_policy->pending_pings = pping->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy; - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", glb_policy, glb_policy->rr_policy); } @@ -837,14 +840,14 @@ static void rr_handover_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy_args* args = lb_policy_args_create(exec_ctx, glb_policy); GPR_ASSERT(args != nullptr); if (glb_policy->rr_policy != nullptr) { - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", glb_policy, glb_policy->rr_policy); } grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args); } else { create_rr_locked(exec_ctx, glb_policy, args); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", glb_policy, glb_policy->rr_policy); } @@ -991,6 +994,7 @@ static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); glb_policy->shutting_down = true; /* We need a copy of the lb_call pointer because we can't cancell the call @@ -1021,6 +1025,9 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { glb_policy->pending_pings = nullptr; if (glb_policy->rr_policy != nullptr) { GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); + } else { + grpc_lb_policy_try_reresolve(exec_ctx, pol, &grpc_lb_glb_trace, + GRPC_ERROR_CANCELLED); } // We destroy the LB channel here because // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy @@ -1030,28 +1037,27 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = nullptr; } - grpc_connectivity_state_set( - exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown"); + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, + GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "glb_shutdown"); while (pp != nullptr) { pending_pick* next = pp->next; *pp->target = nullptr; - GRPC_CLOSURE_SCHED( - exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_ERROR_REF(error)); gpr_free(pp); pp = next; } while (pping != nullptr) { pending_ping* next = pping->next; - GRPC_CLOSURE_SCHED( - exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_ERROR_REF(error)); gpr_free(pping); pping = next; } + GRPC_ERROR_UNREF(error); } // Cancel a specific pending pick. @@ -1186,7 +1192,7 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, // need to make sure we aren't trying to pick from a RR policy instance // that's in shutdown. if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] NOT picking from from RR %p: RR conn state=%s", glb_policy, glb_policy->rr_policy, @@ -1196,7 +1202,7 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, on_complete); pick_done = false; } else { // RR not in shutdown - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy, glb_policy->rr_policy); } @@ -1221,7 +1227,7 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, false /* force_async */, target, wc_arg); } } else { // glb_policy->rr_policy == NULL - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] No RR policy. Adding to grpclb's pending picks", glb_policy); @@ -1272,7 +1278,7 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, glb_policy->retry_timer_active = false; if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr && error == GRPC_ERROR_NONE) { - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy); } query_for_backends_locked(exec_ctx, glb_policy); @@ -1293,7 +1299,7 @@ static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx, grpc_millis next_try = grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state) .next_attempt_start_time; - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); @@ -1342,6 +1348,9 @@ static void client_load_report_done_locked(grpc_exec_ctx* exec_ctx, void* arg, glb_policy->client_load_report_timer_pending = false; GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "client_load_report"); + if (glb_policy->lb_call == nullptr) { + maybe_restart_lb_call(exec_ctx, glb_policy); + } return; } schedule_next_client_load_report(exec_ctx, glb_policy); @@ -1496,7 +1505,7 @@ static void query_for_backends_locked(grpc_exec_ctx* exec_ctx, lb_call_init_locked(exec_ctx, glb_policy); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Query for backends (lb_channel: %p, lb_call: %p)", glb_policy, glb_policy->lb_channel, glb_policy->lb_call); @@ -1587,7 +1596,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, glb_policy->client_stats_report_interval = GPR_MAX( GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis( &response->client_stats_report_interval)); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Received initial LB response message; " "client load reporting interval = %" PRIdPTR " milliseconds", @@ -1599,7 +1608,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, glb_policy->client_load_report_timer_pending = true; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); schedule_next_client_load_report(exec_ctx, glb_policy); - } else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + } else if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Received initial LB response message; client load " "reporting NOT enabled", @@ -1612,7 +1621,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_grpclb_response_parse_serverlist(response_slice); if (serverlist != nullptr) { GPR_ASSERT(glb_policy->lb_call != nullptr); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Serverlist with %" PRIuPTR " servers received", glb_policy, serverlist->num_servers); @@ -1630,7 +1639,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, if (serverlist->num_servers > 0) { if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Incoming server list identical to current, " "ignoring.", @@ -1659,7 +1668,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, rr_handover_locked(exec_ctx, glb_policy); } } else { - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.", glb_policy); @@ -1707,7 +1716,7 @@ static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, * actually runs, don't fall back. */ if (glb_policy->serverlist == nullptr) { if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Falling back to use backends from resolver", glb_policy); @@ -1724,7 +1733,7 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; GPR_ASSERT(glb_policy->lb_call != nullptr); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { char* status_details = grpc_slice_to_c_string(glb_policy->lb_call_status_details); gpr_log(GPR_INFO, @@ -1752,7 +1761,7 @@ static void fallback_update_locked(grpc_exec_ctx* exec_ctx, glb_policy->fallback_backend_addresses = extract_backend_addresses_locked(exec_ctx, addresses); if (glb_policy->lb_fallback_timeout_ms > 0 && - !glb_policy->fallback_timer_active) { + glb_policy->rr_policy != nullptr) { rr_handover_locked(exec_ctx, glb_policy); } } @@ -1850,7 +1859,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx, grpc_call_cancel(glb_policy->lb_call, nullptr); // lb_on_server_status_received() will pick up the cancel and reinit // lb_call. - } else if (glb_policy->started_picking && !glb_policy->shutting_down) { + } else if (glb_policy->started_picking) { if (glb_policy->retry_timer_active) { grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); glb_policy->retry_timer_active = false; @@ -1867,6 +1876,20 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx, } } +static void glb_set_reresolve_closure_locked( + grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, + grpc_closure* request_reresolution) { + glb_lb_policy* glb_policy = (glb_lb_policy*)policy; + GPR_ASSERT(!glb_policy->shutting_down); + GPR_ASSERT(glb_policy->base.request_reresolution == nullptr); + if (glb_policy->rr_policy != nullptr) { + grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, glb_policy->rr_policy, + request_reresolution); + } else { + glb_policy->base.request_reresolution = request_reresolution; + } +} + /* Code wiring the policy with the rest of the core */ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_destroy, @@ -1878,7 +1901,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_exit_idle_locked, glb_check_connectivity_locked, glb_notify_on_state_change_locked, - glb_update_locked}; + glb_update_locked, + glb_set_reresolve_closure_locked}; static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx, grpc_lb_policy_factory* factory, @@ -1906,7 +1930,7 @@ static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx, GPR_ASSERT(uri->path[0] != '\0'); glb_policy->server_name = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Will use '%s' as the server name for LB request.", glb_policy, glb_policy->server_name); @@ -1998,16 +2022,12 @@ static bool maybe_add_client_load_reporting_filter( return true; } -extern "C" void grpc_lb_policy_grpclb_init() { +void grpc_lb_policy_grpclb_init() { grpc_register_lb_policy(grpc_glb_lb_factory_create()); - grpc_register_tracer(&grpc_lb_glb_trace); -#ifndef NDEBUG - grpc_register_tracer(&grpc_trace_lb_policy_refcount); -#endif grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, maybe_add_client_load_reporting_filter, (void*)&grpc_client_load_reporting_filter); } -extern "C" void grpc_lb_policy_grpclb_shutdown() {} +void grpc_lb_policy_grpclb_shutdown() {} |