diff options
Diffstat (limited to 'src/core/ext/filters/client_channel')
4 files changed, 111 insertions, 46 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 5fb502e2dd..9404bd7064 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -113,6 +113,7 @@ #include "src/core/lib/slice/slice_hash_table.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" @@ -397,7 +398,7 @@ typedef struct glb_lb_policy { grpc_slice lb_call_status_details; /** LB call retry backoff state */ - grpc_backoff lb_call_backoff_state; + grpc_core::Backoff* lb_call_backoff; /** LB call retry timer */ grpc_timer lb_call_retry_timer; @@ -986,6 +987,7 @@ static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); grpc_subchannel_index_unref(); + gpr_free(glb_policy->lb_call_backoff); gpr_free(glb_policy); } @@ -1150,7 +1152,7 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx, } glb_policy->started_picking = true; - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); query_for_backends_locked(exec_ctx, glb_policy); } @@ -1291,8 +1293,7 @@ static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx, } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ grpc_millis next_try = - grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state) - .next_attempt_start_time; + glb_policy->lb_call_backoff->Step(exec_ctx).next_attempt_start_time; if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); @@ -1461,12 +1462,14 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx, lb_on_response_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_backoff_init(&glb_policy->lb_call_backoff_state, - GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_GRPCLB_RECONNECT_JITTER, - GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::Backoff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) + .set_min_connect_timeout(GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000) + .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + glb_policy->lb_call_backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); glb_policy->seen_initial_response = false; glb_policy->last_client_load_report_counters_were_zero = false; @@ -1573,7 +1576,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, memset(ops, 0, sizeof(ops)); grpc_op* op = ops; if (glb_policy->lb_response_payload != nullptr) { - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); /* Received data from the LB server. Look inside * glb_policy->lb_response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 07737b19d2..041e290bb8 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -39,6 +39,7 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/service_config.h" @@ -89,7 +90,7 @@ typedef struct { bool have_retry_timer; grpc_timer retry_timer; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::Backoff* backoff; /** currently resolving addresses */ grpc_lb_addresses* lb_addresses; @@ -137,7 +138,7 @@ static void dns_ares_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver) { ares_dns_resolver* r = (ares_dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(exec_ctx, r); } } @@ -272,8 +273,7 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg, } else { const char* msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); - grpc_millis next_try = - grpc_backoff_step(exec_ctx, &r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(exec_ctx).next_attempt_start_time; grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -307,7 +307,7 @@ static void dns_ares_next_locked(grpc_exec_ctx* exec_ctx, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(exec_ctx, r); } else { dns_ares_maybe_finish_next_locked(exec_ctx, r); @@ -353,6 +353,7 @@ static void dns_ares_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); + gpr_free(r->backoff); gpr_free(r); } @@ -381,11 +382,14 @@ static grpc_resolver* dns_ares_create(grpc_exec_ctx* exec_ctx, grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::Backoff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 589c74807f..35c909e2a4 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -32,6 +32,7 @@ #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" @@ -70,7 +71,7 @@ typedef struct { grpc_timer retry_timer; grpc_closure on_retry; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::Backoff* backoff; /** currently resolving addresses */ grpc_resolved_addresses* addresses; @@ -113,7 +114,7 @@ static void dns_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver) { dns_resolver* r = (dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(exec_ctx, r); } } @@ -126,7 +127,7 @@ static void dns_next_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(exec_ctx, r); } else { dns_maybe_finish_next_locked(exec_ctx, r); @@ -170,8 +171,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_resolved_addresses_destroy(r->addresses); grpc_lb_addresses_destroy(exec_ctx, addresses); } else { - grpc_millis next_try = - grpc_backoff_step(exec_ctx, &r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(exec_ctx).next_attempt_start_time; grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -233,6 +233,7 @@ static void dns_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); + gpr_free(r->backoff); gpr_free(r); } @@ -257,11 +258,14 @@ static grpc_resolver* dns_create(grpc_exec_ctx* exec_ctx, grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::Backoff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 58e294d597..a0b441a73a 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -39,6 +39,7 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" @@ -72,6 +73,9 @@ typedef struct external_state_watcher { } external_state_watcher; struct grpc_subchannel { + grpc_subchannel(const grpc_core::Backoff::Options& backoff_options) + : backoff(backoff_options) {} + grpc_connector* connector; /** refcount @@ -118,8 +122,8 @@ struct grpc_subchannel { external_state_watcher root_external_state_watcher; /** backoff state */ - grpc_backoff backoff_state; - grpc_backoff_result backoff_result; + grpc_core::Backoff backoff; + grpc_core::Backoff::Result backoff_result; /** do we have an active alarm? */ bool have_alarm; @@ -283,6 +287,52 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx* exec_ctx, } } +static grpc_core::Backoff::Options extract_backoff_options( + const grpc_channel_args* args) { + int initial_backoff_ms = + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; + int min_connect_timeout_ms = + GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; + int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + bool fixed_reconnect_backoff = false; + if (args != nullptr) { + for (size_t i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, + "grpc.testing.fixed_reconnect_backoff_ms")) { + fixed_reconnect_backoff = true; + initial_backoff_ms = min_connect_timeout_ms = max_backoff_ms = + grpc_channel_arg_get_integer(&args->args[i], + {initial_backoff_ms, 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + min_connect_timeout_ms = grpc_channel_arg_get_integer( + &args->args[i], {min_connect_timeout_ms, 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + max_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], {max_backoff_ms, 100, INT_MAX}); + } else if (0 == strcmp(args->args[i].key, + GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + initial_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], {initial_backoff_ms, 100, INT_MAX}); + } + } + } + grpc_core::Backoff::Options backoff_options; + backoff_options.set_initial_backoff(initial_backoff_ms) + .set_multiplier(fixed_reconnect_backoff + ? 1.0 + : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(fixed_reconnect_backoff ? 0.0 + : GRPC_SUBCHANNEL_RECONNECT_JITTER) + .set_min_connect_timeout(min_connect_timeout_ms) + .set_max_backoff(max_backoff_ms); + return backoff_options; +} + grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, grpc_connector* connector, const grpc_subchannel_args* args) { @@ -294,7 +344,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, } GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx); - c = (grpc_subchannel*)gpr_zalloc(sizeof(*c)); + c = GPR_NEW(grpc_subchannel(extract_backoff_options(args->args))); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; @@ -336,7 +386,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, "subchannel"); int initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; - int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; + int min_connect_timeout_ms = + GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; bool fixed_reconnect_backoff = false; if (c->args) { @@ -344,14 +395,14 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, if (0 == strcmp(c->args->args[i].key, "grpc.testing.fixed_reconnect_backoff_ms")) { fixed_reconnect_backoff = true; - initial_backoff_ms = min_backoff_ms = max_backoff_ms = + initial_backoff_ms = min_connect_timeout_ms = max_backoff_ms = grpc_channel_arg_get_integer(&c->args->args[i], {initial_backoff_ms, 100, INT_MAX}); } else if (0 == strcmp(c->args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; - min_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {min_backoff_ms, 100, INT_MAX}); + min_connect_timeout_ms = grpc_channel_arg_get_integer( + &c->args->args[i], {min_connect_timeout_ms, 100, INT_MAX}); } else if (0 == strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; @@ -365,12 +416,15 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, } } } - grpc_backoff_init( - &c->backoff_state, initial_backoff_ms, - fixed_reconnect_backoff ? 1.0 - : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER, - min_backoff_ms, max_backoff_ms); + grpc_core::Backoff::Options backoff_options; + backoff_options.set_initial_backoff(initial_backoff_ms) + .set_multiplier(fixed_reconnect_backoff + ? 1.0 + : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(fixed_reconnect_backoff ? 0.0 + : GRPC_SUBCHANNEL_RECONNECT_JITTER) + .set_min_connect_timeout(min_connect_timeout_ms) + .set_max_backoff(max_backoff_ms); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -429,7 +483,7 @@ static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->backoff_result = grpc_backoff_step(exec_ctx, &c->backoff_state); + c->backoff_result = c->backoff.Step(exec_ctx); continue_connect_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { @@ -466,7 +520,7 @@ static void maybe_start_connecting_locked(grpc_exec_ctx* exec_ctx, if (!c->backoff_begun) { c->backoff_begun = true; - c->backoff_result = grpc_backoff_begin(exec_ctx, &c->backoff_state); + c->backoff_result = c->backoff.Begin(exec_ctx); continue_connect_locked(exec_ctx, c); } else { GPR_ASSERT(!c->have_alarm); |